兩種基於時間視窗的限流器的簡單實現

2023-11-01 12:01:29

之前開發的一款基於OpenTelemetry的Tracing元件需要使用基於速率限制(Rate Limiting)的跟蹤取樣策略,本想使用現有的解決方案,比如System.Threading.RateLimiting名稱空間下的RateLimiter。大體看了RateLimiter的三種實現(固定視窗、滑動視窗和令牌桶),覺得過於相對複雜了點,程式碼還涉及到鎖,而且提供的功能我也不太需要,於是嘗試實現一種簡單且無鎖解決方案。

一、滑動時間視窗

我為RateLimiter定義瞭如下這個簡單的IRateLimiter介面,唯一的無參方法TryAcquire利用返回的布林值確定當前是否超出設定的速率限制。我只提供的兩種基於時間視窗的實現,如下所示的基於「滑動時間視窗」的實現型別SliddingWindowRateLimiter,我們在構造的時候指定時間視窗和閾值。SliddingWindowRateLimiter採用一種「討巧」的實現,它直接利用了BoundedChannel<DateTimeOffset>物件,我們將指定的閾值作為它的最大容量。

public interface IRateLimiter
{
    bool TryAcquire();
}

public sealed class SliddingWindowRateLimiter: IRateLimiter
{
    private readonly TimeSpan _window;
    private readonly ChannelReader<DateTimeOffset> _reader;
    private readonly ChannelWriter<DateTimeOffset> _writer;
    public SliddingWindowRateLimiter(TimeSpan window, int permit)
    {
        _window = window;
        var options = new BoundedChannelOptions (permit)
        {
            FullMode = BoundedChannelFullMode.Wait,
            SingleReader = false,
            SingleWriter = true
        };
        var channel = Channel.CreateBounded<DateTimeOffset>(options);
        _reader = channel.Reader;
        _writer = channel.Writer;
        Task.Factory.StartNew(Trim,TaskCreationOptions.LongRunning);
    }

    public bool TryAcquire() => _writer.TryWrite(DateTimeOffset.UtcNow);
    private void Trim()
    {
        if (!_reader.TryPeek(out var timestamp))
        {
            Task.Delay(_window).Wait();
            Trim();
        }
        else
        {
            var delay = _window - (DateTimeOffset.UtcNow - timestamp);
            if (delay > TimeSpan.Zero)
            {
                Task.Delay(delay).Wait();
                Trim();
            }
            else
            {
                var valueTask = _reader.ReadAsync();
                if (!valueTask.IsCompleted) _ = valueTask.Result;
                Trim();
            }
        }
    }
}

在實現的TryAcquire方法中,我們試著將當前時間戳寫入這個Channel,並將寫入的結果(成功或者失敗)作為返回值。為了讓Channel中只包含指定時間視窗的時間戳,我們利用一個LongRuning的Task執行Trim方法對過期的時間戳進行「裁剪」。Trim會呼叫ChannelReader的TRyPeek方法,如果返回False,意味著Channel為空,此時會等待一段視窗時間再進行「裁剪」。如果提取出來時間戳在Now-Window與當前時間之間,意味著Channel裡面的時間戳均在設定的視窗內,此時同樣需要等待,等待時間為Window - (Now - Timestamp);只有在提取的時間超出視窗範圍,我們才需要將其從Channel中移除。

var limiter = new SliddingWindowRateLimiter(TimeSpan.FromSeconds(2),2);

var index = 0; await Task.WhenAll( Enumerable.Range(1, 100).Select(_ => Task.Run(() => { while (true) { if (limiter.TryAcquire()) { Console.WriteLine($"[{DateTimeOffset.Now}]{Interlocked.Increment(ref index)}"); } } })));

我們在上面的演示程式中使用這個SliddingWindowRateLimiter,設定的限速規則為 2/2s。我們建立了100個Task並行地呼叫這個SliddingWindowRateLimiter,並將它返回True時的時間戳顯示出來,具體輸出如下所示。

image

二、固定時間視窗

如下這個FixedWindowRateLimiter型別是針對「固定視窗」的實現,欄位_windowTicks和_permit同樣表示時間視窗的時長(這裡我們使用Int64型別的Ticks屬性)和閾值。 _nextWindowStartTimeTicks表示下一次固定視窗的起始時間,這個需要動態調整,為了確保只有一個執行緒能夠修改它,我們定義了_windowReseting這個「號誌」。_count是一個計數器,我們使用它確定是否「超速」。

public sealed class FixedWindowRateLimiter : IRateLimiter
{
    private readonly long _windowTicks;
    private readonly int _permit;
    private long _nextWindowStartTimeTicks;
    private volatile int _count = 0;

    public FixedWindowRateLimiter(TimeSpan window, int permit)
    {
        _windowTicks = window.Ticks;
        _permit = permit;
        _nextWindowStartTimeTicks = DateTimeOffset.UtcNow.Add(window).Ticks;
    }

    public bool TryAcquire()
    {
        // 超出時間視窗,重置計數器,並調整下一個時間視窗的開始時間
        var now = DateTimeOffset.UtcNow.Ticks;
        var nextWindowStartTimeTicks = nextWindowStartTimeTicks;
        if (now >= nextWindowStartTimeTicks && Interlocked.CompareExchange(ref _nextWindowStartTimeTicks, now + _windowTicks, nextWindowStartTimeTicks) == nextWindowStartTimeTicks)
        {
            Interlocked.Exchange(ref _count, 1);
            return true;
        }
        return _count < _permit && Interlocked.Increment(ref _count) <= _permit;
    }
}

在實現的TryAcquire方法中,我們先確定當前時間是否超過了設定的「下一個視窗開始時間」,如果是則呼叫Interlocked.CompareExchange方法修改__nextWindowStartTimeTicks欄位。成功修改__nextWindowStartTimeTicks的執行緒會調整視窗開始時間,並重置計數器_count為1,並返回True。如果計數器大於等於設定閾值,方法返回False。否則我們讓計數器+1,如果該值<=閾值,返回True,否則返回False。

IRateLimiter limiter = new FixedWindowRateLimiter(window: TimeSpan.FromSeconds(2), permit: 2); var index = 0; await Task.WhenAll( Enumerable.Range(1, 100).Select(_ => Task.Run(() => { while (true) { if (limiter.TryAcquire()) { Console.WriteLine($"[{DateTimeOffset.Now}]{Interlocked.Increment(ref index)}"); } } })));

將FixedWindowRateLimiter應用到上面的演示程式,依然能得到我們希望的輸出結果。

image