在看 kube-scheduler
元件的過程中遇到了 kube-scheduler
對於 client-go
的呼叫,泛泛的理解呼叫過程總有種隔靴搔癢的感覺,於是調轉頭先把 client-go
理清楚在回來看 kube-scheduler
。
為什麼要看 client-go
,並且要深入到原理,原始碼層面去看。很簡單,因為它很重要。重要在兩方面:
kubernetes
元件通過 client-go
和 kube-apiserver
互動。client-go
簡單,易用,大部分基於 Kubernetes
做二次開發的應用,在和 kube-apiserver
互動時會使用 client-go
。當然,不僅在於使用,理解層面,對於我們學習程式碼開發,架構等也有幫助。
client-go
支援四種使用者端物件,分別是 RESTClient
,ClientSet
,DynamicClient
和 DiscoveryClient
:
元件或者二次開發的應用可以通過這四種使用者端物件和 kube-apiserver
互動。其中,RESTClient
是最基礎的使用者端物件,它封裝了 HTTP Request
,實現了 RESTful
風格的 API
。ClientSet
基於 RESTClient
,封裝了對於 Resource
和 Version
的請求方法。DynamicClient
相比於 ClientSet
提供了全資源,包括自定義資源的請求方法。DiscoveryClient
用於發現 kube-apiserver
支援的資源組,資源版本和資源資訊。
每種使用者端適用的場景不同,主要是對 HTTP Request
做了層層封裝,具體的程式碼實現可參考 client-go 使用者端物件。
僅僅封裝 HTTP Request
是不夠的,元件通過 client-go
和 kube-apiserver
互動,必然對實時性,可靠性等有很高要求。試想,如果 ETCD
中儲存的資料和元件通過 client-go
從 ETCD
獲取的資料不匹配的話,那將會是一個非常嚴重的問題。
如何實現 client-go
的實時性,可靠性?client-go
給出的答案是:informer
機制。
client-go informer 流程圖
informer
機制的核心元件包括:
Reflector
: 主要負責兩類任務:
client-go
使用者端物件 list kube-apiserver
資源,並且 watch kube-apiserver
資源變更。Delta FIFO
佇列。Informer
: 主要負責三類任務:
Reflector
放入佇列的資源拿出來。indexer
元件。indexer
元件之後觸發回撥函數,處理回撥事件。Indexer
: indexer
元件負責將資源資訊存入到本地記憶體資料庫(實際是 map
物件),該資料庫作為快取存在,其資源資訊和 ETCD
中的資源資訊完全一致(得益於 watch
機制)。因此,client-go
可以從本地 indexer
中讀取相應的資源,而不用每次都從 kube-apiserver
中獲取資源資訊。這也實現了 client-go
對於實時性的要求。接下來從原始碼角度看各個元件的處理流程,力圖做到知其然,知其所以然。
直接閱讀 informer
原始碼是非常晦澀難懂的,這裡通過 informer
的程式碼範例開始學習:
package main
import (
"log"
"time"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 解析 kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}
// 建立 ClientSet 使用者端物件
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
// 建立 sharedInformers
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
// 建立 informer
informer := sharedInformers.Core().V1().Pods().Informer()
// 建立 Event 回撥 handler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("New Pod Added to Store: %s", mObj.GetName())
},
UpdateFunc: func(oldObj, newObj interface{}) {
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
},
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("Pod Deleted from Store: %s", mObj.GetName())
},
})
// 執行 informer
informer.Run(stopCh)
}
執行結果如下:
# go run informer.go
2023/12/14 12:00:26 New Pod Added to Store: prometheus-alertmanager-0
2023/12/14 12:01:26 prometheus-alertmanager-0 Pod Updated to prometheus-alertmanager-0
上述程式碼範例分為三部分:建立 informer
,建立 informer
的 EventHandler
,執行 informer
。下面,通過這三部分流程介紹 client-go
的核心元件。
informer
建立 informer
分為兩步。
1)建立工廠 sharedInformerFactory
// sharedInformers factory
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
// client-go/informers/factory.go
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}
sharedInformerFactory
實現了 SharedInformerFactory
介面,該工廠負責建立 informer
。
2)建立 informer
// 建立 informer
informer := sharedInformers.Core().V1().Pods().Informer()
// 呼叫 Core 方法
func (f *sharedInformerFactory) Core() core.Interface {
return core.New(f, f.namespace, f.tweakListOptions)
}
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// 呼叫 V1 方法
func (g *group) V1() v1.Interface {
return v1.New(g.factory, g.namespace, g.tweakListOptions)
}
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// 呼叫 Pods 方法
func (v *version) Pods() PodInformer {
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
經過層層構建建立 podInformer
物件,該物件實現了 PodInformer
介面,呼叫介面的 Informer
方法建立 informer
物件:
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
podInformer.Informer
實際呼叫的是 sharedInformerFactory.InformerFor
:
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
// 反射出資源物件 obj 的 type
informerType := reflect.TypeOf(obj)
// 讀取並判斷資源物件的 informer
informer, exists := f.informers[informerType]
if exists {
return informer
}
...
// 呼叫 newFunc 建立 informer
informer = newFunc(f.client, resyncPeriod)
// 將 type:informer 加入到 factory 的 informers 中
f.informers[informerType] = informer
return informer
}
從 InformerFor
方法可以看出,sharedInformerFactory
的 share 體現在同一個資源型別共用 informer
。
這麼設計在於,每個 informer
包括一個 Reflector
,Reflector
通過存取 kube-apiserver
實現 ListAndWatch
操作。共用 informer
實際是共用 Reflector
,這種共用機制將減少 Reflector
對於 kube-apiserver
的存取,降低 kube-apiserver
的負載,節約資源。
繼續看,建立 informer
的 newFunc
函數做了什麼:
informer = newFunc(f.client, resyncPeriod)
// client-go/informers/core/v1/pod.go
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
newFunc
實際呼叫的是 NewFilteredPodInformer
函數,在函數內建立 cache.ListAndWatch
物件,物件中包括 ListFunc
和 WatchFunc
回撥函數,回撥函數內呼叫 ClientSet
實現 list 和 watch 資源物件。
繼續看 cache.NewSharedIndexInformer
:
// client-go/tools/cache/shared_informer.go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
return NewSharedIndexInformerWithOptions(
lw,
exampleObject,
SharedIndexInformerOptions{
ResyncPeriod: defaultEventHandlerResyncPeriod,
Indexers: indexers,
},
)
}
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
realClock := &clock.RealClock{}
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
processor: &sharedProcessor{clock: realClock},
listerWatcher: lw,
objectType: exampleObject,
objectDescription: options.ObjectDescription,
resyncCheckPeriod: options.ResyncPeriod,
defaultEventHandlerResyncPeriod: options.ResyncPeriod,
clock: realClock,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
}
}
在 NewSharedIndexInformerWithOptions
函數內建立 informer sharedIndexInformer
。可以看到,sharedIndexInformer
內包括了 indexer
核心元件。
informer
建立完成。接下來為 informer
新增回撥函數 EventHandler
。
EventHandler
程式碼實現如下:
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("New Pod Added to Store: %s", mObj.GetName())
},
UpdateFunc: func(oldObj, newObj interface{}) {
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
},
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("Pod Deleted from Store: %s", mObj.GetName())
},
})
建立 EventHandler
的 handler
中包括三種回撥函數:AddFunc
,UpdateFunc
和 DeleteFunc
,三種回撥函數分別在資源有增加,變更,刪除時觸發。
在 sharedIndexInformer.AddEventHandler
內,將 handler
傳遞給 sharedIndexInformer.AddEventHandlerWithResyncPeriod
方法,該方法主要建立 listener
物件:
// client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
...
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
if !s.started {
return s.processor.addListener(listener), nil
}
...
}
// client-go/tools/cache/shared_informer.go
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
ret.determineNextResync(now)
return ret
}
func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
...
p.listeners[listener] = true
...
return listener
}
listener
物件包含通道 addCh
和 nextCh
,以及 handler
等物件。最後將 listener
存入 sharedIndexInformer.sharedProcessor
中。
建立完 informer
的 EventHandler
,接下來該執行 informer
了。