在.Net Framework環境下,我們使用Windows Workflow Foundation(WF)作為專案的工作流引擎,可是.Net Core已經不支援WF了,需要為基於.Net Core的專案選擇新的工作流引擎。基本要求如下:
符合上述要求的開源專案有幾個,這裡介紹開源專案WorkflowCore,專案地址:https://github.com/danielgerlag/workflow-core。
本文的範例可以從github下載:https://github.com/zhenl/ZL.WorflowCoreDemo 。
首先,使用Visual Studio建立一個.Net Core的控制檯專案,在NuGet管理器中引入下面程式包:
然後,建立兩個工作流的步驟:
using WorkflowCore.Interface;
using WorkflowCore.Models;
namespace WorkflowCoreTest
{
public class HelloWorld : StepBody
{
public override ExecutionResult Run(IStepExecutionContext context)
{
Console.WriteLine("你好");
return ExecutionResult.Next();
}
}
}
using WorkflowCore.Interface;
using WorkflowCore.Models;
namespace WorkflowCoreTest
{
public class GoodbyeWorld : StepBody
{
public override ExecutionResult Run(IStepExecutionContext context)
{
Console.WriteLine("再見");
return ExecutionResult.Next();
}
}
}
接下來使用這兩個步驟定義一個工作流:
using WorkflowCore.Interface;
namespace WorkflowCoreTest
{
public class HelloWorldWorkflow : IWorkflow
{
public string Id => "HelloWorld";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith<HelloWorld>()
.Then<GoodbyeWorld>();
}
}
}
最後,在主程式中,建立WorkflowHost,註冊並執行工作流,程式碼如下:
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading;
using WorkflowCore.Interface;
namespace WorkflowCoreTest
{
class Program
{
static void Main(string[] args)
{
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<HelloWorldWorkflow>();
host.Start();
host.StartWorkflow("HelloWorld", 1, null);
Console.ReadLine();
host.Stop();
}
private static IServiceProvider ConfigureServices()
{
//setup dependency injection
IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow();
var serviceProvider = services.BuildServiceProvider();
return serviceProvider;
}
}
}
簡單的工作流就完成了。
上一節通過一個簡單的控制檯例子介紹了WorkflowCore工作流的定義和執行過程,從例子中可以看到,工作流是執行在WorkflowHost範例中的,再看一下程式碼:
static void Main(string[] args)
{
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<HelloWorldWorkflow>();
host.Start();
host.StartWorkflow("HelloWorld", 1, null);
Console.ReadLine();
host.Stop();
}
WorkflowHost的工作過程是這樣的,首先需要獲取WorkflowHost的範例,然後註冊工作流,這裡可以註冊多個工作流,接下來,啟動host,然後可以啟動工作流,這裡可以啟動多個工作流範例,最後,關閉host。
我們需要對WorkflowHost有進一步的瞭解,第一個問題,每次使用serviceProvider.GetService
var host = serviceProvider.GetService<IWorkflowHost>();
var host1 = serviceProvider.GetService<IWorkflowHost>();
Console.WriteLine(host == host1);
我們獲取兩個host變數比較一下看是否指向相同的物件,結果是True,也就是使用serviceProvider.GetService<IWorkflowHost()獲得的是相同的物件。
第二個問題,呼叫host.Stop是否會影響正在執行的流程?
我們修改一下程式碼,啟動流程範例後,馬上執行host.Stop():
host.RegisterWorkflow<HelloWorldWorkflow>();
host.Start();
host.StartWorkflow("HelloWorld", 1, null);
host.Stop();
Console.ReadLine();
我們發現,沒有輸出結果,也就是host.Stop()終止了所有流程。
第三個問題,host中啟動的流程是否在同一執行緒執行?
我們啟動多個流程,看一下輸出結果:
host.RegisterWorkflow<HelloWorldWorkflow>();
host.Start();
host.StartWorkflow("HelloWorld", 1, null);
host.StartWorkflow("HelloWorld", 1, null);
host.StartWorkflow("HelloWorld", 1, null);
host.Stop();
Console.ReadLine();
說明每個流程是一個獨立的執行緒,並行執行。
下一步我們需要了解流程的引數傳遞。
我們已經知道了如何使用Fluent API定義流程和如何註冊流程,現在我們需要了解如何定義流程需要處理的資料,和如何進行資料傳遞。這裡舉一個最簡單的例子來說明。在前面的例子中,我們輸出「你好」和「再見」,現在擴充套件這個需求,流程啟動後,等待使用者輸入名字,然後輸出「你好,<輸入的名字>」和「<輸入的名字>,再見」。為了完成這個需求,需要:
namespace ZL.WorflowCoreDemo.InputDataToStep
{
public class MyNameClass
{
public string MyName { get; set; }
}
}
然後,修改流程的定義:
using System;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.InputDataToStep
{
public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
{
public string Id => "HelloWithNameWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
.Output(data => data.MyName, step => step.EventData)
.Then<HelloWithName>()
.Input(step => step.Name, data => data.MyName)
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
這裡,流程宣告為 IWorkflow
這段程式碼中還使用WaitFor定義了一個事件,這個事件的輸出是將事件接收的外部引數(step.EventData)傳遞給流程的MyName屬性。
還需要修改兩個步驟,增加名稱欄位:
using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;
namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
{
public class HelloWithName : StepBody
{
public string Name { get; set; }
public override ExecutionResult Run(IStepExecutionContext context)
{
Console.WriteLine("你好," + Name);
return ExecutionResult.Next();
}
}
}
using System;
using WorkflowCore.Interface;
using WorkflowCore.Models;
namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
{
public class GoodbyeWithName : StepBody
{
public string Name { get; set; }
public override ExecutionResult Run(IStepExecutionContext context)
{
Console.WriteLine(Name + ",再見");
return ExecutionResult.Next();
}
}
}
下面是流程註冊和執行的程式碼:
using System;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.InputDataToStep
{
public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
{
public string Id => "HelloWithNameWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
.Output(data => data.MyName, step => step.EventData)
.Then<HelloWithName>()
.Input(step => step.Name, data => data.MyName)
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
using System;
using System.Collections.Generic;
using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using WorkflowCore.Interface;
namespace ZL.WorflowCoreDemo.InputDataToStep
{
public class FlowRun
{
public static void Run()
{
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<HelloWithNameWorkflow, MyNameClass>();
host.Start();
var initialData = new MyNameClass();
var workflowId = host.StartWorkflow("HelloWithNameWorkflow", 1, initialData).Result;
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.PublishEvent("MyEvent", workflowId, value);
Console.ReadLine();
host.Stop();
}
private static IServiceProvider ConfigureServices()
{
//setup dependency injection
IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow();
var serviceProvider = services.BuildServiceProvider();
return serviceProvider;
}
}
}
我們也可以使用字典作為資料物件,流程的定義如下:
using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.InputDataToStep
{
public class HelloWithNameWorkflowDynamic : IWorkflow<Dictionary<string,string>>
{
public string Id => "HelloWithNameWorkflowDynamic";
public int Version => 1;
public void Build(IWorkflowBuilder<Dictionary<string, string>> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
.Output((step,data)=>data.Add("Name",(string)step.EventData))
.Then<HelloWithName>()
.Input(step => step.Name, data => data["Name"])
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data["Name"]);
}
}
}
這裡沒有使用自定義的類,而是使用了字典Dictionary<string, string>,流程的執行程式碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>();
host.Start();
var initialData = new Dictionary<string,string>();
var workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData).Result;
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.PublishEvent("MyEvent", workflowId, value);
Console.ReadLine();
foreach (var key in initialData.Keys)
{
Console.WriteLine(key + ":" + initialData[key]);
}
Console.ReadLine();
host.Stop();
WorkflowCore 支援採用JSON或者YAML格式定義流程,使用時通過使用IDefintionLoader載入流程來替代RegisterWorkflow。我們仍然通過簡單的例子來說明。在我們現有的工程中已經定義了幾個簡單的流程步驟,我們用JSON格式將這幾個步驟組成簡單的工作流。
首先,在現有的解決方案中增加一個.Net Core的控制檯專案,名稱為ZL.WorkflowCoreDemo.Json,使用NuGet引入WorkflowCore,Microsoft.Extensions.Logging,還有WorkflowCore.DSL,然後,我們在專案中增加一個json檔案,將檔案的屬性「複製到輸出目錄」修改為「始終複製」:
在json檔案中定義流程:
{
"Id": "HelloWorld",
"Version": 1,
"Steps": [
{
"Id": "Hello",
"StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
"NextStepId": "Bye"
},
{
"Id": "Bye",
"StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
}
]
}
Json定義格式符合WorkflowCore的DSL,這裡不進行DSL的詳細介紹,我們重點關注流程如何定義,載入和執行。
我們可以將前面專案中的程式碼拷貝過來進行修改,首先修改下面的函數:
private static IServiceProvider ConfigureServices()
{
//setup dependency injection
IServiceCollection services = new ServiceCollection();
services.AddLogging();
services.AddWorkflow();
//這是新增加的服務
services.AddWorkflowDSL();
var serviceProvider = services.BuildServiceProvider();
return serviceProvider;
}
ConfigureServices新增加了services.AddWorkflowDSL();
在主函數中,使用IDefintionLoader載入JSON格式的流程定義:
static void Main(string[] args)
{
IServiceProvider serviceProvider = ConfigureServices();
var loader = serviceProvider.GetService<IDefinitionLoader>();
var json = System.IO.File.ReadAllText("myflow.json");
loader.LoadDefinition(json, Deserializers.Json);
var host = serviceProvider.GetService<IWorkflowHost>();
host.Start();
host.StartWorkflow("HelloWorld", 1, null);
Console.ReadLine();
host.Stop();
}
現在,流程可以執行了。
在研究過程中發現了一個坑,可能需要注意。在這個例子中我們使用了前面專案定義的流程步驟,如果在本專案中定義流程步驟,會出現找不到相應動態庫的錯誤,不知道是否是一個缺陷。
前面我們分別討論了使用Fluent API定義流程和使用JSON格式定義流程,按照以前的使用經驗,感覺這兩種定義方式應該可以互相轉換,互相代替,但在實際應用中發現並不是如此,兩種方式都有不能被替代的功能。
我們可以在流程中直接使用Lambda表示式定義步驟,而不需要定義類,比如:
public class HelloWorldWorkflow : IWorkflow
{
public string Id => "HelloWorld";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context =>
{
Console.WriteLine("你好");
return ExecutionResult.Next();
})
.Then(context =>
{
Console.WriteLine("再見");
return ExecutionResult.Next();
});
}
}
這種方式無法使用JSON等格式實現。
採用JSON等DSL格式時,每個步驟有明示的ID,步驟轉移通過ID標識進行,這樣可以很方便地進行步驟間的跳轉。而採用Fluent API則沒有這麼靈活,我們看以下的定義:
{
"Id": "HelloWorld",
"Version": 1,
"Steps": [
{
"Id": "Hello",
"StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
"NextStepId": "Bye"
},
{
"Id": "Bye",
"StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
"NextStepId": "Hello"
}
]
}
步驟「Hello」執行完成後,執行"Bye",「Bye」執行完又回到「Hello」,如此迴圈。但在Fluent API中就沒有這麼方便,必須使用迴圈或者其它的方式。而這種跳轉方式在實際應用中非常常見,最常見的場景就是審批流程中的提交/駁回,提交-駁回過程可以形成多次迴圈,這種流程模式,採用帶有步驟標記的跳轉很容易實現。
流程相關的資料類和流程步驟中的屬性在理論上是沒有限制的,我們可以使用複雜的資料型別,比如Dictionary<string,string>或者具有複雜層次的資料類,但在研究中我們發現由於JSON DSL定義的限制,我們無法實現複雜資料結構的資料傳遞。使用Fluent API定義的流程中,可以使用Lamdba 表示式,但在JSON DSL中沒找到更好的方法。
下面的程式碼展示通過Lamdba表示式實現兩個Dictionary<string,string>之間的資料傳遞,但在DSL中沒有對應的方式:
.Output((step, data)=> {
var dic = step.EventData as Dictionary<string, object>;
foreach (var key in dic.Keys)
{
if (data.MyDic.ContainsKey(key)) data.MyDic[key] = dic[key];
else data.MyDic.Add(key, dic[key]);
}
而在實際應用中,我們需要使用流程定義檔案而不是寫死的程式碼來定義流程,這樣在流程修改時,就不需要修改程式碼和重新編譯部署。這個限制是WorkflowCore在實際專案中落地的一個主要障礙。
WorkflowCore提供了幾乎針對流行資料庫的各種持久化方式,支援SqlServer、Sqlite等關聯式資料庫,也支援MongoDb、Redis等非關聯式資料庫。預設使用的是在記憶體中儲存流程資料,但在實際應用中,必須將流程資料持久化以保證系統的可靠性。當系統因為計劃內或者意外原因出現異常後,正在執行的流程應該能夠在斷點處恢復並繼續執行。我們改造一下第一部分的例子,增加持久化設定,並模擬流程中斷和恢復過程。
首先,我們需要使用NuGet引入SqlServer持久化Provider:WorkflowCore.Persistence.SqlServer,當然也可以使用其它型別的資料儲存。
然後,修改ConfigureServices,將services.AddWorkflow()修改為:
services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=True;", true, true));
最後修改一下執行程式碼,增加流程Id輸入和恢復程式碼:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>();
host.Start();
var initialData = new Dictionary<string,string>();
Console.WriteLine("請輸入需要恢復的流程編號,如執行新流程直接回車:");
string workflowId = Console.ReadLine();
if (string.IsNullOrEmpty(workflowId))
{
workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData).Result;
Console.WriteLine(workflowId);
}
else
{
host.ResumeWorkflow(workflowId);
}
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.PublishEvent("MyEvent", workflowId, value);
下面,我們模擬中斷-恢復過程。首先,執行程式,不輸入流程id,直接按回車,會生成新的流程,並輸出流程Id,拷貝這個流程ID,並退出程式:
再次執行程式,輸入或貼上上一次生成的流程編號,可以繼續執行流程:
我們已經建立簡單的工作流,並可以在控制檯環境執行,現在我們可以為工作流建立簡單的單元測試,這裡我們使用xUnit作為測試框架。
在ZL.WorkflowCoreDemo解決方案中增加一個xUnit測試專案,命名為ZL.WorkflowCoreDemo.Test,建立好的專案中已經包含xunit和xunit.runner.visualstudio。我們還需要使用NuGet引入其它的框架,首先要引入FluentAssertions,這個框架結合xUnit,可以讓 我們在測試中使用Should斷言。還需要引入WorkflowCore和WorkflowCore.Testing以及我們需要測試的專案。這裡我們測試最簡單的HelloWorldWorkflow。
接下來編寫測試程式碼,測試類需要繼承WorkflowTest<流程類,流程相關的資料類>,由於HelloWorldWorkflow沒有相關的資料類,我們使用dynamic代替,類的定義如下:
using System;
using Xunit;
using WorkflowCore.Testing;
using ZL.WorflowCoreDemo.Basic;
using WorkflowCore.Models;
using System.Threading;
using FluentAssertions;
namespace ZL.WorkflowCoreDemo.Test
{
public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
{
public DemoUnitTest()
{
Setup();
}
[Fact]
public void Test1()
{
dynamic data = new { };
var workflowId = StartWorkflow(data);
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
WorkflowStatus status = GetStatus(workflowId);
status.Should().Be(WorkflowStatus.Complete);
UnhandledStepErrors.Count.Should().Be(0);
}
}
}
需要注意的是在測試類別建構函式中呼叫Setup(),用來初始化流程引擎。
現在我們可以在測試資源管理器中執行測試專案,如果一切順利的化,結果是這樣的:
但有時候理想和現實總是有些差距,我在執行時遇到了如下的異常:
通過研究發現我參照的WorkflowCore是最新的3.1.2版本,而WorkflowCore.Testing的版本是2.2,應該是版本不一致造成的問題,WorkflowCore和WorkflowCore.Testing的更新不同步。這時,開源專案的好處就體現出來了,通過檢視程式碼,改寫測試類如下:
using System;
using Xunit;
using WorkflowCore.Testing;
using ZL.WorflowCoreDemo.Basic;
using WorkflowCore.Models;
using System.Threading;
using FluentAssertions;
namespace ZL.WorkflowCoreDemo.Test
{
public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
{
public DemoUnitTest()
{
Setup();
}
[Fact]
public void Test1()
{
dynamic data = new { };
var workflowId = StartWorkflow(data);
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
WorkflowStatus status = GetStatus(workflowId);
status.Should().Be(WorkflowStatus.Complete);
UnhandledStepErrors.Count.Should().Be(0);
}
protected new WorkflowStatus GetStatus(string workflowId)
{
var instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result;
return instance.Status;
}
protected new void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut)
{
var status = GetStatus(workflowId);
var counter = 0;
while ((status == WorkflowStatus.Runnable) && (counter < (timeOut.TotalMilliseconds / 100)))
{
Thread.Sleep(100);
counter++;
status = GetStatus(workflowId);
}
}
}
}
再次執行,測試通過了。
前面提到了使用WaitFor暫停工作流,等待人工輸入後釋出事件重新啟用流程,今天介紹另一種方式,使用WorkflowCore的Activity,它的作用就是等待資料輸入,資料輸入完成後,工作流繼續執行。下面是簡單的例子:
using WorkflowCore.Interface;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.ActivityWorker
{
public class MyActivityWorkflow : IWorkflow<MyNameClass>
{
public string Id => "MyActivityWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith<HelloWithName>().Input(data => data.Name, step => step.MyName)
.Activity("activity-1", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result)
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName)
.Activity("activity-2", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result)
.Then<HelloWithName>().Input(step => step.Name, data => data.MyName)
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
這個例子很簡單,使用了我們前面定義的兩個步驟,HelloWithName和GoodbyeWithName,Activity在這裡就是接收外部輸入的Name。流程的執行程式碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<MyActivityWorkflow, MyNameClass>();
host.Start();
var myClass = new MyNameClass { MyName = "張三" };
host.StartWorkflow("MyActivityWorkflow", 1, myClass);
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
}
activity = host.GetPendingActivity("activity-2", "worker2", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
}
Console.ReadLine();
host.Stop();
工作流啟動後,需要通過host.GetPendingActivity獲取Activity,獲取成功,就從外部獲取資料,然後使用host.SubmitActivitySuccess提交資料。
使用WorkflowCore獲取外部資料時,有兩種方法可以讓流程等待外部資料,一是使用WaitFor註冊一個事件,外部資料輸入完成後,通過PublishEvent返回流程;另一種是使用Activity,註冊一個人工活動,執行到這個活動時,工作流等待,外部程式碼通過GetPendingActivity獲取相應的Activity,通過SubmitActivitySuccess提交資料。看起來兩種都可以完成外部資料輸入的任務,但實際中發現GetPendingActivity無法獲取是哪一個工作流範例的活動,如果有兩個範例同時執行,就沒有辦法分清除向哪個流程提報資料:
var id1=host.StartWorkflow("MyActivityWorkflow", 1, myClass).Result;
var id2 = host.StartWorkflow("MyActivityWorkflow", 1, myClass).Result;
//上面兩個範例中有相同的activity-1,無法知道這裡獲取的是哪一個範例的活動,
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
WairFor事件釋出時有工作流範例ID傳入:
host.PublishEvent("MyEvent", workflowId, value);
沒有上面的缺陷。
如果需要同時執行多個過程相同的而輸入不同的流程,可以使用ForEach控制語句,一定要注意,這裡的ForEach不是迴圈,不是一個流程執行完再執行另一個流程,我們仍然使用前面定義的簡單的步驟來組織ForEach範例流程,程式碼如下:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.Paralle
{
public class ParalleWorkflow : IWorkflow
{
public string Id => "ParalleWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context => { Console.WriteLine("開始"); ExecutionResult.Next(); })
.ForEach(data => new List<string>() { "張三", "李四", "王五", "趙六" })
.Do(x => x
.StartWith<HelloWithName>()
.Input(step => step.Name, (data, context) => context.Item as string)
.Then<GoodbyeWithName>()
.Input(step => step.Name, (data, context) => context.Item as string)
)
.Then(context => { Console.WriteLine("結束"); ExecutionResult.Next(); });
}
}
}
在這個例子裡,我們沒有定義相關的資料類,需要輸入的人名作為ForEach中的迴圈變數,這些變數儲存在context中,輸入到相應的環節中。執行程式碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<ParalleWorkflow>();
host.Start();
host.StartWorkflow("ParalleWorkflow", 1, null);
Console.ReadLine();
host.Stop();
前面我們提到了使用ForEach執行並行流程,這些流程的執行過程相同,不同的只是輸入的引數。如果需要並行執行多個不同的流程,需要使用Parallel,範例程式碼如下:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
namespace ZL.WorflowCoreDemo.Paralle
{
public class ParallePathWorkflow : IWorkflow
{
public string Id => "ParallePathWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context => { Console.WriteLine("開始"); ExecutionResult.Next(); })
.Parallel()
.Do(then =>
then.StartWith(context=>{ Console.WriteLine("分支一開始"); ExecutionResult.Next(); })
.Then(context => { Console.WriteLine("分支一結束"); ExecutionResult.Next(); }))
.Do(then =>
then.StartWith(context => { Console.WriteLine("分支二開始"); ExecutionResult.Next(); })
.Then(context => { Console.WriteLine("分支二結束"); ExecutionResult.Next(); }))
.Do(then =>
then.StartWith(context => { Console.WriteLine("分支二開始"); ExecutionResult.Next(); })
.Then(context => { Console.WriteLine("分支二結束"); ExecutionResult.Next(); }))
.Join()
.Then(context => { Console.WriteLine("結束"); ExecutionResult.Next(); });
}
}
}
為了說明分支語句的構成,這個流程沒有使用關聯的資料類,也沒有使用類定義步驟,全部使用Lambda表示式。Parallel的結構是分支的開始是Parallel(),結束是Join(),每個分支在Do語句中表示。流程的執行程式碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<ParallePathWorkflow>();
host.Start();
host.StartWorkflow("ParallePathWorkflow", 1, null);
Console.ReadLine();
host.Stop();
While迴圈會重複執行某些步驟,直到條件得到滿足再繼續執行下面的流程。使用While迴圈可以實現審批流程中的「提交/駁回」,如果審批沒有通過,駁回重新輸入,直到審批通過或者駁回次數到達上限。這裡舉一個簡單的例子說明使用方法,結合前面提到的Activity,可以實現對輸入進行判斷,如果輸入不滿足要求,就重新輸入。流程定義如下:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.ControlStructures
{
public class WhileWorkflow : IWorkflow<MyNameClass>
{
public string Id => "WhileWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith<HelloWithName>()
.Input(step => step.Name, data => data.MyName)
.While(data => data.MyName.Length < 3)
.Do(x => x
.StartWith(context=> { Console.WriteLine("輸入小於3個字元"); ExecutionResult.Next(); })
.Activity("activity-1", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result))
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
流程執行的程式碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<WhileWorkflow, MyNameClass>();
host.Start();
var myClass = new MyNameClass { MyName = "張三" };
host.StartWorkflow("WhileWorkflow", 1, myClass);
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
while (activity != null)
{
Console.WriteLine("輸入大於3個字元的名字結束,小於3個字元的名字繼續");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
}
Console.ReadLine();
host.Stop();
If判斷比較簡單,根據流程關聯的資料物件中的值進行判斷,如果條件滿足執行相應的分支。需要注意的是沒有else相關語句,如果需要實現相關邏輯,需要再次進行一次條件相反的判斷。下面是簡單的例子,仍然使用前面定義的資料類和步驟,輸入採用Activity:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.ControlStructures
{
public class IfWorkflow : IWorkflow<MyNameClass>
{
public string Id => "IfWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith(context=> ExecutionResult.Next())
.Activity("activity-1", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result)
.If(data => data.MyName.Length < 3)
.Do(then=>then
.StartWith(context => { Console.WriteLine("輸入小於3個字元"); ExecutionResult.Next(); }))
.If(data => data.MyName.Length >= 3)
.Do(then => then
.StartWith(context => { Console.WriteLine("輸入大於等於3個字元"); ExecutionResult.Next(); }))
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
流程的執行程式碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<IfWorkflow, MyNameClass>();
host.Start();
var myClass = new MyNameClass { MyName = "張三" };
host.StartWorkflow("IfWorkflow", 1, myClass);
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
}
Console.ReadLine();
host.Stop();
Decision Branches有點類似於switch語句,可以為每個條件建立一個分支,這些分支相對獨立,根據不同的條件選擇執行。如果使用Fluent API,可以使用CreateBranch方法建立分支,然後在流程中使用分支。為了說明問題,我們改造前面的If流程,使用Decision Branches實現相同的功能,流程定義的程式碼如下:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.ControlStructures
{
public class DecisionWorkflow : IWorkflow<MyNameClass>
{
public string Id => "DecisionWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
var branch1 = builder.CreateBranch()
.StartWith(context => { Console.WriteLine("輸入小於3個字元"); ExecutionResult.Next(); });
var branch2 = builder.CreateBranch()
.StartWith(context => { Console.WriteLine("輸入大於等於3個字元"); ExecutionResult.Next(); });
builder
.StartWith(context => ExecutionResult.Next())
.Activity("activity-1", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result)
.Decide(data => data.MyName.Length)
.Branch((data, outcome) => data.MyName.Length<3, branch1)
.Branch((data, outcome) => data.MyName.Length >= 3, branch2)
.Then<GoodbyeWithName>()
.Input(step => step.Name, data => data.MyName);
}
}
}
流程執行定義的程式碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<DecisionWorkflow, MyNameClass>();
host.Start();
var myClass = new MyNameClass { MyName = "張三" };
host.StartWorkflow("DecisionWorkflow", 1, myClass);
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
}
Console.ReadLine();
host.Stop();
WorkflowCore 提供了定時執行後臺任務的功能,使用Schedule可以定義非同步執行的任務,在工作流的後臺執行。範例程式碼如下:
using System;
using WorkflowCore.Interface;
namespace ZL.WorflowCoreDemo.ControlStructures
{
public class ScheduleWorkflow : IWorkflow
{
public string Id => "ScheduleWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context => Console.WriteLine("開始"))
.Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
.StartWith(context => Console.WriteLine("後臺工作")))
.Then(context => Console.WriteLine("前臺工作"));
}
}
}
在上面的程式碼中,工作流開始後,定義了一個Schedule,這個任務在延時5秒後,啟動一個後臺流程。流程的執行程式碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<ScheduleWorkflow>();
host.Start();
var workflowId = host.StartWorkflow("ScheduleWorkflow", 1, null).Result;
Console.ReadLine();
host.Stop();
流程的執行程式碼與前面的例子基本類似,執行結果如下:
執行時,前臺任務完成5秒後,後臺工作才執行。
前面介紹的Schedule可以啟動一個後臺的定時任務,這個任務只執行一次。如果需要執行多次固定間隔的任務,可以使用Recur,當條件滿足時任務不再執行。Recur的定義與Schedule類似,只是多了條件判斷輸入,流程定義的程式碼如下:
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;
namespace ZL.WorflowCoreDemo.ControlStructures
{
public class RecurWorkflow : IWorkflow<MyNameClass>
{
public string Id => "RecurWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyNameClass> builder)
{
builder
.StartWith(context => Console.WriteLine("開始"))
.Recur(data => TimeSpan.FromSeconds(5),data=>data.MyName.Length>5).Do(recur => recur
.StartWith<HelloWithName>()
.Input(step => step.Name, data => data.MyName))
.Then(context => Console.WriteLine("前臺工作"))
.Activity("activity-1", (data) => data.MyName)
.Output(data => data.MyName, step => step.Result);
}
}
}
這流程稍微複雜一點,我們增加了使用Activity的輸入,目的是看一下前臺的輸入等待是否會影響後臺的程序執行,還有就是前臺輸入的資料,能否正確傳遞到後臺,流程的執行程式碼如下:
IServiceProvider serviceProvider = ConfigureServices();
var host = serviceProvider.GetService<IWorkflowHost>();
host.RegisterWorkflow<RecurWorkflow,MyNameClass>();
host.Start();
var myClass = new MyNameClass { MyName = "張三" };
var workflowId = host.StartWorkflow("RecurWorkflow", 1, myClass).Result;
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine("輸入名字");
string value = Console.ReadLine();
host.SubmitActivitySuccess(activity.Token, value);
}
Console.ReadLine();
host.Stop();
執行效果如下:
可以看出,前臺需要的輸入等待並沒有影響後臺的執行,我們輸入一個新名字後:
WorkflowCore 自身的查詢功能很弱,不過它提供了Elasticsearch的plugin,可以使用Elasticsearch對流程進行索引和查詢。不太方便的地方是必須要安裝Elasticsearch。這裡先簡單介紹一下Elasticsearch,它是基於Lucene的搜尋伺服器,提供了分散式多使用者的全文檢索引擎,基於RESTful web介面。網上關於Elasticsearch的資料很多,可以自行搜尋。
如果希望使用Elasticsearch索引工作流,需要在專案中安裝WorkflowCore.Providers.Elasticsearch,使用NuGet安裝這個外掛,然後在services中進行設定:
using Nest;
...
services.AddWorkflow(cfg =>
{
...
cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://localhost:9200")), "index_name");
});
在程式碼中,通過依賴注入引入ISearchIndex,使用Search方法進行搜尋:
Search(string terms, int skip, int take, params SearchFilter[] filters)
檢索的範圍包括流程的定義、描述、狀態等。如果流程相關的自定義資料類需要檢索,資料類需要實現ISearchable介面。
WorkflowCore啟動的流程多執行緒的方式執行,如果流程中出現的異常不會丟擲到主程式,很多情況下感覺流程莫名奇妙地結束了。為了避免這種情況,需要顯示地宣告流程步驟的例外處理。如果使用Fluent API定義流程,可以在流程後附加OnError處理異常,但我們更希望對異常進行集中處理和記錄,這時可以使用WorkflowHost服務的OnStepError事件。定義如下:
var host = serviceProvider.GetService<IWorkflowHost>();
host.OnStepError += Host_OnStepError;
例外處理程式碼可以寫在Host_OnStepError中:
private static void Host_OnStepError(WorkflowCore.Models.WorkflowInstance workflow, WorkflowCore.Models.WorkflowStep step, Exception exception)
{
}
到這裡,我們介紹了WorkflowCore的使用,下面談一下這個專案在實際使用時遇到一些問題。
本文來自部落格園,作者:尋找無名的特質,轉載請註明原文連結:https://www.cnblogs.com/zhenl/p/16495977.html