GoRoutineMap 定義了一種型別,可以執行具有名稱的 goroutine 並跟蹤它們的狀態。它防止建立具有相同名稱的多個goroutine,並且在上一個具有該名稱的 goroutine 完成後的一段退避時間內可能阻止重新建立 goroutine。
使用GoRoutineMap場景:
使用GoRoutineMap大體步驟如下:
1)通過goRoutineMap.NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {....}方法建立GoRoutineMap結構體物件,用於管理goroutine 並跟蹤它們的狀態;
2)呼叫GoRoutineMap結構體物件Run(operationName, operation)方法,其能夠防止建立具有相同名稱的多個goroutine,並使用協程的方式執行函數邏輯,如果函數成功執行,則退出該協程;如果函數執行報錯,在指數退避的時間內禁止再次執行該函數邏輯。
注意 1:本文程式碼基於Kubernetes 1.24.10版本,包路徑kubernetes-1.24.10/pkg/util/goroutinemap/goroutinemap.go。
注意 2:概述中涉及的程式碼會在下文進行詳細解釋。
GoRoutineMap工具包介面定義:
type GoRoutineMap interface { // Run adds operation name to the list of running operations and spawns a // new go routine to execute the operation. // If an operation with the same operation name already exists, an // AlreadyExists or ExponentialBackoff error is returned. // Once the operation is complete, the go routine is terminated and the // operation name is removed from the list of executing operations allowing // a new operation to be started with the same operation name without error. Run(operationName string, operationFunc func() error) error // Wait blocks until operations map is empty. This is typically // necessary during tests - the test should wait until all operations finish // and evaluate results after that. Wait() // WaitForCompletion blocks until either all operations have successfully completed // or have failed but are not pending. The test should wait until operations are either // complete or have failed. WaitForCompletion() IsOperationPending(operationName string) bool }
goRoutineMap結構體實現GoRoutineMap介面,定義如下:
// goRoutineMap結構體實現GoRoutineMap介面, type goRoutineMap struct { // 用於記錄goRoutineMap維護協程的狀態 operations map[string]operation // 發生錯誤時是否指數級補償 exponentialBackOffOnError bool // 用在多個 Goroutine 等待,一個 Goroutine 通知(事件發生)的場景 cond *sync.Cond lock sync.RWMutex } // operation結構體物件維護單個goroutine的狀態。 type operation struct { // 是否操作掛起 operationPending bool // 單個goroutine執行邏輯報錯時,實現以指數退避方式 expBackoff exponentialbackoff.ExponentialBackoff }
ExponentialBackoff結構體包含最後一次出現的錯誤、最後一次出現錯誤的時間以及不允許重試的持續時間。
// ExponentialBackoff contains the last occurrence of an error and the duration // that retries are not permitted. type ExponentialBackoff struct { lastError error lastErrorTime time.Time durationBeforeRetry time.Duration }
通過goRoutineMap.NewGoRoutineMap方法建立GoRoutineMap結構體物件,用於管理goroutine 並跟蹤它們的狀態;
// NewGoRoutineMap returns a new instance of GoRoutineMap. func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap { g := &goRoutineMap{ operations: make(map[string]operation), exponentialBackOffOnError: exponentialBackOffOnError, } g.cond = sync.NewCond(&g.lock) return g }
呼叫GoRoutineMap結構體物件Run(operationName, operation)方法,其能夠防止建立具有相同名稱的多個goroutine,並使用協程的方式執行函數邏輯,如果函數成功執行,則退出該協程;如果函數執行報錯,在指數退避的時間內禁止再次執行該函數邏輯。
// Run函數是外部函數,是goRoutineMap核心方法,其能夠防止建立具有相同名稱的多個goroutine,並使用協程的方式執行函數邏輯 // 如果函數成功執行,則退出該協程;如果函數執行報錯,在指數退避的時間內禁止再次執行該函數邏輯。 func (grm *goRoutineMap) Run( operationName string, operationFunc func() error) error { grm.lock.Lock() defer grm.lock.Unlock() // 判斷grm.operations這個map中是否存在具有operationName名稱的協程 existingOp, exists := grm.operations[operationName] if exists { // 如果grm.operations這個map中已經存在operationName名稱的協程,並且existingOp.operationPending==true,說明grm.operations中operationName名稱這個協程正在執行函數邏輯,在這期間又有一個同名的 // operationName希望加入grm.operations這個map,此時加入map失敗並報AlreadyExistsError錯誤 if existingOp.operationPending { return NewAlreadyExistsError(operationName) } // 到這步說明名稱為operationName名稱的協程執行函數邏輯失敗,此時判斷此協程最後一次失敗時間 + 指數退避的時間 >= 當前時間,如果不符合條件的話禁止執行該協程函數邏輯。 if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil { return err } } // 如果grm.operations這個map中不存在operationName名稱的協程 或者 此協程最後一次失敗時間 + 指數退避的時間 < 當前時間,則在grm.operations這個map中重新維護此協程(注意,operationPending=true) grm.operations[operationName] = operation{ operationPending: true, expBackoff: existingOp.expBackoff, } // 以協程方式執行函數邏輯operationFunc() go func() (err error) { // 捕獲崩潰並記錄錯誤,預設不傳參的話,在程式傳送崩潰時,在控制檯列印一下崩潰紀錄檔後再崩潰,方便技術人員排查程式錯誤。 defer k8sRuntime.HandleCrash() // 如果執行operationFunc()函數邏輯不報錯或者grm.exponentialBackOffOnError=false的話,將從grm.operations這個map中移除此operationName名稱協程; // 如果執行operationFunc()函數邏輯報錯並且grm.exponentialBackOffOnError=true,則將產生指數級補償,到達補償時間後才能再呼叫此operationName名稱協程的函數邏輯 // Handle completion of and error, if any, from operationFunc() defer grm.operationComplete(operationName, &err) // 處理operationFunc()函數發生的panic錯誤,以便defer grm.operationComplete(operationName, &err)能執行 // Handle panic, if any, from operationFunc() defer k8sRuntime.RecoverFromPanic(&err) return operationFunc() }() return nil }
如果給定lastErrorTime的durationBeforeRetry週期尚未過期,則SafeToRetry返回錯誤。否則返回零。
// SafeToRetry returns an error if the durationBeforeRetry period for the given // lastErrorTime has not yet expired. Otherwise it returns nil. func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error { if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry { return NewExponentialBackoffError(operationName, *expBackoff) } return nil }
operationComplete是一個內部函數,用於處理在goRoutineMap中已經執行完函數邏輯的協程。
// operationComplete是一個內部函數,用於處理在goRoutineMap中已經執行完函數邏輯的協程 // 如果執行operationFunc()函數邏輯不報錯或者grm.exponentialBackOffOnError=false的話,將從grm.operations這個map中移除此operationName名稱協程; // 如果執行operationFunc()函數邏輯報錯並且grm.exponentialBackOffOnError=true,則將產生指數級補償,到達補償時間後才能再呼叫此operationName名稱協程的函數邏輯 // operationComplete handles the completion of a goroutine run in the // goRoutineMap. func (grm *goRoutineMap) operationComplete( operationName string, err *error) { // Defer operations are executed in Last-In is First-Out order. In this case // the lock is acquired first when operationCompletes begins, and is // released when the method finishes, after the lock is released cond is // signaled to wake waiting goroutine. defer grm.cond.Signal() grm.lock.Lock() defer grm.lock.Unlock() if *err == nil || !grm.exponentialBackOffOnError { // 函數邏輯執行完成無錯誤或已禁用錯誤指數級補償,將從grm.operations這個map中移除此operationName名稱協程; // Operation completed without error, or exponentialBackOffOnError disabled delete(grm.operations, operationName) if *err != nil { // Log error klog.Errorf("operation for %q failed with: %v", operationName, *err) } } else { // 函數邏輯執行完成有錯誤則將產生指數級補償,到達補償時間後才能再呼叫此operationName名稱協程的函數邏輯(注意,指數補充的協程,operationPending=false) // Operation completed with error and exponentialBackOffOnError Enabled existingOp := grm.operations[operationName] existingOp.expBackoff.Update(err) existingOp.operationPending = false grm.operations[operationName] = existingOp // Log error klog.Errorf("%v", existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName)) } }
Update是一個外部函數,用於計算指數級別的退避時間。
const ( initialDurationBeforeRetry time.Duration = 500 * time.Millisecond maxDurationBeforeRetry time.Duration = 2*time.Minute + 2*time.Second ) func (expBackoff *ExponentialBackoff) Update(err *error) { if expBackoff.durationBeforeRetry == 0 { expBackoff.durationBeforeRetry = initialDurationBeforeRetry } else { expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry if expBackoff.durationBeforeRetry > maxDurationBeforeRetry { expBackoff.durationBeforeRetry = maxDurationBeforeRetry } } expBackoff.lastError = *err expBackoff.lastErrorTime = time.Now() }
本文對Kubernetes GoRoutineMap工具包程式碼進行了詳解,通過 GoRoutineMap工具包能夠防止建立具有相同名稱的多個goroutine,並使用協程的方式執行函數邏輯,如果函數成功執行,則退出該協程;如果函數執行報錯,在指數退避的時間內禁止再次執行該函數邏輯。使用Kubernetes GoRoutineMap包的好處包括以下幾點:
減輕負載:當出現錯誤時,使用指數退避時間可以避免過於頻繁地重新嘗試操作,從而減輕系統的負載。指數退避時間通過逐漸增加重試之間的等待時間,有效地減少了對系統資源的過度使用。
提高穩定性:通過逐漸增加重試之間的等待時間,指數退避時間可以幫助應對瞬時的故障或錯誤。這種策略使得系統能夠在短時間內自動恢復,並逐漸增加重試頻率,直到操作成功為止。這有助於提高應用程式的穩定性和可靠性。
降低網路擁塞:當網路出現擁塞時,頻繁地進行重試可能會加重擁塞問題並影響其他任務的正常執行。指數退避時間通過增加重試之間的等待時間,可以降低對網路的額外負載,有助於緩解網路擁塞問題。
避免過早放棄:某些錯誤可能是瞬時的或暫時性的,因此過早放棄重試可能會導致不必要的失敗。指數退避時間確保了在錯誤發生時進行適當的重試,以便系統有更多機會恢復併成功完成操作。
綜上所述,使用Kubernetes GoRoutineMap工具包以協程方式處理常式邏輯可以提高系統的可靠性、穩定性和效能,減輕負載並有效應對錯誤和故障情況。這是在Kubernetes中實施的一種常見的重試策略,常用於處理容器化應用程式中的操作錯誤。