本文將介紹 Go 語言中的 Weighted
並行原語,包括 Weighted
的基本使用方法、實現原理、使用注意事項等內容。能夠更好地理解和應用 Weighted
來實現資源的管理,從而提高程式的穩定性。
在微服務架構中,我們的服務節點負責接收其他節點的請求,並提供相應的功能和資料。比如賬戶服務,其他服務需要獲取賬戶資訊,都會通過rpc請求向賬戶服務發起請求。
這些服務節點通常以叢集的方式部署在伺服器上,用於處理大量的並行請求。每個伺服器都有其處理能力的上限,超過該上限可能導致效能下降甚至崩潰。
在部署服務時,通常會評估服務的並行量,併為其分配適當的資源以處理預期的請求負載。然而,在微服務架構中,存在著上游服務請求下游服務的場景。如果上游服務在某些情況下沒有正確考慮並行量,或者由於某些異常情況導致大量請求傳送給下游服務,那麼下游服務可能面臨超過其處理能力的問題。這可能導致下游服務的響應時間增加,甚至無法正常處理請求,進而影響整個系統的穩定性和可用性。下面用一個簡單的程式碼來說明一下:
package main
import (
"fmt"
"net/http"
"sync"
)
func main() {
// 啟動下游服務,用於處理請求
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// 模擬下游服務的處理邏輯
// ...
// 完成請求處理後,從等待組中刪除一個等待
wg.Done()
})
// 啟動下游服務的 HTTP 伺服器
http.ListenAndServe(":8080", nil)
}
這裡啟動一個簡單的HTTP伺服器,由其來模擬下游服務,來接收上游服務的請求。下面我們啟動一個簡單的程式,由其來模擬上游服務傳送請求:
func main() {
// 建立一個等待組,用於等待所有請求完成
var wg sync.WaitGroup
// 模擬上游服務傳送大量請求給下游服務
go func() {
for i := 0; i < 1000000; i++ {
wg.Add(1)
go sendRequest(&wg)
}
}()
// 等待所有請求完成
wg.Wait()
}
func sendRequest(wg *sync.WaitGroup) {
// 模擬上游服務傳送請求給下游服務
resp, err := http.Get("http://localhost:8080/")
if err != nil {
fmt.Println("請求失敗:", err)
} else {
fmt.Println("請求成功:", resp.Status)
}
// 請求完成後,通知等待組
wg.Done()
}
這裡,我們同時啟動了1000000個協程同時往HTTP伺服器傳送請求,如果伺服器設定不夠高,亦或者是請求量更多的情況下,已經超過了伺服器的處理上限,伺服器沒有主夠的資源去處理這些請求,此時將有可能直接將伺服器打掛掉,服務直接不可用。在這種情況下,如果由於上游服務的問題,導致下游服務,甚至整個鏈路的系統都直接崩潰,這個是不合理的,此時需要有一些手段保護下游服務由於異常流量導致整個系統的崩潰。
這裡對上面的場景進行分析,可以發現,此時是由於上游服務大量請求的過來,而當前服務並沒有足夠的資源去處理這些請求,但是並沒有對其加以限制,而是繼續處理,最終導致了整個系統的不可用。那麼此時就應該進行限流,對並行請求量進行控制,對伺服器能夠處理的並行數進行合理評估,當並行請求數超過了限制,此時應該直接拒絕其存取,避免整個系統的不可用。
那問題來了,go語言中,有什麼方法能夠實現資源的管理,如果沒有足夠的資源,此時將直接返回,不對請求進行處理呢?其實go語言中有Weighted
型別,在這種場景還挺合適的。下面我們將對其進行介紹。
Weighted
是 Go 語言中 golang.org/x/sync
包中的一種型別,用於限制並行存取某個資源的數量。它提供了一種機制,允許呼叫者以不同的權重請求存取資源,並在資源可用時進行授予。
Weighted
的定義如下,提供了Acquire
,TryAcquire
,Release
三個方法:
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
func (s *Weighted) Acquire(ctx context.Context, n int64) error{}
func (s *Weighted) TryAcquire(n int64) bool{}
func (s *Weighted) Release(n int64) {}
Acquire
: 以權重 n
請求獲取資源,阻塞直到資源可用或上下文 ctx
結束。TryAcquire
: 嘗試以權重 n
獲取號誌,如果成功則返回 true
,否則返回 false
,並保持號誌不變。Release
:釋放具有權重 n
的號誌。有時候,不同請求對資源的消耗是不同的。通過設定權重,你可以更好地控制不同請求對資源的使用情況。例如,某些請求可能需要更多的計算資源或更長的處理時間,你可以設定較高的權重來確保它們能夠獲取到足夠的資源。
其次就是權重大隻是代表著請求需要使用到的資源多,對於優先順序並不會有作用。在Weighted
中,資源的許可是以先進先出(FIFO)的順序分配的,而不是根據權重來決定獲取的優先順序。當有多個請求同時等待獲取資源時,它們會按照先後順序依次獲取資源的許可。
假設先請求權重為 1 的資源,然後再請求權重為 2 的資源。如果當前可用的資源許可足夠滿足兩個請求的總權重,那麼先請求的權重為 1 的資源會先獲取到許可,然後是後續請求的權重為 2 的資源。
w.Acquire(context.Background(), 1) // 權重為 1 的請求先獲取到資源許可
w.Acquire(context.Background(), 2) // 權重為 2 的請求在權重為 1 的請求之後獲取到資源許可
當使用Weighted
來控制資源的並行存取時,通常需要以下幾個步驟:
Weighted
範例,定義好最大資源數Acquire
方法佔據資源Release
方法釋放資源下面是一個簡單的程式碼的範例,展示瞭如何使用Weighted
實現資源控制:
func main() {
// 1. 建立一個號誌範例,設定最大並行數
sem := semaphore.NewWeighted(10)
// 具體處理請求的函數
handleRequest := func(id int) {
// 2. 呼叫Acquire嘗試獲取資源
err := sem.Acquire(context.Background(), 1)
if err != nil {
fmt.Printf("Goroutine %d failed to acquire resource\n", id)
}
// 3. 成功獲取資源,使用defer,在任務執行完之後,自動釋放資源
defer sem.Release(1)
// 執行業務邏輯
return
}
// 模擬並行請求
for i := 0; i < 20; i++ {
go handleRequest(i)
}
time.Sleep(20 * time.Second)
}
首先,呼叫NewWeighted
方法建立一個號誌範例,設定最大並行數為10。然後在每次請求處理前呼叫Acquire
方法嘗試獲取資源,成功獲取資源後,使用defer
關鍵字,在任務執行完後自動釋放資源,呼叫Release
方法釋放一個資源。
保證最多同時有10個協程獲取資源。如果有更多的協程嘗試獲取資源,它們會等待其他協程釋放資源後再進行獲取。
Weighted
型別的設計初衷是為了在並行環境中實現對資源的控制和限制。它提供了一種簡單而有效的機制,允許在同一時間內只有一定數量的並行操作可以存取或使用特定的資源。
Weighted
型別的基本實現原理是基於計數號誌的概念。計數號誌是一種用於控制並行存取的同步原語,它維護一個可用資源的計數器。在Weighted
中,該計數器表示可用的資源數量。
當一個任務需要獲取資源時,它會呼叫Acquire
方法。該方法首先會檢查當前可用資源的數量,如果大於零,則表示有可用資源,並將計數器減一,任務獲取到資源,並繼續執行。如果當前可用資源的數量為零,則任務會被阻塞,直到有其他任務釋放資源。
當一個任務完成對資源的使用後,它會呼叫Release
方法來釋放資源。該方法會將計數器加一,表示資源已經可用,其他被阻塞的任務可以繼續獲取資源並執行。
通過這種方式,Weighted
實現了對資源的限制和控制。它確保在同一時間內只有一定數量的並行任務可以存取資源,超過限制的任務會被阻塞,直到有其他任務釋放資源。這樣可以有效地避免資源過度使用和競爭,保證系統的穩定性和效能。
Weighted
的結構體定義如下:
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
size
:表示資源的總數量,即可以同時獲取的最大資源數量。cur
:表示當前已經被獲取的資源數量。mu
:用於保護Weighted
型別的互斥鎖,確保並行安全性。waiters
:使用雙向連結串列來儲存等待獲取資源的任務。Acquire
方法將獲取指定數量的資源。如果當前可用資源數量不足,呼叫此方法的任務將被阻塞,並加入到等待佇列中。
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
// 1. 使用互斥鎖s.mu對Weighted型別進行加鎖,確保並行安全性。
s.mu.Lock()
// size - cur 代表剩餘可用資源數,如果大於請求資源數n, 此時代表剩餘可用資源 大於 需要的資源數
// 其次,Weighted資源分配的順序是FIFO,如果等待佇列不為空,當前請求就需要自動放到佇列最後面
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// s.size 代表最大資源數,如果需要的資源數 大於 最大資源數,此時直接返回錯誤
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
// 這裡代表著當前暫時獲取不到資源,此時將建立一個waiter物件放到等待佇列最後
ready := make(chan struct{})
// waiter物件中包含需要獲取的資源數量n和通知通道ready。
w := waiter{n: n, ready: ready}
// 將waiter物件放到佇列最後
elem := s.waiters.PushBack(w)
// 釋放鎖,讓其他請求進來
s.mu.Unlock()
select {
// 如果ctx.Done()通道被關閉,表示上下文已取消,任務需要返回錯誤。
case <-ctx.Done():
err := ctx.Err()
// 新獲取鎖,檢查是否已經成功獲取資源。如果成功獲取資源,將錯誤置為nil,表示獲取成功;
s.mu.Lock()
select {
// 通過判斷ready channel是否接收到訊號,從而來判斷是否成功獲取資源
case <-ready:
err = nil
default:
// 判斷是否是等待佇列中第一個元素
isFront := s.waiters.Front() == elem
// 將該請求從等待佇列中移除
s.waiters.Remove(elem)
// 如果是第一個等待物件,同時還有剩餘資源,喚醒後面的waiter。說不定後面的waiter剛好符合條件
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
// ready通道接收到資料,代表此時已經成功佔據到資源了
case <-ready:
return nil
}
}
Weighted
物件用來控制可用資源的數量。它有兩個重要的欄位,cur和size,分別表示當前可用的資源數量和總共可用的資源數量。
當一個請求通過Acquire
方法請求資源時,首先會檢查剩餘資源數量是否足夠,並且等待佇列中沒有其他請求在等待資源。如果滿足這兩個條件,請求就可以成功獲取到資源。
如果剩餘資源數量不足以滿足請求,那麼一個waiter
的物件會被建立並放入等待佇列中。waiter
物件包含了請求需要的資源數量n和一個用於通知的通道ready。當其他請求呼叫Release
方法釋放資源時,它們會檢查等待佇列中的waiter
物件是否滿足資源需求,如果滿足,就會將資源分配給該waiter
物件,並通過ready
通道來通知它可以執行業務邏輯了。
即使剩餘資源數量大於請求所需數量,如果等待佇列中存在等待的請求,新的請求也會被放入等待佇列中,而不管資源是否足夠。這可能導致一些請求長時間等待資源,導致資源的浪費和延遲。因此,在使用Weighted
進行資源控制時,需要謹慎評估資源配額,並避免資源飢餓的情況發生,以免影響系統的效能和響應能力。
Release
方法將釋放指定數量的資源。當資源被釋放時,會檢查等待佇列中的任務。它從隊頭開始逐個檢查等待的元素,並嘗試為它們分配資源,直到最後一個不滿足資源條件的元素為止。
func (s *Weighted) Release(n int64) {
// 1. 使用互斥鎖s.mu對Weighted型別進行加鎖,確保並行安全性。
s.mu.Lock()
// 2. 釋放資源
s.cur -= n
// 3. 異常情況處理
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
// 4. 喚醒等待任務
s.notifyWaiters()
s.mu.Unlock()
}
可以看到,Release
方法實現相對比較簡單,釋放資源後,便直接呼叫notifyWaiters
方法喚醒處於等待狀態的任務。下面來看看notifyWaiters
方法的具體實現:
func (s *Weighted) notifyWaiters() {
for {
// 獲取隊頭元素
next := s.waiters.Front()
// 已經沒有處於等待狀態的協程,此時直接返回
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
// 如果資源不滿足要求 當前waiter的要求,此時直接返回
if s.size-s.cur < w.n {
break
}
// 否則佔據waiter需要的資源數
s.cur += w.n
// 移除等待元素
s.waiters.Remove(next)
// 喚醒處於等待狀態的任務,Acquire方法會 <- ready 來等待訊號的到來
close(w.ready)
}
}
notifyWaiters
方法會從隊頭開始獲取元素,判斷當前資源的剩餘數,是否滿足waiter
的要求,如果滿足的話,此時先佔據該waiter
需要的資源,之後再將其從等待佇列中移除,最後呼叫close
方法,喚醒處於等待狀態的任務。 之後,再繼續佇列中取出元素,判斷是否滿足條件,迴圈反覆,直到不滿足waiter
的條件為止。
TryAcquire
方法將嘗試獲取指定數量的資源,但不會阻塞。如果可用資源不足,它會立即返回一個錯誤,而不是阻塞等待。實現比較簡單,只是簡單檢查當前資源數是否滿足要求而已,具體如下:
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
當使用Weighted
來管理資源時,確保在使用完資源後,及時呼叫Release
方法釋放資源。如果不這樣做,將會導致資源洩漏,最終導致所有的請求都將無法被處理。下面展示一個簡單的程式碼說明:
package main
import (
"fmt"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
func main() {
sem := semaphore.NewWeighted(5) // 建立一個最大並行數為5的Weighted範例
// 模擬使用資源的任務
task := func(id int) {
//1. 成功獲取資源
if err := sem.Acquire(context.Background(), 1); err != nil {
fmt.Printf("Task %d failed to acquire resource: %s\n", id, err)
return
}
// 2. 任務處理完成之後,資源沒有被釋放
// defer sem.Release(1) // 使用defer確保在任務完成後釋放資源
}
// 啟動多個任務並行執行
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task(id)
}(i)
}
wg.Wait() // 等待所有任務完成
}
在上面的程式碼中,我們使用Weighted
來控制最大並行數為5。我們在任務中沒有呼叫sem.Release(1)
釋放資源,這些資源將一直被佔用,後面啟動的5個任務將永遠無法獲取到資源,此時將永遠不會繼續執行下去。因此,務必在使用完資源後及時呼叫Release
方法釋放資源,以確保資源的正確回收和釋放,保證系統的穩定性和效能。
而且這裡最好使用defer
語句來實現資源的釋放,避免Release
函數在某些異常場景下無法被執行到。
Weighted
只是提供了一種管理資源的手段,具體的並行數還需要開發人員自行根據系統的實際需求和資源限制,合理設定Weighted
範例的最大並行數。過大的並行數可能導致資源過度競爭,而過小的並行數可能限制了系統的吞吐量。
具體操作可以到線上預釋出環境,不斷調整觀察,獲取到一個最合適的並行數。
Weighted
型別可以用於限制並行存取資源的數量,但它也存在一些潛在的缺點,需要根據具體的應用場景和需求權衡利弊。
首先是記憶體開銷,Weighted
型別使用一個 sync.Mutex
以及一個 list.List
來管理等待佇列,這可能會佔用一定的記憶體開銷。對於大規模的並行處理,特別是在限制極高的情況下,可能會影響系統的記憶體消耗。
其次是Weighted
型別一旦初始化,最大並行數是固定的,無法在執行時動態調整。如果你的應用程式需要根據負載情況動態調整並行限制,可能需要使用其他機制或實現。
而且Weighted
是嚴格按照FIFO請求順序來分配資源的,當某些請求的權重過大時,可能會導致其他請求飢餓,即長時間等待資源。
最後,則是由於 Weighted
型別使用了互斥鎖來保護共用狀態,因此在高並行情況下,爭奪鎖可能成為效能瓶頸,影響系統的吞吐量。
因此,在使用 Weighted
型別時,需要根據具體的應用場景和需求權衡利弊,從而來決定是否使用Weighted
來實現資源的管理控制。
本文介紹了一種解決系統中資源管理問題的解決方案Weighted
。本文從問題引出,詳細介紹了Weighted
的特點和使用方法。通過了解Weighted
的設計初衷和實現原理,讀者可以更好地理解其工作原理。
同時,文章提供了使用Weighted
時需要注意的事項,如及時釋放資源、合理設定並行數等,從而幫助讀者避免潛在的問題,以及能夠在比較合適的場景下使用到Weighted
型別實現資源管理。基於此,我們完成了對Weighted
的介紹,希望對你有所幫助。你的點贊和收藏將是我最大的動力,比心~