MassTransit類庫Saga檔案翻譯

2023-05-28 18:01:01

翻譯自 Saga State Machines

Saga State Machines(狀態機)

Saga State Machines(狀態機)以前被稱為Automatonymous,從v8開始被合併到masstrtransit程式碼庫中。

介紹

Automatonymous是.Net的State Machines(狀態機)類庫,它提供了一種C#語法來定義State Machines,包括狀態、事件和行為。MassTransit包括Automatonymous,並新增了範例儲存、事件關聯、訊息繫結、請求和響應支援以及排程。

Automatonymous不再是一個獨立的NuGet包,它已經被MassTransit包含了。在以前的版本中,需要額外的包參照。所以之前如果參照了Automatonymous,則必須刪除該參照,因為它不再相容。

State Machine(狀態機)

State Machine(狀態機)定義狀態、事件和行為。實現一個派生自MassTransitStateMachine<T>的狀態機類,該類只建立一次,然後用於將事件觸發的行為應用於狀態機範例。

public class OrderStateMachine:MassTransitStateMachine<OrderState>
{}

Instance(範例)

Instance包含狀態機範例的資料。當沒有找到具有相同CorrelationId的現有範例時,將為每個已消費的初始事件建立一個新範例。一個Saga Repository用於持久化範例。範例是類,並且必須實現SagaStateMachineInstance介面。

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState);
    }
}

Instance範例必須儲存當前狀態(CurrentState),它可以是以下三種型別之一:

型別 描述
State 介面狀態SagaStateMachineInstance。可能難以序列化,通常僅用於記憶體範例,但如果repository儲存引擎支援將使用者型別對映到儲存型別,則可以使用。
string State的名稱。但是,它佔用了大量空間,因為每個範例都重複狀態名。
int 小,快,但要求指定每個可能的狀態,以便為每個狀態分配int值。

如果CurrentState範例狀態屬性是state,則自動設定它。對於string或int型別,必須使用InstanceState方法。

要指定int狀態值,請設定instance範例狀態,如下所示。

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public int CurrentState { get; set; }
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState, Submitted, Accepted);
    }
}

結果如下值:

0 - None, 1 - Initial, 2 - Final, 3 - Submitted, 4 - Accepted

State(狀態)

States(狀態)表示事件(events)消費後範例的當前狀態。一個範例在給定時間只能處於一種狀態。新範例預設為初始(Initial)狀態,這是自動定義的。還為所有狀態機定義了最終(Final)狀態,並用於表示範例已達到最終狀態。

在這個例子中,宣告了兩個狀態(State)。狀態由MassTransitStateMachine基礎類別建構函式自動初始化。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; }
    public State Accepted { get; private set; }
}

Event(事件)

事件(Event)是可能導致狀態(State)變化的發生的事情。事件(Event)可以新增或更新範例資料,也可以更改範例(instance)的當前狀態。Event是泛型的,其中T必須是有效的訊息型別。

在下面的範例中,SubmitOrder訊息被宣告為一個事件,包括如何將該事件與範例關聯。

除非事件實現了 CorrelatedBy,否則它們必須用關聯表示式宣告。

public interface SubmitOrder
{
    Guid OrderId { get; }    
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
    }
    public Event<SubmitOrder> SubmitOrder { get; private set; }
}

Behavior(行為)

行為是指在狀態(state)中發生事件(event)時所發生的情況。

下面,Initial塊用於定義在Initial狀態期間SubmitOrder事件的行為。當使用SubmitOrder訊息並且沒有找到具有與OrderId匹配的CorrelationId的範例時,將在Initial狀態下建立一個新範例。TransitionTo activity 將範例轉換到Submitted狀態,之後使用saga repository持久化範例。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted));
    }
}

隨後,OrderAccepted事件可以通過下面所示的行為來處理。

public interface OrderAccepted
{
    Guid OrderId { get; }    
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));
    }
    public Event<OrderAccepted> OrderAccepted { get; private set; }
}
Message Order(訊息順序)

Message brokers(MQ)通常不保證訊息順序。因此,在狀態機(state machine)設計中考慮無序訊息是很重要的。

在上面的範例中,在OrderAccepted事件之後接收SubmitOrder訊息可能會導致SubmitOrder訊息在_error佇列中結束。如果OrderAccepted事件首先被接收,它將被丟棄,因為它在初始(Initial)狀態下不被接受。下面是處理這兩種場景的更新狀態機。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted),
            When(OrderAccepted)
                .TransitionTo(Accepted));
        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));
        During(Accepted,
            Ignore(SubmitOrder));
    }
}

在更新後的範例中,在接受(Accepted)狀態下接收SubmitOrder訊息會忽略該事件。然而,事件中的資料可能是有用的。在這種情況下,可以新增將資料複製到範例的行為。下面,在兩個場景中捕獲事件的資料。

public interface SubmitOrder
{
    Guid OrderId { get; }

    DateTime OrderDate { get; }
}

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public DateTime? OrderDate { get; set; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .Then(x => x.Saga.OrderDate = x.Message.OrderDate)
                .TransitionTo(Submitted),
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Accepted,
            When(SubmitOrder)
                .Then(x => x.Saga.OrderDate = x.Message.OrderDate));
    }
}

Configuration(設定)

設定saga state machine(狀態機)

services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .InMemoryRepository();
});

上面的範例使用記憶體中的saga repository,但是可以使用任何saga repository。永續性部分包括受支援的saga repository的詳細資訊。

要測試state machine(狀態機),請參閱測試部分。

Event(事件)

如上所示,事件(event)是狀態機(state machine)可以使用的訊息。事件(event)可以指定任何有效的訊息型別,並且可以設定每個事件。有幾種事件設定方法可用。

內建的CorrelatedBy<Guid>介面可以在訊息約定中使用,以指定事件CorrelationId

public interface OrderCanceled :
    CorrelatedBy<Guid>
{    
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderCanceled); // not required, as it is the default convention
    }
}

雖然上面顯式宣告了事件(event),但這不是必需的。預設將會自動的設定為CorrelatedBy<Guid>介面的事件(event)。

雖然方便,但有些人認為介面是對訊息契約基礎設施的入侵。MassTransit還支援一種宣告性方法來為事件指定CorrelationId。通過設定全域性訊息拓撲,可以指定要用於關聯的訊息屬性。

public interface SubmitOrder
{    
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    // this is shown here, but can be anywhere in the application as long as it executes
    // before the state machine instance is created. Startup, etc. is a good place for it.
    // It only needs to be called once per process.
    static OrderStateMachine()
    {
        GlobalTopology.Send.UseCorrelationId<SubmitOrder>(x => x.OrderId);
    }

    public OrderStateMachine()
    {
        Event(() => SubmitOrder);
    }

    public Event<SubmitOrder> SubmitOrder { get; private set; }
}

另一種方法是宣告事件相關性,如下所示。當上述兩種方法都未使用時,應使用此方法。

public interface SubmitOrder
{    
    Guid OrderId { get; }
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
    }
    public Event<SubmitOrder> SubmitOrder { get; private set; }
}

因為OrderId是一個Guid,所以它可以用於事件關聯。當在初始狀態下接受SubmitOrder時,由於OrderId是Guid,因此新範例上的CorrelationId會自動分配OrderId值。
還可以使用查詢表示式關聯事件,當事件沒有與範例的CorrelationId屬性關聯時,需要使用查詢表示式。查詢的開銷更大,並且可能匹配多個範例,在設計狀態機和事件時應該考慮到這一點。

只要可能,嘗試使用CorrelationId進行關聯。如果需要查詢,則可能需要在屬性上建立索引,以便優化資料庫查詢。

要使用另一種型別關聯事件,需要額外的設定。

public interface ExternalOrderSubmitted
{    
    string OrderNumber { get; }
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => ExternalOrderSubmitted, e => e
            .CorrelateBy(i => i.OrderNumber, x => x.Message.OrderNumber)
            .SelectId(x => NewId.NextGuid()));
    }
    public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }
}

還可以使用兩個引數編寫查詢,這兩個引數直接傳遞給repository(並且必須得到後臺資料庫的支援)。

public interface ExternalOrderSubmitted
{    
    string OrderNumber { get; }
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => ExternalOrderSubmitted, e => e
            .CorrelateBy((instance,context) => instance.OrderNumber == context.Message.OrderNumber)
            .SelectId(x => NewId.NextGuid()));
    }
    public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }
}

當事件沒有與範例唯一相關的Guid時,必須設定.selectid表示式。在上面的範例中,NewId用於生成一個順序識別符號,該識別符號將分配給範例CorrelationId。事件上的任何屬性都可以用來初始化CorrelationId。

Ignore Event(忽略事件)

可能有必要忽略給定狀態下的事件,以避免錯誤生成,或者防止訊息被移動到_skip佇列。要忽略某個狀態中的事件,請使用ignore方法。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted),
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Accepted,
            Ignore(SubmitOrder));
    }
}

Composite Event(組合事件)

通過指定一個或多個必須使用的事件來設定組合事件,之後將引發組合事件。組合事件使用範例屬性來跟蹤所需的事件,這是在設定期間指定的。

要定義組合事件,必須首先設定所需的事件以及任何事件行為,然後才能設定組合事件。

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public int ReadyEventStatus { get; set; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted),
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));

        CompositeEvent(() => OrderReady, x => x.ReadyEventStatus, SubmitOrder, OrderAccepted);

        DuringAny(
            When(OrderReady)
                .Then(context => Console.WriteLine("Order Ready: {0}", context.Saga.CorrelationId)));
    }

    public Event OrderReady { get; private set; }
}

一旦使用了SubmitOrderOrderAccepted事件,就會觸發OrderReady事件。

Missing Instance

如果事件與範例不匹配,則可以設定缺失的範例行為

public interface RequestOrderCancellation
{    
    Guid OrderId { get; }
}

public interface OrderNotFound
{
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderCancellationRequested, e =>
        {
            e.CorrelateById(context => context.Message.OrderId);

            e.OnMissingInstance(m =>
            {
                return m.ExecuteAsync(x => x.RespondAsync<OrderNotFound>(new { x.OrderId }));
            });
        });
    }

    public Event<RequestOrderCancellation> OrderCancellationRequested { get; private set; }
}

在本例中,當在沒有匹配範例的情況下使用取消訂單請求時,將傳送未找到訂單的響應。響應更顯式,而不是生成Fault。其他缺少的範例選項包括DiscardFaultExecute (ExecuteAsync的同步版本)。

Initial Insert(初始化插入)

為了提高新範例的效能,將事件設定為直接插入到saga repository中可以減少鎖爭用。要設定要插入的事件,它應該位於initial塊中,並指定一個saga工廠。

public interface SubmitOrder
{    
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => SubmitOrder, e => 
        {
            e.CorrelateById(context => context.Message.OrderId));

            e.InsertOnInitial = true;
            e.SetSagaFactory(context => new OrderState
            {
                CorrelationId = context.Message.OrderId
            })
        });

        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted));
    }

    public Event<SubmitOrder> SubmitOrder { get; private set; }
}

在使用InsertOnInitial時,至關重要的是,saga repository能夠檢測重複的鍵(在本例中,是使用OrderId初始化的CorrelationId)。在這種情況下,在CorrelationId上使用叢集主鍵可以防止插入重複的範例。如果使用不同的屬性關聯事件,請確保資料庫對範例屬性實施唯一約束,並且saga工廠使用事件屬性值初始化範例屬性。

public interface ExternalOrderSubmitted
{    
    string OrderNumber { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => ExternalOrderSubmitted, e => 
        {
            e.CorrelateBy(i => i.OrderNumber, x => x.Message.OrderNumber)
            e.SelectId(x => NewId.NextGuid());

            e.InsertOnInitial = true;
            e.SetSagaFactory(context => new OrderState
            {
                CorrelationId = context.CorrelationId ?? NewId.NextGuid(),
                OrderNumber = context.Message.OrderNumber,
            })
        });

        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted));
    }

    public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }
}

資料庫將對OrderNumber使用唯一約束來防止重複,saga repository將將其檢測為現有範例,然後載入該範例以使用事件。

Completed Instance

預設情況下,範例不會從saga repository中刪除。若要設定已完成的範例刪除,請指定用於確定範例是否已完成的方法。

public interface OrderCompleted
{    
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));

        DuringAny(
            When(OrderCompleted)
                .Finalize());

        ();
    }

    public Event<OrderCompleted> OrderCompleted { get; private set; }
}

當範例使用OrderCompleted事件時,範例將被完成(它將範例轉換為Final狀態)。SetCompletedWhenFinalized方法將一個處於Final狀態的範例定義為已完成——然後由saga repository使用它來刪除該範例。

要使用不同的完成表示式,例如檢查範例是否處於完成狀態的表示式,請使用SetCompleted方法,如下所示。

public interface OrderCompleted
{    
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));

        DuringAny(
            When(OrderCompleted)
                .TransitionTo(Completed));

        SetCompleted(async instance => 
        {
            State<TInstance> currentState = await this.GetState(instance);

            return Completed.Equals(currentState);
        });
    }

    public State Completed { get; private set; }
    public Event<OrderCompleted> OrderCompleted { get; private set; }
}

Activities

狀態機行為被定義為響應事件而執行的一系列活動。除了automautonomous中包含的活動之外,MassTransit還包括用於傳送、釋出和排程訊息以及發起和響應請求的活動。

Publish

要釋出事件,請新增publish活動。

public interface OrderSubmitted
{
    Guid OrderId { get; }    
}

public class OrderSubmittedEvent :
    OrderSubmitted
{
    public OrderSubmittedEvent(Guid orderId)
    {
        OrderId = orderId;
    }

    public Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .Publish(context => (OrderSubmitted)new OrderSubmittedEvent(context.Saga.CorrelationId))
                .TransitionTo(Submitted));
    }
}

或者,可以使用訊息初始化器來去除Event類。

public interface OrderSubmitted
{
    Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .PublishAsync(context => context.Init<OrderSubmitted>(new { OrderId = context.Saga.CorrelationId }))
                .TransitionTo(Submitted));
    }
}

Send

要傳送訊息,請新增send活動。

public interface UpdateAccountHistory
{
    Guid OrderId { get; }    
}

public class UpdateAccountHistoryCommand :
    UpdateAccountHistory
{
    public UpdateAccountHistoryCommand(Guid orderId)
    {
        OrderId = orderId;
    }

    public Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine(OrderStateMachineSettings settings)
    {
        Initially(
            When(SubmitOrder)
                .Send(settings.AccountServiceAddress, context => new UpdateAccountHistoryCommand(context.Saga.CorrelationId))
                .TransitionTo(Submitted));
    }
}

或者,可以使用訊息初始化器來去除Command類。

public interface UpdateAccountHistory
{
    Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine(OrderStateMachineSettings settings)
    {
        Initially(
            When(SubmitOrder)
                .SendAsync(settings.AccountServiceAddress, context => context.Init<UpdateAccountHistory>(new { OrderId = context.Saga.CorrelationId }))
                .TransitionTo(Submitted));
    }
}

Respond

狀態機可以通過將請求訊息型別設定為事件,並使用response方法來響應請求。在設定請求事件時,建議設定缺失的實體方法,以提供更好的響應體驗(通過不同的響應型別,或者通過指示未找到範例的響應)。

public interface RequestOrderCancellation
{    
    Guid OrderId { get; }
}

public interface OrderCanceled
{
    Guid OrderId { get; }
}

public interface OrderNotFound
{
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderCancellationRequested, e =>
        {
            e.CorrelateById(context => context.Message.OrderId);

            e.OnMissingInstance(m =>
            {
                return m.ExecuteAsync(x => x.RespondAsync<OrderNotFound>(new { x.OrderId }));
            });
        });

        DuringAny(
            When(OrderCancellationRequested)
                .RespondAsync(context => context.Init<OrderCanceled>(new { OrderId = context.Saga.CorrelationId }))
                .TransitionTo(Canceled));
    }

    public State Canceled { get; private set; }
    public Event<RequestOrderCancellation> OrderCancellationRequested { get; private set; }
}

有些場景需要等待狀態機的響應。在這些場景中,應該儲存響應原始請求所需的資訊。

public record CreateOrder(Guid CorrelationId) : CorrelatedBy<Guid>;

public record ProcessOrder(Guid OrderId, Guid ProcessingId);

public record OrderProcessed(Guid OrderId, Guid ProcessingId);

public record OrderCancelled(Guid OrderId, string Reason);

public class ProcessOrderConsumer : IConsumer<ProcessOrder>
{
    public async Task Consume(ConsumeContext<ProcessOrder> context)
    {
        await context.RespondAsync(new OrderProcessed(context.Message.OrderId, context.Message.ProcessingId));
    }
}

public class OrderState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public Guid? ProcessingId { get; set; }
    public Guid? RequestId { get; set; }
    public Uri ResponseAddress { get; set; }
    public Guid OrderId { get; set; }
}

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public State Created { get; set; }
    
    public State Cancelled { get; set; }
    
    public Event<CreateOrder> OrderSubmitted { get; set; }
    
    public Request<OrderState, ProcessOrder, OrderProcessed> ProcessOrder { get; set; }
    
    public OrderStateMachine()
    {
        InstanceState(m => m.CurrentState);
        Event(() => OrderSubmitted);
        Request(() => ProcessOrder, order => order.ProcessingId, config => { config.Timeout = TimeSpan.Zero; });

        Initially(
            When(OrderSubmitted)
                .Then(context =>
                {
                    context.Saga.CorrelationId = context.Message.CorrelationId;
                    context.Saga.ProcessingId = Guid.NewGuid();

                    context.Saga.OrderId = Guid.NewGuid();

                    context.Saga.RequestId = context.RequestId;
                    context.Saga.ResponseAddress = context.ResponseAddress;
                })
                .Request(ProcessOrder, context => new ProcessOrder(context.Saga.OrderId, context.Saga.ProcessingId!.Value))
                .TransitionTo(ProcessOrder.Pending));
        
        During(ProcessOrder.Pending,
            When(ProcessOrder.Completed)
                .TransitionTo(Created)
                .ThenAsync(async context =>
                {
                    var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                    await endpoint.Send(context.Saga, r => r.RequestId = context.Saga.RequestId);
                }),
            When(ProcessOrder.Faulted)
                .TransitionTo(Cancelled)
                .ThenAsync(async context =>
                {
                    var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                    await endpoint.Send(new OrderCancelled(context.Saga.OrderId, "Faulted"), r => r.RequestId = context.Saga.RequestId);
                }),
            When(ProcessOrder.TimeoutExpired)
                .TransitionTo(Cancelled)
                .ThenAsync(async context =>
                {
                    var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                    await endpoint.Send(new OrderCancelled(context.Saga.OrderId, "Time-out"), r => r.RequestId = context.Saga.RequestId);
                }));
    }
}

Schedule

狀態機可以排程事件,它使用訊息排程器來排程要傳遞給範例的訊息。首先,必須宣告Schedule。

public interface OrderCompletionTimeoutExpired
{
    Guid OrderId { get; }
}

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public Guid? OrderCompletionTimeoutTokenId { get; set; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Schedule(() => OrderCompletionTimeout, instance => instance.OrderCompletionTimeoutTokenId, s =>
        {
            s.Delay = TimeSpan.FromDays(30);

            s.Received = r => r.CorrelateById(context => context.Message.OrderId);
        });
    }

    public Schedule<OrderState, OrderCompletionTimeoutExpired> OrderCompletionTimeout { get; private set; }
}

設定指定了可以被排程活動覆蓋的Delay,以及Received事件的相關表示式。狀態機可以使用Received事件,如下所示。

OrderCompletionTimeoutTokenId是一個Guid?用於跟蹤計劃訊息tokenId的範例屬性,稍後可使用該屬性取消對事件的計劃。

public interface OrderCompleted
{
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        During(Accepted,
            When(OrderCompletionTimeout.Received)
                .PublishAsync(context => context.Init<OrderCompleted>(new { OrderId = context.Saga.CorrelationId }))
                .Finalize());
    }

    public Schedule<OrderState, OrderCompletionTimeoutExpired> OrderCompletionTimeout { get; private set; }
}

可以使用Schedule活動安排事件。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        During(Submitted,
            When(OrderAccepted)
                .Schedule(OrderCompletionTimeout, context => context.Init<OrderCompletionTimeoutExpired>(new { OrderId = context.Saga.CorrelationId }))
                .TransitionTo(Accepted));
    }
}

如上所述,可以通過Schedule活動覆蓋延遲。範例和訊息(context.Data)內容都可以用來計算延遲。

public interface OrderAccepted
{
    Guid OrderId { get; }    
    TimeSpan CompletionTime { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        During(Submitted,
            When(OrderAccepted)
                .Schedule(OrderCompletionTimeout, context => context.Init<OrderCompletionTimeoutExpired>(new { OrderId = context.Saga.CorrelationId }),
                    context => context.Message.CompletionTime)
                .TransitionTo(Accepted));
    }
}

一旦收到預定的事件,就會清除OrderCompletionTimeoutTokenId屬性。

如果不再需要計劃的事件,則可以使用Unschedule活動。

public interface OrderAccepted
{
    Guid OrderId { get; }    
    TimeSpan CompletionTime { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        DuringAny(
            When(OrderCancellationRequested)
                .RespondAsync(context => context.Init<OrderCanceled>(new { OrderId = context.Saga.CorrelationId }))
                .Unschedule(OrderCompletionTimeout)
                .TransitionTo(Canceled));
    }
}

Request

狀態機可以使用request方法傳送請求,該方法指定了請求型別和響應型別。可以指定其他請求設定,包括ServiceAddress和Timeout。

如果指定了ServiceAddress,它應該是將響應請求的服務的端點地址。如果沒有指定,請求將被髮布。

預設超時時間為30秒,但任何大於或等於TimeSpan.Zero的值都可以。當傳送的請求超時大於零時,將排程TimeoutExpired訊息。指定TimeSpan.Zero 不會排程超時訊息,並且請求永遠不會超時。

在定義請求時,應該指定一個範例屬性來儲存用於將響應與狀態機範例相關聯的RequestId。當請求掛起時,RequestId儲存在屬性中。當請求完成後,該屬性被清除。如果請求超時或出現錯誤,則保留requesttid,以便在請求最終完成後進行關聯(例如將請求從_error佇列移回服務佇列)。

最近的增強使此屬性成為可選屬性,而不是使用範例的CorrelationId作為請求訊息RequestId。這可以簡化響應相關性,並且還避免了在saga repository上新增索引的需要。但是,在高度複雜的系統中,為請求重用CorrelationId可能會導致問題。所以在選擇使用哪種方法時要考慮到這一點。

Configuration

要宣告請求,請新增request屬性並使用request方法對其進行設定。

public interface ProcessOrder
{
    Guid OrderId { get; }    
}

public interface OrderProcessed
{
    Guid OrderId { get; }
    Guid ProcessingId { get; }
}

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public Guid? ProcessOrderRequestId { get; set; }
    public Guid? ProcessingId { get; set; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine(OrderStateMachineSettings settings)
    {
        Request(
            () => ProcessOrder,
            x => x.ProcessOrderRequestId, // Optional
            r => {
                r.ServiceAddress = settings.ProcessOrderServiceAddress;
                r.Timeout = settings.RequestTimeout;
            });
    }

    public Request<OrderState, ProcessOrder, OrderProcessed> ProcessOrder { get; private set; }
}

一旦定義, request活動就可以新增到行為中。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        During(Submitted,
            When(OrderAccepted)
                .Request(ProcessOrder, x => x.Init<ProcessOrder>(new { OrderId = x.Saga.CorrelationId}))
                .TransitionTo(ProcessOrder.Pending));

        During(ProcessOrder.Pending,
            When(ProcessOrder.Completed)
                .Then(context => context.Saga.ProcessingId = context.Message.ProcessingId)
                .TransitionTo(Processed),
            When(ProcessOrder.Faulted)
                .TransitionTo(ProcessFaulted),
            When(ProcessOrder.TimeoutExpired)
                .TransitionTo(ProcessTimeoutExpired));
    }

    public State Processed { get; private set; }
    public State ProcessFaulted { get; private set; }
    public State ProcessTimeoutExpired { get; private set; }
}

Request包括三個事件:Completed、Faulted和TimeoutExpired。這些事件可以在任何狀態中使用,但是,請求包含一個Pending狀態,可以使用它來避免宣告單獨的Pending狀態。

Missing Instance

如果在收到響應、錯誤或超時之前完成了saga範例,則可能會設定一個缺失的範例處理程式,類似於常規事件。

Request(() => ProcessOrder, x => x.ProcessOrderRequestId, r =>
{
    r.Completed = m => m.OnMissingInstance(i => i.Discard());
    r.Faulted = m => m.OnMissingInstance(i => i.Discard());
    r.TimeoutExpired = m => m.OnMissingInstance(i => i.Discard());
});

Custom

在某些情況下,事件行為可能具有需要在作用域級別管理的依賴關係,例如資料庫連線,或者複雜性最好封裝在單獨的類中,而不是作為狀態機本身的一部分。開發人員可以建立自己的活動以供狀態機使用,也可以選擇建立自己的擴充套件方法以將其新增到行為中。

要建立一個activity,需要建立一個類來實現IStateMachineActivity<TInstance, TData> 如圖所示。

public class PublishOrderSubmittedActivity :
    IStateMachineActivity<OrderState, SubmitOrder>
{
    readonly ISomeService _service;

    public PublishOrderSubmittedActivity(ISomeService service)
    {
        _service = service;
    }

    public void Probe(ProbeContext context)
    {
        context.CreateScope("publish-order-submitted");
    }

    public void Accept(StateMachineVisitor visitor)
    {
        visitor.Visit(this);
    }

    public async Task Execute(BehaviorContext<OrderState, SubmitOrder> context, IBehavior<OrderState, SubmitOrder> next)
    {
        await _service.OnOrderSubmitted(context.Saga.CorrelationId);
        
        // always call the next activity in the behavior
        await next.Execute(context).ConfigureAwait(false);
    }

    public Task Faulted<TException>(BehaviorExceptionContext<OrderState, SubmitOrder, TException> context, 
        IBehavior<OrderState, SubmitOrder> next)
        where TException : Exception
    {
        return next.Faulted(context);
    }
}

對於ISomeService,在使用IPublishEndpoint釋出事件的類中實現介面,如下所示。

public class SomeService :
    ISomeService
{
    IPublishEndpoint _publishEndpoint;
    
    public SomeService(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }
    
    public async Task OnOrderSubmitted(Guid orderId)
    {
        await _publishEndpoint.Publish<OrderSubmitted>(new { OrderId = orderId });
    }
}

建立後,在狀態機中設定活動,如圖所示。

public interface OrderSubmitted
{
    Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .Activity(x => x.OfType<PublishOrderSubmittedActivity>())
                .TransitionTo(Submitted));
    }
}

當使用SubmitOrder事件時,狀態機將從容器中解析活動,並呼叫Execute方法。活動將被限定範圍,因此任何依賴都將在訊息ConsumeContext中解析。

在上面的例子中,事件型別是事先已知的。如果需要任何事件型別的活動,則可以在不指定事件型別的情況下建立該活動。

public class PublishOrderSubmittedActivity :
    IStateMachineActivity<OrderState>
{
    readonly ISomeService _service;

    public PublishOrderSubmittedActivity(ISomeService service)
    {
        _service = service;
    }

    public void Probe(ProbeContext context)
    {
        context.CreateScope("publish-order-submitted");
    }

    public void Accept(StateMachineVisitor visitor)
    {
        visitor.Visit(this);
    }

    public async Task Execute(BehaviorContext<OrderState> context, IBehavior<OrderState> next)
    {
        await _service.OnOrderSubmitted(context.Saga.CorrelationId);

        await next.Execute(context).ConfigureAwait(false);
    }

    public async Task Execute<T>(BehaviorContext<OrderState, T> context, IBehavior<OrderState, T> next)
    {
        await _service.OnOrderSubmitted(context.Saga.CorrelationId);

        await next.Execute(context).ConfigureAwait(false);
    }

    public Task Faulted<TException>(BehaviorExceptionContext<OrderState, TException> context, IBehavior<OrderState> next) 
        where TException : Exception
    {
        return next.Faulted(context);
    }

    public Task Faulted<T, TException>(BehaviorExceptionContext<OrderState, T, TException> context, IBehavior<OrderState, T> next)
        where TException : Exception
    {
        return next.Faulted(context);
    }
}

要註冊範例活動,請使用以下語法。

public interface OrderSubmitted
{
    Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .Activity(x => x.OfInstanceType<PublishOrderSubmittedActivity>())
                .TransitionTo(Submitted));
    }
}