Go 互斥鎖Mutex

2022-10-18 21:03:24

Mutex是一個互斥鎖,可以建立為其他結構體的欄位;零值為解鎖狀態。Mutex型別的鎖和執行緒無關,可以由不同的執行緒加鎖和解鎖。
互斥鎖的作用是保證共用資源同一時刻只能被一個 Goroutine 佔用,一個 Goroutine 佔用了,其他的 Goroutine 則阻塞等待。

1、資料結構

type Mutex struct {
   state int32  // 表示當前互斥鎖的狀態
   sema  uint32  // 號誌變數,用來控制等待 goroutine 的阻塞休眠和喚醒
}

基於該資料結構,實現了兩種方法,加鎖、釋放鎖

type Locker interface {
   Lock()
   Unlock()
}

const (
    mutexLocked = 1 << iota // 表示鎖是否可用(0可用,1被別的goroutine佔用),001
    mutexWoken  // 表示mutex是否被喚醒,010
    mutexStarving  // 當前的互斥鎖進入飢餓狀態,100
    mutexWaiterShift = iota // 表示統計阻塞在該mutex上的goroutine數目需要移位的數值,1<<(32-3)個
)
// sema + 1,掛起 goroutine
// 1.不斷呼叫嘗試獲取鎖
// 2.休眠當前 goroutine
// 3.等待號誌,喚醒 goroutine
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// sema - 1,喚醒 sema 上等待的一個 goroutine
runtime_Semrelease(&m.sema, false, 1)

2、模式

2.1、正常模式

在正常模式下,等待的 goroutine 會按照先進先出的順序得到鎖。剛被喚醒的 goroutine 與新建立的 goroutine 競爭時,大概率無法獲得鎖,如 G1和 G2 競爭,此時 G1 已經佔著 CPU 了,所以大概率拿到鎖。

如果 goroutine 超過 1ms,沒有獲取鎖,就會將當前鎖切換為飢餓模式。

2.2、飢餓模式

避免 goroutine 被餓死,1.19 引入了飢餓模式

在飢餓模式下,互斥鎖會直接交給等到佇列最前面的 goroutine,新的 goroutine 在該狀態下不能獲取鎖,也不能進入自旋,只能在佇列末尾等待。

2.3、狀態切換

正常模式下,

如果佇列中只剩一個goroutine 獲得了互斥鎖或者它等待的時間少於 1ms,那麼就會切換到正常模式。

3、加鎖

1、Fast path

// 如果鎖沒被佔用,也不是飢餓狀態,也沒有喚醒goroutine,也沒有等待goroutine,加鎖成功
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    if race.Enabled {
       race.Acquire(unsafe.Pointer(m))
   }
    return
}

加鎖的時候先通過一次 CAS(Compare And Swap) 看能不能拿到鎖,如果拿到,直接返回。

// 先判斷引數addr指向的被操作值與引數old的值是否相等
// 如果相等,會用引數new代表的新值替換掉原先的舊值,否則 false
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

2、Slow path

如果狀態不是 0 ,就會嘗試通過自旋等方式等待鎖釋放,大致分為:

  1. 判斷當前 goroutine 能否進入自旋
  2. 通過自旋等待互斥鎖的釋放
  3. 計算互斥鎖的最新狀態
  4. 更新互斥鎖的狀態並獲取鎖
// 等待時間
var waitStartTime int64
// 飢餓標記
starving := false
// 喚醒標記
awoke := false
// 自旋次數
iter := 0
// 當前的鎖的狀態
old := m.state
for {
    // 步驟一
    // 如果鎖是正常狀態,鎖還沒被釋放,就自旋
    // 因為飢餓模式下,需要保證等到佇列中的 goroutine 能夠獲得鎖的的所有權,防止等待佇列餓死
    // 如果鎖在飢餓模式或已經解鎖,或不符合自旋條件就結束自旋
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        // 如果等待佇列有 goroutine ,鎖沒有設定喚醒狀態,就設定為喚醒
        // 用來,當鎖解鎖時,不會去喚醒已經阻塞的 goroutine,保證自己更大概率拿到鎖
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
        // 自旋
        runtime_doSpin()
        // 自旋次數加1
        iter++
        // 設定當前鎖的狀態
        old = m.state
        continue
    }
------------------------------------------------------------------------------>
    // 步驟二
    // 此時可能鎖變為飢餓狀態或者已經解鎖了,或者不符合自旋條件
    // 獲取鎖最新狀態
    new := old

    // 如果當前是正常模式,嘗試加鎖。
    // 飢餓狀態下要讓出競爭權利,不能加鎖
    if old&mutexStarving == 0 {
    	new |= mutexLocked
    }
    // 如果當前被鎖定或者處於飢餓模式,把自己放到等待佇列,waiter加一,表示等待一個等待計數
    // 這塊的狀態,goroutine 只能等著,飢餓狀態要讓出競爭權利
    if old&(mutexLocked|mutexStarving) != 0 {
    	new += 1 << mutexWaiterShift
    }
    
    // 如果已經是飢餓狀態,starving為真,並且old 的鎖是佔用情況,更新狀態改為飢餓狀態
    if starving && old&mutexLocked != 0 {
    	new |= mutexStarving
    }
    
    // 如果awoke在上面自旋時設定成功,那麼在這要消除標誌位
    // 因為該 goroutine 要麼獲得了鎖,要麼進入休眠,和喚醒狀態沒啥關係
    // 後續流程會導致當前執行緒被掛起,需要等待其他釋放鎖的 goroutine 喚醒,
    // 如果 unlock 是發現mutexWoken不是 0,就不會去喚醒
    if awoke { 
    	if new&mutexWoken == 0 {
    		throw("sync: inconsistent mutex state")
        }
        // 清除喚醒標誌位
        new &^= mutexWoken
    }
------------------------------------------------------------------------------>
    // 步驟三
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 1.如果原來狀態沒有上鎖,也沒有飢餓,那麼直接返回,表示獲取到鎖
        if old&(mutexLocked|mutexStarving) == 0 {
            break // locked the mutex with CAS
        }
        
        // 2.到這裡是沒有獲取到鎖,判斷一下等待時長是否不為0
        // 如果新的 goroutine 來搶佔鎖,會返回 false
    	// 如果不是新的,那麼加入到佇列頭部
        // 保證等待最久的 goroutine 優先拿到鎖
        queueLifo := waitStartTime != 0
        
        // 3.如果等待時間為0,那麼初始化等待時間
        if waitStartTime == 0 {
            waitStartTime = runtime_nanotime()
        }
        // 如果不等於,說明不是第一次來,是被喚醒後過來的,則加入佇列頭部,queueLifo=true
        
        // 4.阻塞等待,sema+1,並掛起 goroutine,
        // 如果後面 goroutine 被喚醒,就從該位置往下執行
        runtime_SemacquireMutex(&m.sema, queueLifo, 1)
        
        // 5.說明該 goroutine 被喚醒
        // 判斷該 goroutine 是否長時間沒有獲得鎖,如果是,就是飢餓的 goroutine
        starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
        // 被掛起的時間有點長,需要重新獲取一下當前鎖的狀態
        old = m.state
        
        // 6.判斷是否已經處於飢餓狀態,處於,直接獲得鎖,如果不處於直接跳出
        // 飢餓狀態下,被喚醒的協程直接獲得鎖。
        if old&mutexStarving != 0 {
            // 飢餓狀態下,被喚醒,發現鎖沒釋放,喚醒值是 1,等待列表沒有,報錯
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                throw("sync: inconsistent mutex state")
            }
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            
            // 7.如果喚醒等待佇列的 goroutine 不飢餓,或是等待佇列中的最後一個 goroutine
            if !starving || old>>mutexWaiterShift == 1 {
                // 就從飢餓模式切換會正常模式
                delta -= mutexStarving
            }
            
            // 9.設定狀態
            // 將鎖狀態設定為等待數量減1,同時設定為鎖定,加鎖成功
            atomic.AddInt32(&m.state, delta)
            break
        }
        // 當前 goroutine 是被系統喚醒的
        awoke = true
        // 重置自旋次數
        iter = 0
    } else {
        // 如果 CAS 失敗,重新開始
        old = m.state
    }
}
if race.Enabled {
    race.Acquire(unsafe.Pointer(m))
}

3、小結

4、自旋

自旋是一種多執行緒同步機制,當前的程序在進入自旋的過程中會一直保持 CPU 的佔用,持續檢查某個條件是否為真。

4.1、canSpin

runtime_canSpin(iter)
  • CPU核數要大於1,否則自旋沒有意義,因為此時不可能有其他協程釋放鎖
  • 當前Goroutine為了獲取該鎖進入自旋的次數 iter 小於四次
  • 當前機器上至少存在一個正在執行 Process
  • 處理的執行 G 佇列為空,否則會延遲排程

它的實現方法連結到了sync_runtime_canSpin

4.2、doSpin

runtime_doSpin()

func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

它的實現方法連結到了 sync_runtime_doSpin

會執行 30 次 PAUSE指令,每執行一次再檢查是否可以加鎖,迴圈進行。該過程中,程序仍是執行狀態

4.3、優勢

更充分的利用CPU,儘量避免 goroutine 切換。因為當前申請加鎖的 goroutine 擁有CPU,如果經過短時間的自旋可以獲得鎖,當前協程可以繼續執行,不必進入阻塞狀態。

對於新來程序一直進行自旋加鎖,排隊中的程序長時間無法拿到鎖,則設定飢餓狀態,該狀態下不允許自旋。

5、小結

  1. 上來先一個 CAS ,如果鎖正空閒,並且沒人搶,那麼加鎖成功;
  2. 否則,自旋幾次,如果成功,也不用加入佇列;
  3. 否則,加入佇列;
  4. 從佇列中被喚醒:
    1. 正常模式:和新來的一起搶鎖,大概率失敗
    2. 飢餓模式:肯定拿到鎖