MASA Framework -- EventBus入門與設計

2022-11-14 12:10:39

概述

事件匯流排是一種事件釋出/訂閱結構,通過釋出訂閱模式可以解耦不同架構層級,同樣它也可以來解決業務之間的耦合,它有以下優點

  • 鬆耦合
  • 橫切關注點
  • 可測試性
  • 事件驅動

釋出訂閱模式

通過下圖我們可以快速瞭解釋出訂閱模式的本質

  1. 訂閱者將自己關心的事件在排程中心進行註冊
  2. 事件的釋出者通過排程中心把事件釋出出去
  3. 訂閱者收到自己關心的事件變更並執行相對應業務

其中釋出者無需知道訂閱者是誰,訂閱者彼此之間也互不認識,彼此之間互不干擾

事件匯流排型別

在Masa Framework中,將事件劃分為

  • 程序內事件 (Event)

本地事件,它的釋出與訂閱需要在同一個程序中,訂閱方與釋出方需要在同一個專案中

整合事件,它的釋出與訂閱一定不在同一個程序中,訂閱方與釋出方可以在同一個專案中,也可以在不同的專案中

下面我們會用一個註冊使用者的例子來說明如何使用本地事件

入門

  1. 新建ASP.NET Core 空專案Assignment.InProcessEventBus,並安裝Masa.Contrib.Dispatcher.Events
dotnet new web -o Assignment.InProcessEventBus
cd Assignment.InProcessEventBus
dotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.7
  1. 註冊EventBus (用於釋出本地事件), 修改Program.cs
builder.Services.AddEventBus();
  1. 新增RegisterUserEvent類並繼承Event,用於釋出註冊使用者事件
public record RegisterEvent : Event
{
    public string Account { get; set; }

    public string Email { get; set; }

    public string Password { get; set; }
}
  1. 新增註冊使用者處理程式

在指定事件處理程式方法上增加特性 EventHandler,並在方法中增加引數 RegisterUserEvent

public class UserHandler
{
    private readonly ILogger<UserHandler>? _logger;

    public UserHandler(ILogger<UserHandler>? logger = null)
    {
        //todo: 根據需要可在建構函式中注入其它服務 (需支援從DI獲取)
        _logger = logger;
    }

    [EventHandler]
    public void RegisterUser(RegisterUserEvent @event)
    {
        //todo: 1. 編寫註冊使用者業務
        _logger?.LogDebug("-----------{Message}-----------", "檢測使用者是否存在並註冊使用者");
        
        //todo: 2. 編寫傳送註冊通知等
        _logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "傳送郵件提示註冊成功");
    }
}

註冊使用者的處理程式可以放到任意一個類中,但其建構函式引數必須支援從DI獲取,且處理程式的方法僅支援 TaskVoid 兩種, 不支援其它型別

  1. 傳送註冊使用者事件,修改Program.cs
app.MapPost("/register", async (RegisterUserEvent @event, IEventBus eventBus) =>
{
    await eventBus.PublishAsync(@event);
});

進階

處理流程

EventBus的 請求管道包含一系列請求委託,依次呼叫。 它們與ASP.NET Core中介軟體有異曲同工之妙,區別點在於中介軟體的執行順序與註冊順序相反,最先註冊的最後執行

每個委託均可在下一個委託前後執行操作,其中TransactionMiddleware是EventBus釋出後第一個要進入的中介軟體 (預設提供),並且它是不支援多次巢狀的。

EventBus 支援巢狀,這意味著我們可以在Handler中重新發佈一個新的Event,但TransactionMiddleware僅會在最外層進入時被觸發一次

自定義中介軟體

根據需要我們可以自定義中介軟體,並註冊到EventBus的請求管道中,比如通過增加FluentValidation, 將引數驗證從業務程式碼中剝離開來,從而使得處理程式更專注於業務

  1. 註冊FluentValidation, 修改Program.cs
builder.Services.AddValidatorsFromAssembly(Assembly.GetEntryAssembly());
  1. 自定義驗證中介軟體ValidatorMiddleware.cs,用於驗證引數
public class ValidatorMiddleware<TEvent> : Middleware<TEvent>
    where TEvent : IEvent
{
    private readonly ILogger<ValidatorMiddleware<TEvent>>? _logger;
    private readonly IEnumerable<IValidator<TEvent>> _validators;

    public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>>? logger = null)
    {
        _validators = validators;
        _logger = logger;
    }

    public override async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
    {
        var typeName = @event.GetType().FullName;

        _logger?.LogDebug("----- Validating command {CommandType}", typeName);

        var failures = _validators
            .Select(v => v.Validate(@event))
            .SelectMany(result => result.Errors)
            .Where(error => error != null)
            .ToList();

        if (failures.Any())
        {
            _logger?.LogError("Validation errors - {CommandType} - Event: {@Command} - Errors: {@ValidationErrors}",
                typeName,
                @event,
                failures);

            throw new ValidationException("Validation exception", failures);
        }

        await next();
    }
}
  1. 註冊EventBus並使用驗證中介軟體ValidatorMiddleware
builder.Services.AddEventBus(eventBusBuilder=>eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
  1. 新增註冊使用者驗證類RegisterUserEventValidator.cs
public class RegisterUserEventValidator : AbstractValidator<RegisterUserEvent>
{
    public RegisterUserEventValidator()
    {
        RuleFor(e => e.Account).NotNull().WithMessage("使用者名稱不能為空");
        RuleFor(e => e.Email).NotNull().WithMessage("郵箱不能為空");
        RuleFor(e => e.Password)
            .NotNull().WithMessage("密碼不能為空")
            .MinimumLength(6)
            .WithMessage("密碼必須大於6位")
            .MaximumLength(20)
            .WithMessage("密碼必須小於20位");
    }
}

編排

EventBus 支援事件編排,它們可以用來處理一些對執行順序有要求的業務,比如: 註冊使用者必須成功之後才可以傳送註冊郵件通知,傳送獎勵等等,那我們可以這樣做

將註冊使用者業務拆分為三個Handler,並通過指定Order的值來對執行事件排序

public class UserHandler
{
    private readonly ILogger<UserHandler>? _logger;

    public UserHandler(ILogger<UserHandler>? logger = null)
    {
        _logger = logger;
    }

    [EventHandler(1)]
    public void RegisterUser(RegisterUserEvent @event)
    {
        _logger?.LogDebug("-----------{Message}-----------", "檢測使用者是否存在並註冊使用者");
        //todo: 編寫註冊使用者業務
    }

    [EventHandler(2)]
    public void SendAwardByRegister(RegisterUserEvent @event)
    {
        _logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "傳送註冊獎勵");
        //todo: 編寫傳送獎勵等
    }

    [EventHandler(3)]
    public void SendNoticeByRegister(RegisterUserEvent @event)
    {
        _logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "傳送註冊成功郵件");
        //todo: 編寫傳送註冊通知等
    }
}

Saga

EventBus支援Saga模式

具體是怎麼做呢?

[EventHandler(1, IsCancel = true)]
public void CancelSendAwardByRegister(RegisterUserEvent @event)
{
    _logger?.LogDebug("-----------{Account} 註冊成功,發放獎勵失敗 {Message}-----------", @event.Account, "發放獎勵補償");
}

當傳送獎勵出現異常時,則執行補償機制,執行順序為 (2 - 1) > 0,由於目前僅存在一個Order為1的Handler,則執行獎勵補償後退出

但對於部分不需要執行失敗但不需要執行回退的方法,我們可以修改 FailureLevels 確保不會因為當前方法的異常而導致執行補償機制

[EventHandler(3, FailureLevels = FailureLevels.Ignore)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{
    _logger?.LogDebug("-----------{Account} 註冊成功 {Message}-----------", @event.Account, "傳送郵件提示註冊成功");
    //todo: 編寫傳送註冊通知等
}

原始碼解讀

EventHandler

  • FailureLevels: 失敗級別, 預設: Throw
    • Throw:發生異常後,依次執行Order小於當前Handler的Order的取消動作,比如:Handler順序為 1、2、3,CancelHandler為 1、2、3,如果執行 Handler3 異常,則依次執行 2、1
    • ThrowAndCancel:發生異常後,依次執行Order小於等於當前Handler的Order的取消動作,比如:Handler順序為 1、2、3,CancelHandler為 1、2、3,如果執行 Handler3 異常,則依次執行 3、2、1
    • Ignore:發生異常後,忽略當前異常(不執行取消動作),繼續執行其他Handler
  • Order: 執行順序,預設: int.MaxValue,用於控制當前方法的執行順序
  • EnableRetry: 當Handler異常後是否啟用重試, 預設: false
  • RetryTimes: 重試次數,當出現異常後執行多少次重試, 需開啟重試設定
  • IsCancel: 是否是補償機制,預設: false

Middleware

  • SupportRecursive: 是否支援遞迴 (巢狀), 預設: true
    • 部分中介軟體僅在最外層被觸發一次,像TransactionMiddleware 就是如此,但也有很多中介軟體是需要被多次執行的,比如ValidatorMiddleware,每次釋出事件時都需要驗證引數是否正確
  • HandleAsync(TEvent @event, EventHandlerDelegate next): 處理程式,通過呼叫 next() 使得請求進入下一個Handler

IEventHandler 與 ISagaEventHandler

  • HandleAsync(TEvent @event): 提供事件的Handler
  • CancelAsync(TEvent @event): 提供事件的補償Handler

EventHandler功能類似,提供基本的Handler以及補償Handler,推薦使用EventHandler的方式使用

TransactionMiddleware

提供事務中介軟體,當EventBusUoW以及Masa提供的Repository來使用時,當存在待提交的資料時,會自動執行儲存並提交,當出現異常後,會執行事務回滾,無需擔心臟資料入庫

效能測試

與市面上使用較多的MeidatR作了對比,結果如下圖所示:

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)
11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-preview.4.22252.9
[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUG
Job-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT

Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart

Method Mean Error StdDev Median Min Max
AddShoppingCartByEventBusAsync 124.80 us 346.93 us 1,022.94 us 8.650 us 6.500 us 10,202.4 us
AddShoppingCartByMediatRAsync 110.57 us 306.47 us 903.64 us 7.500 us 5.300 us 9,000.1 us

根據效能測試我們發現,EventBus與MediatR效能差距很小,但EventBus提供的功能卻要強大的多

常見問題

  1. 按照檔案操作,通過EventBus釋出事件後,對應的Handler並沒有執行,也沒有發現錯誤?

①. EventBus.PublishAsync(@event) 是非同步方法,確保等待方法呼叫成功,檢查是否出現同步方法呼叫非同步方法的情況
②. 註冊EventBus時指定程式集集合, Assembly被用於註冊時獲取並儲存事件與Handler的對應關係

var assemblies = new[]
{
    typeof(UserHandler).Assembly
};
builder.Services.AddEventBus(assemblies);

程式集: 手動指定Assembly集合 -> MasaApp.GetAssemblies() -> AppDomain.CurrentDomain.GetAssemblies()

但由於NetCore按需載入,未使用的程式集在當前域中不存在,因此可能會導致部分事件以及Handler的對應關係未正確儲存,因此可通過手動指定Assembly集合或者修改全域性設定中的Assembly集合來修復這個問題

  1. 通過EventBus釋出事件,Handler出錯,但資料依然儲存到資料庫中

①. 檢查是否禁用事務

  1. DisableRollbackOnFailure是否為true (是否失敗時禁止回滾)
  2. UseTransaction是否為false (禁止使用事務)

②. 檢查當前資料庫是否支援回滾。例如: 使用的是Mysql資料庫,但回滾資料失敗,請檢視

本章原始碼

Assignment11

https://github.com/zhenlei520/MasaFramework.Practice

開源地址

MASA.Framework:https://github.com/masastack/MASA.Framework

MASA.EShop:https://github.com/masalabs/MASA.EShop

MASA.Blazor:https://github.com/BlazorComponent/MASA.Blazor

如果你對我們的 MASA Framework 感興趣,無論是程式碼貢獻、使用、提 Issue,歡迎聯絡我們