MASA Framework -- 跨程序事件 IntegrationEventBus入門與設計

2022-11-22 12:01:38

概述

跨程序事件匯流排允許釋出和訂閱跨服務傳輸的訊息, 服務的釋出與訂閱不在同一個程序中

在Masa Framework中, 跨程序匯流排事件提供了一個可以被開箱即用的程式

入門

跨程序事件與Dapr並不是強繫結的, Masa Framework使用了Dapr提供的pub/sub的能力, 如果你不想使用它, 你也可以更換為其它實現, 但目前Masa Framwork中僅提供了Dapr的實現

  1. 新建ASP.NET Core 空專案Assignment.IntegrationEventBus,並安裝Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFMasa.Contrib.Data.EFCore.SqliteMasa.Contrib.Data.UoW.EFCoreMasa.Contrib.Development.DaprStarter.AspNetCoreMicrosoft.EntityFrameworkCore.Design
dotnet new web -o Assignment.IntegrationEventBus
cd Assignment.IntegrationEventBus

dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.8 // 使用dapr提供的pubsub能力
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EF --version 0.7.0-preview.8 //本地訊息表
dotnet add package Masa.Contrib.Data.EFCore.Sqlite --version 0.7.0-preview.8 //使用EfCore.Sqlite
dotnet add package Masa.Contrib.Data.UoW.EFCore --version 0.7.0-preview.8 //使用工作單元
dotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.8 //開發環境使用DaprStarter協助管理Dapr Sidecar
dotnet add package Microsoft.EntityFrameworkCore.Design --version 6.0.6 //方便後續通過CodeFirst遷移資料庫
  1. 新建使用者上下文類UserDbContext,並繼承MasaDbContext
public class UserDbContext : MasaDbContext
{
    public UserDbContext(MasaDbContextOptions<UserDbContext> options) : base(options)
    {
    }
}
  1. 註冊DaprStarter, 協助管理Dapr Sidecar, 修改Program.cs
if (builder.Environment.IsDevelopment())
{
    builder.Services.AddDaprStarter();
}

通過Dapr釋出整合事件需要執行Dapr, 線上環境可通過Kubernetes來執行, 開發環境可藉助Dapr Starter執行Dapr, 因此僅需要在開發環境使用它

  1. 註冊跨程序事件匯流排,修改類Program
builder.Services.AddIntegrationEventBus(option =>
{
    option.UseDapr()
        .UseEventLog<UserDbContext>()
        .UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
});
var app = builder.Build();

#region dapr 訂閱整合事件使用
app.UseRouting();

app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{
    endpoints.MapSubscribeHandler();
});
#endregion
  1. 新增使用者註冊事件的整合事件 RegisterUserEvent
public record RegisterUserEvent : IntegrationEvent
{
    public override string Topic { get; set; } = nameof(RegisterUserEvent);

    public string Account { get; set; }

    public string Mobile { get; set; }
}
  1. 開啟Assignment.IntegrationEventBus所在資料夾,開啟cmd或Powershell執行
dotnet ef migrations add init //建立遷移
dotnet ef database update //更新資料庫
  1. 傳送跨程序事件,修改Program
app.MapPost("/register", async (IIntegrationEventBus eventBus) =>
{
    //todo: 模擬註冊使用者並行布註冊使用者事件
    await eventBus.PublishAsync(new RegisterUserEvent()
    {
        Account = "Tom",
        Mobile = "19999999999"
    });
});
  1. 訂閱事件,修改Program
app.MapPost("/IntegrationEvent/RegisterUser", [Topic("pubsub", nameof(RegisterUserEvent))](RegisterUserEvent @event) =>
{
    Console.WriteLine($"註冊使用者成功: {@event.Account}");
});

訂閱事件暫時未抽象,目前使用的是Dapr原生的訂閱方式,後續我們會支援Bind,屆時不會由於更換pubsub的實現而導致訂閱方式的改變

儘管跨程序事件目前僅支援了Dapr,但這不代表你與RabbitMqKafka等無緣,釋出/訂閱是Dapr抽象出的能力,實現釋出訂閱的元件有很多種,RabbitMqKafka是其中一種實現,如果你想深入瞭解他們之間的關係,可以參考:

  1. 手把手教你學Dapr
  2. PubSub代理

原始碼解讀

首先我們先要知道的基礎知識點:

  • IIntegrationEvent: 整合事件介面, 繼承 IEvent (本地事件介面)、ITopic (訂閱介面, 釋出訂閱的主題)、ITransaction (事務介面)
  • IIntegrationEventBus: 整合事件匯流排介面、用於提供傳送整合事件的功能
  • IIntegrationEventLogService: 整合事件紀錄檔服務的介面 (提供儲存本地紀錄檔、修改狀態為進行中、成功、失敗、刪除過期紀錄檔、獲取等待重試紀錄檔列表的功能)
  • IntegrationEventLog: 整合事件紀錄檔, 提供本地訊息表的模型
  • IHasConcurrencyStamp: 並行標記介面 (實現此介面的類會自動為RowVersion賦值)

Masa.Contrib.Dispatcher.IntegrationEvents

提供了整合事件介面的實現類, 並支援了發件箱模式, 其中:

  • IPublisher: 整合事件的傳送者
  • IProcessingServer: 後臺服務介面
  • IProcessor: 處理程式介面 (後臺處理程式中會獲取所有的程式程式)
    • DeleteLocalQueueExpiresProcessor: 刪除過期程式 (從本地佇列刪除)
    • DeletePublishedExpireEventProcessor: 刪除已過期的釋出成功的本地訊息程式 (從Db刪除)
    • RetryByLocalQueueProcessor: 重試本地訊息記錄 (從本地佇列中獲取, 條件: 傳送狀態為失敗或進行中且重試次數小於最大重試次數且重試間隔大於最小重試間隔)
    • RetryByDataProcessor: 重試本地訊息記錄 (從Db獲取, 條件: 傳送狀態為失敗或進行中且重試次數小於最大重試次數且重試間隔大於最小重試間隔, 且不在本地重試佇列中)
  • IntegrationEventBus: IIntegrationEvent的實現

Masa.Contrib.Dispatcher.IntegrationEvents中僅提供了發件箱的功能, 但整合事件的釋出是由 IPublisher的實現類來提供, 由Db獲取本地訊息表的功能是由IIntegrationEventLogService的實現類來提供, 它們分別屬於Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore的功能, 這也是為什麼使用整合事件需要參照包

  • Masa.Contrib.Dispatcher.IntegrationEvents
  • Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
  • Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore

如何快速接入其它實現

那會有小夥伴問了, 我現在沒有使用Dapr, 未來一段時間暫時也還不希望接入Dapr, 我想自己接入, 以實現整合事件的釋出可以嗎?

當然是可以的, 如果你希望自行實現整合事件, 那麼這個時候你會遇到兩種情況

接入方支援發件箱模式

以社群用的較多的庫CAP為例, 由於它本身已經完成了發件箱模式, 我們不需要再處理本地訊息表, 也無需考慮本地訊息記錄的管理, 那我們可以這樣做

  1. 新建類庫Masa.Contrib.Dispatcher.IntegrationEvents.Cap, 新增Masa.BuildingBlocks.Dispatcher.IntegrationEvents的參照, 並安裝DotNetCore.CAP
dotnet add package DotNetCore.CAP
  1. 新增類IntegrationEventBus, 並實現IIntegrationEventBus
public class IntegrationEventBus : IIntegrationEventBus
{
    private readonly ICapPublisher _publisher;
    private readonly ICapTransaction _capTransaction;
    private readonly IUnitOfWork? _unitOfWork;
    public IntegrationEventBus(ICapPublisher publisher, ICapTransaction capTransaction, IUnitOfWork? unitOfWork = null)
    {
        _publisher = publisher;
        _capTransaction = capTransaction;
        _unitOfWork = unitOfWork;
    }
    
    public Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        // 如果使用事務
        // _publisher.Transaction.Value.DbTransaction = unitOfWork.Transaction;
        // _publisher.Publish(@event.Topic, @event);
        throw new NotImplementedException();
    }

    public IEnumerable<Type> GetAllEventTypes()
    {
        throw new NotImplementedException();
    }

    public Task CommitAsync(CancellationToken cancellationToken = default)
    {
        throw new NotImplementedException();
    }
}

CAP已支援本地事務, 使用當前IUnitOfWork提供的事務, 確保資料的原子性

  1. 新建類ServiceCollectionExtensions, 將自定義Publisher註冊到服務集合
public static class ServiceCollectionExtensions
{
    public static DispatcherOptions UseRabbitMq(this IServiceCollection services)
    {
         //todo: 註冊RabbitMq資訊
         services.TryAddScoped<IIntegrationEventBus, IntegrationEventBus>();
         return dispatcherOptions;
    }
}

已經實現發件箱模式的可以直接使用, 而不需要參照

  • Masa.Contrib.Dispatcher.IntegrationEvents
  • Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
  • Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore

以上未經過實際驗證, 感興趣的可以嘗試下, 歡迎隨時提pr

接入方不支援發件箱模式

我希望直接接入RabbitMq, 但我自己沒有做發件箱模式, 那我可以怎麼做呢?

由於Masa.Contrib.Dispatcher.IntegrationEvents已提供發件箱模式, 如果僅僅希望更換一個釋出事件的實現者, 那我們僅需要實現IPublisher即可

  1. 新建類庫Masa.Contrib.Dispatcher.IntegrationEvents.RabbitMq, 新增Masa.Contrib.Dispatcher.IntegrationEvents專案參照, 並安裝RabbitMQ.Client
dotnet add package RabbitMQ.Client //使用RabbitMq
  1. 新增類Publisher,並實現IPublisher
public class Publisher : IPublisher
{
    public async Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent
    {
        //todo: 通過 RabbitMQ.Client 傳送訊息到RabbitMq
        throw new NotImplementedException();
    }
}
  1. 新建類DispatcherOptionsExtensions, 將自定義Publisher註冊到服務集合
public static class DispatcherOptionsExtensions
{
    public static DispatcherOptions UseRabbitMq(this Masa.Contrib.Dispatcher.IntegrationEvents.Options.DispatcherOptions options)
    {
         //todo: 註冊RabbitMq資訊
         dispatcherOptions.Services.TryAddSingleton<IPublisher, Publisher>();
         return dispatcherOptions;
    }
}
  1. 如何使用自定義實現RabbitMq
builder.Services.AddIntegrationEventBus(option =>
{
    option.UseRabbitMq();//修改為使用RabbitMq
    option.UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
    option.UseEventLog<UserDbContext>();
});

本章原始碼

Assignment12

https://github.com/zhenlei520/MasaFramework.Practice

開源地址

MASA.Framework:https://github.com/masastack/MASA.Framework

MASA.EShop:https://github.com/masalabs/MASA.EShop

MASA.Blazor:https://github.com/BlazorComponent/MASA.Blazor

如果你對我們的 MASA Framework 感興趣,無論是程式碼貢獻、使用、提 Issue,歡迎聯絡我們