Go語言原生支援並行是被眾人津津樂道的特性。goroutine 早期是 Inferno 作業系統的一個試驗性特性,而現在這個特性與作業系統一起,將開發變得越來越簡單。
很多剛開始使用Go語言開發的人都很喜歡使用並行特性,而沒有考慮併行是否真正能解決他們的問題。
了解 goroutine 的生命期時再建立 goroutine
在Go語言中,開發者習慣將並行內容與 goroutine 一一對應地建立 goroutine。開發者很少會考慮 goroutine 在什麼時候能退出和控制 goroutine 生命期,這就會造成 goroutine 失控的情況。下面來看一段程式碼。
失控的 goroutine:
package main
import (
"fmt"
"runtime"
)
// 一段耗時的計算函數
func consumer(ch chan int) {
// 無限獲取資料的迴圈
for {
// 從通道獲取資料
data := <-ch
// 列印資料
fmt.Println(data)
}
}
func main() {
// 建立一個傳遞資料用的通道
ch := make(chan int)
for {
// 空變數, 什麼也不做
var dummy string
// 獲取輸入, 模擬進程持續執行
fmt.Scan(&dummy)
// 啟動並行執行consumer()函數
go consumer(ch)
// 輸出現在的goroutine數量
fmt.Println("goroutines:", runtime.NumGoroutine())
}
}
程式碼說明如下:
-
第 9 行,consumer() 函數模擬平時業務中放到 goroutine 中執行的耗時操作。該函數從其他 goroutine 中獲取和接收資料或者指令,處理後返回結果。
-
第 12 行,需要通過無限迴圈不停地獲取資料。
-
第 15 行,每次從通道中獲取資料。
-
第 18 行,模擬處理完資料後的返回資料。
-
第 26 行,建立一個整型通道。
-
第 34 行,使用 fmt.Scan() 函數接收資料時,需要提供變數地址。如果輸入匹配的變數型別,將會成功賦值給變數。
-
第 37 行,啟動並行執行 consumer() 函數,並傳入 ch 通道。
-
第 40 行,每啟動一個 goroutine,使用 runtime.NumGoroutine 檢查進程建立的 goroutine 數量總數。
執行程式,每輸入一個字串+迴車,將會建立一個 goroutine,結果如下:
a
goroutines: 2
b
goroutines: 3
c
goroutines: 4
注意,結果中 a、b、c 為通過鍵盤輸入的字元,其他為列印字元。
這個程式實際在模擬一個進程根據需要建立 goroutine 的情況。執行後,問題已經被暴露出來:隨著輸入的字串越來越多,goroutine 將會無限制地被建立,但並不會結束。這種情況如果發生在生產環境中,將會造成記憶體大量分配,最終使進程崩潰。現實的情況也許比這段程式碼更加隱蔽:也許你設定了一個退出的條件,但是條件永遠不會被滿足或者觸發。
為了避免這種情況,在這個例子中,需要為 consumer() 函數新增合理的退出條件,修改程式碼後如下:
package main
import (
"fmt"
"runtime"
)
// 一段耗時的計算函數
func consumer(ch chan int) {
// 無限獲取資料的迴圈
for {
// 從通道獲取資料
data := <-ch
if data == 0 {
break
}
// 列印資料
fmt.Println(data)
}
fmt.Println("goroutine exit")
}
func main() {
// 傳遞資料用的通道
ch := make(chan int)
for {
// 空變數, 什麼也不做
var dummy string
// 獲取輸入, 模擬進程持續執行
fmt.Scan(&dummy)
if dummy == "quit" {
for i := 0; i < runtime.NumGoroutine()-1; i++ {
ch <- 0
}
continue
}
// 啟動並行執行consumer()函數
go consumer(ch)
// 輸出現在的goroutine數量
fmt.Println("goroutines:", runtime.NumGoroutine())
}
}
程式碼中加粗部分是新新增的程式碼,具體說明如下:
-
第 17 行,為無限迴圈設定退出條件,這裡設定 0 為退出。
-
第 41 行,當命令列輸入 quit 時,進入退出處理的流程。
-
第 43 行,runtime.NumGoroutine 返回一個進程的所有 goroutine 數,main() 的 goroutine 也被算在裡面。因此需要扣除 main() 的 goroutine 數。剩下的 goroutine 為實際建立的 goroutine 數,對這些 goroutine 進行遍歷。
-
第 44 行,並行開啟的 goroutine 都在競爭獲取通道中的資料,因此只要知道有多少個 goroutine 需要退出,就給通道裡發多少個 0。
修改程式並執行,結果如下:
a
goroutines: 2
b
goroutines: 3
quit
goroutine exit
goroutine exit
c
goroutines: 2
避免在不必要的地方使用通道
通道(channel)和 map、切片一樣,也是由 Go 原始碼編寫而成。為了保證兩個 goroutine 並行存取的安全性,通道也需要做一些鎖操作,因此通道其實並不比鎖高效。
下面的例子展示通訊端的接收和並行管理。對於 TCP 來說,一般是接收過程建立 goroutine 並行處理。當通訊端結束時,就要正常退出這些 goroutine。
本例完整程式碼請參考
./src/chapter12/exitnotify/exitnotify.go
。
本套教學所有原始碼下載地址:https://pan.baidu.com/s/1ORFVTOLEYYqDhRzeq0zIiQ 提取密碼:hfyf
下面是對各個部分的詳細分析。
1) 通訊端接收部分
通訊端在連線後,就需要不停地接收資料,程式碼如下:
// 通訊端接收過程
func socketRecv(conn net.Conn, exitChan chan string) {
// 建立一個接收的緩衝
buff := make([]byte, 1024)
// 不停地接收資料
for {
// 從通訊端中讀取資料
_, err := conn.Read(buff)
// 需要結束接收, 退出迴圈
if err != nil {
break
}
}
// 函數已經結束, 傳送通知
exitChan <- "recv exit"
}
程式碼說明如下:
-
第 2 行傳入的 net.Conn 是通訊端的介面,exitChan 為退出傳送同步通道。
-
第 5 行為通訊端的接收資料建立一個緩衝。
-
第 8 行構建一個接收的迴圈,不停地接收資料。
-
第 11 行,從通訊端中取出資料。這個例子中,不關注具體接收到的資料,只是關注錯誤,這裡將接收到的位元組數做匿名處理。
-
第 14 行,當通訊端呼叫了 Close 方法時,會觸發錯誤,這時需要結束接收迴圈。
-
第 21 行,結束函數時,與函數系結的 goroutine 會同時結束,此時需要通知 main() 的 goroutine。
2) 連線、關閉、同步 goroutine 主流程部分
下面程式碼中嘗試使用通訊端的 TCP 協定連線一個網址,連線上後,進行資料接收,等待一段時間後主動關閉通訊端,等待通訊端所在的 goroutine 自然結束,程式碼如下:
func main() {
// 連線一個地址
conn, err := net.Dial("tcp", "www.163.com:80")
// 發生錯誤時列印錯誤退出
if err != nil {
fmt.Println(err)
return
}
// 建立退出通道
exit := make(chan string)
// 並行執行通訊端接收
go socketRecv(conn, exit)
// 在接收時, 等待1秒
time.Sleep(time.Second)
// 主動關閉通訊端
conn.Close()
// 等待goroutine退出完畢
fmt.Println(<-exit)
}
程式碼說明如下:
-
第 4 行,使用 net.Dial 發起 TCP 協定的連線,呼叫函數就會傳送阻塞直到連線超時或者連線完成。
-
第 7 行,如果連線發生錯誤,將會列印錯誤並退出。
-
第 13 行,建立一個通道用於退出信號同步,這個通道會在接收用的 goroutine 中使用。
-
第 16 行,並行執行接收函數,傳入通訊端和用於退出通知的通道。
-
第 19 行,接收需要一個過程,使用 time.Sleep() 等待一段時間。
-
第 22 行,主動關閉通訊端,此時會觸發通訊端接收錯誤。
-
第 25 行,從 exit 通道接收退出資料,也就是等待接收 goroutine 結束。
在這個例子中,goroutine 退出使用通道來通知,這種做法可以解決問題,但是實際上通道中的資料並沒有完全使用。
3) 優化:使用等待組替代通道簡化同步
通道的內部實現程式碼在Go語言開發包的 src/runtime/chan.go 中,經過分析後大概了解到通道也是用常見的互斥量等進行同步。因此通道雖然是一個語言級特性,但也不是被神化的特性,通道的執行和使用都要比傳統互斥量、等待組(sync.WaitGroup)有一定的消耗。
所以在這個例子中,更建議使用等待組來實現同步,調整後的程式碼如下:
package main
import (
"fmt"
"net"
"sync"
"time"
)
// 通訊端接收過程
func socketRecv(conn net.Conn, wg *sync.WaitGroup) {
// 建立一個接收的緩衝
buff := make([]byte, 1024)
// 不停地接收資料
for {
// 從通訊端中讀取資料
_, err := conn.Read(buff)
// 需要結束接收, 退出迴圈
if err != nil {
break
}
}
// 函數已經結束, 傳送通知
wg.Done()
}
func main() {
// 連線一個地址
conn, err := net.Dial("tcp", "www.163.com:80")
// 發生錯誤時列印錯誤退出
if err != nil {
fmt.Println(err)
return
}
// 退出通道
var wg sync.WaitGroup
// 新增一個任務
wg.Add(1)
// 並行執行接收通訊端
go socketRecv(conn, &wg)
// 在接收時, 等待1秒
time.Sleep(time.Second)
// 主動關閉通訊端
conn.Close()
// 等待goroutine退出完畢
wg.Wait()
fmt.Println("recv done")
}
調整後的程式碼說明如下:
-
第 45 行,宣告退出同步用的等待組。
-
第 48 行,為等待組的計數器加 1,表示需要完成一個任務。
-
第 51 行,將等待組的指標傳入接收函數。
-
第 60 行,等待等待組的完成,完成後列印提示。
-
第 30 行,接收完成後,使用 wg.Done() 方法將等待組計數器減一。