之前參與一個機票價格計算的專案,為他們設計了基本的處理流程,但是由於整個計算流程相當複雜,而且變化非常頻繁,導致日常的修改、維護和升級也變得越來越麻煩,當我後來再接手的時候已經看不懂計算邏輯了。為了解決這個問題,我借鑑了「工作流」的思路,試圖將整個計算過程設計成一個工作流。但是我又不想引入一個獨立的工作流引擎,於是寫了一個名為Pipelines的框架。顧名思義,Pipelines通過構建Pipeline的方式完成所需的處理流程,整個處理邏輯被分解並實現在若干Pipe中,這些Pipe按照指定的順序將完成的Pipeline構建出來。Pipeline本質上就是一個簡單的順序工作流,它僅僅按序執行註冊的Pipe。這個簡單的Pipelines框架被放在這裡,這裡我不會介紹它的設計實現,只是簡單地介紹它的用法,有興趣的可以檢視原始碼。
一、構建並執行管道
二、Pipeline的「內部中斷」
三、Pipeline的「外部中斷」
四、處理層次化資料結構
五、利用擴充套件方法時Pipeline構建更簡潔
Pipelines旨在提供一個用於處理資料的順序工作流或者管道(以下簡稱Pipeline),該Pipeline在一個強型別的上下文中被執行,管道可以利用此上下文得到需要處理的資料,並將處理的結果(含中間結果)儲存在上下文中。接下來我們來演示如何利用Pipelines框架處理人口統計資料的範例。如下所示的兩個型別分別表示人口統計資料和處理上下文,後者繼承基礎類別ContextBase。
public class PopulationData { public object Statistics { get; set; } = default!; } public sealed class PopulationContext : ContextBase { public PopulationContext(PopulationData data)=> Data = data; public PopulationData Data { get; } }
Pipeline由一系列Pipe物件按照註冊的順序組合而成。通過繼承基礎類別PipeBase<PopulationContext>,我們定義了三個Pipe類來完成針對人口統計資料的三項基本處理任務。
public sealed class FooPopulationPipe : PipeBase<PopulationContext> { public override string Description => "Global PopulationProcessor Foo"; protected override void Invoke(PopulationContext context) =>Console.WriteLine($"{nameof(FooPopulationPipe)} is invoked."); } public sealed class BarPopulationPipe : PipeBase<PopulationContext> { public override string Description => "Global PopulationProcessor Bar"; protected override void Invoke(PopulationContext context) => Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked."); } public sealed class BazPopulationPipe : PipeBase<PopulationContext> { public override string Description => "Global PopulationProcessor Baz"; protected override void Invoke(PopulationContext context) => Console.WriteLine($"{nameof(BazPopulationPipe)} is invoked."); }
我設計Pipelines的初衷是讓每個參與者(包含非技術人員)在程式碼的頻繁迭代過程中,可以清晰地瞭解當前的處理流程,所以我會將當前應用構建的所有Pipeline的處理流程匯出來。基於這個目的,每個Pipe型別都需要利用其Description屬性提供一段描述當前處理邏輯的文字。Pipe具體的處理邏輯實現在重寫的Invoke方法中。如果涉及非同步處理,需要繼承更上層的基礎類別Pipe<TContext>(PipeBase<TContext>的基礎類別)並重寫非同步的InvokeAsync方法。
Pipeline的構建實現在如下所示的BuildPipelines方法中,我們利用該方法提供的IPipelineProvider物件註冊了一個命名為「PopulationProcessor」的Pipeline。具體來說,我們呼叫的是它的AddPipeline<TContext>方法,該方法提供的第一個引數為Pipeline的註冊名稱,另一個引數是一個型別為Action<IPipelineBuilder<TContext>>的委託,它利用提供的IPipelineBuilder<TContext>物件完成了上面定義的三個Pipe的註冊。
using App; using Artech.Pipelines; var builder = WebApplication.CreateBuilder(args); builder.Services.AddPipelines(BuildPipelines); var app = builder.Build(); app.MapGet("/test", async (IPipelineProvider provider, HttpResponse response) => { Console.WriteLine("Execute PopulationProcessor pipeline"); var context = new PopulationContext(new PopulationData()); var pipeline = provider.GetPipeline<PopulationContext>("PopulationProcessor"); await pipeline.ProcessAsync(context); return Results.Ok(); }); app.Run(); static void BuildPipelines(IPipelineProvider pipelineProvider) { pipelineProvider.AddPipeline<PopulationContext>( name: "PopulationProcessor", setup: builder => builder .Use<PopulationContext, FooPopulationPipe>() .Use<PopulationContext, BarPopulationPipe>() .Use<PopulationContext, BazPopulationPipe>()); }
Pipelines框架涉及的服務通過IServiceCollection介面的AddPipelines方法進行註冊,BuildPipelines方法轉換成委託作為該方法的引數。我們註冊了一個指向「/test」 的路由終結點來演示針對管道的執行。如程式碼片段所示,我們利用注入的IPipelineProvider物件根據註冊名稱得到具體的Pipeline物件,並建立出相應的PopulationContext上下文作為引數來執行此Pipeline物件。程式執行後,請求路徑」/pipelines」可以得到一個Pipeline的列表,點選具體的連結,對應Pipeline體現的流程就會呈現出來。
如果請求路徑「/test」來執行構建的管道,管道執行的軌跡將會體現在控制檯的輸出結果上。
構成Pipeline的每個Pipe都可以根據處理邏輯的需要立即中斷管道的執行。在如下這個重寫的BarPopulationPipe型別的Invoke方法中,如果生成的亂數為偶數,它會呼叫上下文物件的Abort方法立即終止Pipeline的執行。
public sealed class BarPopulationPipe : PipeBase<PopulationContext> { private readonly Random _random = new(); public override string Description => "Global PopulationProcessor Bar"; protected override void Invoke(PopulationContext context) { Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked."); if (_random.Next() % 2 == 0) { context.Abort(); } } }
這樣的化,當我們構建的Pipeline在執行過程中,有一半的機率BazPopulationPipe將不會執行,如下所示的輸出結果體現了這一點。
對於繼承自Pipe<TContext>的Pipe型別,其實現的InvokeAsync方法可以採用如下的方式中止當前Pipeline的執行,因為引數next返回的委託用於呼叫後續Pipe。如果不執行此委託,就意味著針對Pipeline的執行到此為止。
public sealed class BarPopulationPipe : Pipe<PopulationContext> { private readonly Random _random = new(); public override string Description => "Global PopulationProcessor Bar"; public override ValueTask InvokeAsync(PopulationContext context, Func<PopulationContext, ValueTask> next) { Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked."); if (_random.Next() % 2 != 0) { return next(context); } return ValueTask.CompletedTask; } }
在呼叫Pipeline時,我們可以利用執行上下文提供的CancellationToken中止Pipeline的執行。我們按照如下的方式再次改寫了BarPopulationPipe的執行邏輯,如下面的程式碼片段所示,我們不再呼叫Abort方法,而是選擇延遲2秒執行後續操作。
public sealed class BarPopulationPipe : Pipe<PopulationContext> { private readonly Random _random = new(); public override string Description => "Global PopulationProcessor Bar"; public override async ValueTask InvokeAsync(PopulationContext context, Func<PopulationContext, ValueTask> next) { Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked."); if (_random.Next() % 2 != 0) { await Task.Delay(2000); } await next(context); } }
我們按照如下的方式重寫了PopulationContext的CancellationToken屬性。我們為建構函式新增了兩個引數,一個代表當前HttpContext上下文,另一個表示設定的超時時限。CancellationToken根據這兩個引數建立而成,意味著管道不僅具有預設的超時時間,也可以通過HTTP呼叫方中止執行。
public sealed class PopulationContext: ContextBase { public PopulationContext(PopulationData data, HttpContext httpContext, TimeSpan timeout) { Data = data; CancellationToken = CancellationTokenSource.CreateLinkedTokenSource(httpContext.RequestAborted, new CancellationTokenSource(timeout).Token).Token; } public PopulationData Data { get; } public override CancellationToken CancellationToken { get; } }
在註冊的終結點處理器中,我們在執行Pipeline之前,將作為引數傳入的PopulationContext上下文的超時時間設定為1秒。
var builder = WebApplication.CreateBuilder(args); builder.Services.AddPipelines(BuildPipelines); var app = builder.Build(); app.MapGet("/test", async (HttpContext httpContext,IPipelineProvider provider, HttpResponse response) => { Console.WriteLine("Execute PopulationProcessor pipeline"); var context = new PopulationContext(new PopulationData(), httpContext, TimeSpan.FromSeconds(1)); var pipeline = provider.GetPipeline<PopulationContext>("PopulationProcessor"); await pipeline.ProcessAsync(context); return Results.Ok(); }); app.Run();
根據BarPopulationPipe的執行邏輯,Pipeline的執行具有一半的機率會超時,一旦超時就會立即丟擲一個OperationCancellationToken異常。
Pipelines設計的主要目的是用來處理層次化的資料結構,這涉及到子Pipeline的應用。目前我們處理的人口資料體現為一個簡單的資料型別,現在我們讓它變得更復雜一些。假設我們需要處理國家、省份和城市三個等級的人口資料,其中StatePopulationData代表整個國家的人口資料,它的Provinces屬性承載了每個省份的資料。ProvincePopulationData代表具體某個省份的人口資料,其Cities屬性承載了每個城市的人口資料。
public class PopulationData { public object Statistics { get; set; } = default!; } public class StatePopulationData { public IDictionary<string, ProvincePopulationData> Provinces { get; set; } = default!; } public class ProvincePopulationData { public IDictionary<string, PopulationData> Cities { get; set; } = default!; }
現在我們需要構建一個Pipeline來處理通過StatePopulationData型別表示的整個國家的人口資料,具體的處理流程為:
為此我們需要定義9個Pipe型別,以及3個執行上下文。如下所示的是三個執行上下文型別的具體定義:
public sealed class StatePopulationContext: ContextBase { public StatePopulationData PopulationData { get; } public StatePopulationContext(StatePopulationData populationData) => PopulationData = populationData; } public sealed class ProvincePopulationContext : SubContextBase<StatePopulationContext, KeyValuePair<string, ProvincePopulationData>> { public string Province { get; private set; } = default!; public IDictionary<string, PopulationData> Cities { get; private set; } = default!; public override void Initialize(StatePopulationContext parent, KeyValuePair<string, ProvincePopulationData> item) { Province = item.Key; Cities = item.Value.Cities; base.Initialize(parent, item); } } public sealed class CityPopulationContext: SubContextBase<ProvincePopulationContext, KeyValuePair<string, PopulationData>> { public string City { get; private set; } = default!; public PopulationData PopulationData { get; private set; } = default!; public override void Initialize(ProvincePopulationContext parent, KeyValuePair<string, PopulationData> item) { City = item.Key; PopulationData = item.Value; base.Initialize(parent, item); } }
9個對應的Pipe型別定義如下。每個型別利用重寫的Description提供一個簡單的描述,重寫的Invoke方法輸出當前怎樣的資料(那個省/市的人口資料)。
public sealed class FooStatePipe : PipeBase<StatePopulationContext> { public override string Description => "State Population Processor Foo"; protected override void Invoke(StatePopulationContext context)=>Console.WriteLine("Foo: Process state population"); } public sealed class BarStatePipe : PipeBase<StatePopulationContext> { public override string Description => "State Population Processor Bar"; protected override void Invoke(StatePopulationContext context) => Console.WriteLine("Bar: Process state population"); } public sealed class BazStatePipe : PipeBase<StatePopulationContext> { public override string Description => "State Population Processor Baz"; protected override void Invoke(StatePopulationContext context) => Console.WriteLine("Baz: Process state population"); } public sealed class FooProvincePipe : PipeBase<ProvincePopulationContext> { public override string Description => "Province Population Processor Foo"; protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($"\tFoo: Process population of the province {context.Province}"); } public sealed class BarProvincePipe : PipeBase<ProvincePopulationContext> { public override string Description => "Province Population Processor Bar"; protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($"\tBar: Process population of the province {context.Province}"); } public sealed class BazProvincePipe : PipeBase<ProvincePopulationContext> { public override string Description => "Province Population Processor Baz"; protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($"\tBaz: Process population of the province {context.Province}"); } public sealed class FooCityPipe : PipeBase<CityPopulationContext> { public override string Description => "City Population Processor Foo"; protected override void Invoke(CityPopulationContext context) => Console.WriteLine($"\t\tFoo: Process population of the city {context.City}"); } public sealed class BarCityPipe : PipeBase<CityPopulationContext> { public override string Description => "City Population Processor Bar"; protected override void Invoke(CityPopulationContext context) => Console.WriteLine($"\t\tBar: Process population of the city {context.City}"); } public sealed class BazCityPipe : PipeBase<CityPopulationContext> { public override string Description => "City Population Processor Baz"; protected override void Invoke(CityPopulationContext context) => Console.WriteLine($"\t\tBaz: Process population of the city {context.City}"); }
static void BuildPipelines(IPipelineProvider pipelineProvider) { pipelineProvider.AddPipeline<StatePopulationContext>(name: "PopulationProcessor", setup: builder => builder .Use<StatePopulationContext, FooStatePipe>() .Use<StatePopulationContext, BarStatePipe>() .ForEach<StatePopulationContext, ProvincePopulationContext, KeyValuePair<string, ProvincePopulationData>>( description: "For each province", collectionAccessor: context => context.PopulationData.Provinces, subPipelineSetup: provinceBuilder => provinceBuilder .Use<ProvincePopulationContext, FooProvincePipe>() .Use<ProvincePopulationContext, BarProvincePipe>() .ForEach<ProvincePopulationContext, CityPopulationContext, KeyValuePair<string, PopulationData>>( description: "For each city", collectionAccessor: context => context.Cities, subPipelineSetup: cityBuilder => cityBuilder .Use<CityPopulationContext, FooCityPipe>() .Use<CityPopulationContext, BarCityPipe>() .Use<CityPopulationContext, BazCityPipe>()) .Use<ProvincePopulationContext, BazProvincePipe>()) .Use<StatePopulationContext, BazStatePipe>()); }
終結點處理程式在執行新的Pipeline時,會按照如下的形式將StatePopulationContext上下文構建出來。處理人口資料涉及三個省份(江蘇、山東和浙江),每個省份包含三個城市的人口資料。
var builder = WebApplication.CreateBuilder(args); builder.Services.AddPipelines(BuildPipelines); var app = builder.Build(); app.MapGet("/test", async (HttpContext httpContext, IPipelineProvider provider, HttpResponse response) => { Console.WriteLine("Execute PopulationProcessor pipeline"); var data = new StatePopulationData { Provinces = new Dictionary<string, ProvincePopulationData>() }; data.Provinces.Add("Jiangsu", new ProvincePopulationData { Cities = new Dictionary<string, PopulationData> { {"Suzhou", new PopulationData() }, {"Wuxi", new PopulationData() }, {"Changezhou", new PopulationData() }, } }); data.Provinces.Add("Shandong", new ProvincePopulationData { Cities = new Dictionary<string, PopulationData> { {"Qingdao", new PopulationData() }, {"Jinan", new PopulationData() }, {"Yantai", new PopulationData() }, } }); data.Provinces.Add("Zhejiang", new ProvincePopulationData { Cities = new Dictionary<string, PopulationData> { {"Hangzhou", new PopulationData() }, {"Ningbo", new PopulationData() }, {"Wenzhou", new PopulationData() }, } }); var context = new StatePopulationContext(data); var pipeline = provider.GetPipeline<StatePopulationContext>("PopulationProcessor"); await pipeline.ProcessAsync(context); return Results.Ok(); }); app.Run();
應用啟動後,我們依然可以從Pipeline匯出頁面看到整個Pipeline的處理流程。
當我們請求「/test」,Pipeline針對國家人口資料的執行流程體現在控制檯輸出上。
Pipeline的構建過程體現了完整的處理流程,所以我們應該構建程式碼儘可能地簡潔,最理想的狀態就是讓非技術人員也能看懂。Pipelines提供的用於註冊Pipe的API均為泛型方法,並且會涉及兩到三個必須顯式指定的泛型引數,使用起來還不是很方便。不過這個問題可以通過自定義擴充套件方法來解決。
public static class Extensions { public static IPipelineBuilder<StatePopulationContext> UseStatePipe<TPipe>(this IPipelineBuilder<StatePopulationContext> builder) where TPipe : Pipe<StatePopulationContext> => builder.Use<StatePopulationContext, TPipe>(); public static IPipelineBuilder<ProvincePopulationContext> UseProvincePipe<TPipe>(this IPipelineBuilder<ProvincePopulationContext> builder) where TPipe : Pipe<ProvincePopulationContext> => builder.Use<ProvincePopulationContext, TPipe>(); public static IPipelineBuilder<CityPopulationContext> UseCityPipe<TPipe>(this IPipelineBuilder<CityPopulationContext> builder) where TPipe : Pipe<CityPopulationContext> => builder.Use<CityPopulationContext, TPipe>(); public static IPipelineBuilder<StatePopulationContext> ForEachProvince(this IPipelineBuilder<StatePopulationContext> builder, Action<IPipelineBuilder<ProvincePopulationContext>> setup) => builder.ForEach("For each province", it => it.PopulationData.Provinces, (_, _) => true, setup); public static IPipelineBuilder<ProvincePopulationContext> ForEachCity(this IPipelineBuilder<ProvincePopulationContext> builder, Action<IPipelineBuilder<CityPopulationContext>> setup) => builder.ForEach("For each city", it => it.Cities, (_, _) => true, setup); }
如上面的程式碼片段所示,我們針對三個資料層次(國家、省份、城市)定義了註冊對應Pipe的擴充套件方法UseStatePipe、UseProvincePipe和UseCityPipe。還分別定義了ForEachProvince和ForEachCity這兩個擴充套件方法來註冊構建處理省份/城市人口資料的子Pipeline。有了這5個擴充套件方法,構建整個Pipeline的程式碼就可以變得非常簡單而清晰,即使不寫任何的註釋,相信每個人(包括非開發人員)都能讀懂涉及的處理流程。
static void BuildPipelines(IPipelineProvider pipelineProvider) { pipelineProvider.AddPipeline<StatePopulationContext>(name: "PopulationProcessor", setup: builder => builder .UseStatePipe<FooStatePipe>() .UseStatePipe<BarStatePipe>() .ForEachProvince(provinceBuilder => provinceBuilder .UseProvincePipe<FooProvincePipe>() .UseProvincePipe<BarProvincePipe>() .ForEachCity(cityBuilder => cityBuilder .UseCityPipe<FooCityPipe>() .UseCityPipe<BarCityPipe>() .UseCityPipe<BazCityPipe>()) .UseProvincePipe<BazProvincePipe>()) .UseStatePipe<BazStatePipe>()); }