緩衝池的基本結構如下:
//資料緩衝池介面的實現型別
type myPool struct {
//緩衝器的統一容量
bufferCap uint32
//緩衝器的最大數量
maxBufferNumber uint32
//緩衝器的實際數量
bufferNumber uint32
//池中資料的總數
total uint64
//存放緩衝器的通道
bufCh chan Buffer
//緩衝池的關閉狀態:0-未關閉;1-已關閉
closed uint32
//保護內部共用資源的讀寫鎖
rwlock sync.RWMutex
}
前兩個欄位用於記錄建立緩衝池時的引數,它們在緩衝池執行期間用到。bufferNumber 和 total 欄位用於記錄緩衝資料的實時狀況。
注意,bufCh 欄位的型別是 chan Buffer,一個元素型別為 Buffer 的通道型別。這與緩衝器中同樣是通道型別的 ch 欄位聯合起來看,就是一個雙層通道的設計。在放入或獲取資料時,我會先從 bufCh 拿到一個緩衝器,再向該緩衝器放入資料或從該緩衝器獲取資料,然後再把它傳送回 bufCh。這樣的設計有如下幾點好處。
1) bufCh 中的每個緩衝器一次只會被一個 goroutine 中的程式(以下簡稱並行程式)拿到。並且,在放回 bufCh 之前,它對其他並行程式都是不可見的。一個緩衝器每次只會被並行程式放入或取走一個資料。即使同一個程式連續呼叫多次 Put 方法或 Get 方法,也會這樣。緩衝器不至於一下被填滿或取空。
2) 更進一步看,bufCh 是 FIFO 的。當把先前拿出的緩衝器歸還給 bufCh 時,該緩衝器總會被放在隊尾。也就是說,池中緩衝器的操作頻率可以降到最低,這也有利於池中資料的均勻分布。
3) 在從 bufCh 拿到緩衝器後,我可以判斷是否需要縮減緩衝器的數量。如果需要並且該緩衝器已空,就可以直接把它關掉,並且不還給 bufCh。另一方面,如果在放入資料時發現所有緩衝器都已滿並且在一段時間內都沒有空位,就可以新建一個緩衝器並放入 bufCh。總之,這讓緩衝池自動伸縮功能的實現變得簡單了。
4) 最後也最重要的是,bufCh 本身就提供了對並行安全的保障。
大家可能會想到,基於標準庫的 container 包中的 List 或 Ring 型別也可以編寫出並行安全的緩衝器佇列。確實可以。不過,用它們來實現會讓你不得不編寫更多的程式碼,因為原本一些現成的操作和功能都需要我們自己去實現,尤其是在保證並行安全性方面。並且,這樣的緩衝器佇列的執行效率可不一定高。
注意,上述設計會導致緩衝池中的資料不是 FIFO 的。不過,對於網路爬蟲框架以及排程器來說,這並不會造成問題。
再看最後一個欄位 rwlock。之所以不叫它 closingLock,是因為它不僅僅為了消除緩衝器中的那個與關閉通道有關的競態條件而存在。大家可以思考一下,怎樣並行地向 bufCh 放入新的緩衝器,同時避免池中的緩衝器數量超過最大值。
NewPool 函數用於新建一個緩衝池。它會先檢查引數的有效性,再建立並初始化一個 *myPool 型別的值並返回。在為它的 bufCh 欄位賦值後,我們需要先向該值放入一個緩衝器。這算是對緩衝池的預熱。關於該函數的具體實現,你可以直接檢視範例專案中的對應程式碼。
對於 Pool 介面的 BufferCap> MaxBufferNumber> BufferNumber 和 Total 方法的實現,我也不多說了。myPool 型別中都有相對應的欄位。不過需要注意的是,對 bufferNumber 和 total 欄位的存取需要使用原子操作。
Put 方法有兩個主要的功能。第一個功能顯然是向緩衝池放入資料。第二個功能是,在發現所有的緩衝器都已滿一段時間後,新建一個緩衝器並將其放入緩衝池。當然,如果當前緩衝池持有的緩衝器已達最大數量,就不能這麼做了。
所以,這裡我們首先需要建立一個發現和觸發追加緩衝器操作的機制。我規定當對池中所有緩衝器的操作的失敗次數都達到 5 次時,就追加一個緩衝器入池。其實這方面的控制可以做得很細,也可以新增引數並把問題拋給使用方。不過這裡先用這個簡易版本。如果你覺得這確實有必要,可以自己編寫一個改進的版本。
以下是我編寫的 Put 方法的實現:
func (pool *myPool) Put(datum interface{}) (err error) {
if pool.Closed() {
return ErrClosedBufferPool
}
var count uint32
maxCount := pool.BufferNumber() * 5
var ok bool
for buf := range pool.bufCh {
ok, err = pool.putData(buf, datum, &count, maxCount)
if ok || err != nil {
break
}
}
return
}
實際上,放入操作的核心邏輯在 myPool 型別的 putData 方法中。Put 方法本身做的主要是不斷地取岀池中的緩衝器,並持有一個統一的“已滿”計數。請注意 count 和 maxCount 變數的初始值,並體會它們的關係。
下面來看 putData 方法,其宣告如下:
//用於向給定的緩衝器放入資料,並在必要時把緩衝器歸還給池
func (pool *myPool) putData(
buf Buffer, datum interface{}, count *uint32, maxCount uint32) (ok bool, err error) {
//省略部分程式碼
}
由於這個方法比較長,所以會分段講解。第一段,putData 為了及時響應緩衝池的關閉,需要在一開始就檢查緩衝池的狀態。並且在方法執行結束前還要檢查一次,以便及時釋放資源。程式碼如下所示:
if pool.Closed() {
return false, ErrClosedBufferPool
}
defer func() {
pool.rwlock.RLock()
if pool.Closed() {
atomic.AddUint32(&pool.bufferNumber, ^uint32(O))
err = ErrClosedBufferPool
} else {
pool.bufCh <- buf
}
pool.rwlock.RUnlock()
}()
在 defer 語句中,我用到了 rwlock 的讀鎖,因為這其中包含了向 bufCh 傳送值的操作。如果在方法即將執行結束時,發現緩衝池已關閉,那麼就不會歸還拿到的緩衝器,同時把對應的錯誤值賦給結果變數 err。注意,不歸還緩衝器時,一定要遞減 bufferNumber 欄位的值。
第二段,執行向拿到的緩衝器放入資料的操作,並在必要時增加“已滿”計數:
ok, err = buf.Put(datum)
if ok {
atomic.AddUint64(&pool.total, 1)
return
}
if err != nil {
return
}
//若因緩衝器已滿而未放入資料,就遞增計數
(*count)++
請注意那兩條 return 語句以及最後的 (*count)++。在試圖向緩衝器放入資料後,我們需要立即判斷操作結果。如果 ok 的值是 true,就說明放入成功,此時就可以在遞增 total 欄位的值後直接返回。
如果 err 的值不為 nil,就是說緩衝器已關閉,這時就不需要再執行後面的語句了。除了這兩種情況,我們就需要遞增 count 的值。因為這時說明緩衝器已滿。如果你忘了 myBuffer 的 Put 方式是怎樣實現的,可以現在回顧一下。
這裡的 count 值遞增操作與第三段程式碼息息相關,這涉及對追加緩衝器的操作的觸發。下面是第三段程式碼:
//如果嘗試向緩衝器放入資料的失敗次數達到閾值,
//並且池中緩衝器的數量未達到最大值,
//那麼就嘗試建立一個新的緩衝器,先放入資料再把它放入池
if *count >= maxCount && pool.BufferNumber() < pool.MaxBufferNumber() {
pool.rwlock.Lock()
if pool.BufferNumber() < pool.MaxBufferNumber() {
if pool.Closed() {
pool.rwlock.Uniock()
return
}
newBuf, _ := NewBuffer(pool.bufferCap)
newBuf.Put(datum)
pool.bufCh <- newBuf
atomic.AddUint32(&pool.bufferNumber, 1)
atomic.AddUint64(&pool.total, 1)
ok = true
}
pool.rwlock.Uniock()
*count = 0
}
return
在這段程式碼中,我用到了雙檢鎖。如果第一次條件判斷通過,就會立即再做一次條件判斷。不過這之前,我會先鎖定 rwlock 的寫鎖。這有兩個作用:
-
第一:防止向已關閉的緩衝池追加緩衝器。
-
第二:防止緩衝器的數量超過最大值。在確保這兩種情況不會發生後,把一個已放入那個資料的緩衝器追加到緩衝池中。
同樣,及時更新計數也很重要。一旦第一次條件判斷通過,即使最後沒有追加緩衝器也應該清零 count 的值。及時清零“已滿”計數可以有效減少不必要的操作和資源消耗。另外,一旦追加緩衝器成功,就一定要遞增 bufferNumber 和 total 的值。
Get 方法的總體流程與 Put 方法基本一致:
func (pool *myPool) Get() (datum interfaced, err error) {
if pool.Closed() {
return nil, ErrClosedBufferPool
}
var count uint32
maxCount := pool.BufferNumber() * 10
for buf := range pool.bufCh {
datum, err = pool.getData(buf, &count, maxCount)
if datum != nil || ^rr != nil {
break
}
}
return
}
把“已空”計數的上限 maxCount 設為緩衝器數量的 10 倍。也就是說,若在遍歷所有緩衝器 10 次之後仍無法獲取到資料,Get 方法就會從緩衝池中去掉一個空的緩衝器。getData 方法的宣告如下:
//用於從給定的緩衝器獲取資料,並在必要時把緩衝器歸還給池
func (pool *myPool) getData(
buf Buffer, count *uint32, maxCount uint32) (datum interface!}, err error) {
//省略部分程式碼
}
getData 方法的實現稍微簡單一些,可分為兩段。第一段程式碼的關鍵仍然是狀態檢查和資源釋放:
if pool.Closed() {
return rdl, ErrClosedBufferPool
}
defer func() {
//如果嘗試從緩衝器獲取資料的失敗次數達到閾值,
//同時當前緩衝器已空且池中緩衝器的數量大於1,
//那麼就直接關掉當前緩衝器,並不歸還給池
if *count >= maxCount &&
buf.Len() == 0 &&
pool.BufferNumber() > 1 {
buf.Close()
atomic.AddUint32(&pool.bufferNumber, ^uint32(0))
*count = 0
return
}
pool.rwlock.RLock()
if pool.Closed() {
atomic.AddUint32(Spool.bufferNumber, ^uint32(0))
err = ErrClosedBufferPool
} else {
pool.bufCh <- buf
}
pool.rwlock.RUnlock()
}()
defer 語句中第一條 if 語句的作用是,當不歸還當前緩衝器的所有條件都已滿足時,我們就在關掉當前緩衝器和更新計數後直接返回。只有條件不滿足時,才在確認緩衝池未關閉之後再把它歸還給緩衝池。注意,這時候需要鎖定 rwlock 的讀鎖,以避免向已關閉的 bufCh 傳送值。
第二段程式碼的作用是試圖從當前緩衝器獲取資料。在成功取出資料時,必須遞減 total 欄位的值。同時,如果取出失敗且沒有發現錯誤,就會遞增“已空”計數。相關程式碼如下:
datum, err = buf.Get()
if datum != nil {
atomic.AddUint64(&pool.total, ^uint64(0))
return
}
if err != nil {
return
}
//若因緩衝器已空未取出資料,就遞增計數
(*count)++
return
putData 和 getData 方法中對 rwlock 的讀鎖或寫鎖的鎖定就是為了預防關閉 bufCh 可能引發的執行時恐慌。顯然,這些操作能夠起作用的前提是 Close 方法對 rwlock 的合理使用,該方法的程式碼如下:
func (pool *myPool) Close() bool {
if !atomic.CompareAndSwapUint32(&pool.closed, 0, 1) {
return false
}
pool.rwlock.Lock()
defer pool.rwlock.Unlock()
close(pool.bufCh)
for buf := range pool.bufCh {
buf.Close()
}
return true
}
以上就是對緩衝池實現主要部分的展示和說明。