Mutex是一個互斥鎖,可以建立為其他結構體的欄位;零值為解鎖狀態。Mutex型別的鎖和執行緒無關,可以由不同的執行緒加鎖和解鎖。
互斥鎖的作用是保證共用資源同一時刻只能被一個 Goroutine 佔用,一個 Goroutine 佔用了,其他的 Goroutine 則阻塞等待。
type Mutex struct {
state int32 // 表示當前互斥鎖的狀態
sema uint32 // 號誌變數,用來控制等待 goroutine 的阻塞休眠和喚醒
}
基於該資料結構,實現了兩種方法,加鎖、釋放鎖
type Locker interface {
Lock()
Unlock()
}
const (
mutexLocked = 1 << iota // 表示鎖是否可用(0可用,1被別的goroutine佔用),001
mutexWoken // 表示mutex是否被喚醒,010
mutexStarving // 當前的互斥鎖進入飢餓狀態,100
mutexWaiterShift = iota // 表示統計阻塞在該mutex上的goroutine數目需要移位的數值,1<<(32-3)個
)
// sema + 1,掛起 goroutine
// 1.不斷呼叫嘗試獲取鎖
// 2.休眠當前 goroutine
// 3.等待號誌,喚醒 goroutine
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// sema - 1,喚醒 sema 上等待的一個 goroutine
runtime_Semrelease(&m.sema, false, 1)
在正常模式下,等待的 goroutine 會按照先進先出的順序得到鎖。剛被喚醒的 goroutine 與新建立的 goroutine 競爭時,大概率無法獲得鎖,如 G1和 G2 競爭,此時 G1 已經佔著 CPU 了,所以大概率拿到鎖。
如果 goroutine 超過 1ms,沒有獲取鎖,就會將當前鎖切換為飢餓模式。
避免 goroutine 被餓死,1.19 引入了飢餓模式
在飢餓模式下,互斥鎖會直接交給等到佇列最前面的 goroutine,新的 goroutine 在該狀態下不能獲取鎖,也不能進入自旋,只能在佇列末尾等待。
正常模式下,
如果佇列中只剩一個goroutine 獲得了互斥鎖或者它等待的時間少於 1ms,那麼就會切換到正常模式。
// 如果鎖沒被佔用,也不是飢餓狀態,也沒有喚醒goroutine,也沒有等待goroutine,加鎖成功
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
加鎖的時候先通過一次 CAS(Compare And Swap) 看能不能拿到鎖,如果拿到,直接返回。
// 先判斷引數addr指向的被操作值與引數old的值是否相等
// 如果相等,會用引數new代表的新值替換掉原先的舊值,否則 false
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
如果狀態不是 0 ,就會嘗試通過自旋等方式等待鎖釋放,大致分為:
// 等待時間
var waitStartTime int64
// 飢餓標記
starving := false
// 喚醒標記
awoke := false
// 自旋次數
iter := 0
// 當前的鎖的狀態
old := m.state
for {
// 步驟一
// 如果鎖是正常狀態,鎖還沒被釋放,就自旋
// 因為飢餓模式下,需要保證等到佇列中的 goroutine 能夠獲得鎖的的所有權,防止等待佇列餓死
// 如果鎖在飢餓模式或已經解鎖,或不符合自旋條件就結束自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 如果等待佇列有 goroutine ,鎖沒有設定喚醒狀態,就設定為喚醒
// 用來,當鎖解鎖時,不會去喚醒已經阻塞的 goroutine,保證自己更大概率拿到鎖
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋
runtime_doSpin()
// 自旋次數加1
iter++
// 設定當前鎖的狀態
old = m.state
continue
}
------------------------------------------------------------------------------>
// 步驟二
// 此時可能鎖變為飢餓狀態或者已經解鎖了,或者不符合自旋條件
// 獲取鎖最新狀態
new := old
// 如果當前是正常模式,嘗試加鎖。
// 飢餓狀態下要讓出競爭權利,不能加鎖
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果當前被鎖定或者處於飢餓模式,把自己放到等待佇列,waiter加一,表示等待一個等待計數
// 這塊的狀態,goroutine 只能等著,飢餓狀態要讓出競爭權利
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果已經是飢餓狀態,starving為真,並且old 的鎖是佔用情況,更新狀態改為飢餓狀態
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果awoke在上面自旋時設定成功,那麼在這要消除標誌位
// 因為該 goroutine 要麼獲得了鎖,要麼進入休眠,和喚醒狀態沒啥關係
// 後續流程會導致當前執行緒被掛起,需要等待其他釋放鎖的 goroutine 喚醒,
// 如果 unlock 是發現mutexWoken不是 0,就不會去喚醒
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除喚醒標誌位
new &^= mutexWoken
}
------------------------------------------------------------------------------>
// 步驟三
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 1.如果原來狀態沒有上鎖,也沒有飢餓,那麼直接返回,表示獲取到鎖
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 2.到這裡是沒有獲取到鎖,判斷一下等待時長是否不為0
// 如果新的 goroutine 來搶佔鎖,會返回 false
// 如果不是新的,那麼加入到佇列頭部
// 保證等待最久的 goroutine 優先拿到鎖
queueLifo := waitStartTime != 0
// 3.如果等待時間為0,那麼初始化等待時間
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 如果不等於,說明不是第一次來,是被喚醒後過來的,則加入佇列頭部,queueLifo=true
// 4.阻塞等待,sema+1,並掛起 goroutine,
// 如果後面 goroutine 被喚醒,就從該位置往下執行
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 5.說明該 goroutine 被喚醒
// 判斷該 goroutine 是否長時間沒有獲得鎖,如果是,就是飢餓的 goroutine
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 被掛起的時間有點長,需要重新獲取一下當前鎖的狀態
old = m.state
// 6.判斷是否已經處於飢餓狀態,處於,直接獲得鎖,如果不處於直接跳出
// 飢餓狀態下,被喚醒的協程直接獲得鎖。
if old&mutexStarving != 0 {
// 飢餓狀態下,被喚醒,發現鎖沒釋放,喚醒值是 1,等待列表沒有,報錯
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 7.如果喚醒等待佇列的 goroutine 不飢餓,或是等待佇列中的最後一個 goroutine
if !starving || old>>mutexWaiterShift == 1 {
// 就從飢餓模式切換會正常模式
delta -= mutexStarving
}
// 9.設定狀態
// 將鎖狀態設定為等待數量減1,同時設定為鎖定,加鎖成功
atomic.AddInt32(&m.state, delta)
break
}
// 當前 goroutine 是被系統喚醒的
awoke = true
// 重置自旋次數
iter = 0
} else {
// 如果 CAS 失敗,重新開始
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
自旋是一種多執行緒同步機制,當前的程序在進入自旋的過程中會一直保持 CPU 的佔用,持續檢查某個條件是否為真。
runtime_canSpin(iter)
它的實現方法連結到了sync_runtime_canSpin
runtime_doSpin()
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
它的實現方法連結到了 sync_runtime_doSpin
會執行 30 次 PAUSE
指令,每執行一次再檢查是否可以加鎖,迴圈進行。該過程中,程序仍是執行狀態
更充分的利用CPU,儘量避免 goroutine 切換。因為當前申請加鎖的 goroutine 擁有CPU,如果經過短時間的自旋可以獲得鎖,當前協程可以繼續執行,不必進入阻塞狀態。
對於新來程序一直進行自旋加鎖,排隊中的程序長時間無法拿到鎖,則設定飢餓狀態,該狀態下不允許自旋。