從原始碼深入理解讀寫鎖(golang-RWMutex)

2023-05-05 12:02:00

環境:go 1.19.8

在讀多寫少的情況下,即使一段時間內沒有寫操作,大量並行的讀存取也不得不在Mutex的保護下變成序列存取,這種情況下,使用Mutex,對效能影響比較大。
所以就要區分讀寫操作。如果某個讀操作的g持有了鎖,其他讀操作的g就不必等待了,可以並行的存取共用變數,這樣就可以將序列的讀變成並行的讀,提高讀操作的效能。可理解為共用鎖。
當寫操作的g持有鎖,它是一個排他鎖,不管其他的g是寫操作還是讀操作,都需要阻塞等待持有鎖的g釋放鎖。

什麼是RWMutex?

reader/writer互斥鎖,在某一時刻只能由任意數量的reader持有,或者是隻被單個writer持有。
RWMutex實現了5個方法:

  • Lock/Unlock:寫操作時呼叫。如果鎖已經被reader或者writer持有,那麼,Lock方法會一直阻塞,直到能獲取到鎖;Unlock是對應的釋放鎖方法
  • RLock/RUnlock:讀操作時呼叫。如果鎖已經被writer持有,RLock方法會一直阻塞,直到能獲取鎖,否則直接return;Rnlock是對應的釋放鎖方法
  • RLocker:這個方法的作用是為讀操作返回一個 Locker 介面的物件

案例:計數器,1writer n reader

使用場景

如果可以明確區分 reader 和 writer goroutine ,且有大量的並行讀,少量的並行寫,並且有強烈的效能要求,可以考慮使用讀寫鎖RWMutex替換Mutex

實現原理

RWMutex 是很常見的並行原語,很多程式語言的庫都提供了類似的並行型別。RWMutex
一般都是基於互斥鎖、條件變數(condition variables)或者號誌(semaphores)等
並行原語來實現。Go 標準庫中的 RWMutex 是基於 Mutex 實現的。
reader-writers 問題,一般有三類,基於對讀和寫操作的優先順序,讀寫鎖的設計和實現也分成三類

  • Read-Preferring:讀優先的設計可以提供很高的並行性。但在競爭激烈的情況下會導致寫飢餓
  • Write-Preferring:如果有一個writer在等待請求鎖,它會阻止新來請求鎖reader獲取到鎖,優先保障writer。當然,如果reader已經獲得鎖,新請求的writer也需要等待已持有鎖的reader釋放鎖。寫優先順序設計中的優先權是針對新來的請求而言的。這種設計主要避免了 writer 的飢餓問題。
  • 不指定優先順序:這種設計比較簡單,不區分 reader 和 writer 優先順序,某些場景下這種不指定優先順序的設計反而更有效,因為第一類優先順序會導致寫飢餓,第二類優先順序可能會導致讀飢餓,這種不指定優先順序的存取不再區分讀寫,大家都是同一個優先順序,解決了飢餓的問題。

Go 標準庫中的 RWMutex 設計是 Write-preferring 方案。一個正在阻塞的 Lock 呼叫
會排除新的 reader 請求到鎖。

原始碼解析

上鎖解鎖流程以及數值變化情況

rwmutexMaxReaders 的數量被初始化為1<<30,理想中,寫鎖不會持續很久,不會導致readerCount 自動從負值自動+1回到正值。

RLock/RUnlock實現

type RWMutex struct {
	w           sync.Mutex // hold if there are pending writers
	writerSem   uint32     // 寫 阻塞訊號
	readerSem   uint32     // 讀 阻塞訊號
	readerCount int32      // 正在讀的呼叫者數量/ 當為負數時 表示有write持有鎖
	readerWait  int32      // writer持有鎖之前正等待解鎖的數量
}

const rwmutexMaxReaders = 1 << 30

func (rw *RWMutex) RLock() {
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// 寫端 持有鎖, 讀端阻塞
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

func (rw *RWMutex) RUnlock() {
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		rw.rUnlockSlow(r)
	}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		fatal("sync: RUnlock of unlocked RWMutex")
	}

	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// 無讀者等待,喚醒寫端等待者
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

RLock

第11行,上讀鎖,首先對readerCount進行原子加1,如果小於0則表示存在寫鎖,直接阻塞。為什麼readerCount會存在負值?這個要看readerCount除了在RLock中處理,還在哪裡被處理了。可以看到在獲取寫鎖時有響應程式碼。後面在解釋。如果原子加大於等於0,則表示獲取讀鎖成功。

RUnlock

第18行,讀解鎖,對readerCount進行原子減1,如果小於零,則表示存在活躍的reader(即當前獲得互斥鎖的寫鎖之前獲取到讀鎖許可權的讀者數量),readerWait 欄位就減 1,直到所有的活躍的 reader 都釋放了讀鎖,才會喚醒這個 write

Lock/Unlock

func (rw *RWMutex) Lock() {
	// 1. 先嚐試獲取互斥鎖
	rw.w.Lock()
	// 2. 看是否有其他正持有鎖的讀者,有則阻塞
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		// rc - rwmutexMaxReaders + rwmutexMaxReaders > 0說明還有等待者, 寫端阻塞
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}

func (rw *rwMutex) Unlock() {
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		fatal("sync: Unlock of unlocked RWMutex")
	}

	// 如果有等待的讀者,先喚醒
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}

	// 釋放互斥鎖
	rw.w.Unlock()
}

Lock

  1. 先獲取互斥鎖
  2. 成功獲取後,r=readerCount-rwmutexMaxReaders,得到的數值就是一個負數,在加上rwmutexReaders就表示寫鎖等待者的數量,此時,如果r不等於0,且readerWait+r!=0,則表示有讀等待者,寫鎖阻塞

我們知道,寫操作要等待讀操作結束後才可以獲得鎖,寫操作等待期間可能還有新的讀操作持續到來,如果寫操作等待所有讀操作結束,就會出現飢餓現象。然而,通過readerWait可完美解決這個問題。

寫操作到來時,會把readerCount值拷貝到readerWait中,用於標記排在寫操作之前到讀者個數。
當讀操作結束後,除了會遞減readerCount,還會遞減readerWait的值,當readerWait值變為0時會喚醒寫操作。

寫操作之後產生的讀操作會加入到readerCount中,阻塞知道寫鎖釋放。

Unlock

上面說過,寫鎖之後來的讀者會被阻塞,所以在寫鎖釋放之際,會看是否有需要喚醒的讀者,再釋放互斥鎖

場景討論

寫操作如何阻塞寫操作

讀寫鎖包含一個互斥鎖(Mutex),寫鎖必須先獲取該互斥鎖,如果互斥鎖已被協程A獲取,意味者其他協程只能阻塞等待互斥鎖釋放

寫操作是如何阻塞讀操作

readerCount是個整型值,用於表示讀者數量,不考慮寫操作的情況下,每次獲取讀鎖,將該值加1,每次解鎖將其減1,所以readerCount的取值為[0, N],最大可支援2^30個並行讀者。

當寫鎖定進行時,會先將readerCount -= rwmutextMaxReaders(2^30),此時 readerCount負數。這時再有讀者到了,檢測到readerCount為負值,則表示有寫操作正在進行,後來到讀者阻塞等待。等待者的數量即 reaerCount + 2^30

讀操作是如何阻止寫操作的

寫操作時,會把readerCount的值拷貝到readerWait中,用於標記在寫操作前面讀者的個數,前面的寫鎖釋放後,會遞減readerCount,readerWait,當readerWait值變為0時喚醒寫操作

3個踩坑點

不可複製

rwmutex是由一個互斥鎖和四個輔助欄位組成的,與互斥鎖一樣,讀寫鎖也是不能複製的。
一旦讀寫鎖被使用,它的欄位就會記錄它當前的一些狀態,如果此時去複製這把鎖,就會把它的狀態也複製過去。但原來的鎖在釋放的時候,並不會修改複製出來的讀寫鎖,會導致複製出來的讀寫鎖狀態異常,可能永遠無法釋放鎖。

重入導致死鎖

讀寫鎖重入,或者遞迴呼叫,導致的死鎖情況很多

  1. 讀寫鎖內部基於互斥鎖實現對writer並行控制,而互斥鎖本身就有重入問題,所以,writer重入呼叫Lock,會導致死鎖
func foo(l *sync.RWMutex) {
    fmt.Println("lock in foo")
    l.Lock()
    bar(l)
    l.Unlock()
}

func bar(l *sync.RWMutex) {
    fmt.Println("lock in bar")
    l.Lock()
    l.Unlock()
}

func main() {
    l := &sync.RWMutex{}
    foo(l)
}
  1. 當一個 writer 請求鎖的時候,如果已經有一些活躍的 reader,它會等待這些活躍的reader 完成,才有可能獲取到鎖,但是,如果之後活躍的 reader 再依賴新的 reader 的話,這些新的 reader 就會等待 writer 釋放鎖之後才能繼續執行,這就形成了一個環形依賴: writer 依賴活躍的 reader -> 活躍的 reader 依賴新來的 reader -> 新來的 reader依賴 writer。
func main() {
    var mu sync.RWMutex

    go func() {
        time.Sleep(200*time.Millisecond)
        mu.Lock()
        fmt.Println("Lock")
        time.Sleep(100*time.Millisend)
        mu.Unlock()
        fmt.Println("Unlock")
    }

    go func() {
        factorial(&mu, 10) // 計算10的階乘
    }

    select {}
}

// 
func factorial(m *sync.RWMutex, n int) {
    if n < 1 {
        return 0
    }
    
    fmt.Println("RLock")
    m.RLock()
    defer func() {
        fmt.Println("RUnlock")
        m.RUnlock()
    }

    time.Sleep(100*time.Millisecond)
    return factorial(m, n-1) * n
}

factorial 方法是一個遞迴計算階乘的方法,我們用它來模擬 reader。為了更容易地製造出死鎖場景,在這裡加上了 sleep 的呼叫,延緩邏輯的執行。這個方法會呼叫讀鎖(第 27
行),在第 33 行遞迴地呼叫此方法,每次呼叫都會產生一次讀鎖的呼叫,所以可以不斷地產生讀鎖的呼叫,而且必須等到新請求的讀鎖釋放,這個讀鎖才能釋放。同時,我們使用另一個 goroutine 去呼叫 Lock 方法,來實現 writer,這個 writer 會等待200 毫秒後才會呼叫 Lock,這樣在呼叫 Lock 的時候,factoria 方法還在執行中不斷呼叫
RLock。這兩個 goroutine 互相持有鎖並等待,誰也不會退讓一步,滿足了「writer 依賴活躍的reader -> 活躍的 reader 依賴新來的 reader -> 新來的 reader 依賴 writer」的死鎖條件,所以就導致了死鎖的產生。

釋放未加鎖的RWMutex

鎖都是成對出現的,Lock和RLock的多餘呼叫會導致鎖沒有被釋放,可能會出現死鎖。
而Unlock和RUnlock多餘呼叫會導致panic

參考

  1. go中sync.RWMutex原始碼解讀