跨程序事件匯流排允許釋出和訂閱跨服務傳輸的訊息, 服務的釋出與訂閱不在同一個程序中
在Masa Framework中, 跨程序匯流排事件提供了一個可以被開箱即用的程式
跨程序事件與Dapr
並不是強繫結的, Masa Framework使用了Dapr
提供的pub/sub的能力, 如果你不想使用它, 你也可以更換為其它實現, 但目前Masa Framwork中僅提供了Dapr
的實現
Assignment.IntegrationEventBus
,並安裝Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
、Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EF
、Masa.Contrib.Data.EFCore.Sqlite
、Masa.Contrib.Data.UoW.EFCore
、Masa.Contrib.Development.DaprStarter.AspNetCore
、Microsoft.EntityFrameworkCore.Design
dotnet new web -o Assignment.IntegrationEventBus
cd Assignment.IntegrationEventBus
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.8 // 使用dapr提供的pubsub能力
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EF --version 0.7.0-preview.8 //本地訊息表
dotnet add package Masa.Contrib.Data.EFCore.Sqlite --version 0.7.0-preview.8 //使用EfCore.Sqlite
dotnet add package Masa.Contrib.Data.UoW.EFCore --version 0.7.0-preview.8 //使用工作單元
dotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.8 //開發環境使用DaprStarter協助管理Dapr Sidecar
dotnet add package Microsoft.EntityFrameworkCore.Design --version 6.0.6 //方便後續通過CodeFirst遷移資料庫
UserDbContext
,並繼承MasaDbContext
public class UserDbContext : MasaDbContext
{
public UserDbContext(MasaDbContextOptions<UserDbContext> options) : base(options)
{
}
}
DaprStarter
, 協助管理Dapr Sidecar
, 修改Program.cs
if (builder.Environment.IsDevelopment())
{
builder.Services.AddDaprStarter();
}
通過
Dapr
釋出整合事件需要執行Dapr
, 線上環境可通過Kubernetes
來執行, 開發環境可藉助Dapr Starter執行Dapr
, 因此僅需要在開發環境使用它
Program
builder.Services.AddIntegrationEventBus(option =>
{
option.UseDapr()
.UseEventLog<UserDbContext>()
.UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
});
var app = builder.Build();
#region dapr 訂閱整合事件使用
app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
});
#endregion
RegisterUserEvent
public record RegisterUserEvent : IntegrationEvent
{
public override string Topic { get; set; } = nameof(RegisterUserEvent);
public string Account { get; set; }
public string Mobile { get; set; }
}
Assignment.IntegrationEventBus
所在資料夾,開啟cmd或Powershell執行dotnet ef migrations add init //建立遷移
dotnet ef database update //更新資料庫
Program
app.MapPost("/register", async (IIntegrationEventBus eventBus) =>
{
//todo: 模擬註冊使用者並行布註冊使用者事件
await eventBus.PublishAsync(new RegisterUserEvent()
{
Account = "Tom",
Mobile = "19999999999"
});
});
Program
app.MapPost("/IntegrationEvent/RegisterUser", [Topic("pubsub", nameof(RegisterUserEvent))](RegisterUserEvent @event) =>
{
Console.WriteLine($"註冊使用者成功: {@event.Account}");
});
訂閱事件暫時未抽象,目前使用的是
Dapr
原生的訂閱方式,後續我們會支援Bind,屆時不會由於更換pubsub的實現而導致訂閱方式的改變
儘管跨程序事件目前僅支援了Dapr
,但這不代表你與RabbitMq
、Kafka
等無緣,釋出/訂閱是Dapr
抽象出的能力,實現釋出訂閱的元件有很多種,RabbitMq
、Kafka
是其中一種實現,如果你想深入瞭解他們之間的關係,可以參考:
首先我們先要知道的基礎知識點:
RowVersion
賦值)提供了整合事件介面的實現類, 並支援了發件箱模式, 其中:
在Masa.Contrib.Dispatcher.IntegrationEvents
中僅提供了發件箱的功能, 但整合事件的釋出是由 IPublisher
的實現類來提供, 由Db獲取本地訊息表的功能是由IIntegrationEventLogService
的實現類來提供, 它們分別屬於Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
、Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
的功能, 這也是為什麼使用整合事件需要參照包
Masa.Contrib.Dispatcher.IntegrationEvents
Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
那會有小夥伴問了, 我現在沒有使用Dapr
, 未來一段時間暫時也還不希望接入Dapr
, 我想自己接入, 以實現整合事件的釋出可以嗎?
當然是可以的, 如果你希望自行實現整合事件, 那麼這個時候你會遇到兩種情況
以社群用的較多的庫CAP為例, 由於它本身已經完成了發件箱模式, 我們不需要再處理本地訊息表, 也無需考慮本地訊息記錄的管理, 那我們可以這樣做
Masa.Contrib.Dispatcher.IntegrationEvents.Cap
, 新增Masa.BuildingBlocks.Dispatcher.IntegrationEvents
的參照, 並安裝DotNetCore.CAP
dotnet add package DotNetCore.CAP
IntegrationEventBus
, 並實現IIntegrationEventBus
public class IntegrationEventBus : IIntegrationEventBus
{
private readonly ICapPublisher _publisher;
private readonly ICapTransaction _capTransaction;
private readonly IUnitOfWork? _unitOfWork;
public IntegrationEventBus(ICapPublisher publisher, ICapTransaction capTransaction, IUnitOfWork? unitOfWork = null)
{
_publisher = publisher;
_capTransaction = capTransaction;
_unitOfWork = unitOfWork;
}
public Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
// 如果使用事務
// _publisher.Transaction.Value.DbTransaction = unitOfWork.Transaction;
// _publisher.Publish(@event.Topic, @event);
throw new NotImplementedException();
}
public IEnumerable<Type> GetAllEventTypes()
{
throw new NotImplementedException();
}
public Task CommitAsync(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
}
CAP已支援本地事務, 使用當前
IUnitOfWork
提供的事務, 確保資料的原子性
ServiceCollectionExtensions
, 將自定義Publisher
註冊到服務集合public static class ServiceCollectionExtensions
{
public static DispatcherOptions UseRabbitMq(this IServiceCollection services)
{
//todo: 註冊RabbitMq資訊
services.TryAddScoped<IIntegrationEventBus, IntegrationEventBus>();
return dispatcherOptions;
}
}
已經實現發件箱模式的可以直接使用, 而不需要參照
Masa.Contrib.Dispatcher.IntegrationEvents
Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
以上未經過實際驗證, 感興趣的可以嘗試下, 歡迎隨時提
pr
我希望直接接入RabbitMq
, 但我自己沒有做發件箱模式, 那我可以怎麼做呢?
由於Masa.Contrib.Dispatcher.IntegrationEvents
已提供發件箱模式, 如果僅僅希望更換一個釋出事件的實現者, 那我們僅需要實現IPublisher
即可
Masa.Contrib.Dispatcher.IntegrationEvents.RabbitMq
, 新增Masa.Contrib.Dispatcher.IntegrationEvents
專案參照, 並安裝RabbitMQ.Client
dotnet add package RabbitMQ.Client //使用RabbitMq
Publisher
,並實現IPublisher
public class Publisher : IPublisher
{
public async Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent
{
//todo: 通過 RabbitMQ.Client 傳送訊息到RabbitMq
throw new NotImplementedException();
}
}
DispatcherOptionsExtensions
, 將自定義Publisher
註冊到服務集合public static class DispatcherOptionsExtensions
{
public static DispatcherOptions UseRabbitMq(this Masa.Contrib.Dispatcher.IntegrationEvents.Options.DispatcherOptions options)
{
//todo: 註冊RabbitMq資訊
dispatcherOptions.Services.TryAddSingleton<IPublisher, Publisher>();
return dispatcherOptions;
}
}
RabbitMq
builder.Services.AddIntegrationEventBus(option =>
{
option.UseRabbitMq();//修改為使用RabbitMq
option.UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
option.UseEventLog<UserDbContext>();
});
Assignment12
https://github.com/zhenlei520/MasaFramework.Practice
MASA.Framework:https://github.com/masastack/MASA.Framework
MASA.EShop:https://github.com/masalabs/MASA.EShop
MASA.Blazor:https://github.com/BlazorComponent/MASA.Blazor
如果你對我們的 MASA Framework 感興趣,無論是程式碼貢獻、使用、提 Issue,歡迎聯絡我們
本文來自部落格園,作者:磊_磊,轉載請註明原文連結:https://www.cnblogs.com/zhenlei520/p/16913798.html
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連結,否則保留追究法律責任的權利