承載ASP.NET應用的伺服器資源總是有限的,短時間內湧入過多的請求可能會瞬間耗盡可用資源並導致宕機。為了解決這個問題,我們需要在伺服器端設定一個閥門將並行處理的請求數量限制在一個可控的範圍,即使會導致請求的延遲響應,在極端的情況會還不得不放棄一些請求。ASP.NET應用的流量限制是通過ConcurrencyLimiterMiddleware中介軟體實現的。(本文提供的範例演示已經同步到《ASP.NET Core 6框架揭祕-範例演示版》)
[S2601]設定並行和等待請求閾值 (原始碼)
[S2602]基於佇列的限流策略(原始碼)
[S2603]基於棧的限流策略(原始碼)
[S2604]處理被拒絕的請求(原始碼)
由於各種Web伺服器、反向代理和負載均衡器都提供了限流的能力,我們很少會在應用層面進行流量控制。ConcurrencyLimiterMiddleware中介軟體由「Microsoft.AspNetCore.ConcurrencyLimiter」這個NuGet包提供,ASP.NET應用採用的SDK(「Microsoft.NET.Sdk.Web」)並沒有將該包作為預設的參照,所以我們需要手工新增該NuGet包的參照。
當請求並行量超過設定的閾值,ConcurrencyLimiterMiddleware中介軟體會將請求放到等待佇列中,整個限流工作都是圍繞這個這個佇列進行的,採用怎樣的策略管理這個等待佇列是整個限流模型的核心。不論採用何種策略,我們都需要設定兩個閾值,一個是當前允許的最大並行請求量,另一個是等待佇列的最大容量。如程式碼片段所示,我們通過呼叫IServiceCollection介面的AddQueuePolicy擴充套件方法註冊了一個基於佇列(「Queue」)的策略,並將上述的兩個閾值設定為2。
using App;
var builder = WebApplication.CreateBuilder(args);
builder.Logging.ClearProviders();
builder.Services
.AddHostedService<ConsumerHostedService>()
.AddQueuePolicy(options =>
{
options.MaxConcurrentRequests = 2;
options.RequestQueueLimit = 2;
});
var app = builder.Build();
app
.UseConcurrencyLimiter()
.Run(httpContext => Task.Delay(1000).ContinueWith(_ => httpContext.Response.StatusCode = 200));
app.Run();
public class ConsumerHostedService : BackgroundService { private readonly HttpClient[] _httpClients; public ConsumerHostedService(IConfiguration configuration) { var concurrency = configuration.GetValue<int>("Concurrency"); _httpClients = Enumerable .Range(1, concurrency) .Select(_ => new HttpClient()) .ToArray(); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { var tasks = _httpClients.Select(async client => { while (true) { var start = DateTimeOffset.UtcNow; var response = await client.GetAsync("http://localhost:5000"); var duration = DateTimeOffset.UtcNow - start; var status = $"{(int)response.StatusCode},{response.StatusCode}"; Console.WriteLine($"{status} [{(int)duration.TotalSeconds}s]"); if (!response.IsSuccessStatusCode) { await Task.Delay(1000); } } }); return Task.WhenAll(tasks); } public override Task StopAsync(CancellationToken cancellationToken) { Array.ForEach(_httpClients, it => it.Dispose()); return Task.CompletedTask; } }
對於傳送的每個請求,ConsumerHostedService都會在控制檯上記錄下響應的狀態和耗時。為了避免控制檯「刷屏」,我們在接收到錯誤響應後模擬一秒鐘的等待。由於並行量是由設定系統提供的,所以我們可以利用命令列引數(「Concurrency」)的方式來對並行量進行設定。如圖1所示,我們以命令列的方式啟動了程式,並通過命令列引數將並行量設定為2。由於並行量並沒有超出閾值,所以每個請求均得到正常的響應。
圖1 並行量未超出閾值
由於並行量的閾值和等待佇列的容量均設定為2,從外部來看,我們的演示程式所能承受的最大並行量為4。所以當我們以此並行量啟動程式之後,並行的請求能夠接收到成功的響應,但是除了前兩個請求能夠得到及時處理之外,後續請求都會在等待佇列中呆上一段時間,所以整個耗時會延長。如果將並行量提升到5,這顯然超出了伺服器端的極限,所以部分請求會得到狀態碼為「503, Service Unavailable」的響應。
圖2 並行量超出閾值
ASP.NET應用的並行處理的請求量可以通過dotnet-counters工具提供的效能計數器進行檢視。具體的效能計數器名稱為「Microsoft.AspNetCore.Hosting」,我們現在通過這種方式來看看應用程式真正的並行處理指標是否和我們的預期一致。我們還是以並行量為5啟動演示程式,然後以圖26-3所示的方式執行「dotnet-coutners ps」命令檢視演示程式的程序,並針對程序ID執行「dotnet-counters monitor」命令檢視名為「Microsoft.AspNetCore.Hosting」的效能指標。
圖3 使用dotnet-counters monitor檢視並行量
如圖3所示,dotnet-counters顯示的並行請求為4,這和我們的設定是吻合的,因為對於應用的中介軟體管道來說,並行處理的請求包含ConcurrencyLimiterMiddleware中介軟體的等待佇列的兩個和後續中介軟體真正處理的兩個。我們還看到了每秒處理的請求數量為3,並有約1/3的請求失敗率,這些指標和我們的設定都是吻合的。
通過前面的範例演示我們知道,當ConcurrencyLimiterMiddleware中介軟體維護的等待佇列被填滿並且後續中介軟體管道正在「滿負荷執行(並行處理的請求達到設定的閾值)」的情況下,如果此時接收到一個新的請求,它只能放棄某個待處理的請求。具體來說,它具有兩種選擇,一種是放棄剛剛接收的請求,另一種就是將等待佇列中的某個請求扔掉,其位置由新接收的請求佔據。
前面演示範例採用的等待佇列處理策略是通過呼叫IServiceCollection介面的AddQueuePolicy擴充套件方法註冊的,這樣一種基於「佇列」的策略。我們知道佇列的特點就是先進先出(FIFO),講究「先來後到」,如果採用這種策略就會放棄剛剛接收到的請求。我們可以通過簡單的範例證實這一點。如下面的演示程式所示,我們在ConcurrencyLimiterMiddleware中介軟體之前註冊了一個通過DiagnosticMiddleware方法表示的中介軟體,它會對每個請求按照它接收到的時間順序進行編號,我們利用它列印出每個請求對應的響應狀態就知道ConcurrencyLimiterMiddleware中介軟體最終放棄的是那個請求了。
using App; var requestId = 1; var @lock = new object(); var builder = WebApplication.CreateBuilder(); builder.Logging.ClearProviders(); builder.Services .AddHostedService<ConsumerHostedService>() .AddQueuePolicy(options => { options.MaxConcurrentRequests = 2; options.RequestQueueLimit = 2; }); var app = builder.Build(); app .Use(InstrumentAsync) .UseConcurrencyLimiter() .Run(httpContext => Task.Delay(1000).ContinueWith(_ => httpContext.Response.StatusCode = 200)); await app.StartAsync(); var tasks = Enumerable.Range(1, 5) .Select(_ => new HttpClient().GetAsync("http://localhost:5000")); await Task.WhenAll(tasks); Console.Read(); async Task InstrumentAsync(HttpContext httpContext, RequestDelegate next) { Task task; int id; lock (@lock!) { id = requestId++; task = next(httpContext); } await task; Console.WriteLine($"Request {id}: {httpContext.Response.StatusCode}"); }
我們在 IServiceCollection介面的AddQueuePolicy擴充套件方法中提供的設定不變(最大並行量和等待佇列大小都是2)。在應用啟動之後,我們同時傳送了5個請求,此時控制檯上會呈現出如圖4所示的輸出結果,可以看出ConcurrencyLimiterMiddleware中介軟體在接收到第5個請求並不得不作出取捨的時候,它放棄的就是當前接收到的請求。
圖4 基於佇列的處理策略
當ConcurrencyLimiterMiddleware中介軟體在接收到某個請求並需要決定放棄某個待處理請求時,它還可以採用另一種基於「棧」的策略。如果採用這種策略,它會先保全當前接收到的請求,並用它替換掉儲存在等待佇列時間最長的那個。也就是說它不再講究先來後到,而主張後來居上。對於前面演示的程式來說,我們只需要按照如下的方式將針對AddQueuePolicy擴充套件方法的呼叫替換成AddStackPolicy方法就可以切換到這種策略。
... var builder = WebApplication.CreateBuilder(); builder.Logging.ClearProviders(); builder.Services .AddHostedService<ConsumerHostedService>() .AddStackPolicy(options => { options.MaxConcurrentRequests = 2; options.RequestQueueLimit = 2; }); var app = builder.Build(); ...
重新啟動改動後的演示程式,我們將在控制檯上得到如圖5所示的輸出結果。可以看出這次ConcurrencyLimiterMiddleware中介軟體在接收到第5個請求並不得不做出取捨的時候,它放棄的就是最先儲存到等待佇列的第3個請求。
圖5 基於棧處理策略
從ConcurrencyLimiterMiddleware中介軟體的實現可以看出,在預設情況下因超出限流閾值而被拒絕處理的請求來說,應用最終會給與一個狀態碼為「503 Service Available」的響應。如果我們對這個預設的處理方式不滿意,可以通過對設定選項ConcurrencyLimiterOptions的設定來提供一個自定義的處理器。舉個典型的場景,叢集部署的多臺機器可能負載不均,所以如果將被某臺機器拒絕的請求分發給另一臺機器是可能被正常處理的。為了確保請求能夠儘可能地被處理,我們可以針對相同的URL發起一個使用者端重定向,具體的實現體現在如下所示的演示程式中。
using Microsoft.AspNetCore.ConcurrencyLimiter; using Microsoft.AspNetCore.Http.Extensions; var builder = WebApplication.CreateBuilder(args); builder.Logging.ClearProviders(); builder.Services .Configure<ConcurrencyLimiterOptions>(options => options.OnRejected = RejectAsync) .AddStackPolicy(options => { options.MaxConcurrentRequests = 2; options.RequestQueueLimit = 2; }); var app = builder.Build(); app .UseConcurrencyLimiter() .Run(httpContext => Task.Delay(1000).ContinueWith(_ => httpContext.Response.StatusCode = 200)); app.Run(); static Task RejectAsync(HttpContext httpContext) { var request = httpContext.Request; if (!request.Query.ContainsKey("reject")) { var response = httpContext.Response; response.StatusCode = 307; var queryString = request.QueryString.Add("reject", "true"); var newUrl = UriHelper.BuildAbsolute(request.Scheme, request.Host, request.PathBase, request.Path, queryString); response.Headers.Location = newUrl; } return Task.CompletedTask; }
如上面的程式碼片段所示,我們呼叫IServiceCollection介面的Configure<TOptions>擴充套件方法對ConcurrencyLimiterOptions進行了設定。具體來說,我們將RejectAsync方法表示的RequestDelegate委託作為拒絕請求處理器賦值給了ConcurrencyLimiterOptions設定選項的OnRejected屬性。在RejectAsync方法中,我們針對當前請求的URL返回了一個狀態碼為307的臨時重定向響應。為了避免重複的重定向操作,我們為重定向地址新增了一個名為「reject」的查詢字串來識別重定向請求。