以管道的方式來完成複雜的流程處理

2023-06-30 09:01:44

之前參與一個機票價格計算的專案,為他們設計了基本的處理流程,但是由於整個計算流程相當複雜,而且變化非常頻繁,導致日常的修改、維護和升級也變得越來越麻煩,當我後來再接手的時候已經看不懂計算邏輯了。為了解決這個問題,我借鑑了「工作流」的思路,試圖將整個計算過程設計成一個工作流。但是我又不想引入一個獨立的工作流引擎,於是寫了一個名為Pipelines的框架。顧名思義,Pipelines通過構建Pipeline的方式完成所需的處理流程,整個處理邏輯被分解並實現在若干Pipe中,這些Pipe按照指定的順序將完成的Pipeline構建出來。Pipeline本質上就是一個簡單的順序工作流,它僅僅按序執行註冊的Pipe。這個簡單的Pipelines框架被放在這裡,這裡我不會介紹它的設計實現,只是簡單地介紹它的用法,有興趣的可以檢視原始碼

一、構建並執行管道
二、Pipeline的「內部中斷」
三、Pipeline的「外部中斷」
四、處理層次化資料結構
五、利用擴充套件方法時Pipeline構建更簡潔

一、構建並執行管道

Pipelines旨在提供一個用於處理資料的順序工作流或者管道(以下簡稱Pipeline),該Pipeline在一個強型別的上下文中被執行,管道可以利用此上下文得到需要處理的資料,並將處理的結果(含中間結果)儲存在上下文中。接下來我們來演示如何利用Pipelines框架處理人口統計資料的範例。如下所示的兩個型別分別表示人口統計資料和處理上下文,後者繼承基礎類別ContextBase。

public class PopulationData
{
    public object Statistics { get; set; } = default!;
}
public sealed class PopulationContext : ContextBase
{
    public PopulationContext(PopulationData data)=> Data = data;
    public PopulationData Data { get; }
}

Pipeline由一系列Pipe物件按照註冊的順序組合而成。通過繼承基礎類別PipeBase<PopulationContext>,我們定義了三個Pipe類來完成針對人口統計資料的三項基本處理任務。

public sealed class FooPopulationPipe : PipeBase<PopulationContext>
{
    public override string Description => "Global PopulationProcessor Foo";
    protected override void Invoke(PopulationContext context) =>Console.WriteLine($"{nameof(FooPopulationPipe)} is invoked.");
}
public sealed class BarPopulationPipe : PipeBase<PopulationContext>
{
    public override string Description => "Global PopulationProcessor Bar";
    protected override void Invoke(PopulationContext context) => Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked.");
}
public sealed class BazPopulationPipe : PipeBase<PopulationContext>
{
    public override string Description => "Global PopulationProcessor Baz";
    protected override void Invoke(PopulationContext context) => Console.WriteLine($"{nameof(BazPopulationPipe)} is invoked.");
}

我設計Pipelines的初衷是讓每個參與者(包含非技術人員)在程式碼的頻繁迭代過程中,可以清晰地瞭解當前的處理流程,所以我會將當前應用構建的所有Pipeline的處理流程匯出來。基於這個目的,每個Pipe型別都需要利用其Description屬性提供一段描述當前處理邏輯的文字。Pipe具體的處理邏輯實現在重寫的Invoke方法中。如果涉及非同步處理,需要繼承更上層的基礎類別Pipe<TContext>(PipeBase<TContext>的基礎類別)並重寫非同步的InvokeAsync方法。

Pipeline的構建實現在如下所示的BuildPipelines方法中,我們利用該方法提供的IPipelineProvider物件註冊了一個命名為「PopulationProcessor」的Pipeline。具體來說,我們呼叫的是它的AddPipeline<TContext>方法,該方法提供的第一個引數為Pipeline的註冊名稱,另一個引數是一個型別為Action<IPipelineBuilder<TContext>>的委託,它利用提供的IPipelineBuilder<TContext>物件完成了上面定義的三個Pipe的註冊。

using App;
using Artech.Pipelines;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPipelines(BuildPipelines);
var app = builder.Build();
app.MapGet("/test", async (IPipelineProvider provider, HttpResponse response) => {
    Console.WriteLine("Execute PopulationProcessor pipeline");
    var context = new PopulationContext(new PopulationData());
    var pipeline = provider.GetPipeline<PopulationContext>("PopulationProcessor");
    await pipeline.ProcessAsync(context);
    return Results.Ok();
});
app.Run();

static void BuildPipelines(IPipelineProvider pipelineProvider)
{
    pipelineProvider.AddPipeline<PopulationContext>(
        name: "PopulationProcessor",
        setup: builder => builder
            .Use<PopulationContext, FooPopulationPipe>()
            .Use<PopulationContext, BarPopulationPipe>()
            .Use<PopulationContext, BazPopulationPipe>());
}

Pipelines框架涉及的服務通過IServiceCollection介面的AddPipelines方法進行註冊,BuildPipelines方法轉換成委託作為該方法的引數。我們註冊了一個指向「/test」 的路由終結點來演示針對管道的執行。如程式碼片段所示,我們利用注入的IPipelineProvider物件根據註冊名稱得到具體的Pipeline物件,並建立出相應的PopulationContext上下文作為引數來執行此Pipeline物件。程式執行後,請求路徑」/pipelines」可以得到一個Pipeline的列表,點選具體的連結,對應Pipeline體現的流程就會呈現出來。

clip_image002

如果請求路徑「/test」來執行構建的管道,管道執行的軌跡將會體現在控制檯的輸出結果上。

clip_image003

二、Pipeline的「內部中斷」

構成Pipeline的每個Pipe都可以根據處理邏輯的需要立即中斷管道的執行。在如下這個重寫的BarPopulationPipe型別的Invoke方法中,如果生成的亂數為偶數,它會呼叫上下文物件的Abort方法立即終止Pipeline的執行。

public sealed class BarPopulationPipe : PipeBase<PopulationContext>
{
    private readonly Random _random = new();
    public override string Description => "Global PopulationProcessor Bar";
    protected override void Invoke(PopulationContext context)
    {
        Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked.");
        if (_random.Next() % 2 == 0)
        {
            context.Abort();
        }
    }
}

這樣的化,當我們構建的Pipeline在執行過程中,有一半的機率BazPopulationPipe將不會執行,如下所示的輸出結果體現了這一點。

clip_image004

對於繼承自Pipe<TContext>的Pipe型別,其實現的InvokeAsync方法可以採用如下的方式中止當前Pipeline的執行,因為引數next返回的委託用於呼叫後續Pipe。如果不執行此委託,就意味著針對Pipeline的執行到此為止。

public sealed class BarPopulationPipe : Pipe<PopulationContext>
{
    private readonly Random _random = new();
    public override string Description => "Global PopulationProcessor Bar";
    public override ValueTask InvokeAsync(PopulationContext context, Func<PopulationContext, ValueTask> next)
    {
        Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked.");
        if (_random.Next() % 2 != 0)
        {
            return next(context);
        }
        return ValueTask.CompletedTask;
    }
}

三、Pipeline的「外部中斷」

在呼叫Pipeline時,我們可以利用執行上下文提供的CancellationToken中止Pipeline的執行。我們按照如下的方式再次改寫了BarPopulationPipe的執行邏輯,如下面的程式碼片段所示,我們不再呼叫Abort方法,而是選擇延遲2秒執行後續操作。

public sealed class BarPopulationPipe : Pipe<PopulationContext>
{
    private readonly Random _random = new();
    public override string Description => "Global PopulationProcessor Bar";
    public override async ValueTask InvokeAsync(PopulationContext context, Func<PopulationContext, ValueTask> next)
    {
        Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked.");
        if (_random.Next() % 2 != 0)
        {
            await Task.Delay(2000);
        }
        await next(context);
    }
}

我們按照如下的方式重寫了PopulationContext的CancellationToken屬性。我們為建構函式新增了兩個引數,一個代表當前HttpContext上下文,另一個表示設定的超時時限。CancellationToken根據這兩個引數建立而成,意味著管道不僅具有預設的超時時間,也可以通過HTTP呼叫方中止執行。

public sealed class PopulationContext: ContextBase
{
    public PopulationContext(PopulationData data, HttpContext httpContext, TimeSpan timeout)
    {
        Data = data;
        CancellationToken = CancellationTokenSource.CreateLinkedTokenSource(httpContext.RequestAborted, new CancellationTokenSource(timeout).Token).Token;
    }
    public PopulationData Data { get; }
    public override CancellationToken CancellationToken { get; }
}

在註冊的終結點處理器中,我們在執行Pipeline之前,將作為引數傳入的PopulationContext上下文的超時時間設定為1秒。

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPipelines(BuildPipelines);
var app = builder.Build();
app.MapGet("/test", async (HttpContext httpContext,IPipelineProvider provider, HttpResponse response) => {
    Console.WriteLine("Execute PopulationProcessor pipeline");
    var context = new PopulationContext(new PopulationData(), httpContext, TimeSpan.FromSeconds(1));

    var pipeline = provider.GetPipeline<PopulationContext>("PopulationProcessor");
    await pipeline.ProcessAsync(context);
    return Results.Ok();
});
app.Run();

根據BarPopulationPipe的執行邏輯,Pipeline的執行具有一半的機率會超時,一旦超時就會立即丟擲一個OperationCancellationToken異常。

clip_image005

四、處理層次化資料結構

Pipelines設計的主要目的是用來處理層次化的資料結構,這涉及到子Pipeline的應用。目前我們處理的人口資料體現為一個簡單的資料型別,現在我們讓它變得更復雜一些。假設我們需要處理國家、省份和城市三個等級的人口資料,其中StatePopulationData代表整個國家的人口資料,它的Provinces屬性承載了每個省份的資料。ProvincePopulationData代表具體某個省份的人口資料,其Cities屬性承載了每個城市的人口資料。

public class PopulationData
{
    public object Statistics { get; set; } = default!;
}

public class StatePopulationData
{
    public IDictionary<string, ProvincePopulationData> Provinces { get; set; } = default!;
}

public class ProvincePopulationData
{
    public IDictionary<string, PopulationData> Cities { get; set; } = default!;
}

現在我們需要構建一個Pipeline來處理通過StatePopulationData型別表示的整個國家的人口資料,具體的處理流程為:

  • 利用FooStatePipe處理國家人口資料
  • 利用BarStatePipe處理國家人口資料
  • 構建子Pipeline處理每個省份人口資料,子Pipeline處理邏輯:
    • 利用FooProvincePipe處理省份人口資料
    • 利用BarProvincePipe處理省份人口資料、
    • 構建子Pipeline處理每個城市人口資料,子Pipeline處理邏輯
      • 利用FooCityPipe處理城市人口資料
      • 利用BarCityPipe處理城市人口資料
      • 利用BazCityPipe處理城市人口資料
    • 利用BazProvincePipe處理省份人口資料
  • 利用BazStatePipe處理國家人口資料

為此我們需要定義9個Pipe型別,以及3個執行上下文。如下所示的是三個執行上下文型別的具體定義:

public sealed class StatePopulationContext: ContextBase
{
    public StatePopulationData PopulationData { get; }
    public StatePopulationContext(StatePopulationData populationData) => PopulationData = populationData;
}

public sealed class ProvincePopulationContext : SubContextBase<StatePopulationContext, KeyValuePair<string, ProvincePopulationData>>
{
    public string Province { get; private set; } = default!;
    public IDictionary<string, PopulationData> Cities { get; private set; } = default!;
    public override void Initialize(StatePopulationContext parent, KeyValuePair<string, ProvincePopulationData> item)
    {
        Province = item.Key;
        Cities = item.Value.Cities;
        base.Initialize(parent, item);
    }
}

public sealed class CityPopulationContext: SubContextBase<ProvincePopulationContext, KeyValuePair<string, PopulationData>>
{
    public string City { get; private set; } = default!;
    public PopulationData PopulationData { get; private set; } = default!;
    public override void Initialize(ProvincePopulationContext parent, KeyValuePair<string, PopulationData> item)
    {
        City = item.Key;
        PopulationData = item.Value;
        base.Initialize(parent, item);
}
}

9個對應的Pipe型別定義如下。每個型別利用重寫的Description提供一個簡單的描述,重寫的Invoke方法輸出當前怎樣的資料(那個省/市的人口資料)。

public sealed class FooStatePipe : PipeBase<StatePopulationContext>
{
    public override string Description => "State Population Processor Foo";
    protected override void Invoke(StatePopulationContext context)=>Console.WriteLine("Foo: Process state population");
}
public sealed class BarStatePipe : PipeBase<StatePopulationContext>
{
    public override string Description => "State Population Processor Bar";
    protected override void Invoke(StatePopulationContext context) => Console.WriteLine("Bar: Process state population");
}
public sealed class BazStatePipe : PipeBase<StatePopulationContext>
{
    public override string Description => "State Population Processor Baz";
    protected override void Invoke(StatePopulationContext context) => Console.WriteLine("Baz: Process state population");
}

public sealed class FooProvincePipe : PipeBase<ProvincePopulationContext>
{
    public override string Description => "Province Population Processor Foo";
    protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($"\tFoo: Process population of the province {context.Province}");
}

public sealed class BarProvincePipe : PipeBase<ProvincePopulationContext>
{
    public override string Description => "Province Population Processor Bar";
    protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($"\tBar: Process population of the province {context.Province}");

}

public sealed class BazProvincePipe : PipeBase<ProvincePopulationContext>
{
    public override string Description => "Province Population Processor Baz";
    protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($"\tBaz: Process population of the province {context.Province}");
}

public sealed class FooCityPipe : PipeBase<CityPopulationContext>
{
    public override string Description => "City Population Processor Foo";
    protected override void Invoke(CityPopulationContext context) => Console.WriteLine($"\t\tFoo: Process population of the city {context.City}");
}

public sealed class BarCityPipe : PipeBase<CityPopulationContext>
{
    public override string Description => "City Population Processor Bar";
    protected override void Invoke(CityPopulationContext context) => Console.WriteLine($"\t\tBar: Process population of the city {context.City}");

}

public sealed class BazCityPipe : PipeBase<CityPopulationContext>
{
    public override string Description => "City Population Processor Baz";
    protected override void Invoke(CityPopulationContext context) => Console.WriteLine($"\t\tBaz: Process population of the city {context.City}");
}
用於構建這個Pipeline的BuildPipelines方法根據構建的Pipeline結構進行了如下的改寫:子Pipeline通過IPipelineBuilder<TContext>介面的ForEach<TContext, TSubContext, TItem>擴充套件方法構建,三個泛型引數型別分別表示當前執行上下文型別、子上下文型別和子Pipeline處理資料。它具有三個引數,description提供到處文字,collectionAccessor利用一個委託獲取一個集合物件(構建的子Pipeline用於處理它的每一個元素),subPipelineSetup提供的委託完整最終子Pipeline的構建。雖然看起來複雜,但是其結構還是很清晰的,即使是非技術人員也能明白這個Pipeline體現的處理流程。
static void BuildPipelines(IPipelineProvider pipelineProvider)
{
    pipelineProvider.AddPipeline<StatePopulationContext>(name: "PopulationProcessor", setup: builder => builder
      .Use<StatePopulationContext, FooStatePipe>()
      .Use<StatePopulationContext, BarStatePipe>()
      .ForEach<StatePopulationContext, ProvincePopulationContext, KeyValuePair<string, ProvincePopulationData>>(
            description: "For each province",
            collectionAccessor: context => context.PopulationData.Provinces,
            subPipelineSetup: provinceBuilder => provinceBuilder
                .Use<ProvincePopulationContext, FooProvincePipe>()
                .Use<ProvincePopulationContext, BarProvincePipe>()
                .ForEach<ProvincePopulationContext, CityPopulationContext, KeyValuePair<string, PopulationData>>(
                    description: "For each city",
                    collectionAccessor: context => context.Cities,
                    subPipelineSetup: cityBuilder => cityBuilder
                        .Use<CityPopulationContext, FooCityPipe>()
                        .Use<CityPopulationContext, BarCityPipe>()
                        .Use<CityPopulationContext, BazCityPipe>())
                .Use<ProvincePopulationContext, BazProvincePipe>())
      .Use<StatePopulationContext, BazStatePipe>());
}

終結點處理程式在執行新的Pipeline時,會按照如下的形式將StatePopulationContext上下文構建出來。處理人口資料涉及三個省份(江蘇、山東和浙江),每個省份包含三個城市的人口資料。

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPipelines(BuildPipelines);
var app = builder.Build();
app.MapGet("/test", async (HttpContext httpContext, IPipelineProvider provider, HttpResponse response) => {
    Console.WriteLine("Execute PopulationProcessor pipeline");
    var data = new StatePopulationData
    {
        Provinces = new Dictionary<string, ProvincePopulationData>()
    };
    data.Provinces.Add("Jiangsu", new ProvincePopulationData
    {
        Cities = new Dictionary<string, PopulationData>
        {
            {"Suzhou", new PopulationData() },
            {"Wuxi", new PopulationData() },
            {"Changezhou", new PopulationData() },
        }
    });

    data.Provinces.Add("Shandong", new ProvincePopulationData
    {
        Cities = new Dictionary<string, PopulationData>
        {
            {"Qingdao", new PopulationData() },
            {"Jinan", new PopulationData() },
            {"Yantai", new PopulationData() },
        }
    });

    data.Provinces.Add("Zhejiang", new ProvincePopulationData
    {
        Cities = new Dictionary<string, PopulationData>
        {
            {"Hangzhou", new PopulationData() },
            {"Ningbo", new PopulationData() },
            {"Wenzhou", new PopulationData() },
        }
    });

    var context = new StatePopulationContext(data);

    var pipeline = provider.GetPipeline<StatePopulationContext>("PopulationProcessor");
    await pipeline.ProcessAsync(context);
    return Results.Ok();
});
app.Run();

應用啟動後,我們依然可以從Pipeline匯出頁面看到整個Pipeline的處理流程。

clip_image006

當我們請求「/test」,Pipeline針對國家人口資料的執行流程體現在控制檯輸出上。

clip_image008

五、利用擴充套件方法時Pipeline構建更簡潔

Pipeline的構建過程體現了完整的處理流程,所以我們應該構建程式碼儘可能地簡潔,最理想的狀態就是讓非技術人員也能看懂。Pipelines提供的用於註冊Pipe的API均為泛型方法,並且會涉及兩到三個必須顯式指定的泛型引數,使用起來還不是很方便。不過這個問題可以通過自定義擴充套件方法來解決。

public static class Extensions
{
    public static IPipelineBuilder<StatePopulationContext> UseStatePipe<TPipe>(this IPipelineBuilder<StatePopulationContext> builder)
        where TPipe : Pipe<StatePopulationContext>
        => builder.Use<StatePopulationContext, TPipe>();
    public static IPipelineBuilder<ProvincePopulationContext> UseProvincePipe<TPipe>(this IPipelineBuilder<ProvincePopulationContext> builder)
        where TPipe : Pipe<ProvincePopulationContext>
        => builder.Use<ProvincePopulationContext, TPipe>();
    public static IPipelineBuilder<CityPopulationContext> UseCityPipe<TPipe>(this IPipelineBuilder<CityPopulationContext> builder)
        where TPipe : Pipe<CityPopulationContext>
        => builder.Use<CityPopulationContext, TPipe>();

    public static IPipelineBuilder<StatePopulationContext> ForEachProvince(this IPipelineBuilder<StatePopulationContext> builder, Action<IPipelineBuilder<ProvincePopulationContext>> setup)
        => builder.ForEach("For each province", it => it.PopulationData.Provinces, (_, _) => true, setup);
    public static IPipelineBuilder<ProvincePopulationContext> ForEachCity(this IPipelineBuilder<ProvincePopulationContext> builder, Action<IPipelineBuilder<CityPopulationContext>> setup)
        => builder.ForEach("For each city", it => it.Cities, (_, _) => true, setup);
}

如上面的程式碼片段所示,我們針對三個資料層次(國家、省份、城市)定義了註冊對應Pipe的擴充套件方法UseStatePipe、UseProvincePipe和UseCityPipe。還分別定義了ForEachProvince和ForEachCity這兩個擴充套件方法來註冊構建處理省份/城市人口資料的子Pipeline。有了這5個擴充套件方法,構建整個Pipeline的程式碼就可以變得非常簡單而清晰,即使不寫任何的註釋,相信每個人(包括非開發人員)都能讀懂涉及的處理流程。

static void BuildPipelines(IPipelineProvider pipelineProvider)
{
    pipelineProvider.AddPipeline<StatePopulationContext>(name: "PopulationProcessor", setup: builder => builder
      .UseStatePipe<FooStatePipe>()
      .UseStatePipe<BarStatePipe>()
      .ForEachProvince(provinceBuilder => provinceBuilder
          .UseProvincePipe<FooProvincePipe>()
          .UseProvincePipe<BarProvincePipe>()
          .ForEachCity(cityBuilder => cityBuilder
              .UseCityPipe<FooCityPipe>()
              .UseCityPipe<BarCityPipe>()
              .UseCityPipe<BazCityPipe>())
          .UseProvincePipe<BazProvincePipe>())
      .UseStatePipe<BazStatePipe>());
}