之前開發的一款基於OpenTelemetry的Tracing元件需要使用基於速率限制(Rate Limiting)的跟蹤取樣策略,本想使用現有的解決方案,比如System.Threading.RateLimiting名稱空間下的RateLimiter。大體看了RateLimiter的三種實現(固定視窗、滑動視窗和令牌桶),覺得過於相對複雜了點,程式碼還涉及到鎖,而且提供的功能我也不太需要,於是嘗試實現一種簡單且無鎖解決方案。
我為RateLimiter定義瞭如下這個簡單的IRateLimiter介面,唯一的無參方法TryAcquire利用返回的布林值確定當前是否超出設定的速率限制。我只提供的兩種基於時間視窗的實現,如下所示的基於「滑動時間視窗」的實現型別SliddingWindowRateLimiter,我們在構造的時候指定時間視窗和閾值。SliddingWindowRateLimiter採用一種「討巧」的實現,它直接利用了BoundedChannel<DateTimeOffset>物件,我們將指定的閾值作為它的最大容量。
public interface IRateLimiter { bool TryAcquire(); } public sealed class SliddingWindowRateLimiter: IRateLimiter { private readonly TimeSpan _window; private readonly ChannelReader<DateTimeOffset> _reader; private readonly ChannelWriter<DateTimeOffset> _writer; public SliddingWindowRateLimiter(TimeSpan window, int permit) { _window = window; var options = new BoundedChannelOptions (permit) { FullMode = BoundedChannelFullMode.Wait, SingleReader = false, SingleWriter = true }; var channel = Channel.CreateBounded<DateTimeOffset>(options); _reader = channel.Reader; _writer = channel.Writer; Task.Factory.StartNew(Trim,TaskCreationOptions.LongRunning); } public bool TryAcquire() => _writer.TryWrite(DateTimeOffset.UtcNow); private void Trim() { if (!_reader.TryPeek(out var timestamp)) { Task.Delay(_window).Wait(); Trim(); } else { var delay = _window - (DateTimeOffset.UtcNow - timestamp); if (delay > TimeSpan.Zero) { Task.Delay(delay).Wait(); Trim(); } else { var valueTask = _reader.ReadAsync(); if (!valueTask.IsCompleted) _ = valueTask.Result; Trim(); } } } }
在實現的TryAcquire方法中,我們試著將當前時間戳寫入這個Channel,並將寫入的結果(成功或者失敗)作為返回值。為了讓Channel中只包含指定時間視窗的時間戳,我們利用一個LongRuning的Task執行Trim方法對過期的時間戳進行「裁剪」。Trim會呼叫ChannelReader的TRyPeek方法,如果返回False,意味著Channel為空,此時會等待一段視窗時間再進行「裁剪」。如果提取出來時間戳在Now-Window與當前時間之間,意味著Channel裡面的時間戳均在設定的視窗內,此時同樣需要等待,等待時間為Window - (Now - Timestamp);只有在提取的時間超出視窗範圍,我們才需要將其從Channel中移除。
var limiter = new SliddingWindowRateLimiter(TimeSpan.FromSeconds(2),2);
var index = 0; await Task.WhenAll( Enumerable.Range(1, 100).Select(_ => Task.Run(() => { while (true) { if (limiter.TryAcquire()) { Console.WriteLine($"[{DateTimeOffset.Now}]{Interlocked.Increment(ref index)}"); } } })));
我們在上面的演示程式中使用這個SliddingWindowRateLimiter,設定的限速規則為 2/2s。我們建立了100個Task並行地呼叫這個SliddingWindowRateLimiter,並將它返回True時的時間戳顯示出來,具體輸出如下所示。
如下這個FixedWindowRateLimiter型別是針對「固定視窗」的實現,欄位_windowTicks和_permit同樣表示時間視窗的時長(這裡我們使用Int64型別的Ticks屬性)和閾值。 _nextWindowStartTimeTicks表示下一次固定視窗的起始時間,這個需要動態調整,為了確保只有一個執行緒能夠修改它,我們定義了_windowReseting這個「號誌」。_count是一個計數器,我們使用它確定是否「超速」。
public sealed class FixedWindowRateLimiter : IRateLimiter { private readonly long _windowTicks; private readonly int _permit; private long _nextWindowStartTimeTicks; private volatile int _count = 0; public FixedWindowRateLimiter(TimeSpan window, int permit) { _windowTicks = window.Ticks; _permit = permit; _nextWindowStartTimeTicks = DateTimeOffset.UtcNow.Add(window).Ticks; } public bool TryAcquire() { // 超出時間視窗,重置計數器,並調整下一個時間視窗的開始時間 var now = DateTimeOffset.UtcNow.Ticks; var nextWindowStartTimeTicks = nextWindowStartTimeTicks; if (now >= nextWindowStartTimeTicks && Interlocked.CompareExchange(ref _nextWindowStartTimeTicks, now + _windowTicks, nextWindowStartTimeTicks) == nextWindowStartTimeTicks) { Interlocked.Exchange(ref _count, 1); return true; } return _count < _permit && Interlocked.Increment(ref _count) <= _permit; } }
在實現的TryAcquire方法中,我們先確定當前時間是否超過了設定的「下一個視窗開始時間」,如果是則呼叫Interlocked.CompareExchange方法修改__nextWindowStartTimeTicks欄位。成功修改__nextWindowStartTimeTicks的執行緒會調整視窗開始時間,並重置計數器_count為1,並返回True。如果計數器大於等於設定閾值,方法返回False。否則我們讓計數器+1,如果該值<=閾值,返回True,否則返回False。
IRateLimiter limiter = new FixedWindowRateLimiter(window: TimeSpan.FromSeconds(2), permit: 2); var index = 0; await Task.WhenAll( Enumerable.Range(1, 100).Select(_ => Task.Run(() => { while (true) { if (limiter.TryAcquire()) { Console.WriteLine($"[{DateTimeOffset.Now}]{Interlocked.Increment(ref index)}"); } } })));
將FixedWindowRateLimiter應用到上面的演示程式,依然能得到我們希望的輸出結果。