Saga State Machines(狀態機)以前被稱為Automatonymous,從v8開始被合併到masstrtransit程式碼庫中。
Automatonymous是.Net的State Machines(狀態機)類庫,它提供了一種C#語法來定義State Machines,包括狀態、事件和行為。MassTransit包括Automatonymous,並新增了範例儲存、事件關聯、訊息繫結、請求和響應支援以及排程。
Automatonymous不再是一個獨立的NuGet包,它已經被MassTransit包含了。在以前的版本中,需要額外的包參照。所以之前如果參照了Automatonymous,則必須刪除該參照,因為它不再相容。
State Machine(狀態機)定義狀態、事件和行為。實現一個派生自MassTransitStateMachine<T>
的狀態機類,該類只建立一次,然後用於將事件觸發的行為應用於狀態機範例。
public class OrderStateMachine:MassTransitStateMachine<OrderState>
{}
Instance包含狀態機範例的資料。當沒有找到具有相同CorrelationId
的現有範例時,將為每個已消費的初始事件建立一個新範例。一個Saga Repository用於持久化範例。範例是類,並且必須實現SagaStateMachineInstance
介面。
public class OrderState :
SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
InstanceState(x => x.CurrentState);
}
}
Instance範例必須儲存當前狀態(CurrentState),它可以是以下三種型別之一:
型別 | 描述 |
---|---|
State | 介面狀態SagaStateMachineInstance 。可能難以序列化,通常僅用於記憶體範例,但如果repository儲存引擎支援將使用者型別對映到儲存型別,則可以使用。 |
string | State的名稱。但是,它佔用了大量空間,因為每個範例都重複狀態名。 |
int | 小,快,但要求指定每個可能的狀態,以便為每個狀態分配int值。 |
如果
CurrentState
範例狀態屬性是state,則自動設定它。對於string或int型別,必須使用InstanceState方法。
要指定int狀態值,請設定instance範例狀態,如下所示。
public class OrderState :
SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public int CurrentState { get; set; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
InstanceState(x => x.CurrentState, Submitted, Accepted);
}
}
結果如下值:
0 - None, 1 - Initial, 2 - Final, 3 - Submitted, 4 - Accepted
States(狀態)表示事件(events)消費後範例的當前狀態。一個範例在給定時間只能處於一種狀態。新範例預設為初始(Initial)狀態,這是自動定義的。還為所有狀態機定義了最終(Final)狀態,並用於表示範例已達到最終狀態。
在這個例子中,宣告了兩個狀態(State)。狀態由MassTransitStateMachine
基礎類別建構函式自動初始化。
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public State Accepted { get; private set; }
}
事件(Event)是可能導致狀態(State)變化的發生的事情。事件(Event)可以新增或更新範例資料,也可以更改範例(instance)的當前狀態。Event
在下面的範例中,SubmitOrder
訊息被宣告為一個事件,包括如何將該事件與範例關聯。
除非事件實現了 CorrelatedBy
,否則它們必須用關聯表示式宣告。
public interface SubmitOrder
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
}
public Event<SubmitOrder> SubmitOrder { get; private set; }
}
行為是指在狀態(state)中發生事件(event)時所發生的情況。
下面,Initial
塊用於定義在Initial
狀態期間SubmitOrder
事件的行為。當使用SubmitOrder
訊息並且沒有找到具有與OrderId
匹配的CorrelationId
的範例時,將在Initial
狀態下建立一個新範例。TransitionTo
activity 將範例轉換到Submitted
狀態,之後使用saga repository
持久化範例。
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.TransitionTo(Submitted));
}
}
隨後,OrderAccepted
事件可以通過下面所示的行為來處理。
public interface OrderAccepted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
}
public Event<OrderAccepted> OrderAccepted { get; private set; }
}
Message brokers(MQ)通常不保證訊息順序。因此,在狀態機(state machine)設計中考慮無序訊息是很重要的。
在上面的範例中,在OrderAccepted
事件之後接收SubmitOrder
訊息可能會導致SubmitOrder
訊息在_error
佇列中結束。如果OrderAccepted
事件首先被接收,它將被丟棄,因為它在初始(Initial)狀態下不被接受。下面是處理這兩種場景的更新狀態機。
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.TransitionTo(Submitted),
When(OrderAccepted)
.TransitionTo(Accepted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
Ignore(SubmitOrder));
}
}
在更新後的範例中,在接受(Accepted)狀態下接收SubmitOrder
訊息會忽略該事件。然而,事件中的資料可能是有用的。在這種情況下,可以新增將資料複製到範例的行為。下面,在兩個場景中捕獲事件的資料。
public interface SubmitOrder
{
Guid OrderId { get; }
DateTime OrderDate { get; }
}
public class OrderState :
SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public DateTime? OrderDate { get; set; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.Then(x => x.Saga.OrderDate = x.Message.OrderDate)
.TransitionTo(Submitted),
When(OrderAccepted)
.TransitionTo(Accepted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
When(SubmitOrder)
.Then(x => x.Saga.OrderDate = x.Message.OrderDate));
}
}
設定saga state machine(狀態機)
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderStateMachine, OrderState>()
.InMemoryRepository();
});
上面的範例使用記憶體中的saga repository,但是可以使用任何saga repository。永續性部分包括受支援的saga repository的詳細資訊。
要測試state machine(狀態機),請參閱測試部分。
如上所示,事件(event)是狀態機(state machine)可以使用的訊息。事件(event)可以指定任何有效的訊息型別,並且可以設定每個事件。有幾種事件設定方法可用。
內建的CorrelatedBy<Guid>
介面可以在訊息約定中使用,以指定事件CorrelationId
。
public interface OrderCanceled :
CorrelatedBy<Guid>
{
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCanceled); // not required, as it is the default convention
}
}
雖然上面顯式宣告了事件(event),但這不是必需的。預設將會自動的設定為CorrelatedBy<Guid>
介面的事件(event)。
雖然方便,但有些人認為介面是對訊息契約基礎設施的入侵。MassTransit還支援一種宣告性方法來為事件指定CorrelationId
。通過設定全域性訊息拓撲,可以指定要用於關聯的訊息屬性。
public interface SubmitOrder
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
// this is shown here, but can be anywhere in the application as long as it executes
// before the state machine instance is created. Startup, etc. is a good place for it.
// It only needs to be called once per process.
static OrderStateMachine()
{
GlobalTopology.Send.UseCorrelationId<SubmitOrder>(x => x.OrderId);
}
public OrderStateMachine()
{
Event(() => SubmitOrder);
}
public Event<SubmitOrder> SubmitOrder { get; private set; }
}
另一種方法是宣告事件相關性,如下所示。當上述兩種方法都未使用時,應使用此方法。
public interface SubmitOrder
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
}
public Event<SubmitOrder> SubmitOrder { get; private set; }
}
因為OrderId是一個Guid,所以它可以用於事件關聯。當在初始狀態下接受SubmitOrder時,由於OrderId是Guid,因此新範例上的CorrelationId會自動分配OrderId值。
還可以使用查詢表示式關聯事件,當事件沒有與範例的CorrelationId屬性關聯時,需要使用查詢表示式。查詢的開銷更大,並且可能匹配多個範例,在設計狀態機和事件時應該考慮到這一點。
只要可能,嘗試使用CorrelationId進行關聯。如果需要查詢,則可能需要在屬性上建立索引,以便優化資料庫查詢。
要使用另一種型別關聯事件,需要額外的設定。
public interface ExternalOrderSubmitted
{
string OrderNumber { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => ExternalOrderSubmitted, e => e
.CorrelateBy(i => i.OrderNumber, x => x.Message.OrderNumber)
.SelectId(x => NewId.NextGuid()));
}
public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }
}
還可以使用兩個引數編寫查詢,這兩個引數直接傳遞給repository(並且必須得到後臺資料庫的支援)。
public interface ExternalOrderSubmitted
{
string OrderNumber { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => ExternalOrderSubmitted, e => e
.CorrelateBy((instance,context) => instance.OrderNumber == context.Message.OrderNumber)
.SelectId(x => NewId.NextGuid()));
}
public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }
}
當事件沒有與範例唯一相關的Guid時,必須設定.selectid
表示式。在上面的範例中,NewId用於生成一個順序識別符號,該識別符號將分配給範例CorrelationId。事件上的任何屬性都可以用來初始化CorrelationId。
可能有必要忽略給定狀態下的事件,以避免錯誤生成,或者防止訊息被移動到_skip佇列。要忽略某個狀態中的事件,請使用ignore方法。
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.TransitionTo(Submitted),
When(OrderAccepted)
.TransitionTo(Accepted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
Ignore(SubmitOrder));
}
}
通過指定一個或多個必須使用的事件來設定組合事件,之後將引發組合事件。組合事件使用範例屬性來跟蹤所需的事件,這是在設定期間指定的。
要定義組合事件,必須首先設定所需的事件以及任何事件行為,然後才能設定組合事件。
public class OrderState :
SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public int ReadyEventStatus { get; set; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.TransitionTo(Submitted),
When(OrderAccepted)
.TransitionTo(Accepted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
CompositeEvent(() => OrderReady, x => x.ReadyEventStatus, SubmitOrder, OrderAccepted);
DuringAny(
When(OrderReady)
.Then(context => Console.WriteLine("Order Ready: {0}", context.Saga.CorrelationId)));
}
public Event OrderReady { get; private set; }
}
一旦使用了SubmitOrder
和OrderAccepted
事件,就會觸發OrderReady
事件。
如果事件與範例不匹配,則可以設定缺失的範例行為
public interface RequestOrderCancellation
{
Guid OrderId { get; }
}
public interface OrderNotFound
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCancellationRequested, e =>
{
e.CorrelateById(context => context.Message.OrderId);
e.OnMissingInstance(m =>
{
return m.ExecuteAsync(x => x.RespondAsync<OrderNotFound>(new { x.OrderId }));
});
});
}
public Event<RequestOrderCancellation> OrderCancellationRequested { get; private set; }
}
在本例中,當在沒有匹配範例的情況下使用取消訂單請求時,將傳送未找到訂單的響應。響應更顯式,而不是生成Fault
。其他缺少的範例選項包括Discard
、Fault
和Execute
(ExecuteAsync
的同步版本)。
為了提高新範例的效能,將事件設定為直接插入到saga repository中可以減少鎖爭用。要設定要插入的事件,它應該位於initial塊中,並指定一個saga工廠。
public interface SubmitOrder
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => SubmitOrder, e =>
{
e.CorrelateById(context => context.Message.OrderId));
e.InsertOnInitial = true;
e.SetSagaFactory(context => new OrderState
{
CorrelationId = context.Message.OrderId
})
});
Initially(
When(SubmitOrder)
.TransitionTo(Submitted));
}
public Event<SubmitOrder> SubmitOrder { get; private set; }
}
在使用InsertOnInitial時,至關重要的是,saga repository能夠檢測重複的鍵(在本例中,是使用OrderId初始化的CorrelationId)。在這種情況下,在CorrelationId上使用叢集主鍵可以防止插入重複的範例。如果使用不同的屬性關聯事件,請確保資料庫對範例屬性實施唯一約束,並且saga工廠使用事件屬性值初始化範例屬性。
public interface ExternalOrderSubmitted
{
string OrderNumber { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => ExternalOrderSubmitted, e =>
{
e.CorrelateBy(i => i.OrderNumber, x => x.Message.OrderNumber)
e.SelectId(x => NewId.NextGuid());
e.InsertOnInitial = true;
e.SetSagaFactory(context => new OrderState
{
CorrelationId = context.CorrelationId ?? NewId.NextGuid(),
OrderNumber = context.Message.OrderNumber,
})
});
Initially(
When(SubmitOrder)
.TransitionTo(Submitted));
}
public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }
}
資料庫將對OrderNumber使用唯一約束來防止重複,saga repository將將其檢測為現有範例,然後載入該範例以使用事件。
預設情況下,範例不會從saga repository中刪除。若要設定已完成的範例刪除,請指定用於確定範例是否已完成的方法。
public interface OrderCompleted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));
DuringAny(
When(OrderCompleted)
.Finalize());
();
}
public Event<OrderCompleted> OrderCompleted { get; private set; }
}
當範例使用OrderCompleted事件時,範例將被完成(它將範例轉換為Final狀態)。SetCompletedWhenFinalized
方法將一個處於Final狀態的範例定義為已完成——然後由saga repository使用它來刪除該範例。
要使用不同的完成表示式,例如檢查範例是否處於完成狀態的表示式,請使用SetCompleted方法,如下所示。
public interface OrderCompleted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));
DuringAny(
When(OrderCompleted)
.TransitionTo(Completed));
SetCompleted(async instance =>
{
State<TInstance> currentState = await this.GetState(instance);
return Completed.Equals(currentState);
});
}
public State Completed { get; private set; }
public Event<OrderCompleted> OrderCompleted { get; private set; }
}
狀態機行為被定義為響應事件而執行的一系列活動。除了automautonomous
中包含的活動之外,MassTransit還包括用於傳送、釋出和排程訊息以及發起和響應請求的活動。
要釋出事件,請新增publish活動。
public interface OrderSubmitted
{
Guid OrderId { get; }
}
public class OrderSubmittedEvent :
OrderSubmitted
{
public OrderSubmittedEvent(Guid orderId)
{
OrderId = orderId;
}
public Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.Publish(context => (OrderSubmitted)new OrderSubmittedEvent(context.Saga.CorrelationId))
.TransitionTo(Submitted));
}
}
或者,可以使用訊息初始化器來去除Event類。
public interface OrderSubmitted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.PublishAsync(context => context.Init<OrderSubmitted>(new { OrderId = context.Saga.CorrelationId }))
.TransitionTo(Submitted));
}
}
要傳送訊息,請新增send活動。
public interface UpdateAccountHistory
{
Guid OrderId { get; }
}
public class UpdateAccountHistoryCommand :
UpdateAccountHistory
{
public UpdateAccountHistoryCommand(Guid orderId)
{
OrderId = orderId;
}
public Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine(OrderStateMachineSettings settings)
{
Initially(
When(SubmitOrder)
.Send(settings.AccountServiceAddress, context => new UpdateAccountHistoryCommand(context.Saga.CorrelationId))
.TransitionTo(Submitted));
}
}
或者,可以使用訊息初始化器來去除Command類。
public interface UpdateAccountHistory
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine(OrderStateMachineSettings settings)
{
Initially(
When(SubmitOrder)
.SendAsync(settings.AccountServiceAddress, context => context.Init<UpdateAccountHistory>(new { OrderId = context.Saga.CorrelationId }))
.TransitionTo(Submitted));
}
}
狀態機可以通過將請求訊息型別設定為事件,並使用response方法來響應請求。在設定請求事件時,建議設定缺失的實體方法,以提供更好的響應體驗(通過不同的響應型別,或者通過指示未找到範例的響應)。
public interface RequestOrderCancellation
{
Guid OrderId { get; }
}
public interface OrderCanceled
{
Guid OrderId { get; }
}
public interface OrderNotFound
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCancellationRequested, e =>
{
e.CorrelateById(context => context.Message.OrderId);
e.OnMissingInstance(m =>
{
return m.ExecuteAsync(x => x.RespondAsync<OrderNotFound>(new { x.OrderId }));
});
});
DuringAny(
When(OrderCancellationRequested)
.RespondAsync(context => context.Init<OrderCanceled>(new { OrderId = context.Saga.CorrelationId }))
.TransitionTo(Canceled));
}
public State Canceled { get; private set; }
public Event<RequestOrderCancellation> OrderCancellationRequested { get; private set; }
}
有些場景需要等待狀態機的響應。在這些場景中,應該儲存響應原始請求所需的資訊。
public record CreateOrder(Guid CorrelationId) : CorrelatedBy<Guid>;
public record ProcessOrder(Guid OrderId, Guid ProcessingId);
public record OrderProcessed(Guid OrderId, Guid ProcessingId);
public record OrderCancelled(Guid OrderId, string Reason);
public class ProcessOrderConsumer : IConsumer<ProcessOrder>
{
public async Task Consume(ConsumeContext<ProcessOrder> context)
{
await context.RespondAsync(new OrderProcessed(context.Message.OrderId, context.Message.ProcessingId));
}
}
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid? ProcessingId { get; set; }
public Guid? RequestId { get; set; }
public Uri ResponseAddress { get; set; }
public Guid OrderId { get; set; }
}
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Created { get; set; }
public State Cancelled { get; set; }
public Event<CreateOrder> OrderSubmitted { get; set; }
public Request<OrderState, ProcessOrder, OrderProcessed> ProcessOrder { get; set; }
public OrderStateMachine()
{
InstanceState(m => m.CurrentState);
Event(() => OrderSubmitted);
Request(() => ProcessOrder, order => order.ProcessingId, config => { config.Timeout = TimeSpan.Zero; });
Initially(
When(OrderSubmitted)
.Then(context =>
{
context.Saga.CorrelationId = context.Message.CorrelationId;
context.Saga.ProcessingId = Guid.NewGuid();
context.Saga.OrderId = Guid.NewGuid();
context.Saga.RequestId = context.RequestId;
context.Saga.ResponseAddress = context.ResponseAddress;
})
.Request(ProcessOrder, context => new ProcessOrder(context.Saga.OrderId, context.Saga.ProcessingId!.Value))
.TransitionTo(ProcessOrder.Pending));
During(ProcessOrder.Pending,
When(ProcessOrder.Completed)
.TransitionTo(Created)
.ThenAsync(async context =>
{
var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
await endpoint.Send(context.Saga, r => r.RequestId = context.Saga.RequestId);
}),
When(ProcessOrder.Faulted)
.TransitionTo(Cancelled)
.ThenAsync(async context =>
{
var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
await endpoint.Send(new OrderCancelled(context.Saga.OrderId, "Faulted"), r => r.RequestId = context.Saga.RequestId);
}),
When(ProcessOrder.TimeoutExpired)
.TransitionTo(Cancelled)
.ThenAsync(async context =>
{
var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
await endpoint.Send(new OrderCancelled(context.Saga.OrderId, "Time-out"), r => r.RequestId = context.Saga.RequestId);
}));
}
}
狀態機可以排程事件,它使用訊息排程器來排程要傳遞給範例的訊息。首先,必須宣告Schedule。
public interface OrderCompletionTimeoutExpired
{
Guid OrderId { get; }
}
public class OrderState :
SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid? OrderCompletionTimeoutTokenId { get; set; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Schedule(() => OrderCompletionTimeout, instance => instance.OrderCompletionTimeoutTokenId, s =>
{
s.Delay = TimeSpan.FromDays(30);
s.Received = r => r.CorrelateById(context => context.Message.OrderId);
});
}
public Schedule<OrderState, OrderCompletionTimeoutExpired> OrderCompletionTimeout { get; private set; }
}
設定指定了可以被排程活動覆蓋的Delay,以及Received事件的相關表示式。狀態機可以使用Received事件,如下所示。
OrderCompletionTimeoutTokenId是一個Guid?用於跟蹤計劃訊息tokenId的範例屬性,稍後可使用該屬性取消對事件的計劃。
public interface OrderCompleted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
During(Accepted,
When(OrderCompletionTimeout.Received)
.PublishAsync(context => context.Init<OrderCompleted>(new { OrderId = context.Saga.CorrelationId }))
.Finalize());
}
public Schedule<OrderState, OrderCompletionTimeoutExpired> OrderCompletionTimeout { get; private set; }
}
可以使用Schedule
活動安排事件。
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
During(Submitted,
When(OrderAccepted)
.Schedule(OrderCompletionTimeout, context => context.Init<OrderCompletionTimeoutExpired>(new { OrderId = context.Saga.CorrelationId }))
.TransitionTo(Accepted));
}
}
如上所述,可以通過Schedule活動覆蓋延遲。範例和訊息(context.Data)內容都可以用來計算延遲。
public interface OrderAccepted
{
Guid OrderId { get; }
TimeSpan CompletionTime { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
During(Submitted,
When(OrderAccepted)
.Schedule(OrderCompletionTimeout, context => context.Init<OrderCompletionTimeoutExpired>(new { OrderId = context.Saga.CorrelationId }),
context => context.Message.CompletionTime)
.TransitionTo(Accepted));
}
}
一旦收到預定的事件,就會清除OrderCompletionTimeoutTokenId屬性。
如果不再需要計劃的事件,則可以使用Unschedule活動。
public interface OrderAccepted
{
Guid OrderId { get; }
TimeSpan CompletionTime { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
DuringAny(
When(OrderCancellationRequested)
.RespondAsync(context => context.Init<OrderCanceled>(new { OrderId = context.Saga.CorrelationId }))
.Unschedule(OrderCompletionTimeout)
.TransitionTo(Canceled));
}
}
狀態機可以使用request方法傳送請求,該方法指定了請求型別和響應型別。可以指定其他請求設定,包括ServiceAddress和Timeout。
如果指定了ServiceAddress,它應該是將響應請求的服務的端點地址。如果沒有指定,請求將被髮布。
預設超時時間為30秒,但任何大於或等於TimeSpan.Zero的值都可以。當傳送的請求超時大於零時,將排程TimeoutExpired訊息。指定TimeSpan.Zero 不會排程超時訊息,並且請求永遠不會超時。
在定義請求時,應該指定一個範例屬性來儲存用於將響應與狀態機範例相關聯的RequestId。當請求掛起時,RequestId儲存在屬性中。當請求完成後,該屬性被清除。如果請求超時或出現錯誤,則保留requesttid,以便在請求最終完成後進行關聯(例如將請求從_error佇列移回服務佇列)。
最近的增強使此屬性成為可選屬性,而不是使用範例的CorrelationId作為請求訊息RequestId。這可以簡化響應相關性,並且還避免了在saga repository上新增索引的需要。但是,在高度複雜的系統中,為請求重用CorrelationId可能會導致問題。所以在選擇使用哪種方法時要考慮到這一點。
要宣告請求,請新增request屬性並使用request方法對其進行設定。
public interface ProcessOrder
{
Guid OrderId { get; }
}
public interface OrderProcessed
{
Guid OrderId { get; }
Guid ProcessingId { get; }
}
public class OrderState :
SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid? ProcessOrderRequestId { get; set; }
public Guid? ProcessingId { get; set; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine(OrderStateMachineSettings settings)
{
Request(
() => ProcessOrder,
x => x.ProcessOrderRequestId, // Optional
r => {
r.ServiceAddress = settings.ProcessOrderServiceAddress;
r.Timeout = settings.RequestTimeout;
});
}
public Request<OrderState, ProcessOrder, OrderProcessed> ProcessOrder { get; private set; }
}
一旦定義, request活動就可以新增到行為中。
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
During(Submitted,
When(OrderAccepted)
.Request(ProcessOrder, x => x.Init<ProcessOrder>(new { OrderId = x.Saga.CorrelationId}))
.TransitionTo(ProcessOrder.Pending));
During(ProcessOrder.Pending,
When(ProcessOrder.Completed)
.Then(context => context.Saga.ProcessingId = context.Message.ProcessingId)
.TransitionTo(Processed),
When(ProcessOrder.Faulted)
.TransitionTo(ProcessFaulted),
When(ProcessOrder.TimeoutExpired)
.TransitionTo(ProcessTimeoutExpired));
}
public State Processed { get; private set; }
public State ProcessFaulted { get; private set; }
public State ProcessTimeoutExpired { get; private set; }
}
Request包括三個事件:Completed、Faulted和TimeoutExpired。這些事件可以在任何狀態中使用,但是,請求包含一個Pending狀態,可以使用它來避免宣告單獨的Pending狀態。
如果在收到響應、錯誤或超時之前完成了saga範例,則可能會設定一個缺失的範例處理程式,類似於常規事件。
Request(() => ProcessOrder, x => x.ProcessOrderRequestId, r =>
{
r.Completed = m => m.OnMissingInstance(i => i.Discard());
r.Faulted = m => m.OnMissingInstance(i => i.Discard());
r.TimeoutExpired = m => m.OnMissingInstance(i => i.Discard());
});
在某些情況下,事件行為可能具有需要在作用域級別管理的依賴關係,例如資料庫連線,或者複雜性最好封裝在單獨的類中,而不是作為狀態機本身的一部分。開發人員可以建立自己的活動以供狀態機使用,也可以選擇建立自己的擴充套件方法以將其新增到行為中。
要建立一個activity,需要建立一個類來實現IStateMachineActivity<TInstance, TData>
如圖所示。
public class PublishOrderSubmittedActivity :
IStateMachineActivity<OrderState, SubmitOrder>
{
readonly ISomeService _service;
public PublishOrderSubmittedActivity(ISomeService service)
{
_service = service;
}
public void Probe(ProbeContext context)
{
context.CreateScope("publish-order-submitted");
}
public void Accept(StateMachineVisitor visitor)
{
visitor.Visit(this);
}
public async Task Execute(BehaviorContext<OrderState, SubmitOrder> context, IBehavior<OrderState, SubmitOrder> next)
{
await _service.OnOrderSubmitted(context.Saga.CorrelationId);
// always call the next activity in the behavior
await next.Execute(context).ConfigureAwait(false);
}
public Task Faulted<TException>(BehaviorExceptionContext<OrderState, SubmitOrder, TException> context,
IBehavior<OrderState, SubmitOrder> next)
where TException : Exception
{
return next.Faulted(context);
}
}
對於ISomeService,在使用IPublishEndpoint釋出事件的類中實現介面,如下所示。
public class SomeService :
ISomeService
{
IPublishEndpoint _publishEndpoint;
public SomeService(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task OnOrderSubmitted(Guid orderId)
{
await _publishEndpoint.Publish<OrderSubmitted>(new { OrderId = orderId });
}
}
建立後,在狀態機中設定活動,如圖所示。
public interface OrderSubmitted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.Activity(x => x.OfType<PublishOrderSubmittedActivity>())
.TransitionTo(Submitted));
}
}
當使用SubmitOrder事件時,狀態機將從容器中解析活動,並呼叫Execute方法。活動將被限定範圍,因此任何依賴都將在訊息ConsumeContext中解析。
在上面的例子中,事件型別是事先已知的。如果需要任何事件型別的活動,則可以在不指定事件型別的情況下建立該活動。
public class PublishOrderSubmittedActivity :
IStateMachineActivity<OrderState>
{
readonly ISomeService _service;
public PublishOrderSubmittedActivity(ISomeService service)
{
_service = service;
}
public void Probe(ProbeContext context)
{
context.CreateScope("publish-order-submitted");
}
public void Accept(StateMachineVisitor visitor)
{
visitor.Visit(this);
}
public async Task Execute(BehaviorContext<OrderState> context, IBehavior<OrderState> next)
{
await _service.OnOrderSubmitted(context.Saga.CorrelationId);
await next.Execute(context).ConfigureAwait(false);
}
public async Task Execute<T>(BehaviorContext<OrderState, T> context, IBehavior<OrderState, T> next)
{
await _service.OnOrderSubmitted(context.Saga.CorrelationId);
await next.Execute(context).ConfigureAwait(false);
}
public Task Faulted<TException>(BehaviorExceptionContext<OrderState, TException> context, IBehavior<OrderState> next)
where TException : Exception
{
return next.Faulted(context);
}
public Task Faulted<T, TException>(BehaviorExceptionContext<OrderState, T, TException> context, IBehavior<OrderState, T> next)
where TException : Exception
{
return next.Faulted(context);
}
}
要註冊範例活動,請使用以下語法。
public interface OrderSubmitted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.Activity(x => x.OfInstanceType<PublishOrderSubmittedActivity>())
.TransitionTo(Submitted));
}
}