這裡分類和彙總了欣宸的全部原創(含配套原始碼):https://github.com/zq2599/blog_demos
type Controller struct {
indexer cache.Indexer
queue workqueue.RateLimitingInterface
informer cache.Controller
}
func (c *Controller) processNextItem() bool {
// 阻塞等待,直到佇列中有資料可以被取出,
// 另外有可能是多協程並行獲取資料,此key會被放入processing中,表示正在被處理
key, quit := c.queue.Get()
// 如果最外層呼叫了佇列的Shutdown,這裡的quit就會返回true,
// 呼叫processNextItem的地方發現processNextItem返回false,就不會再次呼叫processNextItem了
if quit {
return false
}
// 表示該key已經被處理完成(從processing中移除)
defer c.queue.Done(key)
// 呼叫業務方法,實現具體的業務需求
err := c.syncToStdout(key.(string))
// Handle the error if something went wrong during the execution of the business logic
// 判斷業務邏輯處理是否出現異常,如果出現就重新放入佇列,以此實現重試,如果已經重試過5次,就放棄
c.handleErr(err, key)
// 呼叫processNextItem的地方發現processNextItem返回true,就會再次呼叫processNextItem
return true
}
func (c *Controller) syncToStdout(key string) error {
// 根據key從本地儲存中獲取完整的pod資訊
// 由於有長連線與apiserver保持同步,因此原生的pod資訊與kubernetes叢集內保持一致
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
fmt.Printf("Pod %s does not exist anymore\n", key)
} else {
// 這裡就是真正的業務邏輯程式碼了,一般會比較spce和status的差異,然後做出處理使得status與spce保持一致,
// 此處為了程式碼簡單僅僅列印一行紀錄檔
fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
}
return nil
}
func (c *Controller) handleErr(err error, key interface{}) {
// 沒有錯誤時的處理邏輯
if err == nil {
// 確認這個key已經被成功處理,在佇列中徹底清理掉
// 假設之前在處理該key的時候曾報錯導致重新進入佇列等待重試,那麼也會因為這個Forget方法而不再被重試
c.queue.Forget(key)
return
}
// 程式碼走到這裡表示前面執行業務邏輯的時候發生了錯誤,
// 檢查已經重試的次數,如果不操作5次就繼續重試,這裡可以根據實際需求客製化
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing pod %v: %v", key, err)
c.queue.AddRateLimited(key)
return
}
// 如果重試超過了5次就徹底放棄了,也像執行成功那樣呼叫Forget做徹底清理(否則就沒完沒了了)
c.queue.Forget(key)
// 向外部報告錯誤,走通用的錯誤處理流程
runtime.HandleError(err)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
return &Controller{
informer: informer,
indexer: indexer,
queue: queue,
}
}
func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
func (c *Controller) Run(workers int, stopCh chan struct{}) {
defer runtime.HandleCrash()
// 只要工作佇列的ShutDown方法被呼叫,processNextItem方法就會返回false,runWorker的無限迴圈就會結束
defer c.queue.ShutDown()
klog.Info("Starting Pod controller")
// informer的Run方法執行後,就開始接受apiserver推播的資源變更事件,並更新本地儲存
go c.informer.Run(stopCh)
// 等待本地儲存和apiserver完成同步
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 啟動worker,並行從工作佇列取資料,然後執行業務邏輯
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
klog.Info("Stopping Pod controller")
}
package action
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type ControllerDemo struct{}
func (controllerDemo ControllerDemo) DoAction(clientset *kubernetes.Clientset) error {
// 建立ListWatch物件,指定要監控的資源型別是pod,namespace是default
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
// 建立工作佇列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 建立informer,並將返回的儲存物件儲存在變數indexer中
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
// 響應新增資源事件的方法,可以按照業務需求來客製化,
// 這裡的做法比較常見:寫入工作佇列
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
// 響應修改資源事件的方法,可以按照業務需求來客製化,
// 這裡的做法比較常見:寫入工作佇列
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
// 響應修改資源事件的方法,可以按照業務需求來客製化,
// 這裡的做法比較常見:寫入工作佇列,注意刪除的時候生成key的方法和新增修改不一樣
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
// 建立Controller物件,將所需的三個變數物件傳入
controller := NewController(queue, indexer, informer)
// Now let's start the controller
stop := make(chan struct{})
defer close(stop)
// 在協程中啟動controller
go controller.Run(1, stop)
// Wait forever
select {}
return nil
}
go get k8s.io/apimachinery/pkg/util/[email protected]
名稱 | 連結 | 備註 |
---|---|---|
專案主頁 | https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協定 |
git倉庫地址(ssh) | [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協定 |