client-go實戰之九:手寫一個kubernetes的controller

2023-10-23 09:00:28

歡迎存取我的GitHub

這裡分類和彙總了欣宸的全部原創(含配套原始碼):https://github.com/zq2599/blog_demos

本篇概覽

  • 本文是《client-go實戰》系列的第九篇,前面咱們已經瞭解了client-go的基本功能,現在要來一次經典的綜合實戰了,接下來咱們會手寫一個kubernetes的controller,其功能是:監聽某種資源的變化,一旦資源發生變化(例如增加或者刪除),apiserver就會有廣播發出,controller使用client-go可以訂閱這個廣播,然後在收到廣播後進行各種業務操作,
  • 本次實戰程式碼量略大,但如果隨本文一步步先設計再開發,並不會覺得有太多,總的來說由以下內容構成
  1. 程式碼整體架構一覽
  2. 對著架構細說流程
  3. 全域性重點的小結
  4. 編碼實戰

程式碼整體架構一覽

  • 首先,再次明確本次實戰的目標:開發出類似kubernetes的controller那樣的功能,實時監聽pod資源的變化,針對每個變化做出響應
  • 今天的實戰源自client-go的官方demo,其主要架構如下
  • 可能您會覺得上圖有些複雜,沒關係,接下來咱們細說此圖,為後面的編碼打好理論基礎

對著架構細說流程

  • 首先將上述架構圖中涉及的內容進行分類,共有三部分
  1. 最左側的Kubernetes API Server+etcd是第一部分,它們都是kubernetes的內部元件
  2. 第二部分是整個informer,informer是client-go庫的核心模組
  3. 第三部分是WorkQueue和Conrol Loop,它們都是controller的業務邏輯程式碼
  • 上面三部分合作,就能做到監聽資源變化並做出響應
  • 另外,informer內部很複雜也很精巧,後面會有專門的文章去細說,本篇只會提到與controller有關係的informer細節,其餘的能不提就不提(不然內容太多,這篇文章寫不完了)
  • 分類完畢後,再來聊流程
  1. controller會通過client-go的list&watch機制與API Server建立長連線(http2的stream),只要pod資源發生變化,API Server就會通過長連線推播到controller
  2. API Server推的資料到達Reflector,它將資料寫入Delta FIFO Queue
  3. Delta FIFO Queue是個先入先出的佇列,除了pod資訊還儲存了操作型別(增加、修改、刪除),informer內部不斷從這個佇列獲取資料,再執行AddFunc、UpdateFunc、DeleteFunc等方法
  4. 完整的pod資料被存放在Local Store中,外部通過Indexer隨時可以獲取到
  5. controller中準備一個或多個工作佇列,在執行AddFunc、UpdateFunc、DeleteFunc等方法時,可以將客製化化的資料放入工作佇列中
  6. controller中啟動一個或多個協程,持續從工作佇列中取資料,執行業務邏輯,執行過程中如果需要pod的詳細資料,可以通過indexder獲取
  • 差不多了,我有種胸有成竹的感覺,迫不及待想寫程式碼,但還是忍忍吧,先規劃再動手

編碼規劃

  • 所謂規劃就是把步驟捋清楚,先寫啥再寫啥,如下圖所示
  • 捋順了,開始寫程式碼吧

編碼之一:定義Controller資料結構(controller.go)

type Controller struct {
	indexer  cache.Indexer
	queue    workqueue.RateLimitingInterface
	informer cache.Controller
}
  • 從上述程式碼可見Controller結構體有三個成員,indexer是informer內負責存取完整資源資訊的物件,queue是用於業務邏輯的工作佇列

編碼之二:編寫業務邏輯程式碼(controller.go)

  • 業務邏輯程式碼共有四部分
  1. 把資源變化資訊存入工作佇列,這裡可能按實際需求客製化(例如有的資料不關注就丟棄了)
  2. 從工作佇列中取出資料
  3. 取出資料後的處理邏輯,這邊是純粹的業務需求了,各人的實現都不一樣
  4. 例外處理
  • 步驟1,存入工作佇列的操作,留待初始化informer的時候再做,
  • 步驟4,例外處理稍後也有單獨段落細說
  • 這裡只聚焦步驟2和3:怎麼取,取出後怎麼用
  • 先寫步驟2的程式碼:從工作佇列中取取資料,用名為processNextItem的方法來實現(對每一行程式碼進行中文註釋著實不易,支援的話請點個贊)
func (c *Controller) processNextItem() bool {
	// 阻塞等待,直到佇列中有資料可以被取出,
	// 另外有可能是多協程並行獲取資料,此key會被放入processing中,表示正在被處理
	key, quit := c.queue.Get()
	// 如果最外層呼叫了佇列的Shutdown,這裡的quit就會返回true,
	// 呼叫processNextItem的地方發現processNextItem返回false,就不會再次呼叫processNextItem了
	if quit {
		return false
	}

	// 表示該key已經被處理完成(從processing中移除)
	defer c.queue.Done(key)

	// 呼叫業務方法,實現具體的業務需求
	err := c.syncToStdout(key.(string))
	// Handle the error if something went wrong during the execution of the business logic

	// 判斷業務邏輯處理是否出現異常,如果出現就重新放入佇列,以此實現重試,如果已經重試過5次,就放棄
	c.handleErr(err, key)

	// 呼叫processNextItem的地方發現processNextItem返回true,就會再次呼叫processNextItem
	return true
}
  • 接下來寫業務處理的程式碼,就是上面呼叫的syncToStdout方法,常規套路是檢查spec和status的差距,然後讓status和spec保持一致,(例如spec中指定副本數為2,而status中記錄了真實的副本數是1,所以業務處理就是增加一個副本數),這裡僅僅是為了展示業務處理程式碼在哪些,所以就簡(fu)化(yan)一些了,只列印pod的名稱
func (c *Controller) syncToStdout(key string) error {
	// 根據key從本地儲存中獲取完整的pod資訊
	// 由於有長連線與apiserver保持同步,因此原生的pod資訊與kubernetes叢集內保持一致
	obj, exists, err := c.indexer.GetByKey(key)
	if err != nil {
		klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
		return err
	}

	if !exists {
		fmt.Printf("Pod %s does not exist anymore\n", key)
	} else {
		// 這裡就是真正的業務邏輯程式碼了,一般會比較spce和status的差異,然後做出處理使得status與spce保持一致,
		// 此處為了程式碼簡單僅僅列印一行紀錄檔
		fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
	}
	return nil
}

編碼之三:編寫錯誤處理程式碼(controller.go)

  • 回顧前面的processNextItem方法內容,在呼叫syncToStdout執行完業務邏輯後就立即呼叫handleErr方法了,此方法的作用是檢查syncToStdout的返回值是否有錯誤,然後做針對性處理
func (c *Controller) handleErr(err error, key interface{}) {
	// 沒有錯誤時的處理邏輯
	if err == nil {
		// 確認這個key已經被成功處理,在佇列中徹底清理掉
		// 假設之前在處理該key的時候曾報錯導致重新進入佇列等待重試,那麼也會因為這個Forget方法而不再被重試
		c.queue.Forget(key)
		return
	}

	// 程式碼走到這裡表示前面執行業務邏輯的時候發生了錯誤,
	// 檢查已經重試的次數,如果不操作5次就繼續重試,這裡可以根據實際需求客製化
	if c.queue.NumRequeues(key) < 5 {
		klog.Infof("Error syncing pod %v: %v", key, err)
		c.queue.AddRateLimited(key)
		return
	}

	// 如果重試超過了5次就徹底放棄了,也像執行成功那樣呼叫Forget做徹底清理(否則就沒完沒了了)
	c.queue.Forget(key)
	// 向外部報告錯誤,走通用的錯誤處理流程
	runtime.HandleError(err)
	klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
  • 好了,和業務有關的程式碼已經完成,接下來就是搭建controller框架,把基本功能串起來

編碼之四:編寫Controller主流程(controller.go)

  • 編寫一個完整的Controller,最基本的是構造方法,Controller的構造方法也很簡單,儲存三個重要的成員變數即可
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
	return &Controller{
		informer: informer,
		indexer:  indexer,
		queue:    queue,
	}
}
  • 先定義個名為runWorker的簡單方法,裡面是個無限迴圈,只要消費訊息的processNextItem方法返回true,就無限迴圈下去
func (c *Controller) runWorker() {
	for c.processNextItem() {
	}
}
  • 然後是Controller主流程程式碼,簡介清晰,啟動informer,開始接受apiserver推播,寫入工作佇列,然後開啟無限迴圈從工作佇列取資料並處理
func (c *Controller) Run(workers int, stopCh chan struct{}) {
	defer runtime.HandleCrash()

	// 只要工作佇列的ShutDown方法被呼叫,processNextItem方法就會返回false,runWorker的無限迴圈就會結束
	defer c.queue.ShutDown()
	klog.Info("Starting Pod controller")

	// informer的Run方法執行後,就開始接受apiserver推播的資源變更事件,並更新本地儲存
	go c.informer.Run(stopCh)

	// 等待本地儲存和apiserver完成同步
	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}

	// 啟動worker,並行從工作佇列取資料,然後執行業務邏輯
	for i := 0; i < workers; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	<-stopCh
	klog.Info("Stopping Pod controller")
}
  • 現在一個完整的Controller已經完成了,接下來編寫呼叫Controller的程式碼,將其所需的三個物件傳入,再呼叫它的Run方法

編碼之五:編寫呼叫Controller的程式碼(controller_demo.go)

  • 為了能讓整個工程的main方法呼叫Controller,這裡新增controller_demo.go方法,裡面新增名為ControllerDemo的資料結構,建立Controller物件以及為其準備成員變數的操作都在ControllerDemo.DoAction方法中
package action

import (
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
)

type ControllerDemo struct{}

func (controllerDemo ControllerDemo) DoAction(clientset *kubernetes.Clientset) error {

	// 建立ListWatch物件,指定要監控的資源型別是pod,namespace是default
	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

	// 建立工作佇列
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	// 建立informer,並將返回的儲存物件儲存在變數indexer中
	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		// 響應新增資源事件的方法,可以按照業務需求來客製化,
		// 這裡的做法比較常見:寫入工作佇列
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
		// 響應修改資源事件的方法,可以按照業務需求來客製化,
		// 這裡的做法比較常見:寫入工作佇列
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				queue.Add(key)
			}
		},
		// 響應修改資源事件的方法,可以按照業務需求來客製化,
		// 這裡的做法比較常見:寫入工作佇列,注意刪除的時候生成key的方法和新增修改不一樣
		DeleteFunc: func(obj interface{}) {
			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
			// key function.
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	}, cache.Indexers{})

	// 建立Controller物件,將所需的三個變數物件傳入
	controller := NewController(queue, indexer, informer)

	// Now let's start the controller
	stop := make(chan struct{})
	defer close(stop)
	// 在協程中啟動controller
	go controller.Run(1, stop)

	// Wait forever
	select {}
	return nil
}

編碼之六:main方法中支援(main.go)

  • 然後是整個工程的main方法,裡面增加一段程式碼,支援新增的ControllerDemo,如下圖黃框所示
  • 最後,如果您使用的是vscode,記得修改launch.json,如下圖黃色箭頭,這樣main方法執行的時候就會執行Controller的程式碼了

執行和驗證

  • 現在工程目錄執行以下命令,獲取必要的包
go get k8s.io/apimachinery/pkg/util/[email protected]
  • 確保kubernetes環境正常,.kube/config設定也能正常使用,然後執行main.go
  • 使用kubectl edit xxx修改kubernetes環境中的pod,例如我這裡改的是下圖黃色箭頭的值
  • 修改完畢儲存退出後,執行mian.go的控制檯立即有內容輸出,如下圖黃色箭頭,是咱們前面的syncToStdout方法的輸入,符合預期
  • 至此,整個Controller已經開發完成了,相信您已經熟悉了informer和kubernetes的controller的基本套路,加上前面的文章打下的基礎,再去做kubernetes二次開發,或者operator開發等都能輕鬆駕馭了

本篇涉及知識點串講

  • 前幾篇的風格,都是抓住一個問題深入研究和實踐,但是到了本篇似乎多個知識點同時湧出,並且還要緊密配合完成業務目標,可能年輕的您一下子略有不適應,我這裡再次將本次開發中的重點進行總結,經歷過一番實戰,再來看這些總結,相信您很容易就融會貫通了
  • 先給出資料流檢視,結合前面的實戰,您應該能一眼看懂
  • 接下來開始梳理重點
  1. 建立一個名為podListWatcher的ListWatch物件,用於對指定資源型別建立監聽(本例中監聽的資源是pod)
  2. 建立一個名為queue的工作佇列,就是個先進先出的記憶體物件,沒啥特別之處
  3. 通過podListWatcher建立一個informer,這個informer的功能對podListWatcher監聽的事件作相應
  4. 在建立informer的時候還會返回一個名為indexer的本地快取,這裡面儲存了所有pod資訊(由於pod的變動全部都會被informer收到,因此indexer中儲存了最新的pod資訊)
  5. 在新協程中啟動informer,這裡面對應兩件事情:第一,建立Reflector物件,這個Reflector物件會把podListWatcher監聽到的資料放入一個DeltaFIFO佇列(注意不是步驟2中的工作佇列),第二是迴圈地取出fifo佇列中的資料,再呼叫AddFunc、UpdateFunc、DeleteFunc等方法
  6. 步驟5中提到的AddFunc、UpdateFunc、DeleteFunc可以在建立informer的時候,由業務開發者自定義,一般會再次將key放入工作佇列中
  7. 在新協程消費工作佇列queue的資料,這裡可以根據業務需求寫入也任務邏輯程式碼
  • 基於以上詳細描述,再來個精簡版,介紹重點物件,如果您對詳細描述不感興趣,可以只看精簡版,掌握其中關鍵即可
  1. podListWatcher:用於監聽指定型別資源的變化
  2. queue:工作佇列,從裡面取出的key,其資源都有事件發生
  3. informer:接受監聽到的事件,再呼叫指定的回撥方法
  4. Reflector:informer內部三大物件之一,用於接受事件再寫入一個內部fifo佇列
  5. DeltaFIFO:informer內部三大物件之二,先入先出佇列,還儲存了操作型別
  6. indexer:informer內部三大物件之三,這裡面儲存的是指定資源的完整資料,和apiserver側保持同步
  7. 接受訊息的協程:informer在這個協程中啟動,也在這個協程中將資料寫入工作佇列
  8. 處理工作佇列的協程:負責從工作佇列中取出資料處理
  9. 工作佇列queue和informer內部的fifo是不同的佇列,是兩回事,為了滿足業務需求,我們可以在一個controller中建立多個工作佇列,也可以不要工作佇列(在informer的三個回撥方法中完成業務邏輯)

以下是官方參考資訊

原始碼下載

名稱 連結 備註
專案主頁 https://github.com/zq2599/blog_demos 該專案在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該專案原始碼的倉庫地址,https協定
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該專案原始碼的倉庫地址,ssh協定
  • 這個git專案中有多個資料夾,本篇的原始碼在tutorials/client-go-tutorials資料夾下,如下圖紅框所示:
  • 寫到這裡,client-go基本功的學習已經完成了,接下來咱們還要繼續深入研究,讓這個優秀的庫在手中發揮更大的威力,欣宸原創,敬請期待

歡迎關注部落格園:程式設計師欣宸

學習路上,你不孤單,欣宸原創一路相伴...