用metux lock for迴圈,在for迴圈中又 向帶緩衝的Channel 寫資料時,千萬要小心死鎖!
有一個作者:https://wavded.com/post/golang-deadlockish/ 分享了一個類似的問題。下面是參照的部分正文內容。
func (r *Room) Broadcast(msg string) { r.membersMx.RLock() defer r.membersMx.RUnlock() for _, m := range r.members { if err := s.Send(msg); err != nil { // ❶ log.Printf("Broadcast: %v: %v", r.instance, err) } } }
請注意,我們等待❶,直到每個成員收到訊息,然後再繼續下一個成員。這很快就會成為問題。
func (r *Room) Add(s sockjs.Session) { r.membersMx.Lock() // ❶ r.members = append(r.members, s) r.membersMx.Unlock() }
我們無法獲得鎖❶,因為我們的 Broadcast 函數仍在使用它來傳送訊息。
func (ud *UserDevice) SendMsg(ctx context.Context, msg *InternalWebsocketMessage) { // 注意,不是原生的Write if err = ud.Conn.Write(data); err != nil { ud.L.Debug("Write error", zap.Error(err)) } } func (c *connectionImpl) Write(data []byte) (err error) { wsMsgData := &MsgData{ MessageType: websocket.BinaryMessage, Data: data, } c.writer <- wsMsgData // 注意這裡,writer是有緩衝的,數量目前是10,如果被寫滿,就會阻塞 return }
func (m *userManager) BroadcastMsgToRoom(ctx context.Context, msg *InternalWebsocketMessage, roomId []int64) { // 這裡有互斥鎖,確保map的遍歷 m.RLock() defer m.RUnlock() // m.users 是一個 map[int64]User型別 for _, user := range m.users { user.SendMsg(ctx, msg) // ❶ } }
func (m *userManager) Add(device UserDeviceInterface) (User, int) { uid := device.UID() m.Lock() // ❶ defer m.Unlock() user, ok := m.users[uid] if !ok { user = NewUser(uid, device.GetLogger()) m.users[uid] = user } remain := user.AddDevice(device) return user, remain }
func onWSUpgrade(ginCtx *gin.Context) { // ... utils.GoSafe(ctx, func(ctx context.Context) { // ... userDevice.User, remain = biz.DefaultUserManager.Add(userDevice) }, logger) }
但是 c.writer <- wsMsgData 為什麼會滿了呢?再繼續跟程式碼,發這裡原來有個超時邏輯:
func (c *connectionImpl) ExecuteLogic(ctx context.Context, device UserDeviceInterface) { go func() { for { select { case msg, ok := <-c.writer: if !ok { return } // 寫超時5秒 _ = c.conn.SetWriteDeadline(time.Now().Add(types.KWriteWaitTime)) if err := c.conn.WriteMessage(msg.MessageType, msg.Data); err != nil { c.conn.Close() c.onWriteError(err, device.UserId(), device.UserId()) return } } } }() }
這下就能解釋的通了!
// Push server push message. func (c *Channel) Push(p *protocol.Proto) (err error) { select { case c.signal <- p: default: err = errors.ErrSignalFullMsgDropped } return }
有一個select,發現了嗎?如果c.signal緩衝區滿,這個i/o就被阻塞,select輪詢機制會執行到default,那麼呼叫方在迴圈中呼叫Push的時候,也不會block了。
func (c *connectionImpl) Write(data []byte) (err error) { wsMsgData := &MsgData{ MessageType: websocket.BinaryMessage, Data: data, } // if buffer full, return error immediate select { case c.writer <- wsMsgData: default: err = ErrWriteChannelFullMsgDropped } return }
func main() { w := make(chan string, 2) w <- "1" fmt.Println("write 1") w <- "2" fmt.Println("write 2」) w <- "3" }
write 1 write 2 fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]: main.main() /Users/xu/repo/github/01_struct_mutex/main.go:133 +0xdc exit status 2
func main() { w := make(chan string, 2) w <- "1" fmt.Println("write 1") w <- "2" fmt.Println("write 2") select { case w <- "3": fmt.Println("write 3") default: fmt.Println("msg flll") } }
write 1 write 2 msg flll
用metux lock for迴圈,在for迴圈中又 向帶緩衝的Channel 寫資料時,千萬要小心死鎖!
func (r *Room) Broadcast(msg string) { r.mu.RLock() defer r.mu.RUnlock() for _, m := range r.members { r.writer <- msg // Bad } }
func (r *Room) Broadcast(msg string) { r.mu.RLock() defer r.mu.RUnlock() for _, m := range r.members { // Good