Kubernetes GoRoutineMap工具包程式碼詳解

2023-05-30 06:00:55

1、概述

GoRoutineMap 定義了一種型別,可以執行具有名稱的 goroutine 並跟蹤它們的狀態。它防止建立具有相同名稱的多個goroutine,並且在上一個具有該名稱的 goroutine 完成後的一段退避時間內可能阻止重新建立 goroutine。

使用GoRoutineMap場景:

  • 使用協程的方式執行函數邏輯,如果函數成功執行,則退出該協程;如果函數執行報錯,在指數退避的時間內禁止再次執行該函數邏輯。

使用GoRoutineMap大體步驟如下:

1)通過goRoutineMap.NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {....}方法建立GoRoutineMap結構體物件,用於管理goroutine 並跟蹤它們的狀態;

2)呼叫GoRoutineMap結構體物件Run(operationName, operation)方法,其能夠防止建立具有相同名稱的多個goroutine,並使用協程的方式執行函數邏輯,如果函數成功執行,則退出該協程;如果函數執行報錯,在指數退避的時間內禁止再次執行該函數邏輯。

注意 1:本文程式碼基於Kubernetes 1.24.10版本,包路徑kubernetes-1.24.10/pkg/util/goroutinemap/goroutinemap.go。

注意 2:概述中涉及的程式碼會在下文進行詳細解釋。

2、goroutinemap工具包程式碼詳解

2.1 相關型別詳解

GoRoutineMap工具包介面定義:

type GoRoutineMap interface {
	// Run adds operation name to the list of running operations and spawns a
	// new go routine to execute the operation.
	// If an operation with the same operation name already exists, an
	// AlreadyExists or ExponentialBackoff error is returned.
	// Once the operation is complete, the go routine is terminated and the
	// operation name is removed from the list of executing operations allowing
	// a new operation to be started with the same operation name without error.
	Run(operationName string, operationFunc func() error) error

	// Wait blocks until operations map is empty. This is typically
	// necessary during tests - the test should wait until all operations finish
	// and evaluate results after that.
	Wait()

	// WaitForCompletion blocks until either all operations have successfully completed
	// or have failed but are not pending. The test should wait until operations are either
	// complete or have failed.
	WaitForCompletion()

	IsOperationPending(operationName string) bool
}

goRoutineMap結構體實現GoRoutineMap介面,定義如下:

// goRoutineMap結構體實現GoRoutineMap介面,
type goRoutineMap struct {
	// 用於記錄goRoutineMap維護協程的狀態
	operations                map[string]operation
	// 發生錯誤時是否指數級補償
	exponentialBackOffOnError bool
	// 用在多個 Goroutine 等待,一個 Goroutine 通知(事件發生)的場景
	cond                      *sync.Cond
	lock                      sync.RWMutex
}

// operation結構體物件維護單個goroutine的狀態。
type operation struct {
	// 是否操作掛起
	operationPending bool
	// 單個goroutine執行邏輯報錯時,實現以指數退避方式
	expBackoff       exponentialbackoff.ExponentialBackoff
}

ExponentialBackoff結構體包含最後一次出現的錯誤、最後一次出現錯誤的時間以及不允許重試的持續時間。

// ExponentialBackoff contains the last occurrence of an error and the duration
// that retries are not permitted.
type ExponentialBackoff struct {
	lastError           error
	lastErrorTime       time.Time
	durationBeforeRetry time.Duration
}

2.2 GoRoutineMap結構體物件初始化

通過goRoutineMap.NewGoRoutineMap方法建立GoRoutineMap結構體物件,用於管理goroutine 並跟蹤它們的狀態;  

// NewGoRoutineMap returns a new instance of GoRoutineMap.
func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
	g := &goRoutineMap{
		operations:                make(map[string]operation),
		exponentialBackOffOnError: exponentialBackOffOnError,
	}

	g.cond = sync.NewCond(&g.lock)
	return g
}

2.3  GoRoutineMap.Run方法程式碼詳解

呼叫GoRoutineMap結構體物件Run(operationName, operation)方法,其能夠防止建立具有相同名稱的多個goroutine,並使用協程的方式執行函數邏輯,如果函數成功執行,則退出該協程;如果函數執行報錯,在指數退避的時間內禁止再次執行該函數邏輯。

// Run函數是外部函數,是goRoutineMap核心方法,其能夠防止建立具有相同名稱的多個goroutine,並使用協程的方式執行函數邏輯
// 如果函數成功執行,則退出該協程;如果函數執行報錯,在指數退避的時間內禁止再次執行該函數邏輯。
func (grm *goRoutineMap) Run(
	operationName string,
	operationFunc func() error) error {
	grm.lock.Lock()
	defer grm.lock.Unlock()

	// 判斷grm.operations這個map中是否存在具有operationName名稱的協程
	existingOp, exists := grm.operations[operationName]
	if exists {
		// 如果grm.operations這個map中已經存在operationName名稱的協程,並且existingOp.operationPending==true,說明grm.operations中operationName名稱這個協程正在執行函數邏輯,在這期間又有一個同名的
		// operationName希望加入grm.operations這個map,此時加入map失敗並報AlreadyExistsError錯誤
		if existingOp.operationPending {
			return NewAlreadyExistsError(operationName)
		}

		// 到這步說明名稱為operationName名稱的協程執行函數邏輯失敗,此時判斷此協程最後一次失敗時間 + 指數退避的時間 >= 當前時間,如果不符合條件的話禁止執行該協程函數邏輯。
		if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
			return err
		}
	}

	// 如果grm.operations這個map中不存在operationName名稱的協程 或者 此協程最後一次失敗時間 + 指數退避的時間 < 當前時間,則在grm.operations這個map中重新維護此協程(注意,operationPending=true)
	grm.operations[operationName] = operation{
		operationPending: true,
		expBackoff:       existingOp.expBackoff,
	}

	// 以協程方式執行函數邏輯operationFunc()
	go func() (err error) {
		// 捕獲崩潰並記錄錯誤,預設不傳參的話,在程式傳送崩潰時,在控制檯列印一下崩潰紀錄檔後再崩潰,方便技術人員排查程式錯誤。
		defer k8sRuntime.HandleCrash()

		// 如果執行operationFunc()函數邏輯不報錯或者grm.exponentialBackOffOnError=false的話,將從grm.operations這個map中移除此operationName名稱協程;
		// 如果執行operationFunc()函數邏輯報錯並且grm.exponentialBackOffOnError=true,則將產生指數級補償,到達補償時間後才能再呼叫此operationName名稱協程的函數邏輯
		// Handle completion of and error, if any, from operationFunc()
		defer grm.operationComplete(operationName, &err)
		// 處理operationFunc()函數發生的panic錯誤,以便defer grm.operationComplete(operationName, &err)能執行
		// Handle panic, if any, from operationFunc()
		defer k8sRuntime.RecoverFromPanic(&err)
		return operationFunc()
	}()

	return nil
}

如果給定lastErrorTime的durationBeforeRetry週期尚未過期,則SafeToRetry返回錯誤。否則返回零。

// SafeToRetry returns an error if the durationBeforeRetry period for the given
// lastErrorTime has not yet expired. Otherwise it returns nil.
func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error {
	if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry {
		return NewExponentialBackoffError(operationName, *expBackoff)
	}

	return nil
}

operationComplete是一個內部函數,用於處理在goRoutineMap中已經執行完函數邏輯的協程。

// operationComplete是一個內部函數,用於處理在goRoutineMap中已經執行完函數邏輯的協程
// 如果執行operationFunc()函數邏輯不報錯或者grm.exponentialBackOffOnError=false的話,將從grm.operations這個map中移除此operationName名稱協程;
// 如果執行operationFunc()函數邏輯報錯並且grm.exponentialBackOffOnError=true,則將產生指數級補償,到達補償時間後才能再呼叫此operationName名稱協程的函數邏輯
// operationComplete handles the completion of a goroutine run in the
// goRoutineMap.
func (grm *goRoutineMap) operationComplete(
	operationName string, err *error) {
	// Defer operations are executed in Last-In is First-Out order. In this case
	// the lock is acquired first when operationCompletes begins, and is
	// released when the method finishes, after the lock is released cond is
	// signaled to wake waiting goroutine.
	defer grm.cond.Signal()
	grm.lock.Lock()
	defer grm.lock.Unlock()

	if *err == nil || !grm.exponentialBackOffOnError {
		// 函數邏輯執行完成無錯誤或已禁用錯誤指數級補償,將從grm.operations這個map中移除此operationName名稱協程;
		// Operation completed without error, or exponentialBackOffOnError disabled
		delete(grm.operations, operationName)
		if *err != nil {
			// Log error
			klog.Errorf("operation for %q failed with: %v",
				operationName,
				*err)
		}
	} else {
		//  函數邏輯執行完成有錯誤則將產生指數級補償,到達補償時間後才能再呼叫此operationName名稱協程的函數邏輯(注意,指數補充的協程,operationPending=false)
		// Operation completed with error and exponentialBackOffOnError Enabled
		existingOp := grm.operations[operationName]
		existingOp.expBackoff.Update(err)
		existingOp.operationPending = false
		grm.operations[operationName] = existingOp

		// Log error
		klog.Errorf("%v",
			existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName))
	}
}

Update是一個外部函數,用於計算指數級別的退避時間。

const (
   initialDurationBeforeRetry time.Duration = 500 * time.Millisecond
   maxDurationBeforeRetry time.Duration = 2*time.Minute + 2*time.Second
)
func (expBackoff *ExponentialBackoff) Update(err *error) {
	if expBackoff.durationBeforeRetry == 0 {
		expBackoff.durationBeforeRetry = initialDurationBeforeRetry
	} else {
		expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry
		if expBackoff.durationBeforeRetry > maxDurationBeforeRetry {
			expBackoff.durationBeforeRetry = maxDurationBeforeRetry
		}
	}

	expBackoff.lastError = *err
	expBackoff.lastErrorTime = time.Now()
}

3、總結

本文對Kubernetes GoRoutineMap工具包程式碼進行了詳解,通過 GoRoutineMap工具包能夠防止建立具有相同名稱的多個goroutine,並使用協程的方式執行函數邏輯,如果函數成功執行,則退出該協程;如果函數執行報錯,在指數退避的時間內禁止再次執行該函數邏輯。使用Kubernetes GoRoutineMap包的好處包括以下幾點:

  1. 減輕負載:當出現錯誤時,使用指數退避時間可以避免過於頻繁地重新嘗試操作,從而減輕系統的負載。指數退避時間通過逐漸增加重試之間的等待時間,有效地減少了對系統資源的過度使用。

  2. 提高穩定性:通過逐漸增加重試之間的等待時間,指數退避時間可以幫助應對瞬時的故障或錯誤。這種策略使得系統能夠在短時間內自動恢復,並逐漸增加重試頻率,直到操作成功為止。這有助於提高應用程式的穩定性和可靠性。

  3. 降低網路擁塞:當網路出現擁塞時,頻繁地進行重試可能會加重擁塞問題並影響其他任務的正常執行。指數退避時間通過增加重試之間的等待時間,可以降低對網路的額外負載,有助於緩解網路擁塞問題。

  4. 避免過早放棄:某些錯誤可能是瞬時的或暫時性的,因此過早放棄重試可能會導致不必要的失敗。指數退避時間確保了在錯誤發生時進行適當的重試,以便系統有更多機會恢復併成功完成操作。

綜上所述,使用Kubernetes GoRoutineMap工具包以協程方式處理常式邏輯可以提高系統的可靠性、穩定性和效能,減輕負載並有效應對錯誤和故障情況。這是在Kubernetes中實施的一種常見的重試策略,常用於處理容器化應用程式中的操作錯誤。