//集衝器介面的實現型別 type myBuffer struet { //存放資料的通道 ch chan interface{} //緩衝器的關閉狀態:0-未關閉;2-已關閉 closed uint32 //為了消除因關閉緩衝器而產生的競態條件的讀寫鎖 closingLock sync.RWMutex }顯然,緩衝器的實現就是對通道型別的簡單封裝,只不過增加了兩個欄位用於解決前面所說的那些問題。欄位 closed 用於標識緩衝器的狀態。緩衝器自建立之後只有兩種狀態:未關閉和已關閉。注意,我們需要用原子操作存取該欄位的值。
// NewBuffer 用於建立一個緩.衝器。 // 引數size代表緩衝器的容量 func NewBuffer(size uint32) (Buffer, error) { if size == 0 { errMsg := fmt.Sprintf("illegal size for buffer: %d", size) return nil, errors.NewIllegalParameterError(errMsg) } return &myBuffer{ ch: make(chan interFace{}, size), }, nil }它先檢驗引數值,然後構造一個 *myBuffer 型別的值並返回。顯然,在實現介面方法時,接收者的型別都是 *myBuffer。
func (buf *myBuffer) Put(datum interface{}) (ok bool, err error) { buf.closingLock.RLock() defer buf.closingLock.RUnlock() if buf.Closed() { return false, ErrClosedBuffer } select { case buf.ch <- datum: ok = true default: ok = false } return }在寫鎖的保護下關閉通道。對應地,在 Put 方法的起始處鎖定讀鎖,然後再去做狀態判斷。如果反過來,那麼通道就有可能在狀態判斷之後且鎖定讀鎖之前關閉。這時,Put 方法會以為通道未關閉,然後在讀鎖的所謂保護下向通道傳送值,引發執行時恐慌。
func (buf *myBuffer) Get() (interface{}, error) { select { case datum, ok := <-buf.ch: if !ok { return nil, ErrClosedBuffer } retum datum, nil default: return nil, nil } }這裡同樣使用 select 語句讓它變成非阻塞的。順便提一句,ErrClosedBuffer 是一個變數,表示緩衝器已關閉的錯誤,它的宣告是這樣的:
//表示緩衝器已關閉的錯誤
var ErrClosedBuffer = errors.New("closed buffer")
func (buf *myBuffer) Close() bool { if atomic.CompareAndSwapUint32(&buf.closed, 0, 1) { buf.closingLock.Lock() close(buf.ch) buf.closingLock.Unlock() return true } return false }最後,在 Closed 方法中讀取 closed 欄位的值時,也一定要使用原子操作:
func (buf *myBuffer) Closed() bool { if atomic.LoadUint32(&buf.closed) == 0 { return false } return true }千萬不要假設讀取共用資源就是並行安全的,除非資源本身做出了這種保證。