Go語言網路爬蟲緩衝器工具的實現

2020-07-16 10:04:51
緩衝器的基本結構如下:
//集衝器介面的實現型別
type myBuffer struet {
    //存放資料的通道
    ch chan interface{}
    //緩衝器的關閉狀態:0-未關閉;2-已關閉
    closed uint32
    //為了消除因關閉緩衝器而產生的競態條件的讀寫鎖
    closingLock sync.RWMutex
}
顯然,緩衝器的實現就是對通道型別的簡單封裝,只不過增加了兩個欄位用於解決前面所說的那些問題。欄位 closed 用於標識緩衝器的狀態。緩衝器自建立之後只有兩種狀態:未關閉和已關閉。注意,我們需要用原子操作存取該欄位的值。

closingLock 欄位代表了讀寫鎖。如果你在程式中並行地進行向通道傳送值和關閉該通道的操作的話,會產生競態條件。通過在使用 go 命令(比如 go test)時加入標記 -race,可以檢測到這種競態條件。後面你會看到使用讀寫鎖消除它的正確方法。

NewBuffer 函數用於建立一個緩衝器:
// NewBuffer 用於建立一個緩.衝器。
// 引數size代表緩衝器的容量
func NewBuffer(size uint32) (Buffer, error) {
    if size == 0 {
        errMsg := fmt.Sprintf("illegal size for buffer: %d", size)
        return nil, errors.NewIllegalParameterError(errMsg)
    }
    return &myBuffer{
        ch: make(chan interFace{}, size),
    }, nil
}
它先檢驗引數值,然後構造一個 *myBuffer 型別的值並返回。顯然,在實現介面方法時,接收者的型別都是 *myBuffer。

注意,errors.NewIllegalParameterError 用於生成一個代表引數錯誤的錯誤值,其中 errors 代表的並不是標準庫中的 errors 包,而是程式碼包中的 gopcp.v2/chapter6/webcrawler/
errors 包。大家可以在我的網路硬碟中下載相關程式碼包(連結:https://pan.baidu.com/s/1yzWHnK1t2jLDIcTPFMLPCA 提取碼:slm5)。

Buffer 介面的 Cap 方法和 Len 方法實現起來都相當簡單,只需把內建函數 cap 或 len 應用在欄位 ch 上就好了。這裡也無需使用額外的保證並行安全的措施。

對於 Put 方法,需要注意的是對讀寫鎖的運用和對緩衝器狀態的判斷。在 Put 方法中,我們應該使用讀鎖。因為“向通道傳送值”的操作會受到“關閉通道”操作的影響。如果不關閉通道,根本無需在進行傳送操作時使用鎖。另外,如果在進行傳送操作前就已經發現通道關閉,就不應該再去嘗試傳送值了。下面來看 Put 方法的實現:
func (buf *myBuffer) Put(datum interface{}) (ok bool, err error) {
    buf.closingLock.RLock()
    defer buf.closingLock.RUnlock()
    if buf.Closed() {
        return false, ErrClosedBuffer
    }
    select {
    case buf.ch <- datum:
        ok = true
    default:
        ok = false
    }
    return
}
在寫鎖的保護下關閉通道。對應地,在 Put 方法的起始處鎖定讀鎖,然後再去做狀態判斷。如果反過來,那麼通道就有可能在狀態判斷之後且鎖定讀鎖之前關閉。這時,Put 方法會以為通道未關閉,然後在讀鎖的所謂保護下向通道傳送值,引發執行時恐慌。

接下來的 select 語句主要是為了讓 Put 方法永遠不會阻塞在傳送操作上。在 default 分支中把結果變數 ok 的值設定為 false,加之這時的結果變數 err 必為 nil,就可以告知呼叫方放入資料的操作未成功,且原因並不是緩衝器已關閉,而是緩衝器已滿。

Get 方法的實現要簡單一些。因為從通道接收值的操作可以絲毫不受到通道關閉的影響,所以無需加鎖。其實現如下:
func (buf *myBuffer) Get() (interface{}, error) {
    select {
        case datum, ok := <-buf.ch:
            if !ok {
                return nil, ErrClosedBuffer
            }
            retum datum, nil
        default:
        return nil, nil
    }
}
這裡同樣使用 select 語句讓它變成非阻塞的。順便提一句,ErrClosedBuffer 是一個變數,表示緩衝器已關閉的錯誤,它的宣告是這樣的:

//表示緩衝器已關閉的錯誤
var ErrClosedBuffer = errors.New("closed buffer")

這遵從了 Go語言程式中的慣用法。標準庫中的類似變數有 io.EOF、bufio.ErrBufferFull 等。

再來說 Close 方法。在關閉通道之前,先要避免重複操作。因為重複關閉一個通道也會引發執行時恐慌。避免措施就是先檢查 closed 欄位的值。當然,必須使用原子操作。下面是它的實現:
func (buf *myBuffer) Close() bool {
    if atomic.CompareAndSwapUint32(&buf.closed, 0, 1) {
        buf.closingLock.Lock()
        close(buf.ch)
        buf.closingLock.Unlock()
        return true
    }
    return false
}
最後,在 Closed 方法中讀取 closed 欄位的值時,也一定要使用原子操作:
func (buf *myBuffer) Closed() bool {
    if atomic.LoadUint32(&buf.closed) == 0 {
        return false
    }
    return true
}
千萬不要假設讀取共用資源就是並行安全的,除非資源本身做出了這種保證。