開源輕量級工作流WorkflowCore介紹

2022-07-20 09:01:42

在.Net Framework環境下,我們使用Windows Workflow Foundation(WF)作為專案的工作流引擎,可是.Net Core已經不支援WF了,需要為基於.Net Core的專案選擇新的工作流引擎。基本要求如下:

  • 輕量級,部署和使用都很簡單。
  • 有相當數量的使用者,往往使用的人越多,產品也就越可靠,遇到問題也容易找到解決辦法。
  • 支援使用組態檔定義工作流,而不僅僅是使用程式碼定義。

符合上述要求的開源專案有幾個,這裡介紹開源專案WorkflowCore,專案地址:https://github.com/danielgerlag/workflow-core。
本文的範例可以從github下載:https://github.com/zhenl/ZL.WorflowCoreDemo

簡單的控制檯專案

首先,使用Visual Studio建立一個.Net Core的控制檯專案,在NuGet管理器中引入下面程式包:

  • WorkflowCore
  • Microsoft.Extensions.DependencyInjection
  • Microsoft.Extensions.Logging

然後,建立兩個工作流的步驟:

using WorkflowCore.Interface;
using WorkflowCore.Models;

namespace WorkflowCoreTest
{
    public class HelloWorld : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("你好");
            return ExecutionResult.Next();
        }
    }
}

using WorkflowCore.Interface;
using WorkflowCore.Models;

namespace WorkflowCoreTest
{
    public class GoodbyeWorld : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("再見");
            return ExecutionResult.Next();
        }
    }
}

接下來使用這兩個步驟定義一個工作流:

using WorkflowCore.Interface;

namespace WorkflowCoreTest
{
    public class HelloWorldWorkflow : IWorkflow
    {
        public string Id => "HelloWorld";
        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith<HelloWorld>()
                .Then<GoodbyeWorld>();
        }
    }
}

最後,在主程式中,建立WorkflowHost,註冊並執行工作流,程式碼如下:

using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading;
using WorkflowCore.Interface;

namespace WorkflowCoreTest
{
    class Program
    {
        static void Main(string[] args)
        {
            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<HelloWorldWorkflow>();
            host.Start();

            host.StartWorkflow("HelloWorld", 1, null);
            Console.ReadLine();
            host.Stop();
        }

        private static IServiceProvider ConfigureServices()
        {
            //setup dependency injection
            IServiceCollection services = new ServiceCollection();
            services.AddLogging();
            services.AddWorkflow();
                        
            var serviceProvider = services.BuildServiceProvider();

            return serviceProvider;
        }
    }
}

簡單的工作流就完成了。

WorkflowHost

上一節通過一個簡單的控制檯例子介紹了WorkflowCore工作流的定義和執行過程,從例子中可以看到,工作流是執行在WorkflowHost範例中的,再看一下程式碼:

static void Main(string[] args)
        {
            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
                       
            host.RegisterWorkflow<HelloWorldWorkflow>();
            host.Start();
            host.StartWorkflow("HelloWorld", 1, null);
            
            Console.ReadLine();
            host.Stop();
        }

WorkflowHost的工作過程是這樣的,首先需要獲取WorkflowHost的範例,然後註冊工作流,這裡可以註冊多個工作流,接下來,啟動host,然後可以啟動工作流,這裡可以啟動多個工作流範例,最後,關閉host。

我們需要對WorkflowHost有進一步的瞭解,第一個問題,每次使用serviceProvider.GetService()獲得的host是否是同一物件?為了回答這個問題,我們增加一些程式碼:

            var host = serviceProvider.GetService<IWorkflowHost>();
            var host1 = serviceProvider.GetService<IWorkflowHost>();

            Console.WriteLine(host == host1);

我們獲取兩個host變數比較一下看是否指向相同的物件,結果是True,也就是使用serviceProvider.GetService<IWorkflowHost()獲得的是相同的物件。

第二個問題,呼叫host.Stop是否會影響正在執行的流程?
我們修改一下程式碼,啟動流程範例後,馬上執行host.Stop():

            host.RegisterWorkflow<HelloWorldWorkflow>();
            host.Start();
            host.StartWorkflow("HelloWorld", 1, null);
            host.Stop();
            Console.ReadLine();
            

我們發現,沒有輸出結果,也就是host.Stop()終止了所有流程。
第三個問題,host中啟動的流程是否在同一執行緒執行?
我們啟動多個流程,看一下輸出結果:

            host.RegisterWorkflow<HelloWorldWorkflow>();
            host.Start();
            host.StartWorkflow("HelloWorld", 1, null);
            host.StartWorkflow("HelloWorld", 1, null);
            host.StartWorkflow("HelloWorld", 1, null);
            host.Stop();
            Console.ReadLine();

說明每個流程是一個獨立的執行緒,並行執行。

下一步我們需要了解流程的引數傳遞。

流程的資料物件和資料傳遞

我們已經知道了如何使用Fluent API定義流程和如何註冊流程,現在我們需要了解如何定義流程需要處理的資料,和如何進行資料傳遞。這裡舉一個最簡單的例子來說明。在前面的例子中,我們輸出「你好」和「再見」,現在擴充套件這個需求,流程啟動後,等待使用者輸入名字,然後輸出「你好,<輸入的名字>」和「<輸入的名字>,再見」。為了完成這個需求,需要:

  • 定義一個資料結構用來儲存輸入的名字
  • 將這個資料結構與流程關聯起來
  • 修改流程,讓流程等待使用者輸入
  • 將使用者輸入的變數傳遞給流程
    首先我們定義一個簡單的類,用來儲存輸入的名字:
namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class MyNameClass
    {
        public string MyName { get; set; }
    }
}

然後,修改流程的定義:

using System;

using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "HelloWithNameWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            builder
                .StartWith(context => ExecutionResult.Next())
                .WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
                    .Output(data => data.MyName, step => step.EventData)
                .Then<HelloWithName>()
                    .Input(step => step.Name, data => data.MyName)
                .Then<GoodbyeWithName>()
                    .Input(step => step.Name, data => data.MyName);
        }
    }
}

這裡,流程宣告為 IWorkflow,說明流程使用這個類儲存資料,在流程定義中,可以使用data操作相關的資料物件,比如: .Input(step => step.Name, data => data.MyName) 就是將流程資料中的MyName傳遞給步驟中的Name(step.Name)。

這段程式碼中還使用WaitFor定義了一個事件,這個事件的輸出是將事件接收的外部引數(step.EventData)傳遞給流程的MyName屬性。

還需要修改兩個步驟,增加名稱欄位:

using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;


namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
{
    public class HelloWithName : StepBody
    {
        public string Name { get; set; }
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("你好," + Name);
            return ExecutionResult.Next();
        }
    }
}

using System;
using WorkflowCore.Interface;
using WorkflowCore.Models;


namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
{
    public class GoodbyeWithName : StepBody
    {
        public string Name { get; set; }
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine(Name + ",再見");
            return ExecutionResult.Next();
        }
    }
}

下面是流程註冊和執行的程式碼:

using System;

using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "HelloWithNameWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            builder
                .StartWith(context => ExecutionResult.Next())
                .WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
                    .Output(data => data.MyName, step => step.EventData)
                .Then<HelloWithName>()
                    .Input(step => step.Name, data => data.MyName)
                .Then<GoodbyeWithName>()
                    .Input(step => step.Name, data => data.MyName);
        }
    }
}
using System;
using System.Collections.Generic;

using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using WorkflowCore.Interface;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class FlowRun
    {
        public static void Run()
        {
            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            
            host.RegisterWorkflow<HelloWithNameWorkflow, MyNameClass>();
            host.Start();

            var initialData = new MyNameClass();
            var workflowId = host.StartWorkflow("HelloWithNameWorkflow", 1, initialData).Result;
            
            Console.WriteLine("輸入名字");
            string value = Console.ReadLine();
            host.PublishEvent("MyEvent", workflowId, value);

            Console.ReadLine();
            host.Stop();
        }

        private static IServiceProvider ConfigureServices()
        {
            //setup dependency injection
            IServiceCollection services = new ServiceCollection();
            services.AddLogging();
            services.AddWorkflow();

            var serviceProvider = services.BuildServiceProvider();

            return serviceProvider;
        }
    }
}

我們也可以使用字典作為資料物件,流程的定義如下:

using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class HelloWithNameWorkflowDynamic : IWorkflow<Dictionary<string,string>>
    {
        public string Id => "HelloWithNameWorkflowDynamic";
        public int Version => 1;

        public void Build(IWorkflowBuilder<Dictionary<string, string>> builder)
        {
            builder
                .StartWith(context => ExecutionResult.Next())
                .WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
                    .Output((step,data)=>data.Add("Name",(string)step.EventData))
                .Then<HelloWithName>()
                    .Input(step => step.Name, data => data["Name"])
                .Then<GoodbyeWithName>()
                    .Input(step => step.Name, data => data["Name"]);
        }
    }
}

這裡沒有使用自定義的類,而是使用了字典Dictionary<string, string>,流程的執行程式碼如下:

IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            
            host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>();
            host.Start();

            var initialData = new Dictionary<string,string>();
            var workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData).Result;
            
            Console.WriteLine("輸入名字");
            string value = Console.ReadLine();
            host.PublishEvent("MyEvent", workflowId, value);

            
            Console.ReadLine();
            foreach (var key in initialData.Keys)
            {
                Console.WriteLine(key + ":" + initialData[key]);
            }
            Console.ReadLine();
            host.Stop();

採用JSON格式定義流程

WorkflowCore 支援採用JSON或者YAML格式定義流程,使用時通過使用IDefintionLoader載入流程來替代RegisterWorkflow。我們仍然通過簡單的例子來說明。在我們現有的工程中已經定義了幾個簡單的流程步驟,我們用JSON格式將這幾個步驟組成簡單的工作流。

首先,在現有的解決方案中增加一個.Net Core的控制檯專案,名稱為ZL.WorkflowCoreDemo.Json,使用NuGet引入WorkflowCore,Microsoft.Extensions.Logging,還有WorkflowCore.DSL,然後,我們在專案中增加一個json檔案,將檔案的屬性「複製到輸出目錄」修改為「始終複製」:

在json檔案中定義流程:

{
  "Id": "HelloWorld",
  "Version": 1,
  "Steps": [
    {
      "Id": "Hello",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
      "NextStepId": "Bye"
    },
    {
      "Id": "Bye",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
      
    }
  ]
}

Json定義格式符合WorkflowCore的DSL,這裡不進行DSL的詳細介紹,我們重點關注流程如何定義,載入和執行。
我們可以將前面專案中的程式碼拷貝過來進行修改,首先修改下面的函數:

private static IServiceProvider ConfigureServices()
        {
            //setup dependency injection
            IServiceCollection services = new ServiceCollection();
            services.AddLogging();
            services.AddWorkflow();
            //這是新增加的服務
            services.AddWorkflowDSL();

            var serviceProvider = services.BuildServiceProvider();

            return serviceProvider;
        }

ConfigureServices新增加了services.AddWorkflowDSL();
在主函數中,使用IDefintionLoader載入JSON格式的流程定義:

static void Main(string[] args)
        {
            IServiceProvider serviceProvider = ConfigureServices();

            var loader = serviceProvider.GetService<IDefinitionLoader>();

            var json = System.IO.File.ReadAllText("myflow.json");
            loader.LoadDefinition(json, Deserializers.Json);
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.Start();
            host.StartWorkflow("HelloWorld", 1, null);
            
            Console.ReadLine();
            host.Stop();
        }

現在,流程可以執行了。

在研究過程中發現了一個坑,可能需要注意。在這個例子中我們使用了前面專案定義的流程步驟,如果在本專案中定義流程步驟,會出現找不到相應動態庫的錯誤,不知道是否是一個缺陷。

JSON格式(DSL)定義流程與使用Fluent API定義流程的比較

前面我們分別討論了使用Fluent API定義流程和使用JSON格式定義流程,按照以前的使用經驗,感覺這兩種定義方式應該可以互相轉換,互相代替,但在實際應用中發現並不是如此,兩種方式都有不能被替代的功能。

使用Fluent API可以使用Lambda 表示式定義步驟

我們可以在流程中直接使用Lambda表示式定義步驟,而不需要定義類,比如:

public class HelloWorldWorkflow : IWorkflow
{
    public string Id => "HelloWorld";
    public int Version => 1;

    public void Build(IWorkflowBuilder<object> builder)
    {
        builder
            .StartWith(context =>
            {
                Console.WriteLine("你好");
                return ExecutionResult.Next();
            })
            .Then(context =>
            {
                Console.WriteLine("再見");
                return ExecutionResult.Next();
            });
    }
}

這種方式無法使用JSON等格式實現。

採用JSON等DSL格式可以方便地定義步驟間的跳轉

採用JSON等DSL格式時,每個步驟有明示的ID,步驟轉移通過ID標識進行,這樣可以很方便地進行步驟間的跳轉。而採用Fluent API則沒有這麼靈活,我們看以下的定義:

{
  "Id": "HelloWorld",
  "Version": 1,
  "Steps": [
    {
      "Id": "Hello",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
      "NextStepId": "Bye"
    },
    {
      "Id": "Bye",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
      "NextStepId": "Hello"
    }
  ]
}

步驟「Hello」執行完成後,執行"Bye",「Bye」執行完又回到「Hello」,如此迴圈。但在Fluent API中就沒有這麼方便,必須使用迴圈或者其它的方式。而這種跳轉方式在實際應用中非常常見,最常見的場景就是審批流程中的提交/駁回,提交-駁回過程可以形成多次迴圈,這種流程模式,採用帶有步驟標記的跳轉很容易實現。

流程資料類的侷限性

流程相關的資料類和流程步驟中的屬性在理論上是沒有限制的,我們可以使用複雜的資料型別,比如Dictionary<string,string>或者具有複雜層次的資料類,但在研究中我們發現由於JSON DSL定義的限制,我們無法實現複雜資料結構的資料傳遞。使用Fluent API定義的流程中,可以使用Lamdba 表示式,但在JSON DSL中沒找到更好的方法。

下面的程式碼展示通過Lamdba表示式實現兩個Dictionary<string,string>之間的資料傳遞,但在DSL中沒有對應的方式:

                    .Output((step, data)=> {
                        var dic = step.EventData as Dictionary<string, object>;
                        foreach (var key in dic.Keys)
                        {
                            if (data.MyDic.ContainsKey(key)) data.MyDic[key] = dic[key];
                            else data.MyDic.Add(key, dic[key]);
                        }

而在實際應用中,我們需要使用流程定義檔案而不是寫死的程式碼來定義流程,這樣在流程修改時,就不需要修改程式碼和重新編譯部署。這個限制是WorkflowCore在實際專案中落地的一個主要障礙。

工作流持久化與恢復

WorkflowCore提供了幾乎針對流行資料庫的各種持久化方式,支援SqlServer、Sqlite等關聯式資料庫,也支援MongoDb、Redis等非關聯式資料庫。預設使用的是在記憶體中儲存流程資料,但在實際應用中,必須將流程資料持久化以保證系統的可靠性。當系統因為計劃內或者意外原因出現異常後,正在執行的流程應該能夠在斷點處恢復並繼續執行。我們改造一下第一部分的例子,增加持久化設定,並模擬流程中斷和恢復過程。

首先,我們需要使用NuGet引入SqlServer持久化Provider:WorkflowCore.Persistence.SqlServer,當然也可以使用其它型別的資料儲存。

然後,修改ConfigureServices,將services.AddWorkflow()修改為:

services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=True;", true, true));

最後修改一下執行程式碼,增加流程Id輸入和恢復程式碼:

IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            
            host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>();
            host.Start();

            var initialData = new Dictionary<string,string>();

            Console.WriteLine("請輸入需要恢復的流程編號,如執行新流程直接回車:");
            string workflowId = Console.ReadLine();
            
            if (string.IsNullOrEmpty(workflowId))
            {
                workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData).Result;
                Console.WriteLine(workflowId);
            }
            else
            {
                host.ResumeWorkflow(workflowId);
            }
              

            
            Console.WriteLine("輸入名字");
            string value = Console.ReadLine();
            host.PublishEvent("MyEvent", workflowId, value);

下面,我們模擬中斷-恢復過程。首先,執行程式,不輸入流程id,直接按回車,會生成新的流程,並輸出流程Id,拷貝這個流程ID,並退出程式:

再次執行程式,輸入或貼上上一次生成的流程編號,可以繼續執行流程:

單元測試

我們已經建立簡單的工作流,並可以在控制檯環境執行,現在我們可以為工作流建立簡單的單元測試,這裡我們使用xUnit作為測試框架。

在ZL.WorkflowCoreDemo解決方案中增加一個xUnit測試專案,命名為ZL.WorkflowCoreDemo.Test,建立好的專案中已經包含xunit和xunit.runner.visualstudio。我們還需要使用NuGet引入其它的框架,首先要引入FluentAssertions,這個框架結合xUnit,可以讓 我們在測試中使用Should斷言。還需要引入WorkflowCore和WorkflowCore.Testing以及我們需要測試的專案。這裡我們測試最簡單的HelloWorldWorkflow。

接下來編寫測試程式碼,測試類需要繼承WorkflowTest<流程類,流程相關的資料類>,由於HelloWorldWorkflow沒有相關的資料類,我們使用dynamic代替,類的定義如下:

using System;
using Xunit;
using WorkflowCore.Testing;
using ZL.WorflowCoreDemo.Basic;
using WorkflowCore.Models;
using System.Threading;
using FluentAssertions;

namespace ZL.WorkflowCoreDemo.Test
{
    public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
    {
        public DemoUnitTest()
        {
            Setup();
        }

        [Fact]
        public void Test1()
        {
            dynamic data = new { };
            var workflowId = StartWorkflow(data);
            WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));

            WorkflowStatus status = GetStatus(workflowId);
            status.Should().Be(WorkflowStatus.Complete);
            UnhandledStepErrors.Count.Should().Be(0);
           
        }

    }
}

需要注意的是在測試類別建構函式中呼叫Setup(),用來初始化流程引擎。

現在我們可以在測試資源管理器中執行測試專案,如果一切順利的化,結果是這樣的:

但有時候理想和現實總是有些差距,我在執行時遇到了如下的異常:

通過研究發現我參照的WorkflowCore是最新的3.1.2版本,而WorkflowCore.Testing的版本是2.2,應該是版本不一致造成的問題,WorkflowCore和WorkflowCore.Testing的更新不同步。這時,開源專案的好處就體現出來了,通過檢視程式碼,改寫測試類如下:

using System;
using Xunit;
using WorkflowCore.Testing;
using ZL.WorflowCoreDemo.Basic;
using WorkflowCore.Models;
using System.Threading;
using FluentAssertions;

namespace ZL.WorkflowCoreDemo.Test
{
    public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
    {
        public DemoUnitTest()
        {
            Setup();
        }

        [Fact]
        public void Test1()
        {
            dynamic data = new { };
            var workflowId = StartWorkflow(data);
            WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));

            WorkflowStatus status = GetStatus(workflowId);
            status.Should().Be(WorkflowStatus.Complete);
            UnhandledStepErrors.Count.Should().Be(0);
           
        }

        protected new WorkflowStatus GetStatus(string workflowId)
        {
            var instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result;
            return instance.Status;
        }

        protected new void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut)
        {
            var status = GetStatus(workflowId);
            var counter = 0;
            while ((status == WorkflowStatus.Runnable) && (counter < (timeOut.TotalMilliseconds / 100)))
            {
                Thread.Sleep(100);
                counter++;
                status = GetStatus(workflowId);
            }
        }
    }
}

再次執行,測試通過了。

Activity Workers

前面提到了使用WaitFor暫停工作流,等待人工輸入後釋出事件重新啟用流程,今天介紹另一種方式,使用WorkflowCore的Activity,它的作用就是等待資料輸入,資料輸入完成後,工作流繼續執行。下面是簡單的例子:

using WorkflowCore.Interface;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ActivityWorker
{
    public class MyActivityWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "MyActivityWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {

            builder
                .StartWith<HelloWithName>().Input(data => data.Name, step => step.MyName)
                    .Activity("activity-1", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result)
                    .Then<GoodbyeWithName>()
                        .Input(step => step.Name, data => data.MyName)
                    .Activity("activity-2", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result)
                     .Then<HelloWithName>().Input(step => step.Name, data => data.MyName)
                    .Then<GoodbyeWithName>()
                        .Input(step => step.Name, data => data.MyName);
        }
    }
}

這個例子很簡單,使用了我們前面定義的兩個步驟,HelloWithName和GoodbyeWithName,Activity在這裡就是接收外部輸入的Name。流程的執行程式碼如下:

IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<MyActivityWorkflow, MyNameClass>();

            host.Start();

            var myClass = new MyNameClass { MyName = "張三" };

            host.StartWorkflow("MyActivityWorkflow", 1, myClass);

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;

            if (activity != null)
            {
                Console.WriteLine("輸入名字");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);
            }

            activity = host.GetPendingActivity("activity-2", "worker2", TimeSpan.FromMinutes(1)).Result;

            if (activity != null)
            {
                Console.WriteLine("輸入名字");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);
            }

            Console.ReadLine();
            host.Stop();

工作流啟動後,需要通過host.GetPendingActivity獲取Activity,獲取成功,就從外部獲取資料,然後使用host.SubmitActivitySuccess提交資料。

WaitFor vs Activity

使用WorkflowCore獲取外部資料時,有兩種方法可以讓流程等待外部資料,一是使用WaitFor註冊一個事件,外部資料輸入完成後,通過PublishEvent返回流程;另一種是使用Activity,註冊一個人工活動,執行到這個活動時,工作流等待,外部程式碼通過GetPendingActivity獲取相應的Activity,通過SubmitActivitySuccess提交資料。看起來兩種都可以完成外部資料輸入的任務,但實際中發現GetPendingActivity無法獲取是哪一個工作流範例的活動,如果有兩個範例同時執行,就沒有辦法分清除向哪個流程提報資料:

            var id1=host.StartWorkflow("MyActivityWorkflow", 1, myClass).Result;
            var id2 = host.StartWorkflow("MyActivityWorkflow", 1, myClass).Result;

             //上面兩個範例中有相同的activity-1,無法知道這裡獲取的是哪一個範例的活動,         
            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;

WairFor事件釋出時有工作流範例ID傳入:

host.PublishEvent("MyEvent", workflowId, value);

沒有上面的缺陷。

使用ForEach並行執行多個流程

如果需要同時執行多個過程相同的而輸入不同的流程,可以使用ForEach控制語句,一定要注意,這裡的ForEach不是迴圈,不是一個流程執行完再執行另一個流程,我們仍然使用前面定義的簡單的步驟來組織ForEach範例流程,程式碼如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.Paralle
{
    public class ParalleWorkflow : IWorkflow
    {
        public string Id => "ParalleWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
            .StartWith(context => { Console.WriteLine("開始"); ExecutionResult.Next(); })
            .ForEach(data => new List<string>() { "張三", "李四", "王五", "趙六" })
                .Do(x => x
                    .StartWith<HelloWithName>()
                        .Input(step => step.Name, (data, context) => context.Item as string)
                    .Then<GoodbyeWithName>()
                        .Input(step => step.Name, (data, context) => context.Item as string)
                    )
            .Then(context => { Console.WriteLine("結束"); ExecutionResult.Next(); });
        }
    }
}

在這個例子裡,我們沒有定義相關的資料類,需要輸入的人名作為ForEach中的迴圈變數,這些變數儲存在context中,輸入到相應的環節中。執行程式碼如下:

            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<ParalleWorkflow>();

            host.Start();
            host.StartWorkflow("ParalleWorkflow", 1, null);


            Console.ReadLine();
            host.Stop();

Parallel並行執行多個流程

前面我們提到了使用ForEach執行並行流程,這些流程的執行過程相同,不同的只是輸入的引數。如果需要並行執行多個不同的流程,需要使用Parallel,範例程式碼如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;

namespace ZL.WorflowCoreDemo.Paralle
{
    public class ParallePathWorkflow : IWorkflow
    {
        public string Id => "ParallePathWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
            .StartWith(context => { Console.WriteLine("開始"); ExecutionResult.Next(); })
            .Parallel()
                .Do(then =>
                    then.StartWith(context=>{ Console.WriteLine("分支一開始"); ExecutionResult.Next(); })
                        .Then(context => { Console.WriteLine("分支一結束"); ExecutionResult.Next(); }))
                .Do(then =>
                    then.StartWith(context => { Console.WriteLine("分支二開始"); ExecutionResult.Next(); })
                        .Then(context => { Console.WriteLine("分支二結束"); ExecutionResult.Next(); }))
                .Do(then =>
                    then.StartWith(context => { Console.WriteLine("分支二開始"); ExecutionResult.Next(); })
                        .Then(context => { Console.WriteLine("分支二結束"); ExecutionResult.Next(); }))
            .Join()
            .Then(context => { Console.WriteLine("結束"); ExecutionResult.Next(); });
        }
    }
}

為了說明分支語句的構成,這個流程沒有使用關聯的資料類,也沒有使用類定義步驟,全部使用Lambda表示式。Parallel的結構是分支的開始是Parallel(),結束是Join(),每個分支在Do語句中表示。流程的執行程式碼如下:

IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<ParallePathWorkflow>();
            host.Start();
            host.StartWorkflow("ParallePathWorkflow", 1, null);
            Console.ReadLine();
            host.Stop();

While迴圈

While迴圈會重複執行某些步驟,直到條件得到滿足再繼續執行下面的流程。使用While迴圈可以實現審批流程中的「提交/駁回」,如果審批沒有通過,駁回重新輸入,直到審批通過或者駁回次數到達上限。這裡舉一個簡單的例子說明使用方法,結合前面提到的Activity,可以實現對輸入進行判斷,如果輸入不滿足要求,就重新輸入。流程定義如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class WhileWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "WhileWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            builder
                .StartWith<HelloWithName>()
                    .Input(step => step.Name, data => data.MyName)
                .While(data => data.MyName.Length < 3)
                    .Do(x => x
                        .StartWith(context=> { Console.WriteLine("輸入小於3個字元"); ExecutionResult.Next(); })
                        .Activity("activity-1", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result))
                .Then<GoodbyeWithName>()
                   .Input(step => step.Name, data => data.MyName);
        }
    }
}

流程執行的程式碼如下:

            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<WhileWorkflow, MyNameClass>();

            host.Start();

            var myClass = new MyNameClass { MyName = "張三" };

            host.StartWorkflow("WhileWorkflow", 1, myClass);

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;

            
            while (activity != null)
            {
                Console.WriteLine("輸入大於3個字元的名字結束,小於3個字元的名字繼續");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);
                activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
            }
                        
            Console.ReadLine();
            host.Stop();

If判斷

If判斷比較簡單,根據流程關聯的資料物件中的值進行判斷,如果條件滿足執行相應的分支。需要注意的是沒有else相關語句,如果需要實現相關邏輯,需要再次進行一次條件相反的判斷。下面是簡單的例子,仍然使用前面定義的資料類和步驟,輸入採用Activity:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class IfWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "IfWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            builder
                .StartWith(context=> ExecutionResult.Next())
                .Activity("activity-1", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result)    
                .If(data => data.MyName.Length < 3)
                    .Do(then=>then
                        .StartWith(context => { Console.WriteLine("輸入小於3個字元"); ExecutionResult.Next(); }))
                .If(data => data.MyName.Length >= 3)
                    .Do(then => then
                        .StartWith(context => { Console.WriteLine("輸入大於等於3個字元"); ExecutionResult.Next(); }))
                .Then<GoodbyeWithName>()
                   .Input(step => step.Name, data => data.MyName);
        }
    }
}

流程的執行程式碼如下:

            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<IfWorkflow, MyNameClass>();

            host.Start();

            var myClass = new MyNameClass { MyName = "張三" };

            host.StartWorkflow("IfWorkflow", 1, myClass);

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;


            if (activity != null)
            {
                Console.WriteLine("輸入名字");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);
                
            }

            Console.ReadLine();
            host.Stop();

條件分支Decision Branches

Decision Branches有點類似於switch語句,可以為每個條件建立一個分支,這些分支相對獨立,根據不同的條件選擇執行。如果使用Fluent API,可以使用CreateBranch方法建立分支,然後在流程中使用分支。為了說明問題,我們改造前面的If流程,使用Decision Branches實現相同的功能,流程定義的程式碼如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class DecisionWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "DecisionWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            var branch1 = builder.CreateBranch()
                .StartWith(context => { Console.WriteLine("輸入小於3個字元"); ExecutionResult.Next(); });
            var branch2 = builder.CreateBranch()
                .StartWith(context => { Console.WriteLine("輸入大於等於3個字元"); ExecutionResult.Next(); });

            builder
                .StartWith(context => ExecutionResult.Next())
                .Activity("activity-1", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result)
                .Decide(data => data.MyName.Length)
                     .Branch((data, outcome) => data.MyName.Length<3, branch1)
                     .Branch((data, outcome) => data.MyName.Length >= 3, branch2)
                .Then<GoodbyeWithName>()
                   .Input(step => step.Name, data => data.MyName);
        }
    }
}

流程執行定義的程式碼如下:

            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<DecisionWorkflow, MyNameClass>();

            host.Start();

            var myClass = new MyNameClass { MyName = "張三" };

            host.StartWorkflow("DecisionWorkflow", 1, myClass);

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;


            if (activity != null)
            {
                Console.WriteLine("輸入名字");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);
                
            }

            Console.ReadLine();
            host.Stop();

使用Schedule執行定時任務

WorkflowCore 提供了定時執行後臺任務的功能,使用Schedule可以定義非同步執行的任務,在工作流的後臺執行。範例程式碼如下:

using System;
using WorkflowCore.Interface;


namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class ScheduleWorkflow : IWorkflow
    {
        public string Id => "ScheduleWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith(context => Console.WriteLine("開始"))
                    .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
                    .StartWith(context => Console.WriteLine("後臺工作")))
                .Then(context => Console.WriteLine("前臺工作"));
        }
    }
}

在上面的程式碼中,工作流開始後,定義了一個Schedule,這個任務在延時5秒後,啟動一個後臺流程。流程的執行程式碼如下:

           IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();

            host.RegisterWorkflow<ScheduleWorkflow>();
            host.Start();

            
            var workflowId = host.StartWorkflow("ScheduleWorkflow", 1, null).Result;

            Console.ReadLine();
            host.Stop();

流程的執行程式碼與前面的例子基本類似,執行結果如下:

執行時,前臺任務完成5秒後,後臺工作才執行。

使用Recur執行重複的後臺任務

前面介紹的Schedule可以啟動一個後臺的定時任務,這個任務只執行一次。如果需要執行多次固定間隔的任務,可以使用Recur,當條件滿足時任務不再執行。Recur的定義與Schedule類似,只是多了條件判斷輸入,流程定義的程式碼如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class RecurWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "RecurWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            builder
                .StartWith(context => Console.WriteLine("開始"))
                    .Recur(data => TimeSpan.FromSeconds(5),data=>data.MyName.Length>5).Do(recur => recur
                    .StartWith<HelloWithName>()
                    .Input(step => step.Name, data => data.MyName))
                .Then(context => Console.WriteLine("前臺工作"))
                .Activity("activity-1", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result);
        }
    }
}

這流程稍微複雜一點,我們增加了使用Activity的輸入,目的是看一下前臺的輸入等待是否會影響後臺的程序執行,還有就是前臺輸入的資料,能否正確傳遞到後臺,流程的執行程式碼如下:

            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();

            host.RegisterWorkflow<RecurWorkflow,MyNameClass>();
            host.Start();

            var myClass = new MyNameClass { MyName = "張三" };

            var workflowId = host.StartWorkflow("RecurWorkflow", 1, myClass).Result;

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;


            if (activity != null)
            {
                Console.WriteLine("輸入名字");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);

            }

            Console.ReadLine();
            host.Stop();

執行效果如下:

可以看出,前臺需要的輸入等待並沒有影響後臺的執行,我們輸入一個新名字後:

整合Elasticsearch

WorkflowCore 自身的查詢功能很弱,不過它提供了Elasticsearch的plugin,可以使用Elasticsearch對流程進行索引和查詢。不太方便的地方是必須要安裝Elasticsearch。這裡先簡單介紹一下Elasticsearch,它是基於Lucene的搜尋伺服器,提供了分散式多使用者的全文檢索引擎,基於RESTful web介面。網上關於Elasticsearch的資料很多,可以自行搜尋。

如果希望使用Elasticsearch索引工作流,需要在專案中安裝WorkflowCore.Providers.Elasticsearch,使用NuGet安裝這個外掛,然後在services中進行設定:

using Nest;
...
services.AddWorkflow(cfg =>
{
    ...
    cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://localhost:9200")), "index_name");
});

在程式碼中,通過依賴注入引入ISearchIndex,使用Search方法進行搜尋:

Search(string terms, int skip, int take, params SearchFilter[] filters)

檢索的範圍包括流程的定義、描述、狀態等。如果流程相關的自定義資料類需要檢索,資料類需要實現ISearchable介面。

例外處理

WorkflowCore啟動的流程多執行緒的方式執行,如果流程中出現的異常不會丟擲到主程式,很多情況下感覺流程莫名奇妙地結束了。為了避免這種情況,需要顯示地宣告流程步驟的例外處理。如果使用Fluent API定義流程,可以在流程後附加OnError處理異常,但我們更希望對異常進行集中處理和記錄,這時可以使用WorkflowHost服務的OnStepError事件。定義如下:

 var host = serviceProvider.GetService<IWorkflowHost>();
 host.OnStepError += Host_OnStepError;

例外處理程式碼可以寫在Host_OnStepError中:

private static void Host_OnStepError(WorkflowCore.Models.WorkflowInstance workflow, WorkflowCore.Models.WorkflowStep step, Exception exception)
        {
            
        }

實際使用中的問題

到這裡,我們介紹了WorkflowCore的使用,下面談一下這個專案在實際使用時遇到一些問題。

  • 輕量級,部署和使用都很簡單。專案本身滿足這個條件,但對流程相關的查詢功能很弱,如果需要增強,需要Elasticsearch的支援。部署和使用Elasticsearch帶來了額外的工作量。
  • WorkflowCore支援使用JSON格式定義工作流,然而從功能上要弱於使用Fluent API定義的工作流,因為不具備解析Lambda表示式的能力
  • 引數傳遞功能相對較弱,無法傳遞複雜物件。
    上述問題是我們在實際中遇到的,希望對大家有所幫助。