用 Golang 從0到1實現一個高效能的 Worker Pool(一)

2023-07-18 15:00:32

1. 概述

話說,早幾天一口氣過了一遍 Golang 的並行機制:

這些機制是使用 Golang 並行程式設計的基礎知識,但是要使用 Golang 編寫出高效、易維護、高質量的程式碼,我們可以使用一些通用的「並行模式」。在 Golang 並行程式設計中大家經常用到的一些模式主要有:

  • Worker-Pool 模式
  • Producer-Consumer 模式
  • Pipeline 模式
  • Event-Driven 模式
  • Reactor 模式
  • Futures and Promises 模式
  • ……

Worker-Pool 模式開始聊吧,Worker-Pool 模式是一種常見的並行設計模式,主要用於控制並行任務的數量,提高系統效能,以及更有效地管理系統資源。

如果在 GitHub 上搜尋 Worker Pool 的開源實現,你會看到結果琳琅滿目,五花八門,各有千秋,眼花繚亂。咱乾脆試試看能不能讓 GPT-4 來實現一個 Worker Pool,然後對比下 Star 數較高的開源實現,看下 GPT-4 「信手拈來」的 Worker Pool 能不能在功能集、易用性、效能上相比各種千星萬星 GitHub 開源流行款更勝一籌。

下文通過 DevChat 和 GPT-4 互動,如果你對 DevChat 感興趣,可以跳轉下文瀏覽:

2. 設計

下面先來明確這個程式需要實現哪些功能,怎麼實現。

2.1 讓 GPT-4 給出功能點

先問下 DevChat 我應該實現哪些功能:

  • DevChat 的回答文字版:

在Golang中實現一個高效能、功能豐富且簡單易用的Worker Pool,你可能需要考慮以下功能點:

  1. 任務佇列:你需要一個佇列來儲存待處理的任務。這個佇列應該是執行緒安全的,以便多個goroutine可以並行地向佇列中新增任務和從佇列中取出任務。

  2. 工作執行緒:你需要一組工作執行緒(goroutine)來並行地執行任務。這些工作執行緒應該可以從任務佇列中取出任務> 並執行。

  3. 執行緒池大小控制:你需要一種機制來控制執行緒池的大小,即同時執行的工作執行緒的數量。這可以幫助你控制並行級別,防止過度消耗系統資源。

  4. 優雅的關閉:你需要一種機制來優雅地關閉執行緒池,即在關閉執行緒池之前,確保所有已經開始的任務都已經完成。

  5. 錯誤處理:你需要一種機制來處理任務執行過程中的錯誤。這可能包括記錄錯誤紀錄檔,或者將錯誤返回給任務的提交者。

  6. 任務超時控制:對於可能長時間執行的任務,你可能需要一種機制來控制任務的執行時間,如果任務執行時間超過了預設的超時時間,那麼應該中止任務的執行。

  7. 任務優先順序:如果你的應用場景中有不同優先順序的任務,你可能需要一種機制來控制任務的執行順序,確保高優先順序的任務先於低優先順序的任務執行。

  8. 任務結果獲取:對於需要返回結果的任務,你需要一種機制來獲取任務的執行結果。

以上就是實現一個Worker Pool可能需要考慮的功能點,具體的功能點可能會根據你的具體需求和應用場景有所不同。

酷,看起來清新脫俗,「花裡胡哨」。目測完全實現這些功能涉及到的程式碼量會超出目前 GPT-4 的上下文限制,外加細節太多,直接讓 GPT-4 上手寫程式碼大概率拿不到滿意的結果,還是繼續只把 AI 當一個「程式設計助手」吧。

2.2 自己總結需求,再給 GPT 派活

1. 給專案起一個名字(起名字是最難的,糾結死人)

就叫 GoPool 吧,對應程式碼庫就叫 gopool,放到 GitHub 上開源出去。

2. 用程式碼的方式總結需求

整個程式涉及到的幾個核心物件是 Task、Worker 和 Pool。Pool 中需要維護一個 workers 棧(或者佇列,不過棧更易引入「Worker 過期」等邏輯);Task 可以先用一個最簡單的函數型別;tasks 似乎由 Pool 來維護會導致過多的 workers(goroutines)上下文切換,影響效能…… 不行,細節太多了,文字不足以描述我想在第一個版本中包含哪些功能(肯定不是上來就實現一個GA版本),或者我先寫幾行物件、介面定義,然後讓 GPT 在此基礎上逐步實現相關邏輯吧。

於是,我寫了這些程式碼,有效行數不到50,看起來卻金光閃閃有沒有?

  • pool.go
type Pool struct {
 	// capacity represents the maximum number of workers the pool can have
 	capacity int
 	// running represents the number of workers that are currently running
 	running int
 	// workers represents a pool of workers that are executing tasks
 	workers WorkerStack
 	// lock represents a mutex to protect the workers slice
 	lock sync.Mutex
 }

 func (p *Pool) Submit(task Task) {
 	// TODO
 }

 func (p *Pool) Running() int {
 	return p.running
 }

 func (p *Pool) Cap() int {
 	return p.capacity
 }

 func (p *Pool) Release() {
 	// TODO
 }
  • worker.go
type Task func()

type Worker struct {
	// pool represents a reference to the pool that owns this worker
	pool *Pool
	// task represents the task that the worker will execute
	task chan Task
	// lastUsedTime represents the last time the worker was used
	lastUsedTime time.Time
}
type WorkerStack interface {
	Push(w *Worker)
	Pop() *Worker
	Len() int
}
type workerStack struct {
	workers []*Worker
}

這個骨架看著還算清晰吧,其實一開始只想定義幾個結構體和方法,但是結構體名字一出來,GitHub Copilot 就開始自動補全相應屬性和方法,包括註釋了(沒錯,我寫的為數不多的幾十行程式碼裡還有一大半是 Copilot 補全的)。於是結合 Copilot 的補全,加上部分的取捨,最終就有了這個初始程式碼段。

3. 實現

關門,放 GPT-4!

3.1 你先隨意發揮

  • 我對 GPT 說:

我要用 Golang 實現一個 worker pool 程式,專案名叫做 GoPool,程式碼庫是 gopool。在 pool.go 和 worker.go 中我已經放了一些相關程式碼,請在此基礎上實現 GoPool 程式。

這裡需要基於選中程式碼/檔案來聊天,DevChat 的這個小功能還是很酷:

  • GPT 回答:

首先,我們需要理解需求和已有程式碼:

  • 使用者需要在 Golang 中實現一個名為 GoPool 的 worker pool 程式。
  • 專案的程式碼庫名為 gopool,已有的程式碼檔案包括 pool.go 和 worker.go。
  • pool.go 檔案中定義了 Pool 結構體,包含了一些欄位如 capacity(池的最大工作數)、running(當前執行的工作數)、workers(執行任務的工作池)和 lock(保護 workers 切片的互斥鎖)。此外,還定義了一些方法,如 Submit(提交任務)、Running(獲取當前執行的工作數)、Cap(獲取池的最大工作數)和 Release(釋放池)。
  • worker.go 檔案中定義了 Task 型別(一個函數型別)、Worker 結構體(包含了一些欄位如 pool(擁有此工作的池的參照)、task(工作將執行的任務)和 lastUsedTime(工作最後使用的時間))和 WorkerStack 介面(包含了一些方法如 Push(推入工作)、Pop(彈出工作)和 Len(獲取工作數))。

基於以上資訊,我們需要實現以下功能:

  1. 在 Pool 結構體的 Submit 方法中,實現任務的提交。
  2. 在 Pool 結構體的 Release 方法中,實現池的釋放。
  3. 實現 WorkerStack 介面的方法。

下面是實現程式碼:

// pool.go
package gopool

import (
	"sync"
)

type Pool struct {
	capacity int
	running  int
	workers  WorkerStack
	lock     sync.Mutex
}

func (p *Pool) Submit(task Task) {
	p.lock.Lock()
	defer p.lock.Unlock()

	if p.running < p.capacity {
		worker := p.workers.Pop()
		if worker == nil {
			worker = &Worker{
				pool: p,
				task: make(chan Task),
			}
		}
		worker.task <- task
		p.running++
	} else {
		// TODO: handle the case when the pool is full
	}
}

func (p *Pool) Running() int {
	return p.running
}

func (p *Pool) Cap() int {
	return p.capacity
}

func (p *Pool) Release() {
	p.lock.Lock()
	defer p.lock.Unlock()

	for p.workers.Len() > 0 {
		worker := p.workers.Pop()
		close(worker.task)
	}
	p.running = 0
}
// worker.go
package gopool

import (
	"time"
)

type Task func()

type Worker struct {
	pool         *Pool
	task         chan Task
	lastUsedTime time.Time
}

type WorkerStack interface {
	Push(w *Worker)
	Pop() *Worker
	Len() int
}

type workerStack struct {
	workers []*Worker
}

func (ws *workerStack) Push(w *Worker) {
	ws.workers = append(ws.workers, w)
}

func (ws *workerStack) Pop() *Worker {
	if len(ws.workers) == 0 {
		return nil
	}
	w := ws.workers[len(ws.workers)-1]
	ws.workers = ws.workers[:len(ws.workers)-1]
	return w
}

func (ws *workerStack) Len() int {
	return len(ws.workers)
}

注意:在 Submit 方法中,當池已滿時,我們需要處理這種情況。這裡我沒有實現,因為這取決於你想如何處理這種情況(例如,你可能想要阻塞直到有可用的工作,或者你可能想要返回一個錯誤)。

不管三七二十一,先把程式碼替換到我的原始檔中再說。(不過這裡實際操作的時候還是要注意,如下圖所示,多數時候「插入」是最有用的;有時候「替換全檔案」也不錯;寫到後面其實先看 Diff,然後選擇性接受 GPT 的程式碼還是非常實用)

這一輪聊天的成果是這個 Commit

3.2 你得讓 Worker 跑起來呀

  • 我對 GPT 說:

看來 Worker 還需要一個 run() 方法用來非同步執行 task。當這個 task 執行完成後,Worker 應該被放回 workStack。在 Submit() 方法中將 task 給到 worker 後,順帶呼叫一下這個 run() 方法吧。請繼續編碼。

GPT 給出了加上 run() 方法後的完整原始檔,貼出來有點長,直接看增加的 run() 部分吧:

func (w *Worker) run() {
 	go func() {
 		for task := range w.task {
 			if task == nil {
 				return
 			}
 			task()
 			w.pool.lock.Lock()
 			w.pool.workers.Push(w)
 			w.pool.running--
 			w.pool.lock.Unlock()
 		}
 	}()
 }

這一輪聊天的成果是這個 Commit

3.3 你說說 Pool 滿了怎麼辦

前面 GPT 給的程式碼中 Summit() 方法裡有一段 TODO:

func (p *Pool) Submit(task Task) {
	// ……
    else {
		// TODO: handle the case when the pool is full
	}
}

滿了怎麼辦?挺好辦的,滿了就等呀,空閒了再繼續。於是,我想到了條件變數 sync.Cond。那就不拐彎抹角,讓 GPT 自由發揮了,給點明確的提示吧。

  • 我對 GPT 說:

下面繼續來實現 handle the case when the pool is full 相關邏輯吧。這裡可以引入 sync.Cond 來實現,當 pool full 的時候就 Wait();同時 Worker 的 run() 方法中當 Worker 被放回到 Pool 中時,說明又有空閒的 Worker 可用了,這時候通過 Signal() 來喚醒 Wait() 的 goroutine。

這時候上下文越來越長了,GPT 的回答貼出來有點礙眼。直接看結果吧:

這一輪聊天的成果是這個 Commit

這時候 Submit() 方法就變成了這樣:

func (p *Pool) Submit(task Task) {
 	p.lock.Lock()
 	defer p.lock.Unlock()

 	for p.running == p.capacity {
 		p.cond.Wait()
 	}

 	worker := p.workers.Pop()
 	if worker == nil {
 		worker = &Worker{
 			pool: p,
 			task: make(chan Task),
 		}
 	}
 	worker.task <- task
 	worker.run()
 	p.running++
 }

看起來基本功能差不多了,繼續新增 UT 吧。

3.4 你得把 UT 給我加上

  • 我對 GPT 說:

接下來我想新增一個 pool_test.go 來放單元測試測一下 Submit() 的功能,請給出測試程式碼。

  • GPT 回答:

為了測試 Submit() 方法,我們可以建立一個 Pool 物件,然後提交一些任務,檢查是否所有的任務都被正確地執行了。以下是測試程式碼:

// pool_test.go
package gopool

import (
	"sync"
	"testing"
)

func TestSubmit(t *testing.T) {
	var wg sync.WaitGroup
	p := NewPool(10)

	for i := 0; i < 20; i++ {
		wg.Add(1)
		task := func() {
			defer wg.Done()
		}
		p.Submit(task)
	}

	wg.Wait()

	if p.Running() != 0 {
		t.Errorf("Expected running workers to be 0, but got %d", p.Running())
	}
}

在這個測試中,我們首先建立了一個新的 Pool 物件,然後提交了 20 個任務。每個任務都會在完成時呼叫 wg.Done()。我們使用 sync.WaitGroup 來確保所有的任務都被執行了。最後,我們檢查 Running() 方法的返回值是否為 0,如果不是,那麼說明有一些任務沒有被執行,測試就失敗了。

我決定手動加上一條紀錄檔,把 for 迴圈改成這樣:

	for i := 0; i < 20; i++ {
		wg.Add(1)
		taskNum := i // Capture the task number
		task := func() {
			t.Logf("Executing task %d\n", taskNum)
			defer wg.Done()
		}
		p.Submit(task)
	}

這時候執行下單元測試,慘慘慘,死鎖了。不過這時候我也不想去考 GPT 了,看到死鎖我已經第一時間想到了 Channel 阻塞問題,也就是這段程式碼:

func (p *Pool) Submit(task Task) {
	// ……
	if worker == nil {
		worker = &Worker{
			pool: p,
			task: make(chan Task),
		}
	}
    worker.task <- task
	// ……
}

這裡給 task 賦值了一個 make(chan Task),導致下面這行 worker.task <- task 直接阻塞了。隨手改一下吧:

func (p *Pool) Submit(task Task) {
	// ……
	if worker == nil {
		worker = &Worker{
			pool: p,
			task: make(chan Task, 1),
		}
	}
    worker.task <- task
	// ……
}

沒錯,把 task: make(chan Task) 改成 task: make(chan Task, 1) 就可以了。

執行一下吧:

$ go test . -v
=== RUN   TestSubmit
    pool_test.go:16: Executing task 9
    pool_test.go:16: Executing task 7
    pool_test.go:16: Executing task 8
    pool_test.go:16: Executing task 10
    pool_test.go:16: Executing task 11
    pool_test.go:16: Executing task 12
    pool_test.go:16: Executing task 13
    pool_test.go:16: Executing task 14
    pool_test.go:16: Executing task 15
    pool_test.go:16: Executing task 16
    pool_test.go:16: Executing task 17
    pool_test.go:16: Executing task 18
    pool_test.go:16: Executing task 19
    pool_test.go:16: Executing task 1
    pool_test.go:16: Executing task 0
    pool_test.go:16: Executing task 2
    pool_test.go:16: Executing task 4
    pool_test.go:16: Executing task 3
    pool_test.go:16: Executing task 6
    pool_test.go:16: Executing task 5
--- PASS: TestSubmit (0.00s)
PASS
ok      github.com/devchat-ai/gopool    0.753s

這一輪聊天的成果是這個 Commit

3.5 你再給我把檔案補上

  • 我對 GPT 說:

我已經初步實現了 GoPool 程式,請幫我完善 README.md 檔案內容。

今天最後一輪聊天了,我決定把所有檔案內容都丟給 GPT,讓它自由發揮一下。

這一輪聊天的成果是這個 Commit

4. 總結

經過一頓瞎聊,最終 GPT-4 寫了差不多200行,我寫了20行,Copilot 寫了20行,加一起,完成了一個初級的 Golang worker pool 程式 GoPool

本文釋出時對應的程式碼版本看這裡

接下來幾天我準備繼續和 GPT-4 瞎聊,讓它幫著把一個 worker pool 該有的功能都加上,並且不斷優化效能,看能不能打造一款炫酷的「開源 Golang Goroutine/Worker Pool 庫」。

當然,大夥有啥好想法,需求,或者 bug 反饋,歡迎直接提到 GitHub Issues

(關注不迷路,我的個人微信公眾號:「胡說雲原生」)

(關注不迷路,我的個人微信公眾號:「胡說雲原生」)

(關注不迷路,我的個人微信公眾號:「胡說雲原生」)