程序是一次程式在作業系統執行的過程,需要消耗一定的CPU、時間、記憶體、IO等。每個程序都擁有著獨立的記憶體空間和系統資源。程序之間的記憶體是不共用的。通常需要使用 IPC 機制進行資料傳輸。程序是直接掛在作業系統上執行的,是作業系統分配硬體資源的最小單位。
執行緒是程序的一個執行實體,一個程序可以包含若干個執行緒。執行緒共用程序的記憶體空間和系統資源。執行緒是 CPU 排程的最小單位。因為執行緒之間是共用記憶體的,所以它的建立、切換、銷燬會比程序所消耗的系統資源更少。
舉一個形象的例子:一個作業系統就相當於一支師級編制的軍隊作戰,一個師會把手上的作戰資源獨立的分配各個團。而一個團級的編制就相當於一個程序,團級內部會根據具體的作戰需求編制出若干的營連級,營連級會共用這些作戰資源,它就相當於是計算機中的執行緒。
協程是一種更輕量的執行緒,被稱為使用者態執行緒。它不是由作業系統分配的,對於作業系統來說,協程是透明的。協程是程式設計師根據具體的業務需求創立出來的。一個執行緒可以跑多個協程。協程之間切換不會阻塞執行緒,而且非常的輕量,更容易實現並行程式。Golang 中使用 goroutine 來實現協程。
可以用一句話來形容:多執行緒程式執行在單核CPU上,稱為並行;多執行緒程式執行在多核CPU上,稱為並行。
Golang 中開啟協程的語法非常簡單,使用 Go 關鍵詞即可:
func main() {
go hello()
fmt.Println("主執行緒結束")
}
func hello() {
fmt.Println("hello world")
}
// 結果
主執行緒結束
程式列印結果並非是我們想象的先列印出 「hello world」,再列印出 「主執行緒結束」。這是因為協程是非同步執行的,當協程還沒有來得及列印,主執行緒就已經結束了。我們只需要在主執行緒中暫停一秒就可以列印出想要的結果。
func main() {
go hello()
time.Sleep(1 * time.Second) // 暫停一秒
fmt.Println("主執行緒結束")
}
// 結果
hello world
主執行緒結束
這裡的一次程式執行其實是執行了一個程序,只不過這個程序就只有一個執行緒。
在 Golang 中開啟一個協程是非常方便的,但我們要知道並行程式設計充滿了複雜性與危險性,需要小心翼翼的使用,以防出現了不可預料的問題。
用來檢測各個站點是否能響應:
func main() {
start := time.Now()
apis := []string{
"https://management.azure.com",
"https://dev.azure.com",
"https://api.github.com",
"https://outlook.office.com/",
"https://api.somewhereintheinternet.com/",
"https://graph.microsoft.com",
}
for _, api := range apis {
_, err := http.Get(api)
if err != nil {
fmt.Printf("響應錯誤: %s\n", api)
continue
}
fmt.Printf("成功響應: %s\n", api)
}
elapsed := time.Since(start) // 用來記錄當前程序執行所消耗的時間
fmt.Printf("主執行緒執行結束,消耗 %v 秒!\n", elapsed.Seconds())
}
// 結果
成功響應: https://management.azure.com
成功響應: https://dev.azure.com
成功響應: https://api.github.com
成功響應: https://outlook.office.com/
響應錯誤: https://api.somewhereintheinternet.com/
成功響應: https://graph.microsoft.com
主執行緒執行結束,消耗 5.4122892 秒!
我們檢測六個站點一個消耗了5秒的時間,假設現在需要對一百個站點進行檢測,那麼這個過程就會耗費大量的時間,這些時間都被消耗到了 http.Get(api)
這裡。
在 http.get(api)
還沒有獲取到結果時,主執行緒會等待請求的響應,會阻塞在這裡。這時候我們就可以使用協程來優化這段程式碼,將各個網路請求的檢測變成非同步執行,從而減少程式響應的總時間。
func main() {
...
for _, api := range apis {
go checkApi(api)
}
time.Sleep(3 * time.Second) // 等待三秒,不然主執行緒會瞬間結束,導致協程被殺死
...
}
func checkApi(api string) {
_, err := http.Get(api)
if err != nil {
fmt.Printf("響應錯誤: %s\n", api)
return
}
fmt.Printf("成功響應: %s\n", api)
}
// 結果
響應錯誤: https://api.somewhereintheinternet.com/
成功響應: https://api.github.com
成功響應: https://graph.microsoft.com
成功響應: https://management.azure.com
成功響應: https://dev.azure.com
成功響應: https://outlook.office.com/
主執行緒執行結束,消耗 3.0013905 秒!
可以看到,使用 goroutine 後,除去等待的三秒鐘,程式的響應時間產生了質的變化。但美中不足的是,我們只能在原地傻傻的等待三秒。那麼有沒有一種方法可以感知協程的執行狀態,當監聽到協程執行結束時再優雅的關閉主執行緒呢?
sync.waitgroup 可以完成我們的」優雅小目標「。 sync.waitgroup 是 goroutine 的一個「計數工具」,通常用來等待一組 goroutine 的執行完成。當我們需要監聽協程是否執行完成就可以使用該工具。sync.waitgroup 提供了三種方法:
Add(n int):新增 n 個goroutine 到 WaitGroup 中,表示需要等待 n 個 goroutine 執行完成。
Done():每個 goroutine 執行完成時呼叫 Done 方法,表示該 goroutine 已完成執行,相當於把計數器 -1。
Wait():主執行緒呼叫 Wait 方法來等待所有 goroutine 執行完成,會阻塞到所有的 goroutine 執行完成。
我們來使用 sync.waitgroup 來優雅的結束程式:
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
func main() {
var (
start = time.Now()
apis = []string{
"https://management.azure.com",
"https://dev.azure.com",
"https://api.github.com",
"https://outlook.office.com/",
"https://api.somewhereintheinternet.com/",
"https://graph.microsoft.com",
}
wg = sync.WaitGroup{} // 初始化WaitGroup
)
wg.Add(len(apis)) // 表示需要等待六個協程請求
for _, api := range apis {
go checkApi(api, &wg)
}
wg.Wait() // 阻塞主執行緒,等待 WaitGroup 歸零後再繼續
elapsed := time.Since(start) // 用來記錄當前程序執行所消耗的時間
fmt.Printf("執行緒執行結束,消耗 %v 秒!\n", elapsed.Seconds())
}
func checkApi(api string, wg *sync.WaitGroup) {
defer wg.Done() // 標記當前協程執行完成,計數器-1
_, err := http.Get(api)
if err != nil {
fmt.Printf("響應錯誤: %s\n", api)
return
}
fmt.Printf("成功響應: %s\n", api)
}
// 結果
響應錯誤: https://api.somewhereintheinternet.com/
成功響應: https://api.github.com
成功響應: https://management.azure.com
成功響應: https://graph.microsoft.com
成功響應: https://dev.azure.com
成功響應: https://outlook.office.com/
執行緒執行結束,消耗 0.9718695 秒!
可以看到,我們優雅了監聽了所有協程是否執行完畢,且大幅度縮短了程式執行時間。但同時我們的列印響應資訊也是無序的了,這代表了我們的協程確確實實非同步的請求了所有的站點。
channel 也可以完成我們的」優雅小目標「。 channel 的中文名字被稱為「通道」,是 goroutine 的通訊機制。當需要將值從一個 goroutine 傳送到另一個時,可以使用通道。Golang 的並行理念是:「通過通訊共用記憶體,而不是通過共用記憶體通訊」。channel 是並行程式設計中的一個重要概念,遵循著資料先進先出,後進後出的原則。
宣告通道需要使用內建的 make()
函數:
ch := make(chan <type>) // type 代表資料型別,如 string、int
建立好 channle 後可以使用 <-
來傳送/接受資料:
func main() {
ch := make(chan int)
go func() {
ch <- 1 // 傳送
}()
a := <-ch // 接收
fmt.Println(a)
close(ch) // 關閉通道
}
// 結果
1
每個傳送資料都必須有正確的接受方式,否則會編譯錯誤。編譯錯誤比誤用 channel 更好!
接收 channel 中的發來的資料時, a := <-ch
這裡是處於阻塞狀態的。我們可以利用這點來監聽協程是否執行完成,還是上文的例子,但這次我們不使用 sync.waitgroup:
func main() {
var (
start = time.Now()
apis = []string{
"https://management.azure.com",
"https://dev.azure.com",
"https://api.github.com",
"https://outlook.office.com/",
"https://api.somewhereintheinternet.com/",
"https://graph.microsoft.com",
}
ch = make(chan string)
)
for _, api := range apis {
go checkApi(api, ch)
}
// 因為我們一共有六個請求,所以我們要接收六次
for i := 0; i < 6; i++ {
fmt.Println(<-ch)
}
elapsed := time.Since(start) // 用來記錄當前程序執行所消耗的時間
fmt.Printf("執行緒執行結束,消耗 %v 秒!\n", elapsed.Seconds())
}
func checkApi(api string, ch chan string) {
_, err := http.Get(api)
if err != nil {
ch <- fmt.Sprintf("響應錯誤: %s", api)
return
}
ch <- fmt.Sprintf("成功響應: %s", api)
}
// 結果
成功響應: https://api.github.com
成功響應: https://management.azure.com
成功響應: https://graph.microsoft.com
成功響應: https://outlook.office.com/
成功響應: https://dev.azure.com
執行緒執行結束,消耗 0.9013927 秒!
可以看到,我們利用通道接收資料的阻塞特性,達到了和使用 sync.waitgroup
一樣的效果。
預設情況下,我們建立的 channel 是無緩衝的,意味著有接收資料,就一定要有對應的傳送資料,否則就會永久阻塞程式。有緩衝的 channel 則可以避免這種限制。建立一個有緩衝的 channel:
ch := make(chan <type>, <num>) // num 代表有緩衝通道的大小
有緩衝 channel 有點類似與佇列,它不限制傳送資料和接收資料,實現了接發 channel 的解耦。每次向 channel 中傳送資料不用管有沒有接收方,直接放入這個「佇列」中。當有接收方從佇列中取走資料時,就會從「佇列」中刪除這個值。當 channel 滿時,傳送資料會被阻塞,直到 channel 有空;當 channel 為空時,接收資料會被阻塞,直到 channel 有資料過來。
func main() {
ch := make(chan int, 3)
fmt.Printf("當前通道長度:%d\n", len(ch))
send(ch, 1)
send(ch, 2)
send(ch, 3)
fmt.Println("所有資料已經已經放入通道")
fmt.Printf("當前通道長度:%d\n", len(ch))
for i := 0; i < 3; i++ {
fmt.Println(<-ch)
}
fmt.Println("主執行緒結束")
}
// 結果
當前通道長度:0
所有資料已經已經放入通道
當前通道長度:3
1
2
3
主執行緒結束
這裡並沒有什麼不同的操作,程式也在正常執行,但我們把通道大小改成比 3 更小時,編譯就會出錯,提示 fatal error: all goroutines are asleep - deadlock!
這是因為我們在主執行緒中連續的執行send,最終超出了通道的限制。
func main() {
ch := make(chan int, 2) // 把通道改小
...
}
// 結果
當前通道長度:0
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.send(...)
E:/project/gotest/main.go:8
main.main()
E:/project/gotest/main.go:16 +0xfe
嘗試使用協程來執行 send
函數:
func main() {
...
go send(ch, 1)
go send(ch, 2)
go send(ch, 3)
...
}
// 結果
當前通道長度:0
所有資料已經已經放入通道
當前通道長度:2
1
2
3
主執行緒結束
channel 與 goroutine 有著千絲萬縷的關係,在使用 channel 時一定要使用 goroutine。
channel 有一個很有意思的功能,當通道作為函數的引數時,可以限制該通道是傳送資料還是接收資料。當程式變的複雜後,這可以有效的記住每個通道的意圖。一個簡單的例子:
func send(ch chan<- string, message string) {
fmt.Printf("傳送: %#v\n", message)
ch <- message
}
func read(ch <-chan string) {
fmt.Printf("接收: %#v\n", <-ch)
}
func main() {
ch := make(chan string, 1)
send(ch, "Hello World!")
read(ch)
}
// 結果
傳送: "Hello World!"
接收: "Hello World!"
當錯誤使用通道時,編譯會不通過:
# command-line-arguments
.\main.go:11:32: invalid operation: cannot receive from send-only channel ch (variable of type chan<- string)
在實際的業務場景中,有時候需要根據不同的資料來處理不同的結果。舉一個白話例子:
例如現在你有一個物流中心,這個物流中心具備一個這樣的功能:從青島、北海、舟山等城市接收海鮮,從朔州、大同等城市接受煤礦,從廣州、蘇州等城市接受制造業產出的生活用品。接收到這些物資後,根據物資種類傳送到各個需要這些物資的地方,比如海鮮發往成渝做火鍋,煤礦發往長三角發電,生活用品發往陝甘供生活使用。
Golang 提供 select
關鍵詞來實現多路複用, select
就類似於這個物流中心,它用來接收多個 channel 中的資料,並做出不同的處理。select
的語法類似與 switch
,都具備 case 和 default 但是 select 只適用於 channel。
func seafood(ch chan<- string) {
time.Sleep(2 * time.Second)
ch <- "海鮮已經送達"
}
func coal(ch chan<- string) {
time.Sleep(5 * time.Second)
ch <- "煤礦已經送達"
}
func goods(ch chan<- string) {
time.Sleep(8 * time.Second)
ch <- "生活用品已經送達"
}
func main() {
var (
text string
seafoodCh = make(chan string)
coalCh = make(chan string)
goodsCh = make(chan string)
tick = time.NewTicker(1 * time.Second)
)
go seafood(seafoodCh)
go coal(coalCh)
go goods(goodsCh)
for _ = range tick.C {
select {
case text = <-seafoodCh:
fmt.Println(text)
case text = <-coalCh:
fmt.Println(text)
case <-goodsCh:
fmt.Println("檢測到有生活用品,但是不做處理")
default:
fmt.Println("什麼都沒來")
}
}
}
// 結果
什麼都沒來
海鮮已經送達
什麼都沒來
什麼都沒來
煤礦已經送達
檢測到有生活用品,但是不做處理
什麼都沒來
什麼都沒來
什麼都沒來
select 在執行時,如果所有 case 都不滿足,就會選擇 default 執行,如果沒有 default ,select 會一直阻塞等待,直到至少有一個 case 滿足被執行。如果同時有多個 case 被滿足,則 select 會隨機選擇一個執行。
Golang 沒有直接提供 channel 的超時機制,但是我們可以使用多路複用來實現:
func goods(ch chan<- string) {
time.Sleep(8 * time.Second)
ch <- "生活用品已經送達"
}
func main() {
var (
goodsCh = make(chan string)
)
go goods(goodsCh)
fmt.Println("開始等待")
select {
case <-goodsCh:
fmt.Println("生活用品送到了")
case t := <-time.After(3 * time.Second): // 三秒後 channel 發出當前時間
fmt.Println("沒有等到生活用品")
fmt.Println(t.Format("2006-01-02 15:04:05"))
break
}
fmt.Println("主執行緒執行結束")
}
// 結果
開始等待
沒有等到生活用品
2023-04-08 14:51:36
主執行緒執行結束
本系列文章: