在kubernetes中,使用go的channel無法滿足kubernetes的應用場景,如延遲、限速等;在kubernetes中存在三種佇列通用佇列 common queue
,延遲佇列 delaying queue
,和限速佇列 rate limiters queue
Interface作為所有佇列的一個抽象定義
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
type Type struct { // 一個work queue
queue []t // queue用slice做儲存
dirty set // 髒位,定義了需要處理的元素,類似於作業系統,表示已修改但為寫入
processing set // 當前正在處理的元素集合
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
type empty struct{}
type t interface{} // t queue中的元素
type set map[t]empty // dirty 和 processing中的元素
可以看到其中核心屬性就是 queue
, dirty
, processing
在研究優先順序佇列前,需要對 Heap
有一定的瞭解,因為delay queue使用了 heap
做延遲佇列
Heap
是基於樹屬性的特殊資料結構;heap是一種完全二元樹型別,具有兩種型別:
二元堆積的儲存規則:
那麼下列圖片中,那個是堆
heap的實現
步驟一:將新元素放入堆中的第一個可用位置。這將使結構保持為完整的二元樹,但它可能不再是堆,因為新元素可能具有比其父元素更大的值。
步驟二:如果新元素的值大於父元素,將新元素與父元素交換,直到達到新元素到根,或者新元素大於等於其父元素的值時將停止
這種過程被稱為 向上調整 (reheapification upward
)
步驟一:將根元素複製到用於返回值的變數中,將最深層的最後一個元素複製到根,然後將最後一個節點從樹中取出。該元素稱為 out-of-place
。
步驟二:而將異位元素與其最大值的子元素交換,並返回在步驟1中儲存的值。
這個過程被稱為向下調整 (reheapification downward
)
優先順序佇列的行為:
如何實現的:
在優先順序佇列中,heap的每個節點都包含一個元素以及元素的優先順序,並且維護樹以便它遵循使用元素的優先順序來比較節點的堆儲存規則:
實現的程式碼:golang priorityQueue
Reference
在Kubernetes中對 delaying queue
的設計非常精美,通過使用 heap
實現的延遲佇列,加上kubernetes中的通過佇列,完成了延遲佇列的功能。
// 註釋中給了一個hot-loop熱迴圈,通過這個loop實現了delaying
type DelayingInterface interface {
Interface // 繼承了workqueue的功能
AddAfter(item interface{}, duration time.Duration) // 在time後將內容新增到工作佇列中
}
具體實現了 DelayingInterface
的範例
type delayingType struct {
Interface // 通用的queue
clock clock.Clock // 對比的時間 ,包含一些定時器的功能
type Clock interface {
PassiveClock
type PassiveClock interface {
Now() time.Time
Since(time.Time) time.Duration
}
After(time.Duration) <-chan time.Time
NewTimer(time.Duration) Timer
Sleep(time.Duration)
NewTicker(time.Duration) Ticker
}
stopCh chan struct{} // 停止loop
stopOnce sync.Once // 保證退出只會觸發一次
heartbeat clock.Ticker // 一個定時器,保證了loop的最大空事件等待時間
waitingForAddCh chan *waitFor // 普通的chan,用來接收資料插入到延遲佇列中
metrics retryMetrics // 重試的指數
}
那麼延遲佇列的整個資料結構如下圖所示
而上面部分也說到了,這個延遲佇列的核心就是一個優先順序佇列,而優先順序佇列又需要滿足:
而 waitFor
就是這個優先順序佇列的資料結構
type waitFor struct {
data t // 資料
readyAt time.Time // 加入工作佇列的時間
index int // 優先順序佇列中的索引
}
而 waitForPriorityQueue
是對 container/heap/heap.go.Inferface
的實現,其資料結構就是使最小 readyAt
位於Root 的一個 MinHeap
type Interface interface {
sort.Interface
Push(x interface{}) // add x as element Len()
Pop() interface{} // remove and return element Len() - 1.
}
而這個的實現是 waitForPriorityQueue
type waitForPriorityQueue []*waitFor
func (pq waitForPriorityQueue) Len() int {
return len(pq)
}
// 這個也是最重要的一個,就是哪個屬性是排序的關鍵,也是heap.down和heap.up中使用的
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
// push 和pop 必須使用heap.push 和heap.pop
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n
*pq = append(*pq, item)
}
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)]
return item
}
// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}
而整個延遲佇列的核心就是 waitingLoop
,作為了延遲佇列的主要邏輯,檢查 waitingForAddCh
有沒有要延遲的內容,取出延遲的內容放置到 Heap
中;以及保證最大的阻塞週期
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
never := make(<-chan time.Time) // 作為預留位置
var nextReadyAtTimer clock.Timer // 最近一個任務要執行的定時器
waitingForQueue := &waitForPriorityQueue{} // 優先順序佇列,heap
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{} // 檢查是否反覆新增
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break // 時間沒到則不處理
}
entry = heap.Pop(waitingForQueue).(*waitFor) // 從優先順序佇列中取出一個
q.Add(entry.data) // 新增到延遲佇列中
delete(waitingEntryByData, entry.data) // 刪除map表中的資料
}
// 如果存在資料則設定最近一個內容要執行的定時器
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor) // 窺視[0]和值
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) // 建立一個定時器
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh: // 退出
return
case <-q.heartbeat.C(): // 多久沒有任何動作時重新一次迴圈
case <-nextReadyAt: // 如果有元素時間到了,則繼續執行迴圈,處理上面新增的操作
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) { // 時間沒到,是用readyAt和now對比time.Now
// 新增到延遲佇列中,有兩個 waitingEntryByData waitingForQueue
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false // 保證可以取完q.waitingForAddCh // addafter
for !drained {
select {
// 這裡是一個有buffer的佇列,需要保障這個佇列讀完
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default: // 保證可以退出,但限制於上一個分支的0~n的讀取
// 如果上一個分支阻塞,則為沒有資料就是取盡了,走到這個分支
// 如果上個分支不阻塞則讀取到上個分支阻塞為止,代表阻塞,則走default退出
drained = true
}
}
}
}
}
限速佇列 RateLimiting
是在優先順序佇列是在延遲佇列的基礎上進行擴充套件的一個佇列
type RateLimitingInterface interface {
DelayingInterface // 繼承延遲佇列
// 在限速器準備完成後(即合規後)新增條目到佇列中
AddRateLimited(item interface{})
// drop掉條目,無論成功或失敗
Forget(item interface{})
// 被重新放入佇列中的次數
NumRequeues(item interface{}) int
}
可以看到一個限速佇列的抽象對應只要滿足了 AddRateLimited()
, Forget()
, NumRequeues()
的延遲佇列都是限速佇列。看了解規則之後,需要對具體的實現進行分析。
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
rateLimitingType
則是對抽象規範 RateLimitingInterface
的實現,可以看出是在延遲佇列的基礎上增加了一個限速器 RateLimiter
type RateLimiter interface {
// when決定等待多長時間
When(item interface{}) time.Duration
// drop掉item
// or for success, we'll stop tracking it
Forget(item interface{})
// 重新加入佇列中的次數
NumRequeues(item interface{}) int
}
抽象限速器的實現,有 BucketRateLimiter
, ItemBucketRateLimiter
, ItemExponentialFailureRateLimiter
, ItemFastSlowRateLimiter
, MaxOfRateLimiter
,下面對這些限速器進行分析
BucketRateLimiter
是實現 rate.Limiter
與 抽象 RateLimiter
的一個令牌桶,初始化時通過 workqueue.DefaultControllerRateLimiter()
進行初始化。
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
ItemBucketRateLimiter
是作為列表儲存每個令牌桶的實現,每個key都是單獨的限速器
type ItemBucketRateLimiter struct {
r rate.Limit
burst int
limitersLock sync.Mutex
limiters map[interface{}]*rate.Limiter
}
func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
return &ItemBucketRateLimiter{
r: r,
burst: burst,
limiters: make(map[interface{}]*rate.Limiter),
}
}
如名所知 ItemExponentialFailureRateLimiter
限速器是一個錯誤指數限速器,根據錯誤的次數,將指數用於delay的時長,指數的計算公式為:\(baseDelay\times2^{<num-failures>}\)。 可以看出When絕定了流量整形的delay時間,根據錯誤次數為指數進行延長重試時間
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int // 失敗的次數
baseDelay time.Duration // 延遲基數
maxDelay time.Duration // 最大延遲
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
ItemFastSlowRateLimiter
,限速器先快速重試一定次數,然後慢速重試
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int // 最大嘗試次數
fastDelay time.Duration // 快的速度
slowDelay time.Duration // 慢的速度
}
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
return &ItemFastSlowRateLimiter{
failures: map[interface{}]int{},
fastDelay: fastDelay,
slowDelay: slowDelay,
maxFastAttempts: maxFastAttempts,
}
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1
// 當錯誤次數沒超過快速的閾值使用快速,否則使用慢速
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
MaxOfRateLimiter
是返回限速器列表中,延遲最大的那個限速器
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
return &MaxOfRateLimiter{limiters: limiters}
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
// 找到列表內所有的NumRequeues(失敗的次數),以最多次的為主。
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
基於流量管制的限速佇列範例,可以大量突發,但是需要進行整形,新增操作會根據 When()
中設計的需要等待的時間進行新增。根據不同的佇列實現不同方式的延遲
package main
import (
"fmt"
"log"
"strconv"
"time"
"k8s.io/client-go/util/workqueue"
)
func main() {
stopCh := make(chan string)
timeLayout := "2006-01-02:15:04:05.0000"
limiter := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
length := 20 // 一共請求20次
chs := make([]chan string, length)
for i := 0; i < length; i++ {
chs[i] = make(chan string, 1)
go func(taskId string, ch chan string) {
item := "Task-" + taskId + time.Now().Format(timeLayout)
log.Println(item + " Added.")
limiter.AddRateLimited(item) // 新增會根據When() 延遲新增到工作佇列中
}(strconv.FormatInt(int64(i), 10), chs[i])
go func() {
for {
key, quit := limiter.Get()
if quit {
return
}
log.Println(fmt.Sprintf("%s process done", key))
defer limiter.Done(key)
}
}()
}
<-stopCh
}
因為預設的限速器不支援初始化 QPS,修改原始碼內的為 \(BT(1, 5)\) ,執行結果可以看出,大突發流量時,超過桶內token數時,會根據token生成的速度進行放行。
圖中,任務的新增是突發性的,紀錄檔列印的是同時新增,但是在新增前輸出的紀錄檔,消費端可以看到實際是被延遲了。設定的是每秒一個token,實際上放行流量也是每秒一個token。