goroutine(Go語言並行)如何使用才更加高效?

2020-07-16 10:05:21
Go語言原生支援並行是被眾人津津樂道的特性。goroutine 早期是 Inferno 作業系統的一個試驗性特性,而現在這個特性與作業系統一起,將開發變得越來越簡單。

很多剛開始使用Go語言開發的人都很喜歡使用並行特性,而沒有考慮併行是否真正能解決他們的問題。

了解 goroutine 的生命期時再建立 goroutine

在Go語言中,開發者習慣將並行內容與 goroutine 一一對應地建立 goroutine。開發者很少會考慮 goroutine 在什麼時候能退出和控制 goroutine 生命期,這就會造成 goroutine 失控的情況。下面來看一段程式碼。

失控的 goroutine:
package main

import (
    "fmt"
    "runtime"
)

// 一段耗時的計算函數
func consumer(ch chan int) {

    // 無限獲取資料的迴圈
    for {

        // 從通道獲取資料
        data := <-ch

        // 列印資料
        fmt.Println(data)
    }

}

func main() {

    // 建立一個傳遞資料用的通道
    ch := make(chan int)

    for {

        // 空變數, 什麼也不做
        var dummy string

        // 獲取輸入, 模擬進程持續執行
        fmt.Scan(&dummy)

        // 啟動並行執行consumer()函數
        go consumer(ch)

        // 輸出現在的goroutine數量
        fmt.Println("goroutines:", runtime.NumGoroutine())
    }

}
程式碼說明如下:
  • 第 9 行,consumer() 函數模擬平時業務中放到 goroutine 中執行的耗時操作。該函數從其他 goroutine 中獲取和接收資料或者指令,處理後返回結果。
  • 第 12 行,需要通過無限迴圈不停地獲取資料。
  • 第 15 行,每次從通道中獲取資料。
  • 第 18 行,模擬處理完資料後的返回資料。
  • 第 26 行,建立一個整型通道。
  • 第 34 行,使用 fmt.Scan() 函數接收資料時,需要提供變數地址。如果輸入匹配的變數型別,將會成功賦值給變數。
  • 第 37 行,啟動並行執行 consumer() 函數,並傳入 ch 通道。
  • 第 40 行,每啟動一個 goroutine,使用 runtime.NumGoroutine 檢查進程建立的 goroutine 數量總數。

執行程式,每輸入一個字串+迴車,將會建立一個 goroutine,結果如下:

a
goroutines: 2
b
goroutines: 3
c
goroutines: 4

注意,結果中 a、b、c 為通過鍵盤輸入的字元,其他為列印字元。

這個程式實際在模擬一個進程根據需要建立 goroutine 的情況。執行後,問題已經被暴露出來:隨著輸入的字串越來越多,goroutine 將會無限制地被建立,但並不會結束。這種情況如果發生在生產環境中,將會造成記憶體大量分配,最終使進程崩潰。現實的情況也許比這段程式碼更加隱蔽:也許你設定了一個退出的條件,但是條件永遠不會被滿足或者觸發。

為了避免這種情況,在這個例子中,需要為 consumer() 函數新增合理的退出條件,修改程式碼後如下:
package main

import (
    "fmt"
    "runtime"
)

// 一段耗時的計算函數
func consumer(ch chan int) {

    // 無限獲取資料的迴圈
    for {

        // 從通道獲取資料
        data := <-ch

        if data == 0 {
            break
        }

        // 列印資料
        fmt.Println(data)
    }

    fmt.Println("goroutine exit")
}

func main() {

    // 傳遞資料用的通道
    ch := make(chan int)

    for {

        // 空變數, 什麼也不做
        var dummy string

        // 獲取輸入, 模擬進程持續執行
        fmt.Scan(&dummy)

        if dummy == "quit" {

            for i := 0; i < runtime.NumGoroutine()-1; i++ {
                ch <- 0
            }

            continue
        }

        // 啟動並行執行consumer()函數
        go consumer(ch)

        // 輸出現在的goroutine數量
        fmt.Println("goroutines:", runtime.NumGoroutine())
    }
}
程式碼中加粗部分是新新增的程式碼,具體說明如下:
  • 第 17 行,為無限迴圈設定退出條件,這裡設定 0 為退出。
  • 第 41 行,當命令列輸入 quit 時,進入退出處理的流程。
  • 第 43 行,runtime.NumGoroutine 返回一個進程的所有 goroutine 數,main() 的 goroutine 也被算在裡面。因此需要扣除 main() 的 goroutine 數。剩下的 goroutine 為實際建立的 goroutine 數,對這些 goroutine 進行遍歷。
  • 第 44 行,並行開啟的 goroutine 都在競爭獲取通道中的資料,因此只要知道有多少個 goroutine 需要退出,就給通道裡發多少個 0。

修改程式並執行,結果如下:

a
goroutines: 2
b
goroutines: 3
quit
goroutine exit
goroutine exit
c
goroutines: 2

避免在不必要的地方使用通道

通道(channel)和 map、切片一樣,也是由 Go 原始碼編寫而成。為了保證兩個 goroutine 並行存取的安全性,通道也需要做一些鎖操作,因此通道其實並不比鎖高效。

下面的例子展示通訊端的接收和並行管理。對於 TCP 來說,一般是接收過程建立 goroutine 並行處理。當通訊端結束時,就要正常退出這些 goroutine。

本例完整程式碼請參考./src/chapter12/exitnotify/exitnotify.go
本套教學所有原始碼下載地址:https://pan.baidu.com/s/1ORFVTOLEYYqDhRzeq0zIiQ    提取密碼:hfyf
下面是對各個部分的詳細分析。

1) 通訊端接收部分

通訊端在連線後,就需要不停地接收資料,程式碼如下:
// 通訊端接收過程
func socketRecv(conn net.Conn, exitChan chan string) {

// 建立一個接收的緩衝
    buff := make([]byte, 1024)

    // 不停地接收資料
    for {

        // 從通訊端中讀取資料
        _, err := conn.Read(buff)

        // 需要結束接收, 退出迴圈
        if err != nil {
            break
        }

    }

    // 函數已經結束, 傳送通知
    exitChan <- "recv exit"
}
程式碼說明如下:
  • 第 2 行傳入的 net.Conn 是通訊端的介面,exitChan 為退出傳送同步通道。
  • 第 5 行為通訊端的接收資料建立一個緩衝。
  • 第 8 行構建一個接收的迴圈,不停地接收資料。
  • 第 11 行,從通訊端中取出資料。這個例子中,不關注具體接收到的資料,只是關注錯誤,這裡將接收到的位元組數做匿名處理。
  • 第 14 行,當通訊端呼叫了 Close 方法時,會觸發錯誤,這時需要結束接收迴圈。
  • 第 21 行,結束函數時,與函數系結的 goroutine 會同時結束,此時需要通知 main() 的 goroutine。

2) 連線、關閉、同步 goroutine 主流程部分

下面程式碼中嘗試使用通訊端的 TCP 協定連線一個網址,連線上後,進行資料接收,等待一段時間後主動關閉通訊端,等待通訊端所在的 goroutine 自然結束,程式碼如下:
func main() {

    // 連線一個地址
    conn, err := net.Dial("tcp", "www.163.com:80")

    // 發生錯誤時列印錯誤退出
    if err != nil {
        fmt.Println(err)
        return
    }

    // 建立退出通道
    exit := make(chan string)

    // 並行執行通訊端接收
    go socketRecv(conn, exit)

    // 在接收時, 等待1秒
    time.Sleep(time.Second)

    // 主動關閉通訊端
    conn.Close()

    // 等待goroutine退出完畢
    fmt.Println(<-exit)
}
程式碼說明如下:
  • 第 4 行,使用 net.Dial 發起 TCP 協定的連線,呼叫函數就會傳送阻塞直到連線超時或者連線完成。
  • 第 7 行,如果連線發生錯誤,將會列印錯誤並退出。
  • 第 13 行,建立一個通道用於退出信號同步,這個通道會在接收用的 goroutine 中使用。
  • 第 16 行,並行執行接收函數,傳入通訊端和用於退出通知的通道。
  • 第 19 行,接收需要一個過程,使用 time.Sleep() 等待一段時間。
  • 第 22 行,主動關閉通訊端,此時會觸發通訊端接收錯誤。
  • 第 25 行,從 exit 通道接收退出資料,也就是等待接收 goroutine 結束。

在這個例子中,goroutine 退出使用通道來通知,這種做法可以解決問題,但是實際上通道中的資料並沒有完全使用。

3) 優化:使用等待組替代通道簡化同步

通道的內部實現程式碼在Go語言開發包的 src/runtime/chan.go 中,經過分析後大概了解到通道也是用常見的互斥量等進行同步。因此通道雖然是一個語言級特性,但也不是被神化的特性,通道的執行和使用都要比傳統互斥量、等待組(sync.WaitGroup)有一定的消耗。

所以在這個例子中,更建議使用等待組來實現同步,調整後的程式碼如下:
package main

import (
    "fmt"
    "net"
    "sync"
    "time"
)

// 通訊端接收過程
func socketRecv(conn net.Conn, wg *sync.WaitGroup) {

    // 建立一個接收的緩衝
    buff := make([]byte, 1024)

    // 不停地接收資料
    for {

        // 從通訊端中讀取資料
        _, err := conn.Read(buff)

        // 需要結束接收, 退出迴圈
        if err != nil {
            break
        }

    }

    // 函數已經結束, 傳送通知
    wg.Done()
}

func main() {

    // 連線一個地址
    conn, err := net.Dial("tcp", "www.163.com:80")

    // 發生錯誤時列印錯誤退出
    if err != nil {
        fmt.Println(err)
        return
    }

    // 退出通道
    var wg sync.WaitGroup
   
    // 新增一個任務
    wg.Add(1)

    // 並行執行接收通訊端
    go socketRecv(conn, &wg)

    // 在接收時, 等待1秒
    time.Sleep(time.Second)

    // 主動關閉通訊端
    conn.Close()

    // 等待goroutine退出完畢
    wg.Wait()
    fmt.Println("recv done")
}
調整後的程式碼說明如下:
  • 第 45 行,宣告退出同步用的等待組。
  • 第 48 行,為等待組的計數器加 1,表示需要完成一個任務。
  • 第 51 行,將等待組的指標傳入接收函數。
  • 第 60 行,等待等待組的完成,完成後列印提示。
  • 第 30 行,接收完成後,使用 wg.Done() 方法將等待組計數器減一。