一文詳解Go中的並行【20 張動圖演示】

2022-09-08 14:01:06
golang中各種並行模式看起來是怎樣的?下面本篇文章就通過20 張動圖為你演示 Go 並行,希望對大家有所幫助!

千萬級資料並行如何處理?進入學習

如果你更喜歡通過視訊瞭解本文,請點選觀看我在GopherCon上的演講 www.youtube.com/watch?v=KyuFeiG3Y6...

Go語言最強大的特性之一就是基於 Tony Hoare’s CSP 這篇論文實現的內建並行. Go在設計時就考慮了並行並允許我們構建複雜的並行管道。那你有沒有想過,各種並行模式看起來是怎樣的?

你一定想過。 我們多數情況下都會通過想象來思考問題. 如果我問你一個關於「1到100的數位」的問題,你腦子裡就會下意識的出現一系列畫面。例如,我會把它想象成一條從我開始的直線,從數位1到20然後右轉90度一直到1000+。我記得我很小的時候,在我們的幼兒園裡,衣帽間裡有很多數位,寫在牆上,數位20恰好在拐角處。你可能有你自己的關於數位的畫面。另一個常見的例子是一年四季的視覺展現——有人將之想象成一個盒子,有人將之想象成一個圈。

無論如何, 我想用Go和WebGL把我對於常見的並行模式的具象化嘗試展現給大家.這多多少少代表了我對於並行程式的理解。如果能聽到我和大家腦海中的畫面有什麼不同,一定會非常有趣。 我特別想知道 Rob Pike 或者 Sameer Ajmani 腦子裡是怎麼描繪並行影象的. 我打賭我會很感興趣的。【相關推薦:Go視訊教學

那麼,讓我們從一個很基礎的「Hello,Concurrent World」例子開始我們今天的主題。

Hello, Concurrent world

程式碼很簡單——單個通道,單個goroutine,單次寫入,單次讀取。

package main

func main() {
    // 建立一個int型別的通道
    ch := make(chan int)

    // 開啟一個匿名 goroutine
    go func() {
        // 向通道傳送數位42
        ch <- 42
    }()
    // 從通道中讀取
    <-ch
}

轉到互動式 WebGL 動畫 Hello, World

藍色線代表隨時間執行的goroutine. 連線‘main’和‘go #19’的藍色細線用來標記goroutine的開始和結束同時展示了父子關係,最後,紅線代表傳送/接收動作. 雖然這是兩個獨立的動作,我還是嘗試用「從 A 傳送到 B」的動畫將他們表示成一個動作. goroutine 名稱中的「#19」 是 goroutine 真實的內部ID, 其獲取方法參考了 Scott Mansfield 的 「Goroutine IDs」 這篇文章。

Timers

實際上,你可以通過以下方法構建一個簡單的計時器——建立一個通道, 開啟一個 goroutine 讓其在指定的時間間隔後向通道中寫入資料,然後將這個通道返回給呼叫者。於是呼叫函數就會在讀取通道時阻塞,直到之前設定的時間間隔過去。接下來我們呼叫24次計時器然後嘗試具象化呼叫過程。

package main

import "time"

func timer(d time.Duration) <-chan int {
    c := make(chan int)
    go func() {
        time.Sleep(d)
        c <- 1
    }()
    return c
}

func main() {
    for i := 0; i < 24; i++ {
        c := timer(1 * time.Second)
        <-c
    }
}

轉到互動式 WebGL 動畫 Recurrent Timers

很整潔,對嗎? 我們繼續。

Ping-pong

這個並行例子取自谷歌員工 Sameer Ajmani 「Advanced Go Concurrency Patterns」 演講。當然,這個模式不算非常高階,但是對於那些只熟悉Go的並行機制的人來說它看起來可能非常新鮮有趣。

這裡我們用一個通道代表乒乓球檯. 一個整型變數代表球, 然後用兩個goroutine代表玩家,玩家通過增加整型變數的值(點選計數器)模擬擊球動作。

package main

import "time"

func main() {
    var Ball int
    table := make(chan int)
    go player(table)
    go player(table)

    table <- Ball
    time.Sleep(1 * time.Second)
    <-table
}

func player(table chan int) {
    for {
        ball := <-table
        ball++
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}

轉到互動式 WebGL 動畫 Ping-Pong

這裡我建議你點選 連結 進入互動式 WebGL 動畫操作一下. 你可以放慢或者加速動畫,從不同的角度觀察。

現在,我們新增三個玩家看看。

    go player(table)
    go player(table)
    go player(table)

轉到互動式 WebGL 動畫 Ping-Pong 3 我們可以看到每個玩家都按照次序輪流操作,你可能會想為什麼會這樣。為什麼多個玩家(goroutine)會按照嚴格的順序接到「球」呢。

答案是 Go 執行時環境維護了一個 接收者 FIFO 佇列 (儲存需要從某一通道上接收資料的goroutine),在我們的例子裡,每個玩家在剛發出球后就做好了接球準備。我們來看一下更復雜的情況,加入100個玩家。

for i := 0; i < 100; i++ {
    go player(table)
}

轉到互動式 WebGL 動畫 Ping-Pong 100

先進先出順序很明顯了,是吧? 我們可以建立一百萬個goroutine,因為它們很輕量,但是對於實現我們的目的來說沒有必要。我們來想想其他可以玩的。 例如, 常見的訊息傳遞模式。

Fan-In

並行世界中流行的模式之一是所謂的 fan-in 模式。這與 fan-out 模式相反,稍後我們將介紹。簡而言之,fan-in 是一項功能,可以從多個輸入中讀取資料並將其全部多路複用到單個通道中。

舉例來說:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration) {
    var i int
    for {
        ch <- i
        i++
        time.Sleep(d)
    }
}

func reader(out chan int) {
    for x := range out {
        fmt.Println(x)
    }
}

func main() {
    ch := make(chan int)
    out := make(chan int)
    go producer(ch, 100*time.Millisecond)
    go producer(ch, 250*time.Millisecond)
    go reader(out)
    for i := range ch {
        out <- i
    }
}

Go to interactive WebGL animation Fan-In Pattern

如我們所見,第一個 producer 每100毫秒生成一次值,第二個每250毫秒生成一次值,但是 reader 會立即從這兩個生產者那裡接受值。實際上,多路複用發生在 main 的range迴圈中。

Workers

fan-in 相反的模式是 fan-out 或者worker 模式。多個 goroutine 可以從單個通道讀取,從而在CPU核心之間分配大量的工作量,因此是 worker 的名稱。在Go中,此模式易於實現-只需以通道為引數啟動多個goroutine,然後將值傳送至該通道-Go執行時會自動地進行分配和複用 :)

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasksCh
        if !ok {
            return
        }
        d := time.Duration(task) * time.Millisecond
        time.Sleep(d)
        fmt.Println("processing task", task)
    }
}

func pool(wg *sync.WaitGroup, workers, tasks int) {
    tasksCh := make(chan int)

    for i := 0; i < workers; i++ {
        go worker(tasksCh, wg)
    }

    for i := 0; i < tasks; i++ {
        tasksCh <- i
    }

    close(tasksCh)
}

func main() {
    var wg sync.WaitGroup
    wg.Add(36)
    go pool(&wg, 36, 50)
    wg.Wait()
}

Go

這裡值得一提的是:並行性。如您所見,所有goroutine並行’執行‘,等待通道給予它們’工作‘。鑑於上面的動畫,很容易發現goroutine幾乎立即接連地收到它們的工作。不幸的是,該動畫在goroutine確實在處理工作還是僅僅是在等待輸入的地方沒有用顏色顯示出來,但是此動畫是在GOMAXPROCS=4的情況下錄製的,因此只有4個goroutine有效地並行執行。我們將很快討論這個主題。

現在,讓我們做一些更復雜的事情,並啟動一些有自己workers(subworkers)的workers。

package main

import (
    "fmt"
    "sync"
    "time"
)

const (
    WORKERS    = 5
    SUBWORKERS = 3
    TASKS      = 20
    SUBTASKS   = 10
)

func subworker(subtasks chan int) {
    for {
        task, ok := <-subtasks
        if !ok {
            return
        }
        time.Sleep(time.Duration(task) * time.Millisecond)
        fmt.Println(task)
    }
}

func worker(tasks <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasks
        if !ok {
            return
        }

        subtasks := make(chan int)
        for i := 0; i < SUBWORKERS; i++ {
            go subworker(subtasks)
        }
        for i := 0; i < SUBTASKS; i++ {
            task1 := task * i
            subtasks <- task1
        }
        close(subtasks)
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(WORKERS)
    tasks := make(chan int)

    for i := 0; i < WORKERS; i++ {
        go worker(tasks, &wg)
    }

    for i := 0; i < TASKS; i++ {
        tasks <- i
    }

    close(tasks)
    wg.Wait()
}

Go to interactive WebGL animation Workers of workers 很好。當然,我們可以將worker和subworker的數量設定為更高的值,但是我試圖使動畫清晰易懂。

更酷的 fan-out 模式確實存在,例如動態數量的worker/subworker,通過通道傳送通道,但是 fan-out 的想法現在應該很清楚了。

伺服器

下一個常見的模式類似於扇出,但是會在很短的時間內生成goroutine,只是為了完成某些任務。它通常用於實現伺服器-建立偵聽器,迴圈執行accept()併為每個接受的連線啟動goroutine。它非常具有表現力,可以實現儘可能簡單的伺服器處理程式。看一個簡單的例子:

package main

import "net"

func handler(c net.Conn) {
    c.Write([]byte("ok"))
    c.Close()
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c)
    }
}

Go to 互動式WebGL動畫 Servers

這不是很有趣-似乎並行方面沒有發生任何事情。當然,在引擎蓋下有很多複雜性,這是我們特意隱藏的。 「簡單性很複雜」.

但是,讓我們回到並行性並向我們的伺服器新增一些互動。假設每個處理程式都希望非同步寫入記錄器。在我們的範例中,記錄器本身是一個單獨的goroutine,它可以完成此任務。

package main

import (
    "fmt"
    "net"
    "time"
)

func handler(c net.Conn, ch chan string) {
    ch <- c.RemoteAddr().String()
    c.Write([]byte("ok"))
    c.Close()
}

func logger(ch chan string) {
    for {
        fmt.Println(<-ch)
    }
}

func server(l net.Listener, ch chan string) {
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c, ch)
    }
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    ch := make(chan string)
    go logger(ch)
    go server(l, ch)
    time.Sleep(10 * time.Second)
}

Go to 互動式WebGL動畫 Servers 2

不是嗎?但是很容易看到,如果請求數量增加並且紀錄檔記錄操作花費一些時間(例如,準備和編碼資料),我們的* logger * goroutine很快就會成為瓶頸。我們可以使用一個已知的扇出模式。我們開始做吧。

伺服器+工作者

帶工作程式的伺服器範例是記錄器的高階版本。它不僅可以完成一些工作,而且還可以通過* results *通道將其工作結果傳送回池中。沒什麼大不了的,但是它將我們的記錄器範例擴充套件到了更實際的範例。

讓我們看一下程式碼和動畫:

package main

import (
    "net"
    "time"
)

func handler(c net.Conn, ch chan string) {
    addr := c.RemoteAddr().String()
    ch <- addr
    time.Sleep(100 * time.Millisecond)
    c.Write([]byte("ok"))
    c.Close()
}

func logger(wch chan int, results chan int) {
    for {
        data := <-wch
        data++
        results <- data
    }
}

func parse(results chan int) {
    for {
        <-results
    }
}

func pool(ch chan string, n int) {
    wch := make(chan int)
    results := make(chan int)
    for i := 0; i < n; i++ {
        go logger(wch, results)
    }
    go parse(results)
    for {
        addr := <-ch
        l := len(addr)
        wch <- l
    }
}

func server(l net.Listener, ch chan string) {
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c, ch)
    }
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    ch := make(chan string)
    go pool(ch, 4)
    go server(l, ch)
    time.Sleep(10 * time.Second)
}

Go to 互動式WebGL動畫 Server + Worker 我們在4個goroutine之間分配了工作,有效地提高了記錄器的吞吐量,但是從此動畫中,我們可以看到記錄器仍然可能是問題的根源。成千上萬的連線在分配之前會匯聚在一個通道中,這可能導致記錄器再次成為瓶頸。但是,當然,它會在更高的負載下發生。

並行素篩(素篩指素數篩法)

足夠的扇入/扇出樂趣。讓我們看看更復雜的並行演演算法。我最喜歡的例子之一是Concurrent Prime Sieve,可以在[Go Concurrency Patterns]對話中找到。素數篩,或[Eratosthenes篩)是一種古老的演演算法,用於查詢達到給定限制的素數。它通過按順序消除所有質數的倍數來工作。天真的演演算法並不是真正有效的演演算法,尤其是在多核計算機上。

該演演算法的並行變體使用goroutine過濾數位-每個發現的素數一個goroutine,以及用於將數位從生成器傳送到過濾器的通道。找到質數後,它將通過通道傳送到* main *以進行輸出。當然,該演演算法也不是很有效,特別是如果您想找到大質數並尋找最低的Big O複雜度,但是我發現它非常優雅。

// 並行的主篩
package main

import "fmt"

// 將序列2、3、4,...傳送到頻道「 ch」。
func Generate(ch chan<- int) {
    for i := 2; ; i++ {
        ch <- i // Send 'i' to channel 'ch'.
    }
}

//將值從通道「 in」複製到通道「 out」,
//刪除可被「素數」整除的那些。
func Filter(in <-chan int, out chan<- int, prime int) {
    for {
        i := <-in // Receive value from 'in'.
        if i%prime != 0 {
            out <- i // Send 'i' to 'out'.
        }
    }
}

//主篩:菊花鏈過濾器過程。
func main() {
    ch := make(chan int) // Create a new channel.
    go Generate(ch)      // Launch Generate goroutine.
    for i := 0; i < 10; i++ {
        prime := <-ch
        fmt.Println(prime)
        ch1 := make(chan int)
        go Filter(ch, ch1, prime)
        ch = ch1
    }
}

轉到互動式WebGL動畫

PrimeSieve

,請以互動模式隨意播放此動畫。我喜歡它的說明性-它確實可以幫助您更好地理解該演演算法。 * generate * goroutine發出從2開始的每個整數,每個新的goroutine僅過濾特定的質數倍數-2、3、5、7 …,將第一個找到的質數傳送給* main *。如果旋轉它從頂部看,您會看到從goroutine傳送到main的所有數位都是質數。漂亮的演演算法,尤其是在3D中。

GOMAXPROCS(調整並行的執行效能)

現在,讓我們回到我們的工作人員範例。還記得我告訴過它以GOMAXPROCS = 4執行嗎?那是因為所有這些動畫都不是藝術品,它們是真實程式的真實痕跡。

讓我們回顧一下GOMAXPROCS是什麼。

GOMAXPROCS設定可以同時執行的最大CPU數量。

當然,CPU是指邏輯CPU。我修改了一些範例,以使他們真正地工作(而不僅僅是睡覺)並使用實際的CPU時間。然後,我執行了程式碼,沒有進行任何修改,只是設定了不同的GOMAXPROCS值。 Linux機上盒有2個CPU,每個CPU具有12個核心,因此有24個核心。

因此,第一次執行演示了該程式在1個核心上執行,而第二次-使用了所有24個核心的功能。

WebGL動畫-1| WebGL動畫-24GOMAXPROCS1

GOMAXPROCS24

這些動畫中的時間速度是不同的(我希望所有動畫都適合同一時間/ height),因此區別很明顯。當GOMAXPROCS = 1時,下一個工作人員只有在上一個工作完成後才能開始實際工作。在GOMAXPROCS = 24的情況下,加速非常大,而複用的開銷可以忽略不計。

不過,重要的是要了解,增加GOMAXPROCS並不總是可以提高效能,在某些情況下實際上會使它變得更糟。

Goroutines leak

我們可以從Go中的並行時間中證明什麼呢?我想到的一件事情是goroutine洩漏。例如,如果您啟動goroutine,但超出範圍,可能會發生洩漏。或者,您只是忘記新增結束條件,而執行了for{}迴圈。

第一次在程式碼中遇到goroutine洩漏時,我的腦海中出現了可怕的影象,並且在下個週末我寫了 expvarmon。現在,我可以使用WebGL視覺化該恐怖影象。

看一看:

Go

僅僅是看到此,我都會感到痛苦:) 所有這些行都浪費了資源,並且是您程式的定時炸彈。

Parallelism is not Concurrency

我要說明的最後一件事是並行性與並行性之間的區別。這個話題涵蓋了 很多 ,Rob Pike在這個話題上做了一個精彩的演講。確實是#必須觀看的視訊之一。

簡而言之,

並行是簡單的並行執行事物。

並行是一種構造程式的方法。

因此,並行程式可能是並行的,也可能不是並行的,這些概念在某種程度上是正交的。我們在演示 GOMAXPROCS 設定效果時已經看到了這一點。

我可以重複所有這些連結的文章和談話,但是一張圖片相當於說了一千個字。我在這裡能做的是視覺化這個差異。因此,這是並行。許多事情並行執行。

轉到互動式WebGL動畫 並行 1

這也是並行性:

轉到互動式WebGL動畫

Go

但這是並行的:

PrimeSieve

還有這個:

Workers of workers

這也是並行的:

Go

How it was made

為了建立這些動畫,我編寫了兩個程式:gotracergothree.js 庫。首先,gotracer執行以下操作:

  • 解析Go程式的AST樹(Abstract Syntax Tree,抽象語法樹),並在與並行相關的事件上插入帶有輸出的特殊命令-啟動/停止goroutine,建立通道,向/從通道傳送/接收。
  • 執行生成的程式
  • 分析此特殊輸出,並生成帶有事件和時間戳描述的JSON。

生成的JSON範例:JSON sample

接下來,gothree.js使用令人驚歎的 Three.js 庫的功能來使用WebGL繪製3D線和物件。我做了一些很小的包裝使其適合單個html頁面-就是這樣。

但是,這種方法非常有侷限。我必須準確地選擇範例,重新命名通道和goroutine,以使得或多或少複雜的程式碼產生正確的跟蹤。使用這種方法,如果goroutine具有不同的名稱,就沒有簡便的方法來關聯goroutine之間的通道。更不用說通過chan型別的通道傳送的通道。時序方面也存在很大的問題-輸出到stdout可能比傳送值花費更多的時間,因此在某些情況下,我必須放置time.Sleep(一些毫秒)以獲取正確的動畫。

基本上,這就是為什麼我還沒有開原始碼的原因。我正在玩 Dmitry Vyukov 的 執行跟蹤器,它似乎提供了事件的詳細資訊,但是沒有包含傳送值得資訊。也許有更好的方法可以實現預期的目標。如果您有想法,請在twitter給我寫信,或在評論中寫給我。如果能將這個為期兩週的工具擴充套件為任何Go程式的真正的偵錯/跟蹤工具,那將是非常棒的。

我也很樂意看到此處未列出的更有趣的並行演演算法和模式。歡迎隨時在評論中寫一個。

Happy coding!

UPD: 可在 github.com/pan/gotrace 上使用此工具,並使用Go Execution Tracer和打了修補程式的執行時生成跟蹤。

另外,我願意接受新工作,因此,如果您公司/團隊對我感興趣,有難題需要使用Go解決,我可以遠端工作(或者您在巴塞羅那)並在招聘,請告訴我:)

英文原文地址:https://divan.dev/posts/go_concurrency_visualize/

更多程式設計相關知識,請存取:!!

以上就是一文詳解Go中的並行【20 張動圖演示】的詳細內容,更多請關注TW511.COM其它相關文章!