WorkFlowCore
是一個針對.NetCore
的輕量級的工作流引擎,提供了FluentAPI、多工、持久化以及並行處理的功能,適合於小型工作流、責任鏈的需求開發。支援工作流長期執行,提供了各種持久化方式。
本篇開發環境為.Net7
,此處不演示Json
和yaml
設定,詳細檔案請檢視官方檔案和專案原始碼地址
通過以下命令安裝
Install-Package WorkflowCore
然後注入WorkFlowCore
builder.Services.AddWorkflow();
WorkFlowCore
主要分為兩部分:步驟和工作流
步驟
多個步驟組成一個工作流,每個步驟都可以有輸入併產生輸出,這些輸出可以傳遞迴其所在的工作流。通過建立繼承抽象類StepBody或StepBodyAsync的類,並且實現Run或RunAsync方法來定義步驟,很明顯它們的區別是是否非同步
public class FirstStepBody: StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Hello world!First"); return ExecutionResult.Next(); } }
工作流
通過繼承IWorkflow
介面定義一個工作流,介面只有Id
、Version
和Build
方法(內部可以執行多個步驟),工作流主機使用這些資訊來標識工作流
public class MyWorkflow :IWorkflow { public string Id => "HelloWorld"; public int Version => 1; public void Build(IWorkflowBuilder<object> builder) { builder .StartWith<FirstStepBody>() .Then<FirstStepBody>(); } }
工作流如果想使用必須在工作流主機中通過RegisterWorkflow()
方法註冊,並且通過Start()
方法啟動主機,當然也可以通過Stop()
方法停止工作流。執行工作流需要使用StartWorkflow()
方法,引數為工作流類的Id
,如下
[ApiController] [Route("[controller]")] public class WeatherForecastController : ControllerBase { private readonly IWorkflowHost _workflowHost; public WeatherForecastController(IWorkflowHost workflowHost) { _workflowHost = workflowHost; } [HttpGet(Name = "get")] public ContentResult Get() { if (!_workflowHost.Registry.IsRegistered("HelloWorld",1)) { _workflowHost.RegisterWorkflow<MyWorkflow>(); } _workflowHost.Start(); _workflowHost.StartWorkflow("HelloWorld"); //host.Stop(); return Content("ok"); } }
當然也可以在構建web
服務的時候統一註冊,然後就可以直接執行啦
var host = app.Services.GetService<IWorkflowHost>(); host.RegisterWorkflow<MyWorkflow>(); host.Start();
每個步驟都是一個黑盒,因此它們支援輸入和輸出。這些輸入和輸出可以對映到一個資料類,該資料類定義與每個工作流範例相關的自定義資料。
以下範例顯示瞭如何定義步驟的輸入和輸出,然後顯示瞭如何使用內部資料的型別化類定義工作流,以及如何將輸入和輸出對映到自定義資料類的屬性。
//步驟包含屬性,並且計算 public class FirstStepBody: StepBody { public int Input1 { get; set; } public int Input2 { get; set; } public int Output { get; set; } public override ExecutionResult Run(IStepExecutionContext context) { Output = Input1 + Input2; Console.WriteLine(Output); return ExecutionResult.Next(); } } //工作流包含輸入輸出的賦值 public class MyWorkflow :IWorkflow<MyDataClass> { public string Id => "HelloWorld"; public int Version => 1; public void Build(IWorkflowBuilder<MyDataClass> builder) { builder .StartWith<FirstStepBody>() .Input(step => step.Input1,data => data.Value1) .Input(step => step.Input2, data => 100) .Output(data => data.Answer, step => step.Output) .Then<FirstStepBody>() .Input(step => step.Input1, data => data.Value1) .Input(step => step.Input2, data => data.Answer) .Output(data => data.Answer, step => step.Output); } } //工作流的屬性類 public class MyDataClass { public int Value1 { get; set; } public int Value2 { get; set; } public int Answer { get; set; } } //執行工作流傳入引數 MyDataClass myDataClass = new MyDataClass(); myDataClass.Value1 = 100; myDataClass.Value2 = 200; //不傳入myDataClass則每次執行都是新的資料物件 _workflowHost.StartWorkflow("HelloWorld", myDataClass);
從上述例子可以看到工作流可以定義一個初始的類作為引數傳入,每個步驟可以有自己的屬性欄位去接收引數(可以是工作流類的欄位,也可以是固定值),可以用Input
方法傳入,Output
方法輸出賦值。如果在工作流執行時不傳入引數每次執行都是新的物件的預設值,比如在StartWorkflow
方法中不傳myDataClass
,執行結果是100
和100
,否則是200
和300
工作流可以使用WaitFor
方法進行等待,通過外部觸發此事件,將事件產生的資料傳遞給工作流,並且讓工作流繼續執行下面的步驟。範例如下:
public class MyWorkflow :IWorkflow<MyDataClass> { //省略。。。。 public void Build(IWorkflowBuilder<MyDataClass> builder) { builder .StartWith<FirstStepBody>() .Input(step => step.Input1,data => data.Value1) .Input(step => step.Input2, data => 100) .Output(data => data.Answer, step => step.Output) .WaitFor("MyEvent",key => "EventKey") .Output(data => data.Answer,step => step.EventData) .Then<FirstStepBody>() .Input(step => step.Input1, data => data.Value1) .Input(step => step.Input2, data => data.Answer) .Output(data => data.Answer, step => step.Output); } } //。。。 [HttpGet(Name = "get")] public ContentResult Get() { MyDataClass myDataClass = new MyDataClass(); myDataClass.Value1 = 100; myDataClass.Value2 = 200; _workflowHost.StartWorkflow("HelloWorld", myDataClass); return Content("ok"); } [HttpPost(Name = "event")] public ContentResult PublishEvent() { _workflowHost.PublishEvent("MyEvent", "EventKey", 200); return Content("ok"); }
使用WaitFor
方法可以使工作流等待監聽指定事件的執行,有兩個入參事件名稱和事件關鍵字。通過工作流主機去觸發PublishEvent
執行指定的事件,有三個入參觸發事件名稱、觸發事件關鍵字和事件引數。
需要執行事件,工作流才會繼續下一步,如下動圖演示:
可以為等待事件設定有效時間,在有效時間之前執行事件是不會繼續下一步流程的,只有當大於有效時間之後執行事件才會繼續下一步步驟。如下程式碼設定,為工作流執行時間一天後執行事件才會繼續執行,否則就等待不動。
WaitFor("MyEvent",key => "EventKey", data => DateTime.Now.AddDays(1))
活動被定義為在工作流中可以被等待的外部工作佇列中的步驟。
在本例中,工作流將等待活動activity-1
,直到活動完成才繼續工作流。它還將data.Value1
的值傳遞給活動,然後將活動的結果對映到data.Value2
。
然後我們建立一個worker
來處理活動項的佇列。它使用GetPendingActivity
方法來獲取工作流正在等待的活動和資料。
//..... builder .StartWith<FirstStepBody>() .Input(step => step.Input1,data => data.Value1) .Input(step => step.Input2, data => 100) .Output(data => data.Answer, step => step.Output) .Activity("activity-1", (data) => data.Value1) .Output(data => data.Value2, step => step.Result) .Then<FirstStepBody>() .Input(step => step.Input1, data => data.Value1) .Input(step => step.Input2, data => data.Answer) .Output(data => data.Answer, step => step.Output); //.... [HttpPost(Name = "active")] public ContentResult PublishEvent() { var activity = _workflowHost.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result; if (activity != null) { Console.WriteLine(activity.Parameters); _workflowHost.SubmitActivitySuccess(activity.Token, 100); } return Content("ok"); }
活動可以看作一個等待的步驟可以傳入引數和輸出引數,和事件的區別是事件不能輸入引數而是單純的等待。
每個步驟都可以設定自己的錯誤處理行為,可以在以後重試、掛起工作流或終止工作流。
public void Build(IWorkflowBuilder<object> builder) { builder .StartWith<HelloWorld>() .OnError(WorkflowErrorHandling.Retry,TimeSpan.FromMinutes(10)) .Then<GoodbyeWorld>(); }
工作流的流程控制包括分支、迴圈等各種操作
決策分支
在工作流中定義多個獨立分支,並根據表示式值選擇滿足條件的分支執行。
使用IWorkflowBuilder
的CreateBranch
方法定義分支。然後我們可以使用branch
方法選擇一個分支。
選擇表示式將與通過branch
方法列出的分支相匹配,匹配的分支將安排執行。匹配多個分支將導致並行分支執行。
如果data.Value1
的值為1
,則此工作流將選擇branch1
,如果為2
,則選擇branch2
。
var branch1 = builder.CreateBranch() .StartWith<PrintMessage>() .Input(step => step.Message, data => "hi from 1") .Then<PrintMessage>() .Input(step => step.Message, data => "bye from 1"); var branch2 = builder.CreateBranch() .StartWith<PrintMessage>() .Input(step => step.Message, data => "hi from 2") .Then<PrintMessage>() .Input(step => step.Message, data => "bye from 2"); builder .StartWith<HelloWorld>() .Decide(data => data.Value1) .Branch((data, outcome) => data.Value1 == "one", branch1) .Branch((data, outcome) => data.Value1 == "two", branch2);
並行ForEach
使用ForEach
方法啟動並行for
迴圈
public class ForEachWorkflow : IWorkflow { public string Id => "Foreach"; public int Version => 1; public void Build(IWorkflowBuilder<object> builder) { builder .StartWith<SayHello>() .ForEach(data => new List<int>() { 1, 2, 3, 4 }) .Do(x => x .StartWith<DisplayContext>() .Input(step => step.Message, (data, context) => context.Item) .Then<DoSomething>()) .Then<SayGoodbye>(); } }
While迴圈
使用While
方法啟動while
迴圈
public class WhileWorkflow : IWorkflow<MyData> { public string Id => "While"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<SayHello>() .While(data => data.Counter < 3) .Do(x => x .StartWith<DoSomething>() .Then<IncrementStep>() .Input(step => step.Value1, data => data.Counter) .Output(data => data.Counter, step => step.Value2)) .Then<SayGoodbye>(); } }
If判斷
使用If
方法執行if
判斷
public class IfWorkflow : IWorkflow<MyData> { public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<SayHello>() .If(data => data.Counter < 3).Do(then => then .StartWith<PrintMessage>() .Input(step => step.Message, data => "Value is less than 3") ) .If(data => data.Counter < 5).Do(then => then .StartWith<PrintMessage>() .Input(step => step.Message, data => "Value is less than 5") ) .Then<SayGoodbye>(); } }
並行
使用Parallel
方法並行執行任務
public class ParallelWorkflow : IWorkflow<MyData> { public string Id => "parallel-sample"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<SayHello>() .Parallel() .Do(then => then.StartWith<Task1dot1>() .Then<Task1dot2>() .Do(then => then.StartWith<Task2dot1>() .Then<Task2dot2>() .Join() .Then<SayGoodbye>(); } }
Schedule
使用Schedule
方法在工作流中註冊在指定時間後執行的非同步方法
builder .StartWith(context => Console.WriteLine("Hello")) .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule .StartWith(context => Console.WriteLine("Doing scheduled tasks")) ) .Then(context => Console.WriteLine("Doing normal tasks"));
Recur
使用Recure
方法在工作流中設定一組重複的後臺步驟,直到滿足特定條件為止
builder .StartWith(context => Console.WriteLine("Hello")) .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur .StartWith(context => Console.WriteLine("Doing recurring task")) ) .Then(context => Console.WriteLine("Carry on"));
saga
允許在saga transaction
中封裝一系列步驟,併為每一個步驟提供補償步驟,使用CompensateWith
方法在對應的步驟後面新增補償步驟,補償步驟將會在步驟丟擲異常的時候觸發。
如下範例,步驟Task2
如果丟擲一個異常,那麼補償步驟UndoTask2
和UndoTask1
將被觸發。
builder .StartWith(context => Console.WriteLine("Begin")) .Saga(saga => saga .StartWith<Task1>() .CompensateWith<UndoTask1>() .Then<Task2>() .CompensateWith<UndoTask2>() .Then<Task3>() .CompensateWith<UndoTask3>() ) .CompensateWith<CleanUp>() .Then(context => Console.WriteLine("End"));
也可以指定重試策略,在指定時間間隔後重試。
builder .StartWith(context => Console.WriteLine("Begin")) .Saga(saga => saga .StartWith<Task1>() .CompensateWith<UndoTask1>() .Then<Task2>() .CompensateWith<UndoTask2>() .Then<Task3>() .CompensateWith<UndoTask3>() ) .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5)) .Then(context => Console.WriteLine("End"));
可以使用Redis
、Mongdb
、Sqlserver
等持久化,具體可以看檔案,此處使用Redis
,先安裝nuget
包
Install-Package WorkflowCore.Providers.Redis
然後注入就可以了
builder.Services.AddWorkflow(cfg => { cfg.UseRedisPersistence("localhost:6379", "app-name"); cfg.UseRedisLocking("localhost:6379"); cfg.UseRedisQueues("localhost:6379", "app-name"); cfg.UseRedisEventHub("localhost:6379", "channel-name"); //cfg.UseMongoDB(@"mongodb://mongo:27017", "workflow"); //cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://elastic:9200")), "workflows"); });
執行開啟可以看到