理解ASP.NET Core

2023-07-18 12:02:08

注:本文隸屬於《理解ASP.NET Core》系列文章,請檢視置頂部落格或點選此處檢視全文目錄

概述

在微服務化的架構設計中,閘道器扮演著重要的看門人角色,它所提供的功能之一就是限流。而對於眾多非微服務化的系統來說,可能並不會部署閘道器(無論是因為成本還是複雜度),在這種場景下,為了實現限流,微軟在 .NET 7 中提供了官方的限流中介軟體。下面我們一起來看一下。

註冊限流策略

首先,確保你的應用依賴的 SDK 版本 >= 7,接著通過AddRateLimiter擴充套件方法註冊限流服務,並新增限流策略,然後通過UseRateLimiter啟用限流中介軟體,最後設定某個路由的請求使用限流策略:

builder.Services.AddRateLimiter(limiterOptions =>
{
    // 設定限流策略
});

app.UseRateLimiter();

app.MapGet("LimitTest", async () =>
{
    await Task.Delay(TimeSpan.FromSeconds(1));
    return Results.Ok($"Limiter");
}).RequireRateLimiting("my_policy");

微軟為我們提供了 4 種常用的限流演演算法:

  • FixedWindowLimiter:固定視窗限流器
  • SlidingWindowLimiter:滑動視窗限流器
  • TokenBucketLimiter:令牌桶限流器
  • ConcurrencyLimiter:並行限流器

我們通常會註冊一個命名限流策略,並在該策略內指定限流演演算法,以及其他限流邏輯。

另外,需要關注一下UseRateLimiter的呼叫位置。若限流行為作用於特定路由,則限流中介軟體必須放置在UseRouting之後。

FixedWindowLimiter

固定視窗限流器是一種簡單的限流方式:

  • 工作原理:使用固定的時間長度來限制請求數量。假設固定視窗長度為10s,則每10s就會切換(銷燬並建立)一個新的視窗,在每個單獨的視窗內,限制請求流量。
  • 特點:
    • 優點:實現簡單,佔用記憶體低
    • 缺點:
      • 當視窗中請求流量到達閾值時,流量會被瞬間切斷,不能平滑地處理突發流量(實際應用中理想效果是讓流量平滑地進入系統中)
      • 視窗切換時可能會出現 2 倍請求流量。比如視窗大小為 1s,閾值為100,視窗 1 在後 500ms 內處理了 100 個請求,視窗 2 在前 500ms 內也處理了 100 個請求,這樣就導致在 1s 內處理了 200 個請求
builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddFixedWindowLimiter(policyName: "fixed", fixedOptions =>
    {
        fixedOptions.PermitLimit = 4;
        fixedOptions.Window = TimeSpan.FromSeconds(60);
        fixedOptions.QueueLimit = 2;
        fixedOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
        fixedOptions.AutoReplenishment = true;
    });
});

public sealed class FixedWindowRateLimiterOptions
{
    public TimeSpan Window { get; set; } = TimeSpan.Zero;

    public bool AutoReplenishment { get; set; } = true;

    public int PermitLimit { get; set; }

    public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;

    public int QueueLimit { get; set; }
}

如上所示,我們通過AddFixedWindowLimiter新增了一個固定視窗限流策略,並指定策略名為fixed。它的含義是視窗時間長度為60s,在每個視窗時間範圍內,最多允許4個請求被處理。

各設定項含義如下:

  • PermitLimit:視窗閾值,即每個視窗時間範圍內,最多允許的請求個數。這裡指定為最多允許4個請求。該值必須 > 0
  • Window:視窗大小,即時間長度。這裡設定為 60 s。該值必須 > TimeSpan.Zero
  • QueueLimit
    • 當視窗請求數達到最大時,後續請求會進入排隊,用於設定佇列的大小(即允許幾個請求在裡面排隊等待)
    • 這裡設定為佇列中最多允許 2 個請求排隊,也就是說,在這個視窗內,可以最多有6個請求,4個會被處理,2個則在排隊,其他的則會在一定時間後拒絕返回 RejectionStatusCode
    • 該值必須 >= 0
  • QueueProcessingOrder:排隊請求的處理順序。這裡設定為優先處理先來的請求
  • AutoReplenishment:指示開啟新視窗時是否自動重置請求限制,該值預設為true。如果設定為false,則需要手動呼叫 FixedWindowRateLimiter.TryReplenish來重置

SlidingWindowLimiter

滑動視窗限流器是固定視窗限流器的升級版:

  • 工作原理:
    • 在固定視窗限流器的基礎上,它再將每個視窗劃分為多個段,每經過一個段的時間間隔(= 視窗時間 / 視窗段的個數),視窗就會向後滑動一段,所以稱為滑動視窗(視窗大小仍是固定的)。
    • 當視窗滑動後,會「吃進」一個段(稱為當前段),並「吐出」一個段(稱為過期段),過期段會被回收,回收的請求數可以用於當前段。
  • 特點:
    • 優點:按段滑動處理,相對於固定視窗來說,可以對流量進行更精準的控制,更平滑的處理突發流量,並且段劃分的越多,移動更平滑。
    • 缺點:對時間精度要求高,比固定視窗實現複雜,記憶體佔用更高
builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddSlidingWindowLimiter(policyName: "sliding", slidingOptions =>
    {
        slidingOptions.PermitLimit = 100;
        slidingOptions.Window = TimeSpan.FromSeconds(30);
        slidingOptions.QueueLimit = 2;
        slidingOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
        slidingOptions.AutoReplenishment = true;
        slidingOptions.SegmentsPerWindow = 3;
    });
});

public sealed class SlidingWindowRateLimiterOptions
{
    public TimeSpan Window { get; set; } = TimeSpan.Zero;

    public int SegmentsPerWindow { get; set; }

    public bool AutoReplenishment { get; set; } = true;

    public int PermitLimit { get; set; }

    public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;

    public int QueueLimit { get; set; }
}

如上所示,我們通過AddSlidingWindowLimiter新增了一個滑動視窗限流策略,並指定策略名為sliding。它的含義是視窗時間長度為30s,在每個視窗時間範圍內,最多允許100個請求,視窗段數為 3,每個段的時間間隔為 30s / 3 = 10s,即視窗每 10s 滑動一段。

各設定項含義如下:

  • PermitLimit:視窗閾值,即每個視窗時間範圍內,最多允許的請求個數。這裡指定為最多允許100個請求。該值必須 > 0
  • Window:視窗大小,即時間長度。這裡設定為 30 s。該值必須 > TimeSpan.Zero
  • QueueLimit
    • 當視窗請求數達到最大時,後續請求會進入排隊,用於設定佇列的大小(即允許幾個請求在裡面排隊等待)
    • 這裡設定為佇列中最多允許 2 個請求排隊
    • 該值必須 >= 0
  • QueueProcessingOrder:排隊請求的處理順序。這裡設定為優先處理先來的請求
  • AutoReplenishment:指示開啟新視窗時是否自動重置請求限制,該值預設為true。如果設定為false,則需要手動呼叫 SlidingWindowRateLimiter.TryReplenish來重置
  • SegmentsPerWindow:每個視窗的段的個數,通過它可以計算出每個段滑動的時間間隔。這裡設定段數為 3,時間間隔為 10s。該值必須 > 0

為了更好地理解滑動視窗限流器的工作原理,下面我會借用官方檔案提供的一張圖來詳細解釋一下:

假設:限制每個視窗的請求數為 100,視窗時間為 30s,每個視窗的段數為 3,那麼每個段的時間間隔就是 30s / 3 = 10s。
定義:當前段結存請求數 = 當前段可用請求數 - 處理請求數 + 回收請求數

限流器工作流程:

  1. 在第 1 個段時(0s~10s),當前段可用請求數為 100,處理了 20 個請求,回收請求 0 個,那麼結存請求數 = 100 - 20 + 0 = 80
  2. 在第 2 個段時(10s~20s),當前段可用請求數為 80,處理了 30 個請求,回收請求 0 個,那麼結存請求數 = 80 - 30 + 0 = 50
  3. 在第 3 個段時(20s~30s),當前段可用請求數為 50,處理了 40 個請求,回收請求 0 個,那麼結存請求數 = 50 - 40 + 0 = 10
  4. 在第 4 個段時(30s~40s),當前段可用請求數為 10,處理了 30 個請求,回收第 1 個段的請求 20 個,那麼結存請求數 = 10 - 30 + 20 = 0
  5. 在第 5 個段時(40s~50s),當前段可用請求數為 0,處理了 10 個請求,回收第 2 個段的請求 30 個,那麼結存請求數 = 0 - 10 + 30 = 20
  6. 在第 6 個段時(50s~60s),當前段可用請求數為 20,處理了 10 個請求,回收第 3 個段的請求 40 個,那麼結存請求數 = 20 - 10 + 40 = 50

TokenBucketLimiter

令牌桶限流器是一種限制資料平均傳輸速率的限流演演算法:

  • 工作原理:想象有一個桶,每個固定時間段會向桶內放入固定數量的令牌(token),當桶內令牌裝滿時,新的令牌將會被丟棄。當請求流量進入時,會先從桶內拿 1 個令牌,拿到了則該請求會被處理,沒拿到則會在佇列中等待,若佇列已滿,則會被限流拒絕處理。
  • 特點:可以限制資料的平均傳輸速率,還可以一次性耗盡令牌應對突發流量,並平滑地處理後續流量,是一種通用的演演算法

以下圖為例,桶內有 3 個令牌(token),進來了 5 個請求,前三個請求可以拿到令牌(token),它們會被處理,後面兩個就只能排隊或被限流拒絕。

builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddTokenBucketLimiter(policyName: "token_bucket", tokenBucketOptions =>
    {
        tokenBucketOptions.TokenLimit = 4;
        tokenBucketOptions.ReplenishmentPeriod = TimeSpan.FromSeconds(10);
        tokenBucketOptions.TokensPerPeriod = 2;
        tokenBucketOptions.QueueLimit = 2;
        tokenBucketOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
        tokenBucketOptions.AutoReplenishment = true;
    });
});

public sealed class TokenBucketRateLimiterOptions
{
    public TimeSpan ReplenishmentPeriod { get; set; } = TimeSpan.Zero;

    public int TokensPerPeriod { get; set; }

    public bool AutoReplenishment { get; set; } = true;

    public int TokenLimit { get; set; }

    public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;

    public int QueueLimit { get; set; }
}

如上所示,我們通過AddTokenBucketLimiter新增了一個令牌桶限流策略,並指定策略名為token_bucket。它的含義是桶最多可以裝 4 個令牌,每 10s 發放一次令牌,每次發放 2 個令牌,所以在一個發放週期內,最多可以處理 4 個請求,至少可以處理 2 個請求

各設定項含義如下:

  • TokenLimit:桶最多可以裝的令牌數,發放的多餘令牌會被丟棄。這裡設定為最多裝 4 個令牌。該值必須 > 0
  • ReplenishmentPeriod:令牌發放週期,即多長時間發放一次令牌。這裡設定為 10 s。該值必須 > TimeSpan.Zero
  • TokensPerPeriod:每個週期發放的令牌數,即每個週期向桶內放入的令牌數(若超過桶可裝令牌數的最大值,則會被丟棄)。這裡設定為 2 個。該值必須 > 0
  • QueueLimit
    • 當桶內的令牌全部被拿完(token 數為 0)時,後續請求會進入排隊,用於設定佇列的大小(即允許幾個請求在裡面排隊等待)
    • 這裡設定為佇列中最多允許 2 個請求排隊
    • 該值必須 >= 0
  • QueueProcessingOrder:排隊請求的處理順序。這裡設定為優先處理先來的請求
  • AutoReplenishment:指示當進入新的令牌發放週期時,是否自動發放令牌,該值預設為true。如果設定為false,則需要手動呼叫 TokenBucketRateLimiter.TryReplenish來發放

ConcurrencyLimiter

並行限流器不是限制一段時間內的最大請求數,而是限制並行數:

  • 工作原理:限制同一時刻並行請求的數量
  • 特點:可以充分利用伺服器效能,當出現突發流量時,伺服器負載可能會持續過高。
builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddConcurrencyLimiter(policyName: "concurrency", concurrencyOptions =>
    {
        concurrencyOptions.PermitLimit = 4;
        concurrencyOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
        concurrencyOptions.QueueLimit = 2;
    });
});

public sealed class ConcurrencyLimiterOptions
{
    public int PermitLimit { get; set; }

    public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;

    public int QueueLimit { get; set; }
}

如上所示,我們通過AddConcurrencyLimiter新增了一個並行限流策略,並指定策略名為concurrency。它的含義是最多可以並行4個請求被處理。

各設定項含義如下:

  • PermitLimit:最多並行的請求數。該值必須 > 0
  • QueueLimit
    • 當並行請求數達到最大時,後續請求會進入排隊,用於設定佇列的大小(即允許幾個請求在裡面排隊等待)
    • 這裡設定為佇列中最多允許 2 個請求排隊
    • 該值必須 >= 0
  • QueueProcessingOrder:排隊請求的處理順序。這裡設定為優先處理先來的請求

RateLimiterOptions

上面已經把常用的限流演演算法介紹完了,下面來看一下可以通過limiterOptions進行哪些設定:

public sealed class RateLimiterOptions
{
    // 僅保留了常用的設定項,其他相關程式碼均忽略

    // 全域性限流器
    public PartitionedRateLimiter<HttpContext>? GlobalLimiter { get; set; }

    // 當請求被限流拒絕時執行
    public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; set; }

    // 當期你去被限流拒絕時的 Http 響應狀態碼
    public int RejectionStatusCode { get; set; } = StatusCodes.Status503ServiceUnavailable;
}

GlobalLimiter

通過GlobalLimiter,我們可以設定全域性限流器,更準確的說法是全域性分割區限流器,該限流器會應用於所有請求。執行順序為先執行全域性限流器,再執行特定於路由終結點的限流器(如果存在的話)。

需要注意的是,相對於上面註冊的限流策略來說,GlobalLimiter已經是一個限流器範例了,所以需要分配給他一個分割區限流器範例,通過PartitionedRateLimiter.Create來建立。

builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, IPAddress>(context =>
    {
        IPAddress? remoteIpAddress = context.Connection.RemoteIpAddress;

        // 針對非迴環地址限流
        if (!IPAddress.IsLoopback(remoteIpAddress!))
        {
            return RateLimitPartition.GetTokenBucketLimiter
            (remoteIpAddress!, _ =>
                new TokenBucketRateLimiterOptions
                {
                    TokenLimit = 4,
                    QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                    QueueLimit = 2,
                    ReplenishmentPeriod = TimeSpan.FromSeconds(10),
                    TokensPerPeriod = 10,
                    AutoReplenishment = true
                });
        }

        // 若為迴環地址,則不限流
        return RateLimitPartition.GetNoLimiter(IPAddress.Loopback);
    });
});

鏈式組合的限流器

它並不是一個新型別的限流器,而是可以將我們上面提到的分割區限流器進行組合而得到一個新的分割區限流器。

例如我可以將包含固定視窗限流邏輯的分割區限流器和將包含並行限流邏輯的分割區限流器組合進行組合,那麼應用該限流器的請求就會先被固定視窗限流器處理,再被並行限流器處理,任意一個被限流,就會被拒絕。

var chainedLimiter = PartitionedRateLimiter.CreateChained(
    PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
    {
        var userAgent = httpContext.Request.Headers.UserAgent.ToString();

        return RateLimitPartition.GetFixedWindowLimiter
        (userAgent, _ =>
            new FixedWindowRateLimiterOptions
            {
                AutoReplenishment = true,
                PermitLimit = 4,
                Window = TimeSpan.FromSeconds(2)
            });
    }),
    PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
    {
        var userAgent = httpContext.Request.Headers.UserAgent.ToString();

        return RateLimitPartition.GetConcurrencyLimiter
        (userAgent, _ =>
            new ConcurrencyLimiterOptions
            {
                PermitLimit = 4,
                QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                QueueLimit = 2
            });
    })
);

目前鏈式組合的限流器只能用於全域性限流器,而不能用於終結點限流器。

RejectionStatusCode

通過RejectionStatusCode,我們可以設定請求被限流拒絕後,http預設的響應狀態碼。預設為 503 服務不可用,我們可以指定為 429 過多的請求。

builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.RejectionStatusCode = StatusCodes.Status429TooManyRequests;
});

另外,該狀態碼可以在OnRejected中被重寫,具體參見下小節。

OnRejected

當請求被限流時,會觸發回撥OnRejected,通過該委託我們可以針對 http 響應進行自定義設定:

  • RetryAfter:設定響應頭Retry-After,指示多長時間後重試請求。需要注意的是,並行限流器無法獲取到 RetryAfter,因為它不是時間段的限流,而是限制的並行數
builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.OnRejected = (context, cancellationToken) =>
    {
        if (context.Lease.TryGetMetadata(MetadataName.RetryAfter, out var retryAfter))
        {
            context.HttpContext.Response.Headers.RetryAfter =
                ((int)retryAfter.TotalSeconds).ToString(NumberFormatInfo.InvariantInfo);
        }

        // 可以重新設定響應狀態碼,會覆蓋掉上面設定的 limiterOptions.RejectionStatusCod
        context.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;
        context.HttpContext.RequestServices.GetService<ILoggerFactory>()?
            .CreateLogger("Microsoft.AspNetCore.RateLimitingMiddleware")
            .LogWarning("OnRejected: {GetUserEndPoint}", GetUserEndPoint(context.HttpContext));

        return ValueTask.CompletedTask;
    };
});

自定義限流策略

上述提到的限流策略,並不能滿足我們所有的需求,所以瞭解如何自定義限流策略是我們的必修課。

在開始編碼之前,你需要了解以下內容:

  • 上述使用AddXXXLimiter新增的限流策略,內部實際上呼叫了AddPolicy(後面的部分會詳細介紹)
  • 上述使用AddXXXLimiter新增的限流策略,每種策略只有一個分割區,即使用了該限流策略的路由共用一個分割區。例如通過AddFixedWindowLimiter新增了限流策略「fixed」,視窗閾值為 10,並有 10 個路由使用了該策略,那麼在一個視窗內,這 10 個路由總的請求數達到 10,那這 10 個路由後續的請求都會被限流。

下面我們就藉助AddPolicy,分別使用兩種方式新增一個自定義策略「my_policy」:一個使用者一個分割區,匿名使用者共用一個分割區

通過委託建立自定義限流策略

builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddPolicy(policyName: "my_policy", httpcontext =>
    {
        var userId = "anonymous user";
        if (httpcontext.User.Identity?.IsAuthenticated is true)
        {
            userId = httpcontext.User.Claims.First(c => c.Type == "id").Value;
        }

        return RateLimitPartition.GetFixedWindowLimiter(partitionKey: userId, _ => new 
            FixedWindowRateLimiterOptions
            {
                PermitLimit = 3,
                Window = TimeSpan.FromSeconds(60),
                QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                QueueLimit = 0
            });
    });
});

通過實現 IRateLimiterPolicy 建立自定義限流策略

public interface IRateLimiterPolicy<TPartitionKey>
{
    // 若不為空,則執行它(不會執行全域性的),如果它為空,則執行全域性的
    Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; }

    // 獲取限流分割區
    RateLimitPartition<TPartitionKey> GetPartition(HttpContext httpContext);
}

public class MyRateLimiterPolicy : IRateLimiterPolicy<string>
{
    // 可以通過依賴注入引數
    public MyRateLimiterPolicy(ILogger<MyRateLimiterPolicy> logger)
    {
        // 可以設定自己的限流拒絕回撥邏輯,而不使用上面全域性設定的 limiterOptions.OnRejected
        OnRejected = (ctx, token) =>
        {
            ctx.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;

            logger.LogWarning($"Request rejected by {nameof(MyRateLimiterPolicy)}");

            return ValueTask.CompletedTask;
        };
    }

    public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; }

    public RateLimitPartition<string> GetPartition(HttpContext httpContext)
    {
        var userId = "anonymous user";
        if (httpContext.User.Identity?.IsAuthenticated is true)
        {
            userId = httpContext.User.Claims.First(c => c.Type == "id").Value;
        }

        return RateLimitPartition.GetFixedWindowLimiter(partitionKey: userId, _ => new 
            FixedWindowRateLimiterOptions
            {
                PermitLimit = 3,
                Window = TimeSpan.FromSeconds(60),
                QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
                QueueLimit = 0
            });
    }
}

// 記得註冊它
builder.Services.AddRateLimiter(limiterOptions =>
{
    limiterOptions.AddPolicy<string, MyRateLimiterPolicy>(policyName: "my_policy");
}

應用限流策略

RequireRateLimiting & DisableRateLimiting

可以一次性為所有 controller 應用限流策略

app.MapControllers().RequireRateLimiting("fixed");

也可以為指定路由應用限流策略

app.MapGet("LimitTest", () =>{ }).RequireRateLimiting("fixed");

實質上,RequireRateLimitingDisableRateLimiting是通過向終結點後設資料中EnableRateLimitingDisableRateLimiting兩個特性來實現的。

public static class RateLimiterEndpointConventionBuilderExtensions
{
    public static TBuilder RequireRateLimiting<TBuilder>(this TBuilder builder, string policyName) where TBuilder : IEndpointConventionBuilder
    {
        builder.Add(endpointBuilder => endpointBuilder.Metadata.Add(new EnableRateLimitingAttribute(policyName)));
        return builder;
    }

    public static TBuilder RequireRateLimiting<TBuilder, TPartitionKey>(this TBuilder builder, IRateLimiterPolicy<TPartitionKey> policy) where TBuilder : IEndpointConventionBuilder
    {
        builder.Add(endpointBuilder =>
        {
            endpointBuilder.Metadata.Add(new EnableRateLimitingAttribute(new 
                DefaultRateLimiterPolicy(
                    RateLimiterOptions.ConvertPartitioner<TPartitionKey>(null, policy.GetPartition), policy.OnRejected)));
        });
        return builder;
    }
        
    public static TBuilder DisableRateLimiting<TBuilder>(this TBuilder builder) where TBuilder : IEndpointConventionBuilder
    {
        builder.Add(endpointBuilder => endpointBuilder.Metadata.Add(DisableRateLimitingAttribute.Instance));
        return builder;
    }
}

EnableRateLimitingAttribute & DisableRateLimitingAttribute

Controller層面,我們可以方便的使用特性來標註使用或禁用限流策略。這兩個特性可以標註在Controller類上,也可以標註在類的方法上。

但需要注意的時,如果前面使用了RequireRateLimitingDisableRateLimiting擴充套件方法,由於它們在後設資料中新增特性比直接使用特性標註要晚,所以它們的優先順序很高,會覆蓋掉這裡使用的策略。建議不要針對所有 Controller 使用RequireRateLimitingDisableRateLimiting

下面是一個應用範例:

[EnableRateLimiting("fixed")]   // 針對整個 Controller 使用限流策略 fixed
public class WeatherForecastController : ControllerBase
{
    // 會使用 Controller 類上標註的 fixed 限流策略
    [HttpGet(Name = "GetWeatherForecast")]
    public string Get() => "Get";
    
    [HttpGet("Hello")]
    [EnableRateLimiting("my_policy")]   // 會使用 my_policy 限流策略,而不會使用 fixed
    public string Hello() => "Hello";
    
    [HttpGet("disable")]
    [DisableRateLimiting]   // 禁用任何限流策略
    public string Disable() => "Disable";
}

設計原理

為了方便理解接下來的內容,先明確幾個容易混淆的型別的概念:

  • RateLimitPartition:限流分割區,TKey表示分割區的 Key,被同一限流分割區作用的請求會互相影響,不同限流分割區則不影響。
  • RateLimitPartition:非泛型的,它只是個靜態類,用來快速建立限流分割區RateLimitPartition<TKey>
  • PartitionedRateLimiter:分割區限流器,即包含了限流分割區的限流器,內部會使用各個限流分割區對不同請求進行限流。TResource表示被限流的資源型別,比如 Http 請求型別為HttpContext。限流中介軟體就是通過它來進行限流操作的。
  • PartitionedRateLimiter:非泛型的,同樣只是個靜態類,用來快速建立分割區限流器PartitionedRateLimiter<TResource>

篇幅所限,下方範例列出的原始碼會忽略一部分非核心程式碼。

AddRateLimiter

AddRateLimiter很簡單,只是單純的進行選項設定:

public static class RateLimiterServiceCollectionExtensions
{
    public static IServiceCollection AddRateLimiter(this IServiceCollection services, Action<RateLimiterOptions> configureOptions)
    {
        services.Configure(configureOptions);
        return services;
    }
}

AddXXXLimiter

以下僅以AddFixedWindowLimiter為例進行講解,其他三個都是類似的。

public static class RateLimiterOptionsExtensions
{
    public static RateLimiterOptions AddFixedWindowLimiter(this RateLimiterOptions options, string policyName, Action<FixedWindowRateLimiterOptions> configureOptions)
    {
        var key = new PolicyNameKey() { PolicyName = policyName };
        var fixedWindowRateLimiterOptions = new FixedWindowRateLimiterOptions();
        configureOptions.Invoke(fixedWindowRateLimiterOptions);
        fixedWindowRateLimiterOptions.AutoReplenishment = false;
        return options.AddPolicy(policyName, context =>
        {
            return RateLimitPartition.GetFixedWindowLimiter(key,
                _ => fixedWindowRateLimiterOptions);
        });
    }
}

首先是設定選項,可以看到它把AutoReplenishment強制設定為了false,不對啊,如果這樣設定豈不是要我來手動呼叫TryReplenish來重置次數了。其實不然,我們一會看GetFixedWindowLimiter的實現就知道原因了。

接著就是呼叫AddPolicy,傳入策略名和一個委託來新增策略,該委託會返回一個限流分割區,分割區內可以通過工廠獲取限流器範例。可以看到該策略的分割區 key 是固定不變的,即該策略共用一個限流分割區。

public static class RateLimitPartition
{
    public static RateLimitPartition<TKey> GetFixedWindowLimiter<TKey>(
        TKey partitionKey,
        Func<TKey, FixedWindowRateLimiterOptions> factory)
    {
        return Get(partitionKey, key =>
        {
            FixedWindowRateLimiterOptions options = factory(key);
            if (options.AutoReplenishment is true)
            {
                options = new FixedWindowRateLimiterOptions
                {
                    PermitLimit = options.PermitLimit,
                    QueueProcessingOrder = options.QueueProcessingOrder,
                    QueueLimit = options.QueueLimit,
                    Window = options.Window,
                    AutoReplenishment = false
                };
            }
            return new FixedWindowRateLimiter(options);
        });
    }
    
    public static RateLimitPartition<TKey> Get<TKey>(
        TKey partitionKey,
        Func<TKey, RateLimiter> factory)
    => new RateLimitPartition<TKey>(partitionKey, factory);

可以看到,如果AutoReplenishmenttrue,會重新new一個新選項,這個新的選項僅僅是將AutoReplenishment設定為false。為什麼呢?這是因為如果它為true,那麼每一個FixedWindowRateLimiters範例(即限流分割區)都會有一個自己的定時器來定時補充許可,這無疑是很浪費的。所以將它設定為false,由分割區限流器中的的定時器來統一管理其下的所有分割區,降低資源消耗,不用擔心,微軟已經幫我們實現好了(具體在RateLimitingMiddleware小節中會介紹),不需要自己實現。

策略被儲存到RateLimiterOptionsPolicyMapUnactivatedPolicyMap中,其中:

  • PolicyMap是指已經建立了建立了策略範例的限流策略集
  • UnactivatedPolicyMap是指還未建立策略範例的限流策略集,它儲存的不是策略範例,而是建立策略的委託。這種一般是實現了IRateLimiterPolicy<TPartitionKey>介面的策略,我們需要在執行時向它的建構函式注入一些引數。
    它們倆都是用於提供限流策略,只不過前者已經構造好了範例,可以直接拿來用,後者則需要在執行時建立範例,然後才能用。

我們的固定視窗限流器策略顯然是存放到PolicyMap中,:

public sealed class RateLimiterOptions
{
    internal Dictionary<string, DefaultRateLimiterPolicy> PolicyMap { get; }
        = new Dictionary<string, DefaultRateLimiterPolicy>(StringComparer.Ordinal);

    internal Dictionary<string, Func<IServiceProvider, DefaultRateLimiterPolicy>> UnactivatedPolicyMap { get; }
        = new Dictionary<string, Func<IServiceProvider, DefaultRateLimiterPolicy>>(StringComparer.Ordinal);

    public RateLimiterOptions AddPolicy<TPartitionKey>(string policyName, Func<HttpContext, RateLimitPartition<TPartitionKey>> partitioner)
    {
        // 策略名不能重複
        if (PolicyMap.ContainsKey(policyName) || UnactivatedPolicyMap.ContainsKey(policyName))
        {
            throw ...;
        }

        PolicyMap.Add(policyName, new DefaultRateLimiterPolicy(ConvertPartitioner<TPartitionKey>(policyName, partitioner), null));

        return this;
    }
}

可以看到,承載策略的範例型別均為DefaultRateLimiterPolicy,即使你是註冊的IRateLimiterPolicy<TPartitionKey>型別的策略,最終也是會轉化為DefaultRateLimiterPolicy

RateLimiter

現在限流器範例的獲取方式已經知道了,那接下來詳細看一下FixedWindowRateLimiter的詳細設計吧。

首先,所有限流器均繼承自抽象類RateLimiter

public abstract class RateLimiter : IAsyncDisposable, IDisposable
{
    public abstract RateLimiterStatistics? GetStatistics();

    public abstract TimeSpan? IdleDuration { get; }

    public RateLimitLease AttemptAcquire(int permitCount = 1)
    => AttemptAcquireCore(permitCount);

    protected abstract RateLimitLease AttemptAcquireCore(int permitCount);

    public ValueTask<RateLimitLease> AcquireAsync(int permitCount = 1, CancellationToken cancellationToken = default)
    => AcquireAsyncCore(permitCount, cancellationToken);

    protected abstract ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken);
}

其中,我們需要重點關注以下成員:

  • IdleDuration:空閒週期,即該限流器有多長時間始終保持著最大可用許可數了。
    • 例如,一個限流器在時間點A,重新發放了許可,沒有一個請求來獲取許可,那麼它的空閒週期就是當前時間 - A,當有請求獲取許可時,空閒週期就會被置為null
    • 限流管理器會通過它來清理未使用的限流器
  • GetStatistics:獲取統計資料,主要包含當前有多少可用的許可、當前排隊的請求個數、許可出租成功的總次數以及許可出租失敗的總次數。
  • AttemptAcquire:嘗試獲取許可,當請求獲取到許可時,則會被處理,否則會被限流拒絕。
    • 它接收一個permitCount引數,表示想要獲取的許可數量,預設值為 1。它所允許的值範圍是 >= 0,當傳入 0 時,表示檢視是否還能獲取到許可(不會消耗許可數)。
    • 返回值型別RateLimitLease擁有一個bool IsAcquired屬性,表示許可是否獲取成功
  • AcquireAsync:非同步獲取許可,它會一直等待,直到成功獲取到許可,或者無法獲取足夠的許可(比如排隊佇列裝不下),才會返回結果。
    • 它接收一個permitCount引數,表示想要獲取的許可數量,預設值為 1。它所允許的值範圍是 >= 0,當傳入 0 時,它會一直等待,直到可以獲取到許可,或者再也不能獲取到許可了(不會消耗許可數)。
    • 同樣的,返回值型別RateLimitLease擁有一個bool IsAcquired屬性,表示許可是否獲取成功

接著,對於FixedWindowLimiterSlidingWindowLimiterTokenBucketLimiter來說,它們都是時間範圍的限流演演算法,都具備Replenish性質,所以又抽象出一層ReplenishingRateLimiter

public abstract class ReplenishingRateLimiter : RateLimiter
{
    // 許可發放週期
    public abstract TimeSpan ReplenishmentPeriod { get; }
    
    // 是否自動補充許可
    public abstract bool IsAutoReplenishing { get; }

    // 嘗試補充許可
    // 當 AutoReplenishment == true 時,不會執行補充許可的邏輯,因為它是自動的,不允許手動干預
    public abstract bool TryReplenish();
}

ConcurrencyLimiter 直接繼承自 RateLimiter

最後具體看一下FixedWindowRateLimiter的詳細實現,先來看建構函式以及一些常用屬性:


public sealed class FixedWindowRateLimiter : ReplenishingRateLimiter
{
    // 用於重新補充許可的定時器
    private readonly Timer? _renewTimer;
    // 選項,會 clone 一份建構函式傳進來的 options
    private readonly FixedWindowRateLimiterOptions _options;

    // 指示許可租賃成功的結果
    private static readonly RateLimitLease SuccessfulLease = new FixedWindowLease(true, null);
    // 指示許可租賃失敗的結果
    private static readonly RateLimitLease FailedLease = new FixedWindowLease(false, null);

    // 空閒週期
    public override TimeSpan? IdleDuration => ...;

    // 是否自動補充許可
    public override bool IsAutoReplenishing => _options.AutoReplenishment;

    // 許可發放週期,對於固定視窗來說,就是視窗大小
    public override TimeSpan ReplenishmentPeriod => _options.Window;

    public FixedWindowRateLimiter(FixedWindowRateLimiterOptions options)
    {
        // 省略部分程式碼...
        
        // 如果 AutoReplenishment == true,則會建立定時器,用於定時補充許可
        // 不過我們從前面可以得知,傳遞到這裡的是 false,所以定時器並不會被建立
        if (_options.AutoReplenishment)
        {
            _renewTimer = new Timer(Replenish, this, _options.Window, _options.Window);
        }
    }
}

接下來是補充許可TryReplenish的實現:

public override bool TryReplenish()
{
    // 當 AutoReplenishment == true 時,不會執行補充許可的邏輯,因為它是自動的,不允許手動干預
    if (_options.AutoReplenishment)
    {
        return false;
    }
    Replenish(this);
    return true;
}

private static void Replenish(object? state)
{
    FixedWindowRateLimiter limiter = (state as FixedWindowRateLimiter)!;

    // 獲取當前時間
    long nowTicks = Stopwatch.GetTimestamp();
    limiter!.ReplenishInternal(nowTicks);
}

private void ReplenishInternal(long nowTicks)
{
    // 如果當前時間距離上次許可發放時間還沒達到視窗大小,則直接返回
    if (((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks && !_options.AutoReplenishment)
    {
        return;
    }

    int availablePermitCounters = _permitCount;
    if (availablePermitCounters >= _options.PermitLimit)
    {
        // 如果當前可用許可數 >= 限流器設定的最大許可數,則無須重新發放,直接返回
        return;
    }

    // 補充許可
    _permitCount = _options.PermitLimit;

    // 先處理排隊的請求
    while (_queue.Count > 0)
    {
        // 根據 QueueProcessingOrder 從佇列中找到(Peek)最老或最新的請求
        RequestRegistration nextPendingRequest =
              _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
              ? _queue.PeekHead()
              : _queue.PeekTail();

        // 若請求已完成處理,則只需要將它移出佇列(Dequeue),並釋放資源即可。
        // 請求已完成可能的原因如下:
        //  1. 已被取消
        //  2. 當 QueueProcessingOrder 設定為 NewestFirst 時,新來的請求把老的踢出了佇列
        if (nextPendingRequest.Tcs.Task.IsCompleted)
        {
            nextPendingRequest =
                _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
                ? _queue.DequeueHead()
                : _queue.DequeueTail();
            nextPendingRequest.CancellationTokenRegistration.Dispose();
        }
        // 若可用的許可數足夠,則從佇列中取出請求並處理
        else if (_permitCount >= nextPendingRequest.Count)
        {
            nextPendingRequest =
                _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
                ? _queue.DequeueHead()
                : _queue.DequeueTail();

            // 扣減
            _queueCount -= nextPendingRequest.Count;
            _permitCount -= nextPendingRequest.Count;

            // 向請求補充許可
            // 若發放失敗,這還原扣減
            if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease))
            {
                _permitCount += nextPendingRequest.Count;
                _queueCount += nextPendingRequest.Count;
            }
            
            // 釋放資源
            nextPendingRequest.CancellationTokenRegistration.Dispose();
        }
        else
        {
            // 請求無法被處理,直接跳出
            break;
        }
    }

    if (_permitCount == _options.PermitLimit)
    {
        // 當可用許可數等於設定的最大許可數,則開始計算空閒週期
        _idleSince = Stopwatch.GetTimestamp();
    }
}

下面一起看一下許可是如何租出去的。由於非同步的AcquireAsyncCore基本包含了同步的AttemptAcquireCore的處理邏輯,所以下面就只看AcquireAsyncCore。需要著重說一下的是,同步的AttemptAcquireCore是不會進行入隊操作的。

原始碼裡面其實有很多鎖,為了便於理解我都刪除了。

protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken = default)
{
    // 當申請的許可數 == 0,並且可用許可數 > 0 時,則直接返回 SuccessfulLease,表示限流器還有可用許可
    // 對於同步的 AttemptAcquireCore 方法來說,若此時可用許可數為 0,則會直接返回 FailedLease,表示限流器沒有可用許可
    if (permitCount == 0 && _permitCount > 0)
    {
        return new ValueTask<RateLimitLease>(SuccessfulLease);
    }

    // 嘗試租賃
    if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
    {
        return new ValueTask<RateLimitLease>(lease);
    }

    // 如果佇列裝不下要申請許可的所有請求
    if (_options.QueueLimit - _queueCount < permitCount)
    {
        // 如果優先處理新來的,並且要申請許可的請求數沒有超過佇列的大小限制,
        // 則將佇列中老的請求踢出佇列,直到為新來的請求留出足夠的空間,準備將新來的請求加進去
        if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)
        {
            do
            {
                RequestRegistration oldestRequest = _queue.DequeueHead();
                _queueCount -= oldestRequest.Count;
                // 設定老請求申請許可失敗
                if (!oldestRequest.Tcs.TrySetResult(FailedLease))
                {
                    _queueCount += oldestRequest.Count;
                }
                oldestRequest.CancellationTokenRegistration.Dispose();
            }
            while (_options.QueueLimit - _queueCount < permitCount);
        }
        else
        {   
            // 如果優先處理後來的,則只能返回 失敗
            return new ValueTask<RateLimitLease>(CreateFailedWindowLease(permitCount));
        }
    }

    // 這部分程式碼不用太在意
    CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);
    CancellationTokenRegistration ctr = default;
    if (cancellationToken.CanBeCanceled)
    {
        ctr = cancellationToken.Register(static obj =>
        {
            ((CancelQueueState)obj!).TrySetCanceled();
        }, tcs);
    }

    RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr);
    // 將新請求加入到隊尾
    _queue.EnqueueTail(registration);
    _queueCount += permitCount;

    // 非同步可等待,直到 Task 執行完成獲取到結果(可能是申請成功,也可能是失敗)
    return new ValueTask<RateLimitLease>(registration.Tcs.Task);
}

TryLeaseUnsynchronized具體邏輯如下:

private bool TryLeaseUnsynchronized(int permitCount, out RateLimitLease? lease)
{
    // 若可用的許可數足夠,且不為 0
    if (_permitCount >= permitCount && _permitCount != 0)
    {
        // 租賃的許可為0,則直接返回 成功
        if (permitCount == 0)
        {
            lease = SuccessfulLease;
            return true;
        }

        // 若:
        //  1. 沒有排隊的請求
        //  2. 或有排隊的請求,但是 QueueProcessingOrder 被設定為 NewestFirst
        // 則租賃成功,其他則租賃失敗(因為要先把排隊的處理完)
        if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
        {
            // 許可租賃出去了,也就表示該限流器不空閒了
            _idleSince = null;
            _permitCount -= permitCount;
            lease = SuccessfulLease;
            return true;
        }
    }

    // 租賃失敗
    lease = null;
    return false;
}

RateLimitingMiddleware

現在,我們已經掌握了限流器補充許可和租賃許可的細節邏輯了,並且也得知並沒有使用限流器內部的定時器去定時補充許可,那這是由誰補充的呢?又是由誰為請求申請的許可呢?

沒錯,這都是RateLimitingMiddleware負責的。

在構造方法中,我們需要重點關注下CreateEndpointLimiter,它建立了終結點分割區限流器,與全域性限流器一起提供限流服務。

internal sealed partial class RateLimitingMiddleware
{
    // 預設被限流拒絕回撥的委託,取自 options.OnRejected
    private readonly Func<OnRejectedContext, CancellationToken, ValueTask>? _defaultOnRejected;
    // 全域性限流器,取自 options.GlobalLimiter
    private readonly PartitionedRateLimiter<HttpContext>? _globalLimiter;
    // 終結點限流器
    private readonly PartitionedRateLimiter<HttpContext> _endpointLimiter;
    // 限流響應狀態碼,取自 options.RejectionStatusCode
    private readonly int _rejectionStatusCode;
    // 限流策略集,取自 options.PolicyMap 和 options.UnactivatedPolicyMap
    private readonly Dictionary<string, DefaultRateLimiterPolicy> _policyMap;
    
    public RateLimitingMiddleware(RequestDelegate next, ILogger<RateLimitingMiddleware> logger, IOptions<RateLimiterOptions> options, IServiceProvider serviceProvider, RateLimitingMetrics metrics)
    {
        // ...省略一堆程式碼
        
        _endpointLimiter = CreateEndpointLimiter();
    }
}

CreateEndpointLimiter方法中,建立了分割區限流器,裡面包含了各種各樣的限流分割區,用於不同終結點請求的限流。

private PartitionedRateLimiter<HttpContext> CreateEndpointLimiter()
{
    // 建立分割區限流器
    return PartitionedRateLimiter.Create<HttpContext, DefaultKeyType>(context =>
    {
        DefaultRateLimiterPolicy? policy;
        var enableRateLimitingAttribute = context.GetEndpoint()?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
        // 如果不需要限流,則返回 NoLimiter
        if (enableRateLimitingAttribute is null)
        {
            return RateLimitPartition.GetNoLimiter<DefaultKeyType>(_defaultPolicyKey);
        }
        
        // 根據限流策略取限流分割區
        policy = enableRateLimitingAttribute.Policy;
        if (policy is not null)
        {
            return policy.GetPartition(context);
        }
        var name = enableRateLimitingAttribute.PolicyName;
        if (name is not null)
        {
            if (_policyMap.TryGetValue(name, out policy))
            {
                return policy.GetPartition(context);
            }
            else
            {
                throw new InvalidOperationException($"This endpoint requires a rate limiting policy with name {name}, but no such policy exists.");
            }
        }
        // 雖然策略名或策略不可能為空,但是加一下判斷更好
        else
        {
            throw new InvalidOperationException("This endpoint requested a rate limiting policy with a null name.");
        }
    }, new DefaultKeyTypeEqualityComparer());
}

咦?怎麼還是沒看到在哪自動補充的許可?實際上它就隱藏在PartitionedRateLimiter.Create中的DefaultPartitionedRateLimiter裡面,藏得太深了:

public static class PartitionedRateLimiter
{
    public static PartitionedRateLimiter<TResource> Create<TResource, TPartitionKey>(
        Func<TResource, RateLimitPartition<TPartitionKey>> partitioner,
        IEqualityComparer<TPartitionKey>? equalityComparer = null) where TPartitionKey : notnull
    {
        return new DefaultPartitionedRateLimiter<TResource, TPartitionKey>(partitioner, equalityComparer);
    }
}

下面是DefaultPartitionedRateLimiter啟動定時器執行心跳的核心程式碼:

internal sealed class DefaultPartitionedRateLimiter<TResource, TKey> : PartitionedRateLimiter<TResource> where TKey : notnull
{
    // 限流器集合
    private readonly Dictionary<TKey, Lazy<RateLimiter>> _limiters;
    // 限流分割區委託,可通過資源獲取到分割區
    private readonly Func<TResource, RateLimitPartition<TKey>> _partitioner;
    // 定時器,主要作用是每 100ms 進行一次心跳,即執行 Heartbeat 方法
    private readonly TimerAwaitable _timer;
    private readonly Task _timerTask;

    public DefaultPartitionedRateLimiter(Func<TResource, RateLimitPartition<TKey>> partitioner,
        IEqualityComparer<TKey>? equalityComparer = null)
    {
        _limiters = new Dictionary<TKey, Lazy<RateLimiter>>(equalityComparer);
        _partitioner = partitioner;

        var timerInterval = TimeSpan.FromMilliseconds(100);
        _timer = new TimerAwaitable(timerInterval, timerInterval);
        _timerTask = RunTimer();
    }
    
    private async Task RunTimer()
    {
        _timer.Start();
        // 只要 timer 不被停止,則一直返回 true,即 timer 仍在執行中
        while (await _timer)
        {
            try
            {
                await Heartbeat().ConfigureAwait(false);
            }
            catch { }
        }
        _timer.Dispose();
    }
}

TimerAwaitable是一個可非同步等待的型別(實現了GetAwaiterINotifyCompletionIsCompletedGetResult),內部設計非常有意思。在它內部,啟動了一個定時器,每 100ms(傳入的timerInterval) Tick 一次,每次 Tick 就會把 IsCompleted設定為true,將任務狀態切換為已完成。外部通過await獲取結果時(靜默呼叫GetResult),又會將IsCompleted設定為false,再將其轉換為未完成狀態。外部再配合while以達到定時執行的效果。

為什麼不直接用Timer而又弄出一個TimerAwaitable?我認為TimerAwaitable有以下優點:

  1. 優雅的書寫非同步程式碼
  2. 它的定時執行不會出現重入。即不會因為上一次定時任務執行耗時超過定時間隔還未完成,這一次又執行了定時任務,導致同時有兩個甚至多個執行緒在執行定時任務。

通過定時器,每 100ms 執行一次心跳,心跳過程中檢查各個限流器是否需要補充許可,如果需要,則補充,並回收空閒限流器等。以下是簡化的心跳邏輯:

private async Task Heartbeat()
{
    if (_cacheInvalid)
    {
        _cachedLimiters.Clear();
        _cachedLimiters.AddRange(_limiters);
    }

    // 遍歷所有快取的限流器
    foreach (KeyValuePair<TKey, Lazy<RateLimiter>> rateLimiter in _cachedLimiters)
    {
        // 如果限流器還未被範例化,則跳過
        if (!rateLimiter.Value.IsValueCreated) continue;
        
        // 如果限流器空閒週期超過了空閒時間限制(預設10s),則回
        if (rateLimiter.Value.Value.IdleDuration is TimeSpan idleDuration && idleDuration > s_idleTimeLimit)
        {
            lock (Lock)
            {
                // 雙重檢測,確保限流器確實是空閒的
                idleDuration = rateLimiter.Value.Value.IdleDuration ?? TimeSpan.Zero;
                if (idleDuration > s_idleTimeLimit)
                {
                    _cacheInvalid = true;
                    // 回收該限流器
                    _limiters.Remove(rateLimiter.Key);

                    // 儲存下來,後面一起釋放資源
                    _limitersToDispose.Add(rateLimiter.Value.Value);
                }
            }
        }
        // 如果限流器可補充許可,則嘗試補充
        else if (rateLimiter.Value.Value is ReplenishingRateLimiter replenishingRateLimiter)
        {
            try
            {
                replenishingRateLimiter.TryReplenish();
            }
            catch (Exception ex) { ... }
        }
    }

    // 釋放回收的限流器資源
    foreach (RateLimiter limiter in _limitersToDispose)
    {
        try
        {
            await limiter.DisposeAsync().ConfigureAwait(false);
        }
        catch (Exception ex) { ... }
    }
    _limitersToDispose.Clear();
}

好了,我們已經瞭解了限流器的管理,讓我們再次回到RateLimitingMiddleware,看看他是如何工作的吧:

public Task Invoke(HttpContext context)
{
    var endpoint = context.GetEndpoint();
    // 如果終結點包含禁用限流標記,則不限流
    if (endpoint?.Metadata.GetMetadata<DisableRateLimitingAttribute>() is not null)
    {
        return _next(context);
    }
    
    var enableRateLimitingAttribute = endpoint?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
    // 如果終結點沒有啟用限流標記,並且全域性限流器也是空的,則同樣不限流
    if (enableRateLimitingAttribute is null && _globalLimiter is null)
    {
        return _next(context);
    }

    return InvokeInternal(context, enableRateLimitingAttribute);
}

private async Task InvokeInternal(HttpContext context, EnableRateLimitingAttribute? enableRateLimitingAttribute)
{
    var policyName = enableRateLimitingAttribute?.PolicyName;

    // 嘗試獲取許可
    using var leaseContext = await TryAcquireAsync(context);

    // 如果獲取到了許可,則處理請求
    if (leaseContext.Lease?.IsAcquired == true)
    {
        await _next(context);
    }
    // 沒有獲取到許可,則限流拒絕
    else
    {
        // 如果請求是被取消的,則不要執行 OnRejected 回撥,應該直接返回
        if (leaseContext.RequestRejectionReason == RequestRejectionReason.RequestCanceled)
        {
            return;
        }
        var thisRequestOnRejected = _defaultOnRejected;
        context.Response.StatusCode = _rejectionStatusCode;

        // 如果請求是被終結點限流器限流拒絕的
        if (leaseContext.RequestRejectionReason == RequestRejectionReason.EndpointLimiter)
        {
            // 若策略有自己的 OnRejected,則使用策略的,如果沒有,則使用 _defaultOnRejected
            
            // 這裡我感覺是個 bug,應該判斷 policy?.OnRejected is not null 才賦值
            DefaultRateLimiterPolicy policy = enableRateLimitingAttribute?.Policy;
            if (policy is not null)
            {
                thisRequestOnRejected = policy.OnRejected;
            }
            else
            {
                // 對於策略名,當 OnRejected 不為空時,才使用策略的 OnRejected
                if (policyName is not null && _policyMap.TryGetValue(policyName, out policy) && policy.OnRejected is not null)
                {
                    thisRequestOnRejected = policy.OnRejected;
                }
            }
        }
        
        // 執行回撥
        if (thisRequestOnRejected is not null)
        {
            await thisRequestOnRejected(new OnRejectedContext() { HttpContext = context, Lease = leaseContext.Lease! }, context.RequestAborted);
        }
    }
}

TryAcquireAsync會先從全域性限流器獲取許可,如果獲取到了,則會繼續在終結點限流器中獲取許可,如果獲取到了,請求才會被處理:

非同步的 CombinedWaitAsync 與同步的 CombinedAcquire 類似,只不過前面呼叫的是非同步方法,後面是同步,故下方僅列出 CombinedAcquire 簡化原始碼。

private async ValueTask<LeaseContext> TryAcquireAsync(HttpContext context, MetricsContext metricsContext)
{
    // 組合獲取,即按順序從全域性限流器和終結點限流器中獲取許可
    var leaseContext = CombinedAcquire(context);
    // 如果獲取到了,則直接返回
    if (leaseContext.Lease?.IsAcquired == true)
    {
        return leaseContext;
    }

    // 非同步等待再次獲取許可
    return await CombinedWaitAsync(context, context.RequestAborted);
}

private LeaseContext CombinedAcquire(HttpContext context)
{
    // 全域性限流器不為空,則先從其中獲取許可
    if (_globalLimiter is not null)
    {
        var globalLease = _globalLimiter.AttemptAcquire(context);
        // 未獲取許可,直接返回
        if (!globalLease.IsAcquired)
        {
            return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.GlobalLimiter, Lease = globalLease };
        }
    }
    
    // 從終結點限流器中獲取許可
    var endpointLease = _endpointLimiter.AttemptAcquire(context);
    // 未獲取許可,直接返回
    if (!endpointLease.IsAcquired)
    {
        globalLease?.Dispose();
        return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.EndpointLimiter, Lease = endpointLease };
    }

    return globalLease is null 
        ? new LeaseContext() { Lease = endpointLease } 
        : new LeaseContext() { Lease = new DefaultCombinedLease(globalLease, endpointLease) };
}

總結

  1. ASP.NET Core 為我們提供了限流功能,通過AddRateLimiter註冊限流服務,通過UseRateLimiter啟用限流功能。
  2. 預設提供了4種限流演演算法,分別是:
    • FixedWindowLimiter:固定視窗限流器
    • SlidingWindowLimiter:滑動視窗限流器
    • TokenBucketLimiter:令牌桶限流器
    • ConcurrencyLimiter:並行限流器
  3. 可以通過options.AddPolicy新增限流策略,作用於某些終結點,這些策略最終組成的分割區限流器稱為終結點限流器
    • 可以通過options.AddXXXLimiter的方式快捷新增限流策略
    • 也可以自定義限流策略,限流邏輯可以通過委託直接傳入,也可以通過實現介面IRateLimiterPolicy<TPartitionKey>
  4. 可以通過options.GlobalLimiter設定全域性限流器,當請求進入應用時,會先執行全域性限流器,再執行終結點限流器。
  5. 可以通過options.RejectionStatusCode設定限流拒絕的響應狀態碼,還可以通過OnRejected編寫更多的響應邏輯。
  6. 可以通過PartitionedRateLimiter.CreateChained將多個分割區限流器進行鏈式組合
  7. 目前 ASP.NET Core 提供的限流功能還不夠成熟,例如終結點限流器無法進行鏈式組合、無法為終結點設定多個限流策略等