樂觀鎖和悲觀鎖在kubernetes中的應用

2022-07-17 06:00:39

資料競爭和競態條件

Go並行中有兩個重要的概念:資料競爭(data race)和競爭條件(race condition)。在並行程式中,競爭問題可能是程式面臨的最難也是最不容易發現的錯誤之一。

當有兩個或多個協程同時存取同一個記憶體地址,並且至少有一個是寫時,就會發生資料競爭,它造成的影響就是讀取變數的值將變得不可知。

資料競爭產生的原因是對於同一個變數的存取不是原子性的。

避免資料競爭可以使用以下三種方式:

  • 使用原子操作
  • 使用mutex對同一區域進行互斥操作
  • 使用管道 (channel) 進行通訊以保證僅且只有一個協程在進行寫操作

相比於資料競爭,競爭條件也稱為資源競爭,受各協程的執行順序和時機的影響,程式的執行結果產生變化。

競態條件產生的原因很多是對於同一個資源的一系列連續操作並不是原子性的,也就是說有可能在執行的中途被其他執行緒搶佔,同時這個「其他執行緒」剛好也要存取這個資源。解決方法通常是:將這一系列操作作為一個critical section(臨界區)

i := 0
mutex := sync.Mutex{}
go func() {
    mutex.Lock()
    defer mutex.Unlock()
    i = 1
}()

go func() {
    mutex.Lock()
    defer mutex.Unlock()
    i = 2
}()

這裡雖然使用了鎖來避免了資料競爭,但輸出結果仍然是不可控的,因為變數的結果依賴於協程執行的順序。這就是競爭條件。

樂觀鎖和悲觀鎖

概念

樂觀鎖和悲觀鎖是兩種思想,用於解決並行場景下的資料競爭問題。

  • 樂觀鎖

    樂觀鎖在運算元據時非常樂觀,認為別人不會同時修改資料。因此樂觀鎖不會上鎖,只是在執行更新的時候判斷一下在此期間別人是否修改了資料:如果別人沒有修改資料,執行更新操作。如果資料已經被更新過了,根據不同的實現方式執行不同的操作:重試(重新讀取更新然後比較)或報異常。

  • 悲觀鎖

    悲觀鎖在運算元據時比較悲觀,認為別人會同時修改資料。因此運算元據時直接把資料鎖住,直到操作完成後才會釋放鎖;上鎖期間其他人不能修改資料。

應用場景:

當競爭不激烈時,即出現並行衝突的概率小。樂觀鎖更有優勢,因為悲觀鎖加鎖和釋放鎖的操作需要耗費額外的資源;當競爭激烈的時候,悲觀鎖有優勢,因為樂觀鎖在執行失敗的時候需要不斷重試,浪費CPU資源。針對這個問題的一個思路是引入退出機制,如果重試次數超過一定閾值後失敗推出。當然,應該避免在高競爭環境下使用樂觀鎖。

實現方式

悲觀鎖的實現方式是加鎖,加鎖既可以是對程式碼塊加鎖(如Java的synchronized關鍵字),也可以是對資料加鎖(如MySQL中的排它鎖)。

樂觀鎖的實現方式主要有兩種:CAS機制和版本號機制,

  • CAS

    CAS機制就是Compare And Swap。他的操作邏輯是:如果記憶體位置V的值等於預期的A值,則將該位置更新為新值B,否則不進行任何操作。許多CAS的操作是自旋的:如果操作不成功,會一直重試,直到操作成功為止。即CAS在更新之前先比較一下,然後決定是否要更新。

    這裡的比較和更新是兩個操作,其原子性是由CPU支援的,在硬體層面上進行保證。

    CAS有個缺點,就是ABA問題:

    假設有兩個執行緒——執行緒1和執行緒2,兩個執行緒按照順序進行以下操作:

    (1)執行緒1讀取記憶體中資料為A;

    (2)執行緒2將該資料修改為B;

    (3)執行緒2將該資料修改為A;

    (4)執行緒1對資料進行CAS操作

    在第(4)步中,由於記憶體中資料仍然為A,因此CAS操作成功,但實際上該資料已經被執行緒2修改過了。這就是ABA問題。

    在AtomicInteger的例子中,ABA似乎沒有什麼危害。但是在某些場景下,ABA卻會帶來隱患,例如棧頂問題:一個棧的棧頂經過兩次(或多次)變化又恢復了原值,雖然棧頂不變,但是棧的結構可能已發生了變化。

    對於ABA問題,比較有效的方案是引入版本號,記憶體中的值每發生一次變化,版本號都+1;在進行CAS操作時,不僅比較記憶體中的值,也會比較版本號,只有當二者都沒有變化時,CAS才能執行成功。

  • 版本號機制

    版本號機制的基本思路是在資料中增加一個欄位version,表示該資料的版本號,每當資料被修改,版本號加1。當某個執行緒查詢資料時,將該資料的版本號一起查出來;當該執行緒更新資料時,判斷當前版本號與之前讀取的版本號是否一致,如果一致才進行操作。需要注意的是,這裡使用了版本號作為判斷資料變化的標記,實際上可以根據實際情況選用其他能夠標記資料版本的欄位,如時間戳等。

與悲觀鎖相比,樂觀鎖功能有很多限制,比如CAS操作只能保證單個變數的原子性。

kubernetes中的樂觀並行

更新資源時

k8s中的資源都有一個metadata.ResourceVersion欄位,當api-server執行update操作的時候通常會先執行get操作,server會比較該欄位,如果相同則更新成功,並修改該欄位,如果不同,則更新失敗。

Leader Election

在kubernetes中,通常kube-scheduler和kube-controller-manager都是多副本進行部署來保證高可用的,這裡利用的就是leader election機制。

leader election 指的是一個程式為了高可用會有多個副本,但是每一個時候只允許一個程序在工作。k8s基於樂觀並行控制實現了leader election,使得一些控制面元件有多個副本,但只有一個副本在工作,從而達到高可用。

$ kubectl get ep -n kube-system kube-scheduler -o yaml
apiVersion: v1
kind: Endpoints
metadata:
  annotations:
    control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"kube-master-1_ad5220de-2442-11e9-91f6-52540025e0cf","leaseDurationSeconds":15,"acquireTime":"2019-02-22T02:09:15Z","renewTime":"2019-03-14T07:37:19Z","leaderTransitions":1}'
  name: kube-scheduler
  namespace: kube-system

holderIdentity:表示當前那個副本在工作

leaseDurationSeconds:租期的時間

acquireTime:獲得鎖的時間

renewTime:重新整理鎖的時間

leaderTransitions:leader 交換的次數

實現原理

其原理就是利用kubernetes中的configmap、endpoints、lease這三種鎖資源實現了一個分散式鎖。推薦使用Lease,因為Lease 物件本身就是用來協調租約物件的,其Spec 定義與Leader 選舉機制需要操控的屬性是一致的。使用Configmap 和Endpoints 物件更多是為了向後相容,伴隨著一定的負面影響。以Endpoints 為例,Leader 每隔固定週期就要續約,這使得Endpoints 物件處於不斷的變化中。Endpoints 物件會被每個節點的kube-proxy 等監聽,任何Endpoints 物件的變更都會推播給所有節點的kube-proxy,這為叢集引入了不必要的網路流量。

  1. 大致邏輯就是多個副本會同時更新某個資源annotation中的holderIdentity欄位,寫入欄位值的操作被稱為獲取鎖資源。由於k8s時樂觀並行,只有一個會更新成功,更新成功的這個副本就會成為leader。
  2. 在 leader 被選舉成功之後,leader 為了保住自己的位置,需要定時去更新這個 Lease 資源的狀態,即一個時間戳資訊,表明自己有在一直工作沒有出現故障,這一操作稱為續約。
  3. 其他 candidate 也不是完全閒著,而是也會定期嘗試獲取這個資源,檢查資源的資訊,時間戳有沒有太久沒更新,否則認為原來的 leader 故障失聯無法正常工作,並更新此資源的 holder 為自己,成為 leader 開始工作並同時定期續約。

使用舉例

程式碼路徑:client-go/examples/leader-election/main.go

		// leader election uses the Kubernetes API by writing to a
    // lock object, which can be a LeaseLock object (preferred),
    // a ConfigMap, or an Endpoints (deprecated) object.
    // Conflicting writes are detected and each client handles those actions
    // independently.
    config, err := buildConfig(kubeconfig)
    if err != nil {
        klog.Fatal(err)
    }
    client := clientset.NewForConfigOrDie(config)

    run := func(ctx context.Context) {
        // complete your controller loop here
        klog.Info("Controller loop...")

        select {}
    }

    // use a Go context so we can tell the leaderelection code when we
    // want to step down
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // listen for interrupts or the Linux SIGTERM signal and cancel
    // our context, which the leader election code will observe and
    // step down
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
    go func() {
        <-ch
        klog.Info("Received termination, signaling shutdown")
        cancel()
    }()

    // we use the Lease lock type since edits to Leases are less common
    // and fewer objects in the cluster watch "all Leases".
    // 指定鎖的資源物件,這裡使用了Lease資源,還支援configmap,endpoint,或者multilock(即多種配合使用)
    lock := &resourcelock.LeaseLock{
        LeaseMeta: metav1.ObjectMeta{
            Name:      leaseLockName,
            Namespace: leaseLockNamespace,
        },
        Client: client.CoordinationV1(),
        LockConfig: resourcelock.ResourceLockConfig{
            Identity: id,
        },
    }

    // start the leader election code loop
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock: lock,
        // IMPORTANT: you MUST ensure that any code you have that
        // is protected by the lease must terminate **before**
        // you call cancel. Otherwise, you could have a background
        // loop still running and another process could
        // get elected before your background loop finished, violating
        // the stated goal of the lease.
        ReleaseOnCancel: true,
        LeaseDuration:   60 * time.Second,//租約時間
        RenewDeadline:   15 * time.Second,//更新租約的
        RetryPeriod:     5 * time.Second,//非leader節點重試時間
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                //變為leader執行的業務程式碼
                // we're notified when we start - this is where you would
                // usually put your code
                run(ctx)
            },
            OnStoppedLeading: func() {
                 // 程序退出
                // we can do cleanup here
                klog.Infof("leader lost: %s", id)
                os.Exit(0)
            },
            OnNewLeader: func(identity string) {
                //當產生新的leader後執行的方法
                // we're notified when new leader elected
                if identity == id {
                    // I just got the lock
                    return
                }
                klog.Infof("new leader elected: %s", identity)
            },
        },
    })