某次會議上發表了error group包,一個g失敗,其他的g會同時失敗的錯誤言論(看了一下原始碼中的一句話The first call to return a non-nil error cancels the group
,沒進一步看其他原始碼,片面理解了)。
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext.
後面想想好像不對,就再次深入原始碼,並進行驗證!
官方介紹:包 errgroup 為處理公共任務的子任務的 goroutine 組提供同步、錯誤傳播和上下文取消。
Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.
如何理解官方介紹中國呢說的,同步,錯誤傳播,上下文取消?
errgroup原始碼就132行,短小精悍,下面就看下每個函數的功能。
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
package errgroup
import (
"context"
"fmt"
"sync"
)
type token struct{}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
cancel func() // 持有ctx cancel func
wg sync.WaitGroup
sem chan token // g數量限制的 chan
errOnce sync.Once // 單例儲存第一次出現的err
err error
}
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
return true
}
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}
Go(f func() error)
: 啟動一個goroutine,如果設定了g limit,則往chan中追加,當達到數量後會被阻塞。否則正常執行g,g中如果出現錯誤,則在單例中儲存錯誤資訊,如果有cancel func(使用) 則呼叫取消訊號。Wait
:等待所有啟動的g都完成,如果有cancel func(使用) 則呼叫取消訊號,返回第一次出現的錯誤,沒有則是nilWithContext(ctx context.Context) (*Group, context.Context)
:啟動一個帶有cancel訊號的errgroup。返回errgoup.Group範例和ctx。TryGo(f func() error) bool
:嘗試啟動一個g,如果g啟動了則返回true。在limit還未達到時,則可以啟動g SetLimit(n int)
:設定最大可啟動的g數量,如果n為負數,則無數量限制。有啟動的g,在重新設定n時則會panicdone
:私有方法,如果設定了limit,則從channel減少一個數量,然後呼叫底層的wg.Done官方介紹中說的,錯誤傳播,應該就是單例模式儲存第一次出現的錯我,在呼叫Wait後會返回。那麼如果理解上下文取消。一個g出錯會取消其他g嘛?用以下程式碼進行測試
func GetGoid() int64 {
var (
buf [64]byte
n = runtime.Stack(buf[:], false)
stk = strings.TrimPrefix(string(buf[:n]), "goroutine")
)
idField := strings.Fields(stk)[0]
id, err := strconv.Atoi(idField)
if err != nil {
panic(fmt.Errorf("can not get goroutine id: %v", err))
}
return int64(id)
}
var (
datarange = []string{"a", "b", "c"}
randIndex = rand.Int31n(10)
)
func calc(index int, val string) (string, error) {
if randIndex == int32(index) {
return "", errors.New("invalid index")
}
return val, nil
}
func TestErrGroupNoCtx(t *testing.T) {
var wg errgroup.Group
result := make(map[string]bool)
var mu sync.Mutex
for i, v := range datarange {
index, val := i, v
wg.Go(func() error {
gid := GetGoid()
data, err := calc(index, val)
if err != nil {
return fmt.Errorf("在g: %d報錯, %s\n", gid, err)
}
fmt.Printf("[%s] 執行: %d\n", data, gid)
mu.Lock()
result[data] = true
mu.Unlock()
fmt.Printf("正常退出: %d\n", GetGoid())
return nil
})
}
if err := wg.Wait(); err != nil {
fmt.Println(err)
}
fmt.Println("執行結束", result)
// first nil err
_, ok := result[datarange[randIndex]]
assert.Equal(t, ok, false)
}
執行後輸出
[a] 執行: 35
正常退出: 35
[c] 執行: 37
正常退出: 37
在g: 36報錯, invalid index
執行結束 map[a:true c:true]
PASS
可以看到,其中b報錯了,但是其他兩個還是正常執行了,所以說,一個g報錯,其他g並不會被殺死,Wait會等待所有g執行完在返回err。
在看WithContext,為什要用帶cancel的ctx呢,看包內Go也只是在某個錯誤中呼叫了取消訊號,怎麼會取消其他的呢?這裡返回到ctx有什麼用?
為什麼說可以取消其他g呢?其實就是要呼叫方來自己負責,每個各自的 goroutine 所有者必須處理取消訊號。
var (
datarange = []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
)
func TestErrGroupWithCtx(t *testing.T) {
wg, ctx := errgroup.WithContext(context.Background())
result := make(map[string]bool)
var mu sync.Mutex
for i, v := range datarange {
index, val := i, v
wg.Go(func() error {
gid := GetGoid()
select {
case <-ctx.Done():
fmt.Printf("goroutine: %d 未執行,msg: %s\n", gid, ctx.Err())
return nil
default:
}
data, err := calc(index, val)
if err != nil {
return fmt.Errorf("在g: %d報錯, %s\n", gid, err)
}
fmt.Printf("[%s] 執行: %d\n", data, gid)
mu.Lock()
result[data] = true
mu.Unlock()
fmt.Printf("正常退出: %d\n", gid)
return nil
})
}
if err := wg.Wait(); err != nil {
fmt.Println(err)
}
fmt.Println("執行結束", result)
// first nil err
_, ok := result[datarange[randIndex]]
assert.Equal(t, ok, false)
}
執行如下命令
go test -v errgroup_test.go --count=1 -run=TestErrGroupWithCtx
=== RUN TestErrGroupWithCtx
[a] 執行: 7
[j] 執行: 16
正常退出: 7
正常退出: 16
[f] 執行: 12
正常退出: 12
[g] 執行: 13
正常退出: 13
[h] 執行: 14
[i] 執行: 15
正常退出: 15
正常退出: 14
[c] 執行: 9
正常退出: 9
goroutine: 10 未執行,msg: context canceled
goroutine: 11 未執行,msg: context canceled
在g: 8報錯, invalid index
執行結束 map[a:true c:true f:true g:true h:true i:true j:true]
--- PASS: TestErrGroupWithCtx (0.00s)
PASS
ok command-line-arguments 0.262s
可以看到,在出錯前,啟動了的g都能正常執行,在出現錯誤後,呼叫Go啟動的g就無法正常執行了,關鍵還是看使用方如何處理cancel這個訊號。
**針對同步,官方有個案例,可以看 ** example-Group-Parallel
可以理解為用邏輯實現的並行的獲取同步資料
The first call to return a non-nil error cancels the group
意思是 cancel group 持有的 ctx,而對於啟動 g 所有者來說,要自己處理 ctx 的取消訊號,並非errgoup會殺死其他 g。其實我們使用errgroup主要就是因為, Group
代替 sync.WaitGroup
簡化了 goroutine 的計數和錯誤處理。
errgroup for迴圈裡千萬別忘了 i, x := i, x,以前用 sync.Waitgroup 的時候都是 go func 手動給閉包傳參解決這個問題的,errgroup 的.Go沒法這麼幹,需要重新宣告變數,獲取實際的值
多工時,才需要考慮,同步、非同步、並行、並行
並行、並行,是邏輯結構的設計模式。
同步、非同步,是邏輯呼叫方式。序列是同步的一種實現,就是沒有並行,所有任務一個一個執行完成。
並行、並行是非同步的 2 種實現方式