話說,早幾天一口氣過了一遍 Golang 的並行機制:
這些機制是使用 Golang 並行程式設計的基礎知識,但是要使用 Golang 編寫出高效、易維護、高質量的程式碼,我們可以使用一些通用的「並行模式」。在 Golang 並行程式設計中大家經常用到的一些模式主要有:
從 Worker-Pool 模式開始聊吧,Worker-Pool 模式是一種常見的並行設計模式,主要用於控制並行任務的數量,提高系統效能,以及更有效地管理系統資源。
如果在 GitHub 上搜尋 Worker Pool 的開源實現,你會看到結果琳琅滿目,五花八門,各有千秋,眼花繚亂。咱乾脆試試看能不能讓 GPT-4 來實現一個 Worker Pool,然後對比下 Star 數較高的開源實現,看下 GPT-4 「信手拈來」的 Worker Pool 能不能在功能集、易用性、效能上相比各種千星萬星 GitHub 開源流行款更勝一籌。
下文通過 DevChat 和 GPT-4 互動,如果你對 DevChat 感興趣,可以跳轉下文瀏覽:
下面先來明確這個程式需要實現哪些功能,怎麼實現。
先問下 DevChat 我應該實現哪些功能:
在Golang中實現一個高效能、功能豐富且簡單易用的Worker Pool,你可能需要考慮以下功能點:
任務佇列:你需要一個佇列來儲存待處理的任務。這個佇列應該是執行緒安全的,以便多個goroutine可以並行地向佇列中新增任務和從佇列中取出任務。
工作執行緒:你需要一組工作執行緒(goroutine)來並行地執行任務。這些工作執行緒應該可以從任務佇列中取出任務> 並執行。
執行緒池大小控制:你需要一種機制來控制執行緒池的大小,即同時執行的工作執行緒的數量。這可以幫助你控制並行級別,防止過度消耗系統資源。
優雅的關閉:你需要一種機制來優雅地關閉執行緒池,即在關閉執行緒池之前,確保所有已經開始的任務都已經完成。
錯誤處理:你需要一種機制來處理任務執行過程中的錯誤。這可能包括記錄錯誤紀錄檔,或者將錯誤返回給任務的提交者。
任務超時控制:對於可能長時間執行的任務,你可能需要一種機制來控制任務的執行時間,如果任務執行時間超過了預設的超時時間,那麼應該中止任務的執行。
任務優先順序:如果你的應用場景中有不同優先順序的任務,你可能需要一種機制來控制任務的執行順序,確保高優先順序的任務先於低優先順序的任務執行。
任務結果獲取:對於需要返回結果的任務,你需要一種機制來獲取任務的執行結果。
以上就是實現一個Worker Pool可能需要考慮的功能點,具體的功能點可能會根據你的具體需求和應用場景有所不同。
酷,看起來清新脫俗,「花裡胡哨」。目測完全實現這些功能涉及到的程式碼量會超出目前 GPT-4 的上下文限制,外加細節太多,直接讓 GPT-4 上手寫程式碼大概率拿不到滿意的結果,還是繼續只把 AI 當一個「程式設計助手」吧。
1. 給專案起一個名字(起名字是最難的,糾結死人)
就叫 GoPool 吧,對應程式碼庫就叫 gopool,放到 GitHub 上開源出去。
2. 用程式碼的方式總結需求
整個程式涉及到的幾個核心物件是 Task、Worker 和 Pool。Pool 中需要維護一個 workers 棧(或者佇列,不過棧更易引入「Worker 過期」等邏輯);Task 可以先用一個最簡單的函數型別;tasks 似乎由 Pool 來維護會導致過多的 workers(goroutines)上下文切換,影響效能…… 不行,細節太多了,文字不足以描述我想在第一個版本中包含哪些功能(肯定不是上來就實現一個GA版本),或者我先寫幾行物件、介面定義,然後讓 GPT 在此基礎上逐步實現相關邏輯吧。
於是,我寫了這些程式碼,有效行數不到50,看起來卻金光閃閃有沒有?
pool.go
type Pool struct {
// capacity represents the maximum number of workers the pool can have
capacity int
// running represents the number of workers that are currently running
running int
// workers represents a pool of workers that are executing tasks
workers WorkerStack
// lock represents a mutex to protect the workers slice
lock sync.Mutex
}
func (p *Pool) Submit(task Task) {
// TODO
}
func (p *Pool) Running() int {
return p.running
}
func (p *Pool) Cap() int {
return p.capacity
}
func (p *Pool) Release() {
// TODO
}
worker.go
type Task func()
type Worker struct {
// pool represents a reference to the pool that owns this worker
pool *Pool
// task represents the task that the worker will execute
task chan Task
// lastUsedTime represents the last time the worker was used
lastUsedTime time.Time
}
type WorkerStack interface {
Push(w *Worker)
Pop() *Worker
Len() int
}
type workerStack struct {
workers []*Worker
}
這個骨架看著還算清晰吧,其實一開始只想定義幾個結構體和方法,但是結構體名字一出來,GitHub Copilot 就開始自動補全相應屬性和方法,包括註釋了(沒錯,我寫的為數不多的幾十行程式碼裡還有一大半是 Copilot 補全的)。於是結合 Copilot 的補全,加上部分的取捨,最終就有了這個初始程式碼段。
關門,放 GPT-4!
我要用 Golang 實現一個 worker pool 程式,專案名叫做 GoPool,程式碼庫是 gopool。在 pool.go 和 worker.go 中我已經放了一些相關程式碼,請在此基礎上實現 GoPool 程式。
這裡需要基於選中程式碼/檔案來聊天,DevChat 的這個小功能還是很酷:
首先,我們需要理解需求和已有程式碼:
- 使用者需要在 Golang 中實現一個名為 GoPool 的 worker pool 程式。
- 專案的程式碼庫名為 gopool,已有的程式碼檔案包括 pool.go 和 worker.go。
- pool.go 檔案中定義了 Pool 結構體,包含了一些欄位如 capacity(池的最大工作數)、running(當前執行的工作數)、workers(執行任務的工作池)和 lock(保護 workers 切片的互斥鎖)。此外,還定義了一些方法,如 Submit(提交任務)、Running(獲取當前執行的工作數)、Cap(獲取池的最大工作數)和 Release(釋放池)。
- worker.go 檔案中定義了 Task 型別(一個函數型別)、Worker 結構體(包含了一些欄位如 pool(擁有此工作的池的參照)、task(工作將執行的任務)和 lastUsedTime(工作最後使用的時間))和 WorkerStack 介面(包含了一些方法如 Push(推入工作)、Pop(彈出工作)和 Len(獲取工作數))。
基於以上資訊,我們需要實現以下功能:
- 在 Pool 結構體的 Submit 方法中,實現任務的提交。
- 在 Pool 結構體的 Release 方法中,實現池的釋放。
- 實現 WorkerStack 介面的方法。
下面是實現程式碼:
// pool.go package gopool import ( "sync" ) type Pool struct { capacity int running int workers WorkerStack lock sync.Mutex } func (p *Pool) Submit(task Task) { p.lock.Lock() defer p.lock.Unlock() if p.running < p.capacity { worker := p.workers.Pop() if worker == nil { worker = &Worker{ pool: p, task: make(chan Task), } } worker.task <- task p.running++ } else { // TODO: handle the case when the pool is full } } func (p *Pool) Running() int { return p.running } func (p *Pool) Cap() int { return p.capacity } func (p *Pool) Release() { p.lock.Lock() defer p.lock.Unlock() for p.workers.Len() > 0 { worker := p.workers.Pop() close(worker.task) } p.running = 0 }
// worker.go package gopool import ( "time" ) type Task func() type Worker struct { pool *Pool task chan Task lastUsedTime time.Time } type WorkerStack interface { Push(w *Worker) Pop() *Worker Len() int } type workerStack struct { workers []*Worker } func (ws *workerStack) Push(w *Worker) { ws.workers = append(ws.workers, w) } func (ws *workerStack) Pop() *Worker { if len(ws.workers) == 0 { return nil } w := ws.workers[len(ws.workers)-1] ws.workers = ws.workers[:len(ws.workers)-1] return w } func (ws *workerStack) Len() int { return len(ws.workers) }
注意:在 Submit 方法中,當池已滿時,我們需要處理這種情況。這裡我沒有實現,因為這取決於你想如何處理這種情況(例如,你可能想要阻塞直到有可用的工作,或者你可能想要返回一個錯誤)。
不管三七二十一,先把程式碼替換到我的原始檔中再說。(不過這裡實際操作的時候還是要注意,如下圖所示,多數時候「插入」是最有用的;有時候「替換全檔案」也不錯;寫到後面其實先看 Diff,然後選擇性接受 GPT 的程式碼還是非常實用)
這一輪聊天的成果是這個 Commit。
看來 Worker 還需要一個 run() 方法用來非同步執行 task。當這個 task 執行完成後,Worker 應該被放回 workStack。在 Submit() 方法中將 task 給到 worker 後,順帶呼叫一下這個 run() 方法吧。請繼續編碼。
GPT 給出了加上 run()
方法後的完整原始檔,貼出來有點長,直接看增加的 run()
部分吧:
func (w *Worker) run() {
go func() {
for task := range w.task {
if task == nil {
return
}
task()
w.pool.lock.Lock()
w.pool.workers.Push(w)
w.pool.running--
w.pool.lock.Unlock()
}
}()
}
這一輪聊天的成果是這個 Commit。
前面 GPT 給的程式碼中 Summit() 方法裡有一段 TODO:
func (p *Pool) Submit(task Task) {
// ……
else {
// TODO: handle the case when the pool is full
}
}
滿了怎麼辦?挺好辦的,滿了就等呀,空閒了再繼續。於是,我想到了條件變數 sync.Cond
。那就不拐彎抹角,讓 GPT 自由發揮了,給點明確的提示吧。
下面繼續來實現 handle the case when the pool is full 相關邏輯吧。這裡可以引入 sync.Cond 來實現,當 pool full 的時候就 Wait();同時 Worker 的 run() 方法中當 Worker 被放回到 Pool 中時,說明又有空閒的 Worker 可用了,這時候通過 Signal() 來喚醒 Wait() 的 goroutine。
這時候上下文越來越長了,GPT 的回答貼出來有點礙眼。直接看結果吧:
這一輪聊天的成果是這個 Commit。
這時候 Submit()
方法就變成了這樣:
func (p *Pool) Submit(task Task) {
p.lock.Lock()
defer p.lock.Unlock()
for p.running == p.capacity {
p.cond.Wait()
}
worker := p.workers.Pop()
if worker == nil {
worker = &Worker{
pool: p,
task: make(chan Task),
}
}
worker.task <- task
worker.run()
p.running++
}
看起來基本功能差不多了,繼續新增 UT 吧。
接下來我想新增一個 pool_test.go 來放單元測試測一下 Submit() 的功能,請給出測試程式碼。
為了測試
Submit()
方法,我們可以建立一個Pool
物件,然後提交一些任務,檢查是否所有的任務都被正確地執行了。以下是測試程式碼:// pool_test.go package gopool import ( "sync" "testing" ) func TestSubmit(t *testing.T) { var wg sync.WaitGroup p := NewPool(10) for i := 0; i < 20; i++ { wg.Add(1) task := func() { defer wg.Done() } p.Submit(task) } wg.Wait() if p.Running() != 0 { t.Errorf("Expected running workers to be 0, but got %d", p.Running()) } }
在這個測試中,我們首先建立了一個新的
Pool
物件,然後提交了 20 個任務。每個任務都會在完成時呼叫wg.Done()
。我們使用sync.WaitGroup
來確保所有的任務都被執行了。最後,我們檢查Running()
方法的返回值是否為 0,如果不是,那麼說明有一些任務沒有被執行,測試就失敗了。
我決定手動加上一條紀錄檔,把 for 迴圈改成這樣:
for i := 0; i < 20; i++ {
wg.Add(1)
taskNum := i // Capture the task number
task := func() {
t.Logf("Executing task %d\n", taskNum)
defer wg.Done()
}
p.Submit(task)
}
這時候執行下單元測試,慘慘慘,死鎖了。不過這時候我也不想去考 GPT 了,看到死鎖我已經第一時間想到了 Channel 阻塞問題,也就是這段程式碼:
func (p *Pool) Submit(task Task) {
// ……
if worker == nil {
worker = &Worker{
pool: p,
task: make(chan Task),
}
}
worker.task <- task
// ……
}
這裡給 task
賦值了一個 make(chan Task)
,導致下面這行 worker.task <- task
直接阻塞了。隨手改一下吧:
func (p *Pool) Submit(task Task) {
// ……
if worker == nil {
worker = &Worker{
pool: p,
task: make(chan Task, 1),
}
}
worker.task <- task
// ……
}
沒錯,把 task: make(chan Task)
改成 task: make(chan Task, 1)
就可以了。
執行一下吧:
$ go test . -v
=== RUN TestSubmit
pool_test.go:16: Executing task 9
pool_test.go:16: Executing task 7
pool_test.go:16: Executing task 8
pool_test.go:16: Executing task 10
pool_test.go:16: Executing task 11
pool_test.go:16: Executing task 12
pool_test.go:16: Executing task 13
pool_test.go:16: Executing task 14
pool_test.go:16: Executing task 15
pool_test.go:16: Executing task 16
pool_test.go:16: Executing task 17
pool_test.go:16: Executing task 18
pool_test.go:16: Executing task 19
pool_test.go:16: Executing task 1
pool_test.go:16: Executing task 0
pool_test.go:16: Executing task 2
pool_test.go:16: Executing task 4
pool_test.go:16: Executing task 3
pool_test.go:16: Executing task 6
pool_test.go:16: Executing task 5
--- PASS: TestSubmit (0.00s)
PASS
ok github.com/devchat-ai/gopool 0.753s
這一輪聊天的成果是這個 Commit。
我已經初步實現了 GoPool 程式,請幫我完善 README.md 檔案內容。
今天最後一輪聊天了,我決定把所有檔案內容都丟給 GPT,讓它自由發揮一下。
這一輪聊天的成果是這個 Commit。
經過一頓瞎聊,最終 GPT-4 寫了差不多200行,我寫了20行,Copilot 寫了20行,加一起,完成了一個初級的 Golang worker pool 程式 GoPool。
本文釋出時對應的程式碼版本看這裡。
接下來幾天我準備繼續和 GPT-4 瞎聊,讓它幫著把一個 worker pool 該有的功能都加上,並且不斷優化效能,看能不能打造一款炫酷的「開源 Golang Goroutine/Worker Pool 庫」。
當然,大夥有啥好想法,需求,或者 bug 反饋,歡迎直接提到 GitHub Issues
(關注不迷路,我的個人微信公眾號:「胡說雲原生」)
(關注不迷路,我的個人微信公眾號:「胡說雲原生」)
(關注不迷路,我的個人微信公眾號:「胡說雲原生」)