MassTransit | .NET 分散式應用框架

2022-10-23 21:01:08

引言

A free, open-source distributed application framework for .NET.
一個免費、開源的.NET 分散式應用框架。 -- MassTransit 官網

MassTransit,直譯公共交通, 是由Chris Patterson開發的基於訊息驅動的.NET 分散式應用框架,其核心思想是藉助訊息來實現服務之間的鬆耦合非同步通訊,進而確保應用更高的可用性、可靠性和可延伸性。通過對訊息模型的高度抽象,以及對主流的訊息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的整合,大大簡化了基於訊息驅動的開發門檻,同時內建了連線管理、訊息序列化和消費者生命週期管理,以及諸如重試、限流、斷路器等例外處理機制,讓開發者更好的專注於業務實現。
簡而言之,MassTransit實現了訊息代理透明化。無需訊息導向代理程式設計進行諸如連線管理、佇列的申明和繫結等操作,即可輕鬆實現應用間訊息的傳遞和消費。

快速體驗

空口無憑,建立一個專案快速體驗一下。

  1. 基於worker模板建立一個基礎專案:dotnet new worker -n MassTransit.Demo
  2. 開啟專案,新增NuGet包:MassTransit
  3. 定義訂單建立事件訊息契約:
using System;

namespace MassTransit.Demo
{
    public record OrderCreatedEvent
    {
        public Guid OrderId { get; set; }
    }
}
  1. 修改Worker類,傳送訂單建立事件:
namespace MassTransit.Demo;

public class Worker : BackgroundService
{
    readonly IBus _bus;//註冊匯流排
    public Worker(IBus bus)
    {
        _bus = bus;
    }
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            //模擬並行送訂單建立事件
            await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken);
            await Task.Delay(1000, stoppingToken);
        }
    }
}

  1. 僅需實現IConsumer<OrderCreatedEvent>泛型介面,即可實現訊息的訂閱:
public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent>
{
    private readonly ILogger<OrderCreatedEventConsumer> _logger;
    public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger)
    {
        _logger = logger;
    }
    public Task Consume(ConsumeContext<OrderCreatedEvent> context)
    {
        _logger.LogInformation($"Received Order:{context.Message.OrderId}");
        return Task.CompletedTask;
    }
}
  1. 註冊服務:
using MassTransit;
using MassTransit.Demo;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddHostedService<Worker>();
        services.AddMassTransit(configurator =>
        {
            //註冊消費者
            configurator.AddConsumer<OrderCreatedEventConsumer>();
            //使用基於記憶體的訊息路由傳輸
            configurator.UsingInMemory((context, cfg) =>
            {
                cfg.ConfigureEndpoints(context);
            });
        });
    })
    .Build();

await host.RunAsync();

  1. 執行專案,一個簡單的程序內事件釋出訂閱的應用就完成了。

如果需要使用RabbitMQ 訊息代理進行訊息傳輸,則僅需安裝MassTransit.RabbitMQNuGet包,然後指定使用RabbitMQ 傳輸訊息即可。

using MassTransit;
using MassTransit.Demo;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddHostedService<Worker>();
        services.AddMassTransit(configurator =>
        {
            configurator.AddConsumer<OrderCreatedEventConsumer>();
            
            // configurator.UsingInMemory((context, cfg) =>
            // {
            //     cfg.ConfigureEndpoints(context);
            // });
            
            configurator.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host(
                    host: "localhost",
                    port: 5672,
                    virtualHost: "/",
                    configure: hostConfig =>
                    {
                        hostConfig.Username("guest");
                        hostConfig.Password("guest");
                    });
                cfg.ConfigureEndpoints(context);
            });
        });
    })
    .Build();

await host.RunAsync();

執行專案,MassTransit會自動在指定的RabbitMQ上建立一個型別為fanoutMassTransit.Demo.OrderCreatedEventExchange和一個與OrderCreatedEvent同名的佇列進行訊息傳輸,如下圖所示。

核心概念

MassTranist 為了實現訊息代理的透明化和應用間訊息的高效傳輸,抽象了以下概念,其中訊息流轉流程如下圖所示:

  1. Message:訊息契約,定義了訊息生產者和訊息消費者之間的契約。
  2. Producer:生產者,傳送訊息的一方都可以稱為生產者。
  3. SendEndpoint:傳送端點,用於將訊息內容序列化,並行送到傳輸模組。
  4. Transport:傳輸模組,訊息代理透明化的核心,用於和訊息代理通訊,負責傳送和接收訊息。
  5. ReceiveEndpoint:接收端點,用於從傳輸模組接收訊息,反序列化訊息內容,並將訊息路由到消費者。
  6. Consumer:消費者,用於訊息消費。

從上圖可知,本質上還是釋出訂閱模式的實現,接下來就核心概念進行詳解。

Message

Message:訊息,可以使用class、interface、struct和record來建立,訊息作為一個契約,需確保建立後不能篡改,因此應只保留唯讀屬性且不應包含方法和行為。MassTransit使用的是包含名稱空間的完全限定名即typeof(T).FullName來表示特定的訊息型別。因此若在另外的專案中消費同名的訊息型別,需確保訊息的名稱空間相同。另外需注意訊息不應繼承,以避免傳送基礎類別訊息型別造成的不可預期的結果。為避免此類情況,官方建議使用介面來定義訊息。在MassTransit中,訊息主要分為兩種型別:

  1. Command:命令,用於告訴服務做什麼,命令被傳送到指定端點,僅被一個服務接收並執行。一般以動名詞結構命名,如:UpdateAddress、CancelOrder。
  2. Event:事件,用於告訴服務什麼發生了,事件被釋出到多個端點,可以被多個服務消費。 一般以過去式結構命名,如:AddressUpdated,OrderCanceled。

經過MassTransit傳送的訊息,會使用信封包裝,包含一些附加資訊,資料結構舉例如下:

{
    "messageId": "6c600000-873b-00ff-9a8f-08da8da85542",
    "requestId": null,
    "correlationId": null,
    "conversationId": "6c600000-873b-00ff-9526-08da8da85544",
    "initiatorId": null,
    "sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true",
    "destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent",
    "responseAddress": null,
    "faultAddress": null,
    "messageType": [
        "urn:message:MassTransit.Demo:OrderCreatedEvent"
    ],
    "message": {
        "orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"
    },
    "expirationTime": null,
    "sentTime": "2022-09-03T12:32:15.0796943Z",
    "headers": {},
    "host": {
        "machineName": "THINKPAD",
        "processName": "MassTransit.Demo",
        "processId": 24684,
        "assembly": "MassTransit.Demo",
        "assemblyVersion": "1.0.0.0",
        "frameworkVersion": "6.0.5",
        "massTransitVersion": "8.0.6.0",
        "operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0"
    }
}

從以上訊息範例中可以看出一個包裝後的訊息包含以下核心屬性:

  1. messageId:全域性唯一的訊息ID
  2. messageType:訊息型別
  3. message:訊息體,也就是具體的訊息範例
  4. sourceAddress:訊息來源地址
  5. destinationAddress:訊息目標地址
  6. responseAddress:響應地址,在請求響應模式中使用
  7. faultAddress:訊息異常傳送地址,用於儲存異常消費訊息
  8. headers:訊息頭,允許應用自定義擴充套件資訊
  9. correlationId:關聯Id,在Saga狀態機中會用到,用來關聯絡列事件
  10. host:宿主,訊息來源應用的宿主資訊

Producer

Producer,生產者,即用於生產訊息。在MassTransit主要藉助以下物件進行命令的傳送和事件的釋出。

從以上類圖可以看出,訊息的傳送主要核心依賴於兩個介面:

  1. ISendEndpoint:提供了Send方法,用於傳送命令。
  2. IPublishEndpoint:提供了Publish方法,用於釋出事件。

但基於上圖的繼承體系,可以看出通過IBusISendEndpointProviderConsumeContext進行命令的傳送;通過IBusIPublishEndpointProvider進行事件的釋出。具體舉例如下:

傳送命令

  1. 通過IBus傳送:
private readonly IBus _bus;
public async Task Post(CreateOrderRequest request)
{
    //通過以下方式設定對應訊息型別的目標地址
    EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));
    await _bus.Send(request);
}
  1. 通過ISendEndpointProvider傳送:
private readonly ISendEndpointProvider  _sendEndpointProvider;
public async Task Post(CreateOrderRequest request)
{
    var serviceAddress = new Uri("queue:create-order");
    var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);
    await endpoint.Send(request);
}
  1. 通過ConsumeContext傳送:
public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>
{    
    public async Task Consume(ConsumeContext<CreateOrderRequest> context)
    {
    	//do something else
        var destinationAddress = new Uri("queue:lock-stock");
        var command = new LockStockRequest(context.Message.OrderId);
       
        await context.Send<LockStockRequest>(destinationAddress, command);
 		// 也可以通過獲取`SendEndpoint`傳送命令
        // var endpoint = await context.GetSendEndpoint(destinationAddress);
        // await endpoint.Send<LockStockRequest>(command);
    	
    }
}

釋出事件

  1. 通過IBus釋出:
private readonly IBus _bus;
public async Task Post(CreateOrderRequest request)
{
    //do something
    await _bus.Publish(request);
}
  1. 通過IPublishEndpoint釋出:
private readonly IPublishEndpoint _publishEndpoint;
public async Task Post(CreateOrderRequest request)
{
    //do something
    var order = CreateOrder(request);
    await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));
}
  1. 通過ConsumeContext釋出:
public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>
{    
    public async Task Consume(ConsumeContext<CreateOrderRequest> context)
    {
        var order = CreateOrder(conext.Message);
    	await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));
    }
}

Consumer

Consumer,消費者,即用於消費訊息。MassTransit 包括多種消費者型別,主要分為無狀態和有狀態兩種消費者型別。

無狀態消費者

無狀態消費者,即消費者無狀態,訊息消費完畢,消費者就釋放。主要的消費者型別有:IConsumer<TMessage>JobConsumerIActivityRoutingSlip等。其中IConsumer<TMessage>已經在上面的快速體驗部分舉例說明。而JobConsumer<TMessage>主要是對IConsumer<TMessage>的補充,其主要應用場景在於執行耗時任務。
而對於IActivityRoutingSlip則是MassTransit Courier的核心物件,主要用於實現Saga模式的分散式事務。MassTransit Courier 實現了Routing Slip模式,通過按需有序組合一系列的Activity,得到一個用來限定訊息處理順序的Routing Slip。而每個Activity的具體抽象就是IActivityIExecuteActivity。二者的差別在於IActivity定義了ExecuteCompensate兩個方法,而IExecuteActivitiy僅定義了Execute方法。其中Execute代表正向操作,Compensate代表反向補償操作。用一個簡單的下單流程:建立訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。而對於具體實現,可參閱文章:AspNetCore&MassTransit Courier實現分散式事務

有狀態消費者

有狀態消費者,即消費者有狀態,其狀態會持久化,代表的消費者型別為MassTransitStateMachineMassTransitStateMachineMassTransit Automatonymous 庫定義的,Automatonymous 是一個.NET 狀態機庫,用於定義狀態機,包括狀態、事件和行為。MassTransitStateMachine就是狀態機的具體抽象,可以用其編排一系列事件來實現狀態的流轉,也可以用來實現Saga模式的分散式事務。並支援與EF Core和Dapper整合將狀態持久化到關係型資料庫,也支援將狀態持久化到MongoDB、Redis等資料庫。MassTransitStateMachine對於Saga模式分散式事務的實現方式與RoutingSlip不同,還是以簡單的下單流程:建立訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。基於MassTransitStateMachine 實現分散式事務詳參後續文章。

從上圖可知,通過MassTransitStateMachine可以將事件的執行順序邏輯編排在一個集中的狀態機中,通過傳送命令和訂閱事件來推動狀態流轉,而這也正是Saga編排模式的實現。

應用場景

瞭解完MassTransit的核心概念,接下來再來看下MassTransit的核心特性以及應用場景:

  1. 基於訊息的請求響應模式:可用於同步通訊
  2. Mediator模式:中間者模式的實現,類似MediatR,但功能更完善
  3. 計劃任務:可用於執行定時任務
  4. Routing Slip 模式:可用於實現Saga模式的分散式事務
  5. Saga 狀態機:可用於實現Saga模式的分散式事務
  6. 本地訊息表:類似DotNetCore.Cap,用於實現最終一致性

總體而言,MassTransit是一款優秀的分散式應用框架,可作為分散式應用的訊息匯流排,也可以用作單體應用的事件匯流排。感興趣的朋友不妨一觀。