使用c#的 async/await編寫 長時間執行的基於程式碼的工作流的 持久任務框架

2022-11-25 18:04:09

持久任務框架 (DTF) 是基於async/await 工作流執行框架。工作流的解決方案很多,包括Windows Workflow Foundation,BizTalk,Logic Apps, Workflow-CoreElsa-Core。最近我在Dapr 的倉庫裡跟蹤工作流構建塊的進展時,深入瞭解了一下,這個DTFx在Azure 基礎設施有大量的應用,現在Dapr團隊正在把這個實踐抽象成工作流構建塊,具體參看https://github.com/dapr/dapr/issues/4576。DTFx 正好是.NET開發的,所以對他多了幾分關注,以前沒有深入進去看看,現在我覺得是值得推薦給大家的一個工作流方案,它足夠輕量級,而且非常簡單,依賴很少。

持久任務框架是一個開源框架,它為 .NET 平臺中的工作流即程式碼提供了基礎。GitHub上:https://github.com/Azure/durabletask

它有兩個主要元件:業務流程和任務。業務流程「編排」應用程式邏輯,以內聯方式執行自定義程式碼並呼叫任務。自定義業務流程派生自 TaskOrchestration<TResult, TInput>自定義任務派生自 TaskActivity<TInput, TResult>。

推薦大家從這兩個倉庫可用來學習和生產使用。

Microsoft.Extensions.Hosting包裝器: https://github.com/jviau/durabletask-hosting

持久任務框架擴充套件: https://github.com/lucaslorentz/durabletask-extensions


我們一起來看下持久任務框架的Hello world: 程式碼來自https://github.com/jviau/durabletask-hosting 的 DurableTask.Samples:

這個非常簡單的業務流程「GreetingsOrchestration」,有兩個稱為任務「GetUserTask」,它執行名稱提示和「SendGreetingTask」,它將問候語寫入控制檯。

GreetingsOrchestration 派生自 TaskOrchestration<string、string> 並具有呼叫 GetUserTask 和 SendGreetingTask 的 RunTask 方法。

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

/// <summary>
/// A task orchestration for greeting a user.
/// </summary>
public class GreetingsOrchestration : TaskOrchestration<string, string>
{
     /// <inheritdoc />
     public override async Task<string> RunTask(OrchestrationContext context, string input)
     {
         string user = await context.ScheduleTask<string>(typeof(GetUserTask));
         string greeting = await context.ScheduleTask<string>(typeof(SendGreetingTask), user);
         return greeting;
     }
}

GetUserTask 派生自 TaskActivity<string,string> 並實現了 Execute 方法

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

/// <summary>
/// A task activity for getting a username from console.
/// </summary>
public class GetUserTask : TaskActivity<string, string>
{
     private readonly IConsole _console;

    /// <summary>
     /// Initializes a new instance of the <see cref="GetUserTask"/> class.
     /// </summary>
     /// <param name="console">The console output helper.</param>
     public GetUserTask(IConsole console)
     {
         _console = console ?? throw new ArgumentNullException(nameof(console));
     }

    /// <inheritdoc />
     protected override string Execute(TaskContext context, string input)
     {
         _console.WriteLine("Please enter your name:");
         return _console.ReadLine();
     }
}

SendGreetingTask 派生自 TaskActivity<string、string> 並實現了 Excute 方法

using DurableTask.Core;

namespace DurableTask.Samples.Greetings;

/// <summary>
/// A task for sending a greeting.
/// </summary>
public sealed class SendGreetingTask : AsyncTaskActivity<string, string>
{
     private readonly IConsole _console;

    /// <summary>
     /// Initializes a new instance of the <see cref="SendGreetingTask"/> class.
     /// </summary>
     /// <param name="console">The console output helper.</param>
     public SendGreetingTask(IConsole console)
     {
         _console = console ?? throw new ArgumentNullException(nameof(console));
     }

    /// <inheritdoc />
     protected override async Task<string> ExecuteAsync(TaskContext context, string user)
     {
         string message;
         if (!string.IsNullOrWhiteSpace(user) && user.Equals("TimedOut"))
         {
             message = "GetUser Timed out!!!";
             _console.WriteLine(message);
         }
         else
         {
             _console.WriteLine("Sending greetings to user: " + user + "...");
             await Task.Delay(5 * 1000);
             message = "Greeting sent to " + user;
             _console.WriteLine(message);
         }

        return message;
     }
}

上面的這個例子非常基礎,我們在專案中要把它用起來就要用到這個擴充套件專案 https://github.com/lucaslorentz/durabletask-extensions。這個專案通過更多功能擴充套件持久任務框架,並使其更易於使用,目前還在開發過程中,尚未達到投入生產的程度。包含了下列這些功能,讓你在任何地方都可以執行。

  • 更多定義儲存功能的介面
  • 依賴注入整合
  • EF Core MySql/PostgreSQL/SqlServer storages
  • 分散式工作執行緒:允許在多個工作執行緒中拆分業務流程/活動實現
  • 通過 GRPC 協定進行間接儲存存取:將您的儲存選擇和設定集中在單個元件中。
  • 使用者介面
  • BPMN 執行器

範例資料夾中,您可以找到經典書籍《飛行、汽車、酒店》的實現,其中包含補償問題。

該範例旨在演示具有以下元件的微服務體系結構:

  • 伺服器:連線到儲存並將其公開為 GRPC 終結點。
  • 應用程式介面:公開 REST API 以管理業務流程。
  • 使用者介面:公開用於管理業務流程的 UI。
  • 業務流程工作執行緒:為給定問題實現BookParallelBookSquential業務流程。
  • 飛行工作人員:實施預訂航班和取消航班活動。
  • 車伕:實施「預訂汽車」和「取消汽車」活動。
  • 酒店工作人員:實施預訂酒店和取消酒店活動。
  • BPMNWorker:一個建立在持久任務之上的實驗性 BPMN 執行器。對於給定的問題,還有BookParallelBookSequentialBPMN 工作流。

image