環境:go 1.19.8
在讀多寫少的情況下,即使一段時間內沒有寫操作,大量並行的讀存取也不得不在Mutex的保護下變成序列存取,這種情況下,使用Mutex,對效能影響比較大。
所以就要區分讀寫操作。如果某個讀操作的g持有了鎖,其他讀操作的g就不必等待了,可以並行的存取共用變數,這樣就可以將序列的讀變成並行的讀,提高讀操作的效能。可理解為共用鎖。
當寫操作的g持有鎖,它是一個排他鎖,不管其他的g是寫操作還是讀操作,都需要阻塞等待持有鎖的g釋放鎖。
reader/writer互斥鎖,在某一時刻只能由任意數量的reader持有,或者是隻被單個writer持有。
RWMutex實現了5個方法:
案例:計數器,1writer n reader
如果可以明確區分 reader 和 writer goroutine ,且有大量的並行讀,少量的並行寫,並且有強烈的效能要求,可以考慮使用讀寫鎖RWMutex替換Mutex
RWMutex 是很常見的並行原語,很多程式語言的庫都提供了類似的並行型別。RWMutex
一般都是基於互斥鎖、條件變數(condition variables)或者號誌(semaphores)等
並行原語來實現。Go 標準庫中的 RWMutex 是基於 Mutex 實現的。
reader-writers 問題,一般有三類,基於對讀和寫操作的優先順序,讀寫鎖的設計和實現也分成三類
Go 標準庫中的 RWMutex 設計是 Write-preferring 方案。一個正在阻塞的 Lock 呼叫
會排除新的 reader 請求到鎖。
上鎖解鎖流程以及數值變化情況
rwmutexMaxReaders 的數量被初始化為1<<30
,理想中,寫鎖不會持續很久,不會導致readerCount 自動從負值自動+1回到正值。
type RWMutex struct {
w sync.Mutex // hold if there are pending writers
writerSem uint32 // 寫 阻塞訊號
readerSem uint32 // 讀 阻塞訊號
readerCount int32 // 正在讀的呼叫者數量/ 當為負數時 表示有write持有鎖
readerWait int32 // writer持有鎖之前正等待解鎖的數量
}
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 寫端 持有鎖, 讀端阻塞
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 無讀者等待,喚醒寫端等待者
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
第11行,上讀鎖,首先對readerCount進行原子加1,如果小於0則表示存在寫鎖,直接阻塞。為什麼readerCount會存在負值?這個要看readerCount除了在RLock中處理,還在哪裡被處理了。可以看到在獲取寫鎖時有響應程式碼。後面在解釋。如果原子加大於等於0,則表示獲取讀鎖成功。
第18行,讀解鎖,對readerCount進行原子減1,如果小於零,則表示存在活躍的reader(即當前獲得互斥鎖的寫鎖之前獲取到讀鎖許可權的讀者數量),readerWait 欄位就減 1,直到所有的活躍的 reader 都釋放了讀鎖,才會喚醒這個 write
func (rw *RWMutex) Lock() {
// 1. 先嚐試獲取互斥鎖
rw.w.Lock()
// 2. 看是否有其他正持有鎖的讀者,有則阻塞
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// rc - rwmutexMaxReaders + rwmutexMaxReaders > 0說明還有等待者, 寫端阻塞
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
func (rw *rwMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
// 如果有等待的讀者,先喚醒
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 釋放互斥鎖
rw.w.Unlock()
}
我們知道,寫操作要等待讀操作結束後才可以獲得鎖,寫操作等待期間可能還有新的讀操作持續到來,如果寫操作等待所有讀操作結束,就會出現飢餓現象。然而,通過readerWait
可完美解決這個問題。
寫操作到來時,會把readerCount
值拷貝到readerWait
中,用於標記排在寫操作之前到讀者個數。
當讀操作結束後,除了會遞減readerCount
,還會遞減readerWait
的值,當readerWait
值變為0時會喚醒寫操作。
寫操作之後產生的讀操作會加入到readerCount
中,阻塞知道寫鎖釋放。
上面說過,寫鎖之後來的讀者會被阻塞,所以在寫鎖釋放之際,會看是否有需要喚醒的讀者,再釋放互斥鎖
讀寫鎖包含一個互斥鎖(Mutex),寫鎖必須先獲取該互斥鎖,如果互斥鎖已被協程A獲取,意味者其他協程只能阻塞等待互斥鎖釋放
readerCount
是個整型值,用於表示讀者數量,不考慮寫操作的情況下,每次獲取讀鎖,將該值加1,每次解鎖將其減1,所以readerCount
的取值為[0, N]
,最大可支援2^30
個並行讀者。
當寫鎖定進行時,會先將readerCount -= rwmutextMaxReaders(2^30)
,此時 readerCount
負數。這時再有讀者到了,檢測到readerCount
為負值,則表示有寫操作正在進行,後來到讀者阻塞等待。等待者的數量即 reaerCount + 2^30
寫操作時,會把readerCount
的值拷貝到readerWait
中,用於標記在寫操作前面讀者的個數,前面的寫鎖釋放後,會遞減readerCount,readerWait
,當readerWait
值變為0時喚醒寫操作
rwmutex是由一個互斥鎖和四個輔助欄位組成的,與互斥鎖一樣,讀寫鎖也是不能複製的。
一旦讀寫鎖被使用,它的欄位就會記錄它當前的一些狀態,如果此時去複製這把鎖,就會把它的狀態也複製過去。但原來的鎖在釋放的時候,並不會修改複製出來的讀寫鎖,會導致複製出來的讀寫鎖狀態異常,可能永遠無法釋放鎖。
讀寫鎖重入,或者遞迴呼叫,導致的死鎖情況很多
func foo(l *sync.RWMutex) {
fmt.Println("lock in foo")
l.Lock()
bar(l)
l.Unlock()
}
func bar(l *sync.RWMutex) {
fmt.Println("lock in bar")
l.Lock()
l.Unlock()
}
func main() {
l := &sync.RWMutex{}
foo(l)
}
func main() {
var mu sync.RWMutex
go func() {
time.Sleep(200*time.Millisecond)
mu.Lock()
fmt.Println("Lock")
time.Sleep(100*time.Millisend)
mu.Unlock()
fmt.Println("Unlock")
}
go func() {
factorial(&mu, 10) // 計算10的階乘
}
select {}
}
//
func factorial(m *sync.RWMutex, n int) {
if n < 1 {
return 0
}
fmt.Println("RLock")
m.RLock()
defer func() {
fmt.Println("RUnlock")
m.RUnlock()
}
time.Sleep(100*time.Millisecond)
return factorial(m, n-1) * n
}
factorial 方法是一個遞迴計算階乘的方法,我們用它來模擬 reader。為了更容易地製造出死鎖場景,在這裡加上了 sleep 的呼叫,延緩邏輯的執行。這個方法會呼叫讀鎖(第 27
行),在第 33 行遞迴地呼叫此方法,每次呼叫都會產生一次讀鎖的呼叫,所以可以不斷地產生讀鎖的呼叫,而且必須等到新請求的讀鎖釋放,這個讀鎖才能釋放。同時,我們使用另一個 goroutine 去呼叫 Lock 方法,來實現 writer,這個 writer 會等待200 毫秒後才會呼叫 Lock,這樣在呼叫 Lock 的時候,factoria 方法還在執行中不斷呼叫
RLock。這兩個 goroutine 互相持有鎖並等待,誰也不會退讓一步,滿足了「writer 依賴活躍的reader -> 活躍的 reader 依賴新來的 reader -> 新來的 reader 依賴 writer」的死鎖條件,所以就導致了死鎖的產生。
鎖都是成對出現的,Lock和RLock的多餘呼叫會導致鎖沒有被釋放,可能會出現死鎖。
而Unlock和RUnlock多餘呼叫會導致panic