並行和並行是有區別的,並行不等於並行。
兩個或多個事件在同一時間不同時間間隔發生。對應在Go中,就是指多個 goroutine 在單個CPU上的交替執行。
兩個或者多個事件在同一時刻發生。對應在Go中,就是指多個 goroutine 在多個CPU上同時執行。
goroutine 是 Go 中一種輕量級執行緒。也稱為使用者態執行緒。由 Go 的 runtime 進行管理。Go 的程式會智慧地將 goroutine 中的任務合理地分配給每個 CPU。
在程式中,我們只要使用 go 關鍵字,就可以輕易開啟一個 goroutine
在使用 goroutine 時,以下兩個建議可以有效避免 goroutine 洩露。
來看一個洩露的例子
func leak() {
ch := make(chan int)
go func() {
<-ch//leak 函數阻塞在接受 ch
fmt.Println("receive a value")
}()
}
func main() {
leak(ch)//函數返回,
}
這個channel將無法被關閉,leak 函數裡開啟的 goroutine 也永遠無法返回,當然,這個例子中 leak 函數返回了,main 函數結束,leak 函數裡開啟的 goroutine 也就返回了。
1.呼叫者不清楚什麼時候結束,也無法控制 goroutine 的生命週期。只能被動等待 channel 接受訊號,然後執行函數邏輯,如你所見,造成的後果便是容易產生 goroutine 洩露。
來看下面一個例子
type Worker struct {
wg sync.WaitGroup
}
func (w *Worker) Do() {
w.wg.Add(1)
go func() {
defer w.wg.Done()
//do someting
time.Sleep(800 * time.Millisecond)
fmt.Println("finish")
}()
}
func (w *Worker) Shutdown(ctx context.Context) error {
ch := make(chan struct{})
go func() {
w.wg.Wait()
close(ch)
}()
select {
case <-ch:
return nil
case <-ctx.Done():
// time out
// close(ch)
return errors.New("time out")
}
}
func main() {
worker := &Worker{
wg: sync.WaitGroup{},
}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1000*time.Millisecond))
defer cancel()
worker.Do()
if err := worker.Shutdown(ctx); err != nil {
fmt.Println(err)
}
}
有一個 worker 物件,這個物件會做一些耗時操作。我們在 Do() 方法中使用 goroutine 來處理具體邏輯,在開啟goroutine 之前呼叫 wg.Add(1), 然後在 goroutine 的 defer 函數中 呼叫 wg.Done(),在 Shutdown() 方法中使用 wg.Wait() 來等待 Do() 方法執行結束。在 Shutdown() 方法中,如果 goroutine 執行結束了,就會往 ch channel 中傳送訊息,底下 select {} 中收到 ch channel 訊息後,Shutdown 方法就可以正常返回,函數到此執行結束。如果 Do() 方法執行太長超出了 ctx 的最長時間。Shutdown 會返回 "time out" 異常。返回之前可以進行資源的處理。
在這個例子中呼叫者可以通過控制上下文控制來控制 Worker 物件的生命週期。
Go 的 sync 包提供了 mutex、RwMutex,分別是互斥鎖與讀寫鎖。
在需要共用記憶體的地方,如果有多個物件同時對這個地方進行讀寫操作,就會產生競態條件。我們需要使用程式語言提供的同步原語對讀寫操作進行保護。互斥鎖就是同一時刻一段程式碼只能被一個執行緒/協程執行。Mutex 在大量並行的情況下,會造成鎖等待,對效能的影響比較大。在讀多寫少的場景下可以使用讀寫鎖。讀寫鎖主要遵循以下原則:
下面是一個互斥鎖簡單範例。在需要存取共用資源的地方使用 Lock 和 Unlock 方法。表示這部分操作屬於「原子操作」。使用時需要注意鎖粒度。我們要儘可能的減小鎖粒度。鎖粒度小了,鎖競爭就少。對程式的效能影響就小。
var l sync.Mutex
var a string
func f() {
a = "hello, world"
l.Unlock()
}
func main() {
l.Lock()
go f()
l.Lock()
print(a)
}
sync/atomic 提供了用於實現同步演演算法的底層原子記憶體原語
copy-on-write 思路在微服務降級或者 local cache 經常使用。我們可以使用 atomic 來實現。atmic 依賴於原子 CPU 指令而不是依賴外部鎖,效能不俗。
type NumberArray struct {
array []int
}
func main() {
var atomic atomic.Value
go func() {
var i int
for {
i++
numArray := &NumberArray{
array: []int{i, i + 1, i + 2, i + 3},
}
atomic.Store(numArray)
time.Sleep(100 * time.Millisecond)
}
}()
time.Sleep(500 * time.Millisecond) //先讓資料更新
var wg sync.WaitGroup
for n := 0; n < 100000; n++ {
wg.Add(1)
time.Sleep(100 * time.Millisecond)
go func() {
numArray := atomic.Load()
fmt.Println(numArray)
wg.Done()
}()
}
wg.Wait()
}
errgroup 為處理公共任務的子任務的 goroutine 組提供同步、錯誤傳播和上下文取消。
https://github.com/go-kratos/kratos/blob/main/app.go
func (a *App) Run() error {
instance, err := a.buildInstance()
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(NewContext(a.ctx, a))
wg := sync.WaitGroup{}
for _, srv := range a.opts.servers {
srv := srv
eg.Go(func() error {
<-ctx.Done() // wait for stop signal
stopCtx, cancel := context.WithTimeout(NewContext(a.opts.ctx, a), a.opts.stopTimeout)
defer cancel()
return srv.Stop(stopCtx)
})
wg.Add(1)
eg.Go(func() error {
wg.Done()
return srv.Start(NewContext(a.opts.ctx, a))
})
}
wg.Wait()
if a.opts.registrar != nil {
rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout)
defer rcancel()
if err := a.opts.registrar.Register(rctx, instance); err != nil {
return err
}
a.lk.Lock()
a.instance = instance
a.lk.Unlock()
}
c := make(chan os.Signal, 1)
signal.Notify(c, a.opts.sigs...)
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-c:
if err := a.Stop(); err != nil {
a.opts.logger.Errorf("failed to stop app: %v", err)
return err
}
}
}
})
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}
channel 是 Go 語言中一種型別安全的訊息佇列,充當兩個 goroutine 之間的通道,通過它可以進行任意資源的的交換。同時通過 channel 實現 Go 的同步機制。
當建立的 channel 沒有緩衝時,稱為無緩衝通道。無緩衝管道必須讀寫同時操作才會有效果,如果只進行讀或者只進行寫那麼會被阻塞,等待另外一方的操作。
建立的 channel 具有緩衝時,稱為緩衝通道。緩衝通道是固定容量的先進先出(FIFO)佇列。容量在佇列建立的時候就已經固定,執行是無法更改。消費者從佇列中取出元素並處理它們。如果佇列為空並且消費者無事可做,就會發生阻塞,直到生產者放入一個元素。如果佇列已滿,並且消費者未開始消費,則會發生阻塞,知道消費者消費一個元素。
不論是無緩衝通道還是緩衝通道,都不能往一個已關閉的 channel 傳送訊息,否則程式會直接 panic ,因此,最好是由傳送端進行關閉 channel。
func main() {
ch := make(chan int)
close(ch)
fmt.Println(<-ch)//0
//close(ch) //panic: close of closed channel
//ch <- 2 //panic: send on closed channel
chs := make(chan int, 2)
chs <- 1
chs <- 3
close(chs)
fmt.Println(<-chs)
fmt.Println(<-chs)
fmt.Println(<-chs)//0
// chs <- 2 //panic: send on closed channel
}
關於channel 還可以檢視這篇文章 polarisxu:無緩衝和有緩衝通道