Go 並行程式設計

2023-10-31 09:00:37

基礎概念

程序與執行緒

程序是一次程式在作業系統執行的過程,需要消耗一定的CPU、時間、記憶體、IO等。每個程序都擁有著獨立的記憶體空間和系統資源。程序之間的記憶體是不共用的。通常需要使用 IPC 機制進行資料傳輸。程序是直接掛在作業系統上執行的,是作業系統分配硬體資源的最小單位。

執行緒是程序的一個執行實體,一個程序可以包含若干個執行緒。執行緒共用程序的記憶體空間和系統資源。執行緒是 CPU 排程的最小單位。因為執行緒之間是共用記憶體的,所以它的建立、切換、銷燬會比程序所消耗的系統資源更少。

舉一個形象的例子:一個作業系統就相當於一支師級編制的軍隊作戰,一個師會把手上的作戰資源獨立的分配各個團。而一個團級的編制就相當於一個程序,團級內部會根據具體的作戰需求編制出若干的營連級,營連級會共用這些作戰資源,它就相當於是計算機中的執行緒。

什麼是協程

協程是一種更輕量的執行緒,被稱為使用者態執行緒。它不是由作業系統分配的,對於作業系統來說,協程是透明的。協程是程式設計師根據具體的業務需求創立出來的。一個執行緒可以跑多個協程。協程之間切換不會阻塞執行緒,而且非常的輕量,更容易實現並行程式。Golang 中使用 goroutine 來實現協程。

並行與並行

可以用一句話來形容:多執行緒程式執行在單核CPU上,稱為並行;多執行緒程式執行在多核CPU上,稱為並行。

Goroutine

Golang 中開啟協程的語法非常簡單,使用 Go 關鍵詞即可:

func main() {
	go hello()
	fmt.Println("主執行緒結束")
}

func hello() {
	fmt.Println("hello world")
}

// 結果
主執行緒結束

程式列印結果並非是我們想象的先列印出 「hello world」,再列印出 「主執行緒結束」。這是因為協程是非同步執行的,當協程還沒有來得及列印,主執行緒就已經結束了。我們只需要在主執行緒中暫停一秒就可以列印出想要的結果。

func main() {
	go hello()
	time.Sleep(1 * time.Second) // 暫停一秒
	fmt.Println("主執行緒結束")
}

// 結果
hello world
主執行緒結束

這裡的一次程式執行其實是執行了一個程序,只不過這個程序就只有一個執行緒。

在 Golang 中開啟一個協程是非常方便的,但我們要知道並行程式設計充滿了複雜性與危險性,需要小心翼翼的使用,以防出現了不可預料的問題。

編寫一個簡單的並行程式

用來檢測各個站點是否能響應:

func main() {
	start := time.Now()

	apis := []string{
		"https://management.azure.com",
		"https://dev.azure.com",
		"https://api.github.com",
		"https://outlook.office.com/",
		"https://api.somewhereintheinternet.com/",
		"https://graph.microsoft.com",
	}

	for _, api := range apis {
		_, err := http.Get(api)
		if err != nil {
			fmt.Printf("響應錯誤: %s\n", api)
			continue
		}

		fmt.Printf("成功響應: %s\n", api)
	}

	elapsed := time.Since(start) // 用來記錄當前程序執行所消耗的時間
	fmt.Printf("主執行緒執行結束,消耗 %v 秒!\n", elapsed.Seconds())
}

// 結果
成功響應: https://management.azure.com
成功響應: https://dev.azure.com
成功響應: https://api.github.com
成功響應: https://outlook.office.com/
響應錯誤: https://api.somewhereintheinternet.com/
成功響應: https://graph.microsoft.com
主執行緒執行結束,消耗 5.4122892 秒!

我們檢測六個站點一個消耗了5秒的時間,假設現在需要對一百個站點進行檢測,那麼這個過程就會耗費大量的時間,這些時間都被消耗到了 http.Get(api) 這裡。

http.get(api) 還沒有獲取到結果時,主執行緒會等待請求的響應,會阻塞在這裡。這時候我們就可以使用協程來優化這段程式碼,將各個網路請求的檢測變成非同步執行,從而減少程式響應的總時間。

func main() {
	...

	for _, api := range apis {
		go checkApi(api)
	}

	time.Sleep(3 * time.Second)  // 等待三秒,不然主執行緒會瞬間結束,導致協程被殺死
	...
}

func checkApi(api string) {
	_, err := http.Get(api)
	if err != nil {
		fmt.Printf("響應錯誤: %s\n", api)
		return
	}

	fmt.Printf("成功響應: %s\n", api)
}

// 結果
響應錯誤: https://api.somewhereintheinternet.com/
成功響應: https://api.github.com
成功響應: https://graph.microsoft.com
成功響應: https://management.azure.com
成功響應: https://dev.azure.com
成功響應: https://outlook.office.com/
主執行緒執行結束,消耗 3.0013905 秒!

可以看到,使用 goroutine 後,除去等待的三秒鐘,程式的響應時間產生了質的變化。但美中不足的是,我們只能在原地傻傻的等待三秒。那麼有沒有一種方法可以感知協程的執行狀態,當監聽到協程執行結束時再優雅的關閉主執行緒呢?

sync.waitgroup

sync.waitgroup 可以完成我們的」優雅小目標「。 sync.waitgroup 是 goroutine 的一個「計數工具」,通常用來等待一組 goroutine 的執行完成。當我們需要監聽協程是否執行完成就可以使用該工具。sync.waitgroup 提供了三種方法:

  1. Add(n int):新增 n 個goroutine 到 WaitGroup 中,表示需要等待 n 個 goroutine 執行完成。

  2. Done():每個 goroutine 執行完成時呼叫 Done 方法,表示該 goroutine 已完成執行,相當於把計數器 -1。

  3. Wait():主執行緒呼叫 Wait 方法來等待所有 goroutine 執行完成,會阻塞到所有的 goroutine 執行完成。

我們來使用 sync.waitgroup 來優雅的結束程式:

package main

import (
	"fmt"
	"net/http"
	"sync"
	"time"
)

func main() {
	var (
		start = time.Now()
		apis  = []string{
			"https://management.azure.com",
			"https://dev.azure.com",
			"https://api.github.com",
			"https://outlook.office.com/",
			"https://api.somewhereintheinternet.com/",
			"https://graph.microsoft.com",
		}
		wg = sync.WaitGroup{} // 初始化WaitGroup
	)

	wg.Add(len(apis)) // 表示需要等待六個協程請求

	for _, api := range apis {
		go checkApi(api, &wg)
	}

	wg.Wait()                    // 阻塞主執行緒,等待 WaitGroup 歸零後再繼續
	elapsed := time.Since(start) // 用來記錄當前程序執行所消耗的時間
	fmt.Printf("執行緒執行結束,消耗 %v 秒!\n", elapsed.Seconds())
}

func checkApi(api string, wg *sync.WaitGroup) {
	defer wg.Done() // 標記當前協程執行完成,計數器-1
	_, err := http.Get(api)
	if err != nil {
		fmt.Printf("響應錯誤: %s\n", api)
		return
	}

	fmt.Printf("成功響應: %s\n", api)
}

// 結果
響應錯誤: https://api.somewhereintheinternet.com/
成功響應: https://api.github.com
成功響應: https://management.azure.com
成功響應: https://graph.microsoft.com
成功響應: https://dev.azure.com
成功響應: https://outlook.office.com/
執行緒執行結束,消耗 0.9718695 秒!

可以看到,我們優雅了監聽了所有協程是否執行完畢,且大幅度縮短了程式執行時間。但同時我們的列印響應資訊也是無序的了,這代表了我們的協程確確實實非同步的請求了所有的站點。

Channel

channel 也可以完成我們的」優雅小目標「。 channel 的中文名字被稱為「通道」,是 goroutine 的通訊機制。當需要將值從一個 goroutine 傳送到另一個時,可以使用通道。Golang 的並行理念是:「通過通訊共用記憶體,而不是通過共用記憶體通訊」。channel 是並行程式設計中的一個重要概念,遵循著資料先進先出,後進後出的原則。

宣告 Channel 

宣告通道需要使用內建的 make() 函數:

ch := make(chan <type>) // type 代表資料型別,如 string、int

Channel 傳送資料和接收資料

建立好 channle 後可以使用 <- 來傳送/接受資料:

func main() {
	ch := make(chan int)
	go func() {
		ch <- 1 // 傳送
	}()
	a := <-ch // 接收
	fmt.Println(a)
	close(ch) // 關閉通道
}

// 結果
1

每個傳送資料都必須有正確的接受方式,否則會編譯錯誤。編譯錯誤比誤用 channel 更好!

接收 channel 中的發來的資料時, a := <-ch 這裡是處於阻塞狀態的。我們可以利用這點來監聽協程是否執行完成,還是上文的例子,但這次我們不使用 sync.waitgroup:

func main() {
	var (
		start = time.Now()
		apis  = []string{
			"https://management.azure.com",
			"https://dev.azure.com",
			"https://api.github.com",
			"https://outlook.office.com/",
			"https://api.somewhereintheinternet.com/",
			"https://graph.microsoft.com",
		}
		ch = make(chan string)
	)

	for _, api := range apis {
		go checkApi(api, ch)
	}

	// 因為我們一共有六個請求,所以我們要接收六次
	for i := 0; i < 6; i++ {
		fmt.Println(<-ch)
	}
	elapsed := time.Since(start) // 用來記錄當前程序執行所消耗的時間
	fmt.Printf("執行緒執行結束,消耗 %v 秒!\n", elapsed.Seconds())
}

func checkApi(api string, ch chan string) {
	_, err := http.Get(api)
	if err != nil {
		ch <- fmt.Sprintf("響應錯誤: %s", api)
		return
	}
	ch <- fmt.Sprintf("成功響應: %s", api)
}

// 結果
成功響應: https://api.github.com
成功響應: https://management.azure.com
成功響應: https://graph.microsoft.com
成功響應: https://outlook.office.com/
成功響應: https://dev.azure.com
執行緒執行結束,消耗 0.9013927 秒!

可以看到,我們利用通道接收資料的阻塞特性,達到了和使用 sync.waitgroup 一樣的效果。

有緩衝 Channel

預設情況下,我們建立的 channel 是無緩衝的,意味著有接收資料,就一定要有對應的傳送資料,否則就會永久阻塞程式。有緩衝的 channel 則可以避免這種限制。建立一個有緩衝的 channel:

ch := make(chan <type>, <num>) // num 代表有緩衝通道的大小

有緩衝 channel 有點類似與佇列,它不限制傳送資料和接收資料,實現了接發 channel 的解耦。每次向 channel 中傳送資料不用管有沒有接收方,直接放入這個「佇列」中。當有接收方從佇列中取走資料時,就會從「佇列」中刪除這個值。當 channel 滿時,傳送資料會被阻塞,直到 channel 有空;當 channel 為空時,接收資料會被阻塞,直到 channel 有資料過來。

func main() {
	ch := make(chan int, 3)
	fmt.Printf("當前通道長度:%d\n", len(ch))
	send(ch, 1)
	send(ch, 2)
	send(ch, 3)
	fmt.Println("所有資料已經已經放入通道")
	fmt.Printf("當前通道長度:%d\n", len(ch))

	for i := 0; i < 3; i++ {
		fmt.Println(<-ch)
	}

	fmt.Println("主執行緒結束")
}

// 結果
當前通道長度:0
所有資料已經已經放入通道
當前通道長度:3
1
2
3
主執行緒結束

這裡並沒有什麼不同的操作,程式也在正常執行,但我們把通道大小改成比 3 更小時,編譯就會出錯,提示 fatal error: all goroutines are asleep - deadlock!這是因為我們在主執行緒中連續的執行send,最終超出了通道的限制。

func main() {
	ch := make(chan int, 2) // 把通道改小
	...
}

// 結果
當前通道長度:0
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.send(...)
        E:/project/gotest/main.go:8
main.main()
        E:/project/gotest/main.go:16 +0xfe

嘗試使用協程來執行 send 函數:

func main() {
	...
	go send(ch, 1)
	go send(ch, 2)
	go send(ch, 3)
	...
}

// 結果
當前通道長度:0
所有資料已經已經放入通道
當前通道長度:2
1
2
3
主執行緒結束

channel 與 goroutine 有著千絲萬縷的關係,在使用 channel 時一定要使用 goroutine。

 

Channel 方向

channel 有一個很有意思的功能,當通道作為函數的引數時,可以限制該通道是傳送資料還是接收資料。當程式變的複雜後,這可以有效的記住每個通道的意圖。一個簡單的例子:

func send(ch chan<- string, message string) {
	fmt.Printf("傳送: %#v\n", message)
	ch <- message
}

func read(ch <-chan string) {
	fmt.Printf("接收: %#v\n", <-ch)
}

func main() {
	ch := make(chan string, 1)
	send(ch, "Hello World!")
	read(ch)
}

// 結果
傳送: "Hello World!"
接收: "Hello World!"

當錯誤使用通道時,編譯會不通過:

# command-line-arguments
.\main.go:11:32: invalid operation: cannot receive from send-only channel ch (variable of type chan<- string)

多路複用

什麼是多路複用

在實際的業務場景中,有時候需要根據不同的資料來處理不同的結果。舉一個白話例子:

例如現在你有一個物流中心,這個物流中心具備一個這樣的功能:從青島、北海、舟山等城市接收海鮮,從朔州、大同等城市接受煤礦,從廣州、蘇州等城市接受制造業產出的生活用品。接收到這些物資後,根據物資種類傳送到各個需要這些物資的地方,比如海鮮發往成渝做火鍋,煤礦發往長三角發電,生活用品發往陝甘供生活使用。

Golang 提供 select 關鍵詞來實現多路複用, select 就類似於這個物流中心,它用來接收多個 channel 中的資料,並做出不同的處理。select 的語法類似與 switch,都具備 case 和 default 但是 select 只適用於 channel。

func seafood(ch chan<- string) {
	time.Sleep(2 * time.Second)
	ch <- "海鮮已經送達"
}

func coal(ch chan<- string) {
	time.Sleep(5 * time.Second)
	ch <- "煤礦已經送達"
}

func goods(ch chan<- string) {
	time.Sleep(8 * time.Second)
	ch <- "生活用品已經送達"
}

func main() {
	var (
		text      string
		seafoodCh = make(chan string)
		coalCh    = make(chan string)
		goodsCh   = make(chan string)
		tick      = time.NewTicker(1 * time.Second)
	)
	go seafood(seafoodCh)
	go coal(coalCh)
	go goods(goodsCh)
	for _ = range tick.C {
		select {
		case text = <-seafoodCh:
			fmt.Println(text)
		case text = <-coalCh:
			fmt.Println(text)
		case <-goodsCh:
			fmt.Println("檢測到有生活用品,但是不做處理")
		default:
			fmt.Println("什麼都沒來")
		}
	}
}

// 結果
什麼都沒來
海鮮已經送達
什麼都沒來
什麼都沒來
煤礦已經送達
檢測到有生活用品,但是不做處理
什麼都沒來
什麼都沒來
什麼都沒來

 select 在執行時,如果所有 case 都不滿足,就會選擇 default 執行,如果沒有 default ,select 會一直阻塞等待,直到至少有一個 case 滿足被執行。如果同時有多個 case 被滿足,則 select 會隨機選擇一個執行。

使用多路複用來實現 channel 超時

Golang 沒有直接提供 channel 的超時機制,但是我們可以使用多路複用來實現:

func goods(ch chan<- string) {
	time.Sleep(8 * time.Second)
	ch <- "生活用品已經送達"
}

func main() {
	var (
		goodsCh = make(chan string)
	)
	go goods(goodsCh)
	fmt.Println("開始等待")
	select {
	case <-goodsCh:
		fmt.Println("生活用品送到了")
	case t := <-time.After(3 * time.Second): // 三秒後 channel 發出當前時間
		fmt.Println("沒有等到生活用品")
		fmt.Println(t.Format("2006-01-02 15:04:05"))
		break
	}
	fmt.Println("主執行緒執行結束")
}

// 結果
開始等待
沒有等到生活用品
2023-04-08 14:51:36
主執行緒執行結束

本系列文章:

  1. Go 並行程式設計 - Goroutine 基礎 (一)
  2. Go 並行程式設計 - 並行安全(二)
  3. Go 並行程式設計 - runtime 協程排程(三)