注:本文隸屬於《理解ASP.NET Core》系列文章,請檢視置頂部落格或點選此處檢視全文目錄
在微服務化的架構設計中,閘道器扮演著重要的看門人角色,它所提供的功能之一就是限流。而對於眾多非微服務化的系統來說,可能並不會部署閘道器(無論是因為成本還是複雜度),在這種場景下,為了實現限流,微軟在 .NET 7 中提供了官方的限流中介軟體。下面我們一起來看一下。
首先,確保你的應用依賴的 SDK 版本 >= 7,接著通過AddRateLimiter
擴充套件方法註冊限流服務,並新增限流策略,然後通過UseRateLimiter
啟用限流中介軟體,最後設定某個路由的請求使用限流策略:
builder.Services.AddRateLimiter(limiterOptions =>
{
// 設定限流策略
});
app.UseRateLimiter();
app.MapGet("LimitTest", async () =>
{
await Task.Delay(TimeSpan.FromSeconds(1));
return Results.Ok($"Limiter");
}).RequireRateLimiting("my_policy");
微軟為我們提供了 4 種常用的限流演演算法:
我們通常會註冊一個命名限流策略,並在該策略內指定限流演演算法,以及其他限流邏輯。
另外,需要關注一下UseRateLimiter
的呼叫位置。若限流行為作用於特定路由,則限流中介軟體必須放置在UseRouting
之後。
固定視窗限流器是一種簡單的限流方式:
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddFixedWindowLimiter(policyName: "fixed", fixedOptions =>
{
fixedOptions.PermitLimit = 4;
fixedOptions.Window = TimeSpan.FromSeconds(60);
fixedOptions.QueueLimit = 2;
fixedOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
fixedOptions.AutoReplenishment = true;
});
});
public sealed class FixedWindowRateLimiterOptions
{
public TimeSpan Window { get; set; } = TimeSpan.Zero;
public bool AutoReplenishment { get; set; } = true;
public int PermitLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我們通過AddFixedWindowLimiter
新增了一個固定視窗限流策略,並指定策略名為fixed
。它的含義是視窗時間長度為60s,在每個視窗時間範圍內,最多允許4個請求被處理。
各設定項含義如下:
TimeSpan.Zero
RejectionStatusCode
true
。如果設定為false
,則需要手動呼叫 FixedWindowRateLimiter.TryReplenish
來重置滑動視窗限流器是固定視窗限流器的升級版:
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddSlidingWindowLimiter(policyName: "sliding", slidingOptions =>
{
slidingOptions.PermitLimit = 100;
slidingOptions.Window = TimeSpan.FromSeconds(30);
slidingOptions.QueueLimit = 2;
slidingOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
slidingOptions.AutoReplenishment = true;
slidingOptions.SegmentsPerWindow = 3;
});
});
public sealed class SlidingWindowRateLimiterOptions
{
public TimeSpan Window { get; set; } = TimeSpan.Zero;
public int SegmentsPerWindow { get; set; }
public bool AutoReplenishment { get; set; } = true;
public int PermitLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我們通過AddSlidingWindowLimiter
新增了一個滑動視窗限流策略,並指定策略名為sliding
。它的含義是視窗時間長度為30s,在每個視窗時間範圍內,最多允許100個請求,視窗段數為 3,每個段的時間間隔為 30s / 3 = 10s,即視窗每 10s 滑動一段。
各設定項含義如下:
TimeSpan.Zero
true
。如果設定為false
,則需要手動呼叫 SlidingWindowRateLimiter.TryReplenish
來重置為了更好地理解滑動視窗限流器的工作原理,下面我會借用官方檔案提供的一張圖來詳細解釋一下:
假設:限制每個視窗的請求數為 100,視窗時間為 30s,每個視窗的段數為 3,那麼每個段的時間間隔就是 30s / 3 = 10s。
定義:當前段結存請求數 = 當前段可用請求數 - 處理請求數 + 回收請求數
限流器工作流程:
令牌桶限流器是一種限制資料平均傳輸速率的限流演演算法:
以下圖為例,桶內有 3 個令牌(token),進來了 5 個請求,前三個請求可以拿到令牌(token),它們會被處理,後面兩個就只能排隊或被限流拒絕。
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddTokenBucketLimiter(policyName: "token_bucket", tokenBucketOptions =>
{
tokenBucketOptions.TokenLimit = 4;
tokenBucketOptions.ReplenishmentPeriod = TimeSpan.FromSeconds(10);
tokenBucketOptions.TokensPerPeriod = 2;
tokenBucketOptions.QueueLimit = 2;
tokenBucketOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
tokenBucketOptions.AutoReplenishment = true;
});
});
public sealed class TokenBucketRateLimiterOptions
{
public TimeSpan ReplenishmentPeriod { get; set; } = TimeSpan.Zero;
public int TokensPerPeriod { get; set; }
public bool AutoReplenishment { get; set; } = true;
public int TokenLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我們通過AddTokenBucketLimiter
新增了一個令牌桶限流策略,並指定策略名為token_bucket
。它的含義是桶最多可以裝 4 個令牌,每 10s 發放一次令牌,每次發放 2 個令牌,所以在一個發放週期內,最多可以處理 4 個請求,至少可以處理 2 個請求
各設定項含義如下:
TimeSpan.Zero
true
。如果設定為false
,則需要手動呼叫 TokenBucketRateLimiter.TryReplenish
來發放並行限流器不是限制一段時間內的最大請求數,而是限制並行數:
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddConcurrencyLimiter(policyName: "concurrency", concurrencyOptions =>
{
concurrencyOptions.PermitLimit = 4;
concurrencyOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
concurrencyOptions.QueueLimit = 2;
});
});
public sealed class ConcurrencyLimiterOptions
{
public int PermitLimit { get; set; }
public QueueProcessingOrder QueueProcessingOrder { get; set; } = QueueProcessingOrder.OldestFirst;
public int QueueLimit { get; set; }
}
如上所示,我們通過AddConcurrencyLimiter
新增了一個並行限流策略,並指定策略名為concurrency
。它的含義是最多可以並行4個請求被處理。
各設定項含義如下:
上面已經把常用的限流演演算法介紹完了,下面來看一下可以通過limiterOptions
進行哪些設定:
public sealed class RateLimiterOptions
{
// 僅保留了常用的設定項,其他相關程式碼均忽略
// 全域性限流器
public PartitionedRateLimiter<HttpContext>? GlobalLimiter { get; set; }
// 當請求被限流拒絕時執行
public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; set; }
// 當期你去被限流拒絕時的 Http 響應狀態碼
public int RejectionStatusCode { get; set; } = StatusCodes.Status503ServiceUnavailable;
}
通過GlobalLimiter
,我們可以設定全域性限流器,更準確的說法是全域性分割區限流器,該限流器會應用於所有請求。執行順序為先執行全域性限流器,再執行特定於路由終結點的限流器(如果存在的話)。
需要注意的是,相對於上面註冊的限流策略來說,GlobalLimiter
已經是一個限流器範例了,所以需要分配給他一個分割區限流器範例,通過PartitionedRateLimiter.Create
來建立。
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, IPAddress>(context =>
{
IPAddress? remoteIpAddress = context.Connection.RemoteIpAddress;
// 針對非迴環地址限流
if (!IPAddress.IsLoopback(remoteIpAddress!))
{
return RateLimitPartition.GetTokenBucketLimiter
(remoteIpAddress!, _ =>
new TokenBucketRateLimiterOptions
{
TokenLimit = 4,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 2,
ReplenishmentPeriod = TimeSpan.FromSeconds(10),
TokensPerPeriod = 10,
AutoReplenishment = true
});
}
// 若為迴環地址,則不限流
return RateLimitPartition.GetNoLimiter(IPAddress.Loopback);
});
});
它並不是一個新型別的限流器,而是可以將我們上面提到的分割區限流器進行組合而得到一個新的分割區限流器。
例如我可以將包含固定視窗限流邏輯的分割區限流器和將包含並行限流邏輯的分割區限流器組合進行組合,那麼應用該限流器的請求就會先被固定視窗限流器處理,再被並行限流器處理,任意一個被限流,就會被拒絕。
var chainedLimiter = PartitionedRateLimiter.CreateChained(
PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
{
var userAgent = httpContext.Request.Headers.UserAgent.ToString();
return RateLimitPartition.GetFixedWindowLimiter
(userAgent, _ =>
new FixedWindowRateLimiterOptions
{
AutoReplenishment = true,
PermitLimit = 4,
Window = TimeSpan.FromSeconds(2)
});
}),
PartitionedRateLimiter.Create<HttpContext, string>(httpContext =>
{
var userAgent = httpContext.Request.Headers.UserAgent.ToString();
return RateLimitPartition.GetConcurrencyLimiter
(userAgent, _ =>
new ConcurrencyLimiterOptions
{
PermitLimit = 4,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 2
});
})
);
目前鏈式組合的限流器只能用於全域性限流器,而不能用於終結點限流器。
通過RejectionStatusCode
,我們可以設定請求被限流拒絕後,http預設的響應狀態碼。預設為 503 服務不可用,我們可以指定為 429 過多的請求。
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.RejectionStatusCode = StatusCodes.Status429TooManyRequests;
});
另外,該狀態碼可以在OnRejected
中被重寫,具體參見下小節。
當請求被限流時,會觸發回撥OnRejected
,通過該委託我們可以針對 http 響應進行自定義設定:
Retry-After
,指示多長時間後重試請求。需要注意的是,並行限流器無法獲取到 RetryAfter,因為它不是時間段的限流,而是限制的並行數builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.OnRejected = (context, cancellationToken) =>
{
if (context.Lease.TryGetMetadata(MetadataName.RetryAfter, out var retryAfter))
{
context.HttpContext.Response.Headers.RetryAfter =
((int)retryAfter.TotalSeconds).ToString(NumberFormatInfo.InvariantInfo);
}
// 可以重新設定響應狀態碼,會覆蓋掉上面設定的 limiterOptions.RejectionStatusCod
context.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;
context.HttpContext.RequestServices.GetService<ILoggerFactory>()?
.CreateLogger("Microsoft.AspNetCore.RateLimitingMiddleware")
.LogWarning("OnRejected: {GetUserEndPoint}", GetUserEndPoint(context.HttpContext));
return ValueTask.CompletedTask;
};
});
上述提到的限流策略,並不能滿足我們所有的需求,所以瞭解如何自定義限流策略是我們的必修課。
在開始編碼之前,你需要了解以下內容:
AddXXXLimiter
新增的限流策略,內部實際上呼叫了AddPolicy
(後面的部分會詳細介紹)AddXXXLimiter
新增的限流策略,每種策略只有一個分割區,即使用了該限流策略的路由共用一個分割區。例如通過AddFixedWindowLimiter
新增了限流策略「fixed」,視窗閾值為 10,並有 10 個路由使用了該策略,那麼在一個視窗內,這 10 個路由總的請求數達到 10,那這 10 個路由後續的請求都會被限流。下面我們就藉助AddPolicy
,分別使用兩種方式新增一個自定義策略「my_policy」:一個使用者一個分割區,匿名使用者共用一個分割區
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddPolicy(policyName: "my_policy", httpcontext =>
{
var userId = "anonymous user";
if (httpcontext.User.Identity?.IsAuthenticated is true)
{
userId = httpcontext.User.Claims.First(c => c.Type == "id").Value;
}
return RateLimitPartition.GetFixedWindowLimiter(partitionKey: userId, _ => new
FixedWindowRateLimiterOptions
{
PermitLimit = 3,
Window = TimeSpan.FromSeconds(60),
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 0
});
});
});
public interface IRateLimiterPolicy<TPartitionKey>
{
// 若不為空,則執行它(不會執行全域性的),如果它為空,則執行全域性的
Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; }
// 獲取限流分割區
RateLimitPartition<TPartitionKey> GetPartition(HttpContext httpContext);
}
public class MyRateLimiterPolicy : IRateLimiterPolicy<string>
{
// 可以通過依賴注入引數
public MyRateLimiterPolicy(ILogger<MyRateLimiterPolicy> logger)
{
// 可以設定自己的限流拒絕回撥邏輯,而不使用上面全域性設定的 limiterOptions.OnRejected
OnRejected = (ctx, token) =>
{
ctx.HttpContext.Response.StatusCode = StatusCodes.Status429TooManyRequests;
logger.LogWarning($"Request rejected by {nameof(MyRateLimiterPolicy)}");
return ValueTask.CompletedTask;
};
}
public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected { get; }
public RateLimitPartition<string> GetPartition(HttpContext httpContext)
{
var userId = "anonymous user";
if (httpContext.User.Identity?.IsAuthenticated is true)
{
userId = httpContext.User.Claims.First(c => c.Type == "id").Value;
}
return RateLimitPartition.GetFixedWindowLimiter(partitionKey: userId, _ => new
FixedWindowRateLimiterOptions
{
PermitLimit = 3,
Window = TimeSpan.FromSeconds(60),
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 0
});
}
}
// 記得註冊它
builder.Services.AddRateLimiter(limiterOptions =>
{
limiterOptions.AddPolicy<string, MyRateLimiterPolicy>(policyName: "my_policy");
}
可以一次性為所有 controller 應用限流策略
app.MapControllers().RequireRateLimiting("fixed");
也可以為指定路由應用限流策略
app.MapGet("LimitTest", () =>{ }).RequireRateLimiting("fixed");
實質上,RequireRateLimiting
和DisableRateLimiting
是通過向終結點後設資料中EnableRateLimiting
和DisableRateLimiting
兩個特性來實現的。
public static class RateLimiterEndpointConventionBuilderExtensions
{
public static TBuilder RequireRateLimiting<TBuilder>(this TBuilder builder, string policyName) where TBuilder : IEndpointConventionBuilder
{
builder.Add(endpointBuilder => endpointBuilder.Metadata.Add(new EnableRateLimitingAttribute(policyName)));
return builder;
}
public static TBuilder RequireRateLimiting<TBuilder, TPartitionKey>(this TBuilder builder, IRateLimiterPolicy<TPartitionKey> policy) where TBuilder : IEndpointConventionBuilder
{
builder.Add(endpointBuilder =>
{
endpointBuilder.Metadata.Add(new EnableRateLimitingAttribute(new
DefaultRateLimiterPolicy(
RateLimiterOptions.ConvertPartitioner<TPartitionKey>(null, policy.GetPartition), policy.OnRejected)));
});
return builder;
}
public static TBuilder DisableRateLimiting<TBuilder>(this TBuilder builder) where TBuilder : IEndpointConventionBuilder
{
builder.Add(endpointBuilder => endpointBuilder.Metadata.Add(DisableRateLimitingAttribute.Instance));
return builder;
}
}
在Controller
層面,我們可以方便的使用特性來標註使用或禁用限流策略。這兩個特性可以標註在Controller
類上,也可以標註在類的方法上。
但需要注意的時,如果前面使用了RequireRateLimiting
或DisableRateLimiting
擴充套件方法,由於它們在後設資料中新增特性比直接使用特性標註要晚,所以它們的優先順序很高,會覆蓋掉這裡使用的策略。建議不要針對所有 Controller 使用RequireRateLimiting
或DisableRateLimiting
。
下面是一個應用範例:
[EnableRateLimiting("fixed")] // 針對整個 Controller 使用限流策略 fixed
public class WeatherForecastController : ControllerBase
{
// 會使用 Controller 類上標註的 fixed 限流策略
[HttpGet(Name = "GetWeatherForecast")]
public string Get() => "Get";
[HttpGet("Hello")]
[EnableRateLimiting("my_policy")] // 會使用 my_policy 限流策略,而不會使用 fixed
public string Hello() => "Hello";
[HttpGet("disable")]
[DisableRateLimiting] // 禁用任何限流策略
public string Disable() => "Disable";
}
為了方便理解接下來的內容,先明確幾個容易混淆的型別的概念:
TKey
表示分割區的 Key,被同一限流分割區作用的請求會互相影響,不同限流分割區則不影響。RateLimitPartition<TKey>
TResource
表示被限流的資源型別,比如 Http 請求型別為HttpContext
。限流中介軟體就是通過它來進行限流操作的。PartitionedRateLimiter<TResource>
篇幅所限,下方範例列出的原始碼會忽略一部分非核心程式碼。
AddRateLimiter
很簡單,只是單純的進行選項設定:
public static class RateLimiterServiceCollectionExtensions
{
public static IServiceCollection AddRateLimiter(this IServiceCollection services, Action<RateLimiterOptions> configureOptions)
{
services.Configure(configureOptions);
return services;
}
}
以下僅以AddFixedWindowLimiter
為例進行講解,其他三個都是類似的。
public static class RateLimiterOptionsExtensions
{
public static RateLimiterOptions AddFixedWindowLimiter(this RateLimiterOptions options, string policyName, Action<FixedWindowRateLimiterOptions> configureOptions)
{
var key = new PolicyNameKey() { PolicyName = policyName };
var fixedWindowRateLimiterOptions = new FixedWindowRateLimiterOptions();
configureOptions.Invoke(fixedWindowRateLimiterOptions);
fixedWindowRateLimiterOptions.AutoReplenishment = false;
return options.AddPolicy(policyName, context =>
{
return RateLimitPartition.GetFixedWindowLimiter(key,
_ => fixedWindowRateLimiterOptions);
});
}
}
首先是設定選項,可以看到它把AutoReplenishment
強制設定為了false
,不對啊,如果這樣設定豈不是要我來手動呼叫TryReplenish
來重置次數了。其實不然,我們一會看GetFixedWindowLimiter
的實現就知道原因了。
接著就是呼叫AddPolicy
,傳入策略名和一個委託來新增策略,該委託會返回一個限流分割區,分割區內可以通過工廠獲取限流器範例。可以看到該策略的分割區 key 是固定不變的,即該策略共用一個限流分割區。
public static class RateLimitPartition
{
public static RateLimitPartition<TKey> GetFixedWindowLimiter<TKey>(
TKey partitionKey,
Func<TKey, FixedWindowRateLimiterOptions> factory)
{
return Get(partitionKey, key =>
{
FixedWindowRateLimiterOptions options = factory(key);
if (options.AutoReplenishment is true)
{
options = new FixedWindowRateLimiterOptions
{
PermitLimit = options.PermitLimit,
QueueProcessingOrder = options.QueueProcessingOrder,
QueueLimit = options.QueueLimit,
Window = options.Window,
AutoReplenishment = false
};
}
return new FixedWindowRateLimiter(options);
});
}
public static RateLimitPartition<TKey> Get<TKey>(
TKey partitionKey,
Func<TKey, RateLimiter> factory)
=> new RateLimitPartition<TKey>(partitionKey, factory);
可以看到,如果AutoReplenishment
為true
,會重新new
一個新選項,這個新的選項僅僅是將AutoReplenishment
設定為false
。為什麼呢?這是因為如果它為true
,那麼每一個FixedWindowRateLimiters
範例(即限流分割區)都會有一個自己的定時器來定時補充許可,這無疑是很浪費的。所以將它設定為false
,由分割區限流器中的的定時器來統一管理其下的所有分割區,降低資源消耗,不用擔心,微軟已經幫我們實現好了(具體在RateLimitingMiddleware
小節中會介紹),不需要自己實現。
策略被儲存到RateLimiterOptions
的PolicyMap
和UnactivatedPolicyMap
中,其中:
PolicyMap
是指已經建立了建立了策略範例的限流策略集UnactivatedPolicyMap
是指還未建立策略範例的限流策略集,它儲存的不是策略範例,而是建立策略的委託。這種一般是實現了IRateLimiterPolicy<TPartitionKey>
介面的策略,我們需要在執行時向它的建構函式注入一些引數。我們的固定視窗限流器策略顯然是存放到PolicyMap
中,:
public sealed class RateLimiterOptions
{
internal Dictionary<string, DefaultRateLimiterPolicy> PolicyMap { get; }
= new Dictionary<string, DefaultRateLimiterPolicy>(StringComparer.Ordinal);
internal Dictionary<string, Func<IServiceProvider, DefaultRateLimiterPolicy>> UnactivatedPolicyMap { get; }
= new Dictionary<string, Func<IServiceProvider, DefaultRateLimiterPolicy>>(StringComparer.Ordinal);
public RateLimiterOptions AddPolicy<TPartitionKey>(string policyName, Func<HttpContext, RateLimitPartition<TPartitionKey>> partitioner)
{
// 策略名不能重複
if (PolicyMap.ContainsKey(policyName) || UnactivatedPolicyMap.ContainsKey(policyName))
{
throw ...;
}
PolicyMap.Add(policyName, new DefaultRateLimiterPolicy(ConvertPartitioner<TPartitionKey>(policyName, partitioner), null));
return this;
}
}
可以看到,承載策略的範例型別均為DefaultRateLimiterPolicy
,即使你是註冊的IRateLimiterPolicy<TPartitionKey>
型別的策略,最終也是會轉化為DefaultRateLimiterPolicy
。
現在限流器範例的獲取方式已經知道了,那接下來詳細看一下FixedWindowRateLimiter
的詳細設計吧。
首先,所有限流器均繼承自抽象類RateLimiter
:
public abstract class RateLimiter : IAsyncDisposable, IDisposable
{
public abstract RateLimiterStatistics? GetStatistics();
public abstract TimeSpan? IdleDuration { get; }
public RateLimitLease AttemptAcquire(int permitCount = 1)
=> AttemptAcquireCore(permitCount);
protected abstract RateLimitLease AttemptAcquireCore(int permitCount);
public ValueTask<RateLimitLease> AcquireAsync(int permitCount = 1, CancellationToken cancellationToken = default)
=> AcquireAsyncCore(permitCount, cancellationToken);
protected abstract ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken);
}
其中,我們需要重點關注以下成員:
當前時間 - A
,當有請求獲取許可時,空閒週期就會被置為null
permitCount
引數,表示想要獲取的許可數量,預設值為 1。它所允許的值範圍是 >= 0,當傳入 0 時,表示檢視是否還能獲取到許可(不會消耗許可數)。RateLimitLease
擁有一個bool IsAcquired
屬性,表示許可是否獲取成功permitCount
引數,表示想要獲取的許可數量,預設值為 1。它所允許的值範圍是 >= 0,當傳入 0 時,它會一直等待,直到可以獲取到許可,或者再也不能獲取到許可了(不會消耗許可數)。RateLimitLease
擁有一個bool IsAcquired
屬性,表示許可是否獲取成功接著,對於FixedWindowLimiter
、SlidingWindowLimiter
和TokenBucketLimiter
來說,它們都是時間範圍的限流演演算法,都具備Replenish
性質,所以又抽象出一層ReplenishingRateLimiter
:
public abstract class ReplenishingRateLimiter : RateLimiter
{
// 許可發放週期
public abstract TimeSpan ReplenishmentPeriod { get; }
// 是否自動補充許可
public abstract bool IsAutoReplenishing { get; }
// 嘗試補充許可
// 當 AutoReplenishment == true 時,不會執行補充許可的邏輯,因為它是自動的,不允許手動干預
public abstract bool TryReplenish();
}
ConcurrencyLimiter
直接繼承自RateLimiter
最後具體看一下FixedWindowRateLimiter
的詳細實現,先來看建構函式以及一些常用屬性:
public sealed class FixedWindowRateLimiter : ReplenishingRateLimiter
{
// 用於重新補充許可的定時器
private readonly Timer? _renewTimer;
// 選項,會 clone 一份建構函式傳進來的 options
private readonly FixedWindowRateLimiterOptions _options;
// 指示許可租賃成功的結果
private static readonly RateLimitLease SuccessfulLease = new FixedWindowLease(true, null);
// 指示許可租賃失敗的結果
private static readonly RateLimitLease FailedLease = new FixedWindowLease(false, null);
// 空閒週期
public override TimeSpan? IdleDuration => ...;
// 是否自動補充許可
public override bool IsAutoReplenishing => _options.AutoReplenishment;
// 許可發放週期,對於固定視窗來說,就是視窗大小
public override TimeSpan ReplenishmentPeriod => _options.Window;
public FixedWindowRateLimiter(FixedWindowRateLimiterOptions options)
{
// 省略部分程式碼...
// 如果 AutoReplenishment == true,則會建立定時器,用於定時補充許可
// 不過我們從前面可以得知,傳遞到這裡的是 false,所以定時器並不會被建立
if (_options.AutoReplenishment)
{
_renewTimer = new Timer(Replenish, this, _options.Window, _options.Window);
}
}
}
接下來是補充許可TryReplenish
的實現:
public override bool TryReplenish()
{
// 當 AutoReplenishment == true 時,不會執行補充許可的邏輯,因為它是自動的,不允許手動干預
if (_options.AutoReplenishment)
{
return false;
}
Replenish(this);
return true;
}
private static void Replenish(object? state)
{
FixedWindowRateLimiter limiter = (state as FixedWindowRateLimiter)!;
// 獲取當前時間
long nowTicks = Stopwatch.GetTimestamp();
limiter!.ReplenishInternal(nowTicks);
}
private void ReplenishInternal(long nowTicks)
{
// 如果當前時間距離上次許可發放時間還沒達到視窗大小,則直接返回
if (((nowTicks - _lastReplenishmentTick) * TickFrequency) < _options.Window.Ticks && !_options.AutoReplenishment)
{
return;
}
int availablePermitCounters = _permitCount;
if (availablePermitCounters >= _options.PermitLimit)
{
// 如果當前可用許可數 >= 限流器設定的最大許可數,則無須重新發放,直接返回
return;
}
// 補充許可
_permitCount = _options.PermitLimit;
// 先處理排隊的請求
while (_queue.Count > 0)
{
// 根據 QueueProcessingOrder 從佇列中找到(Peek)最老或最新的請求
RequestRegistration nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.PeekHead()
: _queue.PeekTail();
// 若請求已完成處理,則只需要將它移出佇列(Dequeue),並釋放資源即可。
// 請求已完成可能的原因如下:
// 1. 已被取消
// 2. 當 QueueProcessingOrder 設定為 NewestFirst 時,新來的請求把老的踢出了佇列
if (nextPendingRequest.Tcs.Task.IsCompleted)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
// 若可用的許可數足夠,則從佇列中取出請求並處理
else if (_permitCount >= nextPendingRequest.Count)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
// 扣減
_queueCount -= nextPendingRequest.Count;
_permitCount -= nextPendingRequest.Count;
// 向請求補充許可
// 若發放失敗,這還原扣減
if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease))
{
_permitCount += nextPendingRequest.Count;
_queueCount += nextPendingRequest.Count;
}
// 釋放資源
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
else
{
// 請求無法被處理,直接跳出
break;
}
}
if (_permitCount == _options.PermitLimit)
{
// 當可用許可數等於設定的最大許可數,則開始計算空閒週期
_idleSince = Stopwatch.GetTimestamp();
}
}
下面一起看一下許可是如何租出去的。由於非同步的AcquireAsyncCore
基本包含了同步的AttemptAcquireCore
的處理邏輯,所以下面就只看AcquireAsyncCore
。需要著重說一下的是,同步的AttemptAcquireCore
是不會進行入隊操作的。
原始碼裡面其實有很多鎖,為了便於理解我都刪除了。
protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken = default)
{
// 當申請的許可數 == 0,並且可用許可數 > 0 時,則直接返回 SuccessfulLease,表示限流器還有可用許可
// 對於同步的 AttemptAcquireCore 方法來說,若此時可用許可數為 0,則會直接返回 FailedLease,表示限流器沒有可用許可
if (permitCount == 0 && _permitCount > 0)
{
return new ValueTask<RateLimitLease>(SuccessfulLease);
}
// 嘗試租賃
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
{
return new ValueTask<RateLimitLease>(lease);
}
// 如果佇列裝不下要申請許可的所有請求
if (_options.QueueLimit - _queueCount < permitCount)
{
// 如果優先處理新來的,並且要申請許可的請求數沒有超過佇列的大小限制,
// 則將佇列中老的請求踢出佇列,直到為新來的請求留出足夠的空間,準備將新來的請求加進去
if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)
{
do
{
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
// 設定老請求申請許可失敗
if (!oldestRequest.Tcs.TrySetResult(FailedLease))
{
_queueCount += oldestRequest.Count;
}
oldestRequest.CancellationTokenRegistration.Dispose();
}
while (_options.QueueLimit - _queueCount < permitCount);
}
else
{
// 如果優先處理後來的,則只能返回 失敗
return new ValueTask<RateLimitLease>(CreateFailedWindowLease(permitCount));
}
}
// 這部分程式碼不用太在意
CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);
CancellationTokenRegistration ctr = default;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(static obj =>
{
((CancelQueueState)obj!).TrySetCanceled();
}, tcs);
}
RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr);
// 將新請求加入到隊尾
_queue.EnqueueTail(registration);
_queueCount += permitCount;
// 非同步可等待,直到 Task 執行完成獲取到結果(可能是申請成功,也可能是失敗)
return new ValueTask<RateLimitLease>(registration.Tcs.Task);
}
TryLeaseUnsynchronized
具體邏輯如下:
private bool TryLeaseUnsynchronized(int permitCount, out RateLimitLease? lease)
{
// 若可用的許可數足夠,且不為 0
if (_permitCount >= permitCount && _permitCount != 0)
{
// 租賃的許可為0,則直接返回 成功
if (permitCount == 0)
{
lease = SuccessfulLease;
return true;
}
// 若:
// 1. 沒有排隊的請求
// 2. 或有排隊的請求,但是 QueueProcessingOrder 被設定為 NewestFirst
// 則租賃成功,其他則租賃失敗(因為要先把排隊的處理完)
if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
{
// 許可租賃出去了,也就表示該限流器不空閒了
_idleSince = null;
_permitCount -= permitCount;
lease = SuccessfulLease;
return true;
}
}
// 租賃失敗
lease = null;
return false;
}
現在,我們已經掌握了限流器補充許可和租賃許可的細節邏輯了,並且也得知並沒有使用限流器內部的定時器去定時補充許可,那這是由誰補充的呢?又是由誰為請求申請的許可呢?
沒錯,這都是RateLimitingMiddleware
負責的。
在構造方法中,我們需要重點關注下CreateEndpointLimiter
,它建立了終結點分割區限流器,與全域性限流器一起提供限流服務。
internal sealed partial class RateLimitingMiddleware
{
// 預設被限流拒絕回撥的委託,取自 options.OnRejected
private readonly Func<OnRejectedContext, CancellationToken, ValueTask>? _defaultOnRejected;
// 全域性限流器,取自 options.GlobalLimiter
private readonly PartitionedRateLimiter<HttpContext>? _globalLimiter;
// 終結點限流器
private readonly PartitionedRateLimiter<HttpContext> _endpointLimiter;
// 限流響應狀態碼,取自 options.RejectionStatusCode
private readonly int _rejectionStatusCode;
// 限流策略集,取自 options.PolicyMap 和 options.UnactivatedPolicyMap
private readonly Dictionary<string, DefaultRateLimiterPolicy> _policyMap;
public RateLimitingMiddleware(RequestDelegate next, ILogger<RateLimitingMiddleware> logger, IOptions<RateLimiterOptions> options, IServiceProvider serviceProvider, RateLimitingMetrics metrics)
{
// ...省略一堆程式碼
_endpointLimiter = CreateEndpointLimiter();
}
}
在CreateEndpointLimiter
方法中,建立了分割區限流器,裡面包含了各種各樣的限流分割區,用於不同終結點請求的限流。
private PartitionedRateLimiter<HttpContext> CreateEndpointLimiter()
{
// 建立分割區限流器
return PartitionedRateLimiter.Create<HttpContext, DefaultKeyType>(context =>
{
DefaultRateLimiterPolicy? policy;
var enableRateLimitingAttribute = context.GetEndpoint()?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
// 如果不需要限流,則返回 NoLimiter
if (enableRateLimitingAttribute is null)
{
return RateLimitPartition.GetNoLimiter<DefaultKeyType>(_defaultPolicyKey);
}
// 根據限流策略取限流分割區
policy = enableRateLimitingAttribute.Policy;
if (policy is not null)
{
return policy.GetPartition(context);
}
var name = enableRateLimitingAttribute.PolicyName;
if (name is not null)
{
if (_policyMap.TryGetValue(name, out policy))
{
return policy.GetPartition(context);
}
else
{
throw new InvalidOperationException($"This endpoint requires a rate limiting policy with name {name}, but no such policy exists.");
}
}
// 雖然策略名或策略不可能為空,但是加一下判斷更好
else
{
throw new InvalidOperationException("This endpoint requested a rate limiting policy with a null name.");
}
}, new DefaultKeyTypeEqualityComparer());
}
咦?怎麼還是沒看到在哪自動補充的許可?實際上它就隱藏在PartitionedRateLimiter.Create
中的DefaultPartitionedRateLimiter
裡面,藏得太深了:
public static class PartitionedRateLimiter
{
public static PartitionedRateLimiter<TResource> Create<TResource, TPartitionKey>(
Func<TResource, RateLimitPartition<TPartitionKey>> partitioner,
IEqualityComparer<TPartitionKey>? equalityComparer = null) where TPartitionKey : notnull
{
return new DefaultPartitionedRateLimiter<TResource, TPartitionKey>(partitioner, equalityComparer);
}
}
下面是DefaultPartitionedRateLimiter
啟動定時器執行心跳的核心程式碼:
internal sealed class DefaultPartitionedRateLimiter<TResource, TKey> : PartitionedRateLimiter<TResource> where TKey : notnull
{
// 限流器集合
private readonly Dictionary<TKey, Lazy<RateLimiter>> _limiters;
// 限流分割區委託,可通過資源獲取到分割區
private readonly Func<TResource, RateLimitPartition<TKey>> _partitioner;
// 定時器,主要作用是每 100ms 進行一次心跳,即執行 Heartbeat 方法
private readonly TimerAwaitable _timer;
private readonly Task _timerTask;
public DefaultPartitionedRateLimiter(Func<TResource, RateLimitPartition<TKey>> partitioner,
IEqualityComparer<TKey>? equalityComparer = null)
{
_limiters = new Dictionary<TKey, Lazy<RateLimiter>>(equalityComparer);
_partitioner = partitioner;
var timerInterval = TimeSpan.FromMilliseconds(100);
_timer = new TimerAwaitable(timerInterval, timerInterval);
_timerTask = RunTimer();
}
private async Task RunTimer()
{
_timer.Start();
// 只要 timer 不被停止,則一直返回 true,即 timer 仍在執行中
while (await _timer)
{
try
{
await Heartbeat().ConfigureAwait(false);
}
catch { }
}
_timer.Dispose();
}
}
TimerAwaitable
是一個可非同步等待的型別(實現了GetAwaiter
、INotifyCompletion
、IsCompleted
和GetResult
),內部設計非常有意思。在它內部,啟動了一個定時器,每 100ms(傳入的timerInterval) Tick 一次,每次 Tick 就會把 IsCompleted
設定為true
,將任務狀態切換為已完成。外部通過await
獲取結果時(靜默呼叫GetResult
),又會將IsCompleted
設定為false
,再將其轉換為未完成狀態。外部再配合while
以達到定時執行的效果。
為什麼不直接用Timer
而又弄出一個TimerAwaitable
?我認為TimerAwaitable
有以下優點:
通過定時器,每 100ms 執行一次心跳,心跳過程中檢查各個限流器是否需要補充許可,如果需要,則補充,並回收空閒限流器等。以下是簡化的心跳邏輯:
private async Task Heartbeat()
{
if (_cacheInvalid)
{
_cachedLimiters.Clear();
_cachedLimiters.AddRange(_limiters);
}
// 遍歷所有快取的限流器
foreach (KeyValuePair<TKey, Lazy<RateLimiter>> rateLimiter in _cachedLimiters)
{
// 如果限流器還未被範例化,則跳過
if (!rateLimiter.Value.IsValueCreated) continue;
// 如果限流器空閒週期超過了空閒時間限制(預設10s),則回
if (rateLimiter.Value.Value.IdleDuration is TimeSpan idleDuration && idleDuration > s_idleTimeLimit)
{
lock (Lock)
{
// 雙重檢測,確保限流器確實是空閒的
idleDuration = rateLimiter.Value.Value.IdleDuration ?? TimeSpan.Zero;
if (idleDuration > s_idleTimeLimit)
{
_cacheInvalid = true;
// 回收該限流器
_limiters.Remove(rateLimiter.Key);
// 儲存下來,後面一起釋放資源
_limitersToDispose.Add(rateLimiter.Value.Value);
}
}
}
// 如果限流器可補充許可,則嘗試補充
else if (rateLimiter.Value.Value is ReplenishingRateLimiter replenishingRateLimiter)
{
try
{
replenishingRateLimiter.TryReplenish();
}
catch (Exception ex) { ... }
}
}
// 釋放回收的限流器資源
foreach (RateLimiter limiter in _limitersToDispose)
{
try
{
await limiter.DisposeAsync().ConfigureAwait(false);
}
catch (Exception ex) { ... }
}
_limitersToDispose.Clear();
}
好了,我們已經瞭解了限流器的管理,讓我們再次回到RateLimitingMiddleware
,看看他是如何工作的吧:
public Task Invoke(HttpContext context)
{
var endpoint = context.GetEndpoint();
// 如果終結點包含禁用限流標記,則不限流
if (endpoint?.Metadata.GetMetadata<DisableRateLimitingAttribute>() is not null)
{
return _next(context);
}
var enableRateLimitingAttribute = endpoint?.Metadata.GetMetadata<EnableRateLimitingAttribute>();
// 如果終結點沒有啟用限流標記,並且全域性限流器也是空的,則同樣不限流
if (enableRateLimitingAttribute is null && _globalLimiter is null)
{
return _next(context);
}
return InvokeInternal(context, enableRateLimitingAttribute);
}
private async Task InvokeInternal(HttpContext context, EnableRateLimitingAttribute? enableRateLimitingAttribute)
{
var policyName = enableRateLimitingAttribute?.PolicyName;
// 嘗試獲取許可
using var leaseContext = await TryAcquireAsync(context);
// 如果獲取到了許可,則處理請求
if (leaseContext.Lease?.IsAcquired == true)
{
await _next(context);
}
// 沒有獲取到許可,則限流拒絕
else
{
// 如果請求是被取消的,則不要執行 OnRejected 回撥,應該直接返回
if (leaseContext.RequestRejectionReason == RequestRejectionReason.RequestCanceled)
{
return;
}
var thisRequestOnRejected = _defaultOnRejected;
context.Response.StatusCode = _rejectionStatusCode;
// 如果請求是被終結點限流器限流拒絕的
if (leaseContext.RequestRejectionReason == RequestRejectionReason.EndpointLimiter)
{
// 若策略有自己的 OnRejected,則使用策略的,如果沒有,則使用 _defaultOnRejected
// 這裡我感覺是個 bug,應該判斷 policy?.OnRejected is not null 才賦值
DefaultRateLimiterPolicy policy = enableRateLimitingAttribute?.Policy;
if (policy is not null)
{
thisRequestOnRejected = policy.OnRejected;
}
else
{
// 對於策略名,當 OnRejected 不為空時,才使用策略的 OnRejected
if (policyName is not null && _policyMap.TryGetValue(policyName, out policy) && policy.OnRejected is not null)
{
thisRequestOnRejected = policy.OnRejected;
}
}
}
// 執行回撥
if (thisRequestOnRejected is not null)
{
await thisRequestOnRejected(new OnRejectedContext() { HttpContext = context, Lease = leaseContext.Lease! }, context.RequestAborted);
}
}
}
TryAcquireAsync
會先從全域性限流器獲取許可,如果獲取到了,則會繼續在終結點限流器中獲取許可,如果獲取到了,請求才會被處理:
非同步的
CombinedWaitAsync
與同步的CombinedAcquire
類似,只不過前面呼叫的是非同步方法,後面是同步,故下方僅列出CombinedAcquire
簡化原始碼。
private async ValueTask<LeaseContext> TryAcquireAsync(HttpContext context, MetricsContext metricsContext)
{
// 組合獲取,即按順序從全域性限流器和終結點限流器中獲取許可
var leaseContext = CombinedAcquire(context);
// 如果獲取到了,則直接返回
if (leaseContext.Lease?.IsAcquired == true)
{
return leaseContext;
}
// 非同步等待再次獲取許可
return await CombinedWaitAsync(context, context.RequestAborted);
}
private LeaseContext CombinedAcquire(HttpContext context)
{
// 全域性限流器不為空,則先從其中獲取許可
if (_globalLimiter is not null)
{
var globalLease = _globalLimiter.AttemptAcquire(context);
// 未獲取許可,直接返回
if (!globalLease.IsAcquired)
{
return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.GlobalLimiter, Lease = globalLease };
}
}
// 從終結點限流器中獲取許可
var endpointLease = _endpointLimiter.AttemptAcquire(context);
// 未獲取許可,直接返回
if (!endpointLease.IsAcquired)
{
globalLease?.Dispose();
return new LeaseContext() { RequestRejectionReason = RequestRejectionReason.EndpointLimiter, Lease = endpointLease };
}
return globalLease is null
? new LeaseContext() { Lease = endpointLease }
: new LeaseContext() { Lease = new DefaultCombinedLease(globalLease, endpointLease) };
}
AddRateLimiter
註冊限流服務,通過UseRateLimiter
啟用限流功能。options.AddPolicy
新增限流策略,作用於某些終結點,這些策略最終組成的分割區限流器稱為終結點限流器
options.AddXXXLimiter
的方式快捷新增限流策略IRateLimiterPolicy<TPartitionKey>
options.GlobalLimiter
設定全域性限流器,當請求進入應用時,會先執行全域性限流器,再執行終結點限流器。options.RejectionStatusCode
設定限流拒絕的響應狀態碼,還可以通過OnRejected
編寫更多的響應邏輯。PartitionedRateLimiter.CreateChained
將多個分割區限流器進行鏈式組合