產品程式碼都給你看了,可別再說不會DDD(九):領域事件

2023-10-29 18:00:14

這是一個講解DDD落地的文章系列,作者是《實現領域驅動設計》的譯者滕雲。本文章系列以一個真實的並已成功上線的軟體專案——碼如雲https://www.mryqr.com)為例,系統性地講解DDD在落地實施過程中的各種典型實踐,以及在面臨實際業務場景時的諸多取捨。

本系列包含以下文章:

  1. DDD入門
  2. DDD概念大白話
  3. 戰略設計
  4. 程式碼工程結構
  5. 請求處理流程
  6. 聚合根與資源庫
  7. 實體與值物件
  8. 應用服務與領域服務
  9. 領域事件(本文)
  10. CQRS

案例專案介紹

既然DDD是「領域」驅動,那麼我們便不能拋開業務而只講技術,為此讓我們先從業務上了解一下貫穿本文章系列的案例專案 —— 碼如雲(不是馬雲,也不是碼雲)。如你已經在本系列的其他文章中瞭解過該案例,可跳過。

碼如雲是一個基於二維條碼的一物一碼管理平臺,可以為每一件「物品」生成一個二維條碼,並以該二維條碼為入口展開對「物品」的相關操作,典型的應用場景包括固定資產管理、裝置巡檢以及物品標籤等。

在使用碼如雲時,首先需要建立一個應用(App),一個應用包含了多個頁面(Page),也可稱為表單,一個頁面又可以包含多個控制元件(Control),比如單選框控制元件。應用建立好後,可在應用下建立多個範例(QR)用於表示被管理的物件(比如機器裝置)。每個範例均對應一個二維條碼,手機掃碼便可對範例進行相應操作,比如檢視範例相關資訊或者填寫頁面表單等,對錶單的一次填寫稱為提交(Submission);更多概念請參考碼如雲術語

在技術上,碼如雲是一個無程式碼平臺,包含了表單引擎、審批流程和資料包表等多個功能模組。碼如雲全程採用DDD完成開發,其後端技術棧主要有Java、Spring Boot和MongoDB等。

碼如雲的原始碼是開源的,可以通過以下方式存取:

碼如雲原始碼:https://github.com/mryqr-com/mry-backend

領域事件

領域事件(Domain Event)中的「事件」即事件驅動架構(Event Driven Architecture, EDA)中的「事件」之意。事件驅動架構被廣泛地用於計算機的硬體和軟體中,DDD也不例外。狹義地理解「事件」,你可能會認為不就是諸如Kafka或者RabbitMQ之類的訊息佇列(Message Queue)麼?可不止那麼簡單,在本文中,我們將對DDD中領域事件的建模、產生、傳送和消費做詳細講解。

為了方便讀者直觀概括性地瞭解領域事件的全景,我們先將從事件釋出到消費整個過程中的關鍵節點展示在下圖。在閱讀過程中,讀者可返回該圖進行對應。

領域事件建模

領域事件表示在領域模型中已經發生過的重要事件,主要用於軟體中各個元件、模組、子系統甚至與第三方系統之間的資料同步和整合。所謂「重要」,指的是所發出的事件會引起進一步的動作,以此形成更大範圍的業務閉環。舉個例子,在電商系統中,「使用者已下單」則是一個領域事件,它可能會進一步引起支付、物流、積分等一些列後續業務動作。

當然,對於「重要」的定義是相對的,需要視實際所處業務場景而定。例如,在碼如雲中,使用者可以自行更改頭像,整個業務閉環到此為止,因此我們並沒有為此建立相應的領域事件;不過,對於其他一些系統來說,使用者更新了頭像後,可能需要將頭像資訊同步到另外的子系統,那麼此時便可發出「使用者頭像已更新」事件,其他子系統通過訂閱監聽該事件完成頭像資料的同步。

領域事件的命名一般採用「XX已XX」的形式,前一個「XX」通常表示一個名詞,後一個「XX」則表示一個動詞,比如「訂單已支付」、「表單已提交」等。在實際建模時,通常先建立一個公共基礎類別DomainEvent,其他實際的事件類均繼承自該基礎類別。

//DomainEvent

public abstract class DomainEvent {
    private String id;//事件ID
    private DomainEventType type;//事件型別

    //狀態,CREATED(剛建立),PUBLISH_SUCCEED(已釋出成功), PUBLISH_FAILED(釋出失敗)
    private DomainEventStatus status;

    private Instant raisedAt;//事件產生時間

    protected DomainEvent(DomainEventType type,) {
        requireNonNull(type, "Domain event type must not be null.");
        this.id = newSnowflakeId();
        this.type = type;
        this.raisedAt = now();
    }
}

原始碼出處:com/mryqr/core/common/domain/event/DomainEvent.java

領域事件基礎類別DomainEvent包含了事件標識id,事件型別type,事件狀態status,以及事件產生的時間raisedAt,根據自身情況還可以新增更多的公共欄位,比如事件產生時的操作人等。

具體的事件類繼承自DomainEvent,以「成員已建立(MemberCreatedEvent)」事件為例:

//MemberCreatedEvent

public class MemberCreatedEvent extends DomainEvent {
    private String memberId;

    public MemberCreatedEvent(String memberId) {
        super(MEMBER_CREATED);
        this.memberId = memberId;
    }
}

原始碼出處:com/mryqr/core/member/domain/event/MemberCreatedEvent.java

領域事件中應該包含恰如其分的資料資訊,且所包含的資訊應該與其所產生時的上下文強相關。比如本例中,MemberCreatedEvent事件對應新成員已建立的業務場景,此時最重要的是記錄下這個新成員的唯一標識memberId。又比如,對於「成員修改自己姓名」的業務場景,其所發出的「成員姓名已更新」事件MemberNameChangedEvent則應該同時包含修改前的姓名oldName和修改後的姓名newName

//MemberNameChangedEvent

public class MemberNameChangedEvent extends DomainEvent {
    private String memberId;
    private String newName;
    private String oldName;

    public MemberNameChangedEvent(String memberId, String newName, String oldName) {
        super(MEMBER_NAME_CHANGED);
        this.memberId = memberId;
        this.newName = newName;
        this.oldName = oldName;
    }
}

原始碼出處:com/mryqr/core/member/domain/event/MemberNameChangedEvent.java

這裡有兩個需要注意的問題,第一個是對於「成員已建立」事件MemberCreatedEvent來說,除了唯一的memberId欄位之外,為什麼不包含其他資訊呢,比如建立成員時所輸入的姓名、郵箱和電話號碼等,這些資訊不也是和場景強相關的嗎?這個問題涉及到事件驅動架構的架構模式問題,通常來說有2種模式: (1)事件作為通知機制 (2)事件攜帶狀態轉移(Event Carried State Transfer)

對於第(1)種「事件作為通知機制」來說,領域事件主要起到一個通知作用,事件消費方在得到通知後需要反過來呼叫事件釋出方提供的API以獲取更多的業務資料,這種方式主要用於處理一些資料同步的場景,優點是可以保證任何時候事件的消費者都能獲取到最新的資料,而不用擔心事件的延遲消費或者亂序消費等問題,這種方式的缺點是增加了一次額外的API呼叫,並且在事件的傳送方和消費方之間多了一層耦合。

對於第(2)種「事件攜帶狀態轉移」來說,事件消費方無需額外的API呼叫,而是從事件本身中即可獲取到業務資料,降低了系統之間的耦合,通常用於比單純的資料同步更復雜的業務場景,不過缺點則是可能導致消費方所獲取到的資料不再是最新的,舉個例子,對於「成員姓名已更新」事件(MemberNameChangedEvent)來說,假設成員的姓名先後更新的2次,首先將newName更新為「張三」,然後更新為「李四」,但是由於訊息機制的不確定性等原因,可能更新為「李四」的事件先於「張三」事件而到達,最終導致的結果是消費方中成員的姓名依然為「張三」,而不是最新的「李四」,當然可以通過更多的手段來解決這個問題,比如消費方可以對事件產生的時間進行檢查,如果發現事件產生的時間早於最近一次已處理事件的產生時間,則不再處理,不過這樣一來引入了一些新的成本。

至於選擇哪一種架構模式,並不是一個確定性的問題,開發團隊需要根據自身系統的業務場景以及自身的團隊情況做出決定。在碼如雲,我們選擇了第(1)種,即將事件作為通知機制,因為碼如雲系統中的領域事件多數是用來處理純粹的事件同步的。

另一個問題是,對於「成員姓名已更新」事件(MemberNameChangedEvent)來講,一般來說消費方更關心變更後的姓名newName,誰會去關心那個老姓名oldName呢?這樣一來是不是可以將oldName刪除掉?答案是否定的,因為事件的釋出者應該是一個「獨善其身」式的存在,應該按照自身的業務場景行事,而不應該因為消費方不需要而省略掉與上下文強相關的資訊。

領域事件的產生

使用領域事件的一種直接做法是:在應用服務(Application Service)中產生事件並行布出去。例如,對於「成員更新姓名」的用例來講,對應的應用服務MemberCommandService實現如下:

@Transactional
public void updateMyName(UpdateMemberNameCommand command, User user) {
    Member member = memberRepository.byId(user.getMemberId());
    String oldName = member.getName();
    String newName = command.getName();
    
    member.updateName(newName, user);
    memberRepository.save(member);

    MemberNameChangedEvent event = new MemberNameChangedEvent(member.getId(), newName, oldName);
    eventPublisher.publish(event);

    log.info("Member name updated by member[{}].", member.getId());
}

原始碼出處:com/mryqr/core/member/command/MemberCommandService.java

這裡,在更新了成員的姓名之後,即刻呼叫事件釋出器eventPublisher.publish()將事件傳送到訊息佇列(Redis Stream)中。雖然這種方式比較流行,但它至少存在2個問題:

  1. 領域事件本應屬於領域模型的一部分,也即應該從領域模型中產生,而這裡卻在應用服務中產生
  2. 對聚合根(本例中的Member)的持久化和對事件的釋出可能導致資料不一致問題

對於第1個問題,我們可以採用「從領域模型中返回領域事件」的方式:

@Transactional
public void updateMyName(UpdateMemberNameCommand command, User user) {
    Member member = memberRepository.byId(user.getMemberId());
    String oldName = member.getName();
    String newName = command.getName();

    MemberNameChangedEvent event = member.updateName(newName, user);
    memberRepository.save(member);
    eventPublisher.publish(event);

    log.info("Member name updated by member[{}].", member.getId());
}

原始碼出處:com/mryqr/core/member/command/MemberCommandService.java

在本例中,Member.updateName()方法不再返回void,而是返回領域事件MemberNameChangedEvent,然後由eventPublisher.publish(event)釋出。更多關於此種方式的討論,請參考這篇文章

這種方式保證了領域事件是從領域模型中產生,也即解決了第1個問題,但是依然存在第2個問題,接下來我們詳細解釋一下第2個問題。第2個問題中所謂的「資料一致性」,表示的是將聚合根儲存到資料庫和將領域事件釋出到訊息佇列之間的一致性。由於資料庫和訊息佇列屬於異構的資料來源,要保證他們之間的資料一致性需要引入分散式事務,比如JTA(Java Transaction API)。但是分散式事務通常是比較重量級的,再加上當下的諸多常見訊息佇列均不支援分散式事務(比如Kafka),因此我們並不建議使用分散式事務來解決這個問題。不過不要擔心,有人專門研究過這個問題的解決方案,並形成了一種設計模式——Transactional Outbox。概括來說,這種方式將一個分散式事務的問題拆解為多個本地事務,並採用「至少一次投遞(At Least Once Delivery)」原則保證訊息的釋出。具體來講,釋出方在與業務資料相同的資料庫中為領域事件建立相應的事件釋出表(Outbox table),然後在儲存業務資料的同時將所產生的事件儲存到事件釋出表中,由於此時二者都屬於同一個資料庫的本地事務所管轄,因此保證了「業務操作」與「事件產生」之間的一致性。此時的程式碼變成了:

@Transactional
public void updateMyName(UpdateMemberNameCommand command, User user) {
    Member member = memberRepository.byId(user.getMemberId());
    String oldName = member.getName();
    String newName = command.getName();

    MemberNameChangedEvent event = member.updateName(newName, user);
    memberRepository.save(member);
    eventStore.save(event);

    log.info("Member name updated by member[{}].", member.getId());
}

原始碼出處:com/mryqr/core/member/command/MemberCommandService.java

此例和上例唯一的區別在於:先前的eventPublisher.publish(event)被替換成了eventStore.save(event),也即應用服務不再將事件直接釋出出去,而是將事件儲存到資料庫中,之後,另一個模組將從資料庫中讀取事件並行布(對此我們將在下文進行講解)。

然而,這種方式依然有個缺點:每個需要產生領域事件的場景都需要應用服務先後呼叫repository.save()eventStore.save(),導致了程式碼重複。有沒有一種「一勞永逸」的方法呢?答案是有的,為此請允許我們隆重地介紹處理領域事件的一枚「銀彈」——在聚合根中臨時儲存領域事件,然後在資源庫中同時儲存聚合根和領域事件到資料庫。開玩笑的啦,「銀彈」這個梗,我們怎麼可能不給自己留點後路呢?雖然不是「銀彈」,但是這種方式的確有其好處,在碼如雲,我們採用了這種方式,算得上是屢試不爽了。在這種方式下,首先需要在聚合根的基礎類別中完成與領域事件相關的各種設施,包括建立臨時性的事件容器events以及通用的事件產生方法raiseEvent()

//AggregateRoot

@Getter
public abstract class AggregateRoot implements Identified {
    private String id;
    private String tenantId;

    private List<DomainEvent> events;//領域事件列表,用於臨時存放完成某個業務流程中所發出的事件,會被BaseRepository儲存到事件表中

    //此處省略其他程式碼

    protected void raiseEvent(DomainEvent event) {//將領域事件新增到臨時性的events容器中
        allEvents().add(event);
    }

    public void clearEvents() {//清空所有的事件,在聚合根落庫之前需要完成此操作
        this.events = null;
    }

    private List<DomainEvent> allEvents() {
        if (events == null) {
            this.events = new ArrayList<>();
        }

        return events;
    }
}

原始碼出處:com/mryqr/core/common/domain/AggregateRoot.java

在聚合根基礎類別AggregateRoot中,events欄位用於臨時儲存聚合根中所產生的所有事件,各實際的聚合根類通過呼叫raiseEvent()events中新增事件。比如,對於「成員修改姓名」用例而言,Member實現如下:

//Member

public void updateName(String name, User user) {
    if (Objects.equals(this.name, name)) {
        return;
    }

    String oldName = this.name;
    this.name = name;
    raiseEvent(new MemberNameChangedEvent(this.getId(), name, oldName));
}

原始碼出處:com/mryqr/core/member/domain/Member.java

這裡,聚合根Member不再返回領域事件,而是將領域事件通過AggregateRoot.raiseEvent()暫時性地儲存到自身的events中。之後在儲存Member時,資源庫的公共基礎類別BaseRepositorysave()方法同時完成對聚合根和領域事件的持久化:

//MongoBaseRepository

public void save(AR it) {
    requireNonNull(it, "AR must not be null.");

    if (!isEmpty(it.getEvents())) {
        saveEvents(it.getEvents());
        it.clearEvents();
    }

    mongoTemplate.save(it);
}

原始碼出處:com/mryqr/common/mongo/MongoBaseRepository.java

這裡了的AR是表示所有聚合根類的泛型,在save()方法中,首先獲取到聚合根中的所有領域事件,然後通過saveEvents()方法將它們儲存到釋出事件表中,最後通過mongoTemplate.save(it)儲存聚合根。需要注意的是,在這種方式下,AggregateRoot中的events欄位是不能被持久化的,因為我們需要保證每次從資料庫中載入出聚合根時events都是空的,為此我們在saveEvents()儲存了領域事件後,立即呼叫it.clearEvents()將所有的領域事件清空掉,以免領域事件隨著聚合根一道被持久化到資料庫中。

到目前為止,我們對領域事件的處理都還沒有涉及到與任何訊息中介軟體相關的內容,也即事件的產生是一個完全獨立於訊息佇列的關注點,此時我們不用關心領域事件之後將以何種形式釋出出去,Kafka也好,RabbitMQ也罷。除了關注點分離的好處外,這種解耦也使得系統在有可能切換訊息中介軟體時更加的簡單。

領域事件的釋出

對於上文中的「在應用服務中通過eventPublisher.publish()直接釋出事件」而言,對事件的產生和釋出是同時完成的;但是對於「在聚合根中臨時性儲存領域事件」的方式來說,它只解決了事件的產生問題,並未解決事件的釋出問題,在本小節中,我們將詳細講解在這種方式下如何釋出領域事件。

事件的釋出方應該採用「發射後不管(Fire And Forget)」的原則,即釋出方無需瞭解消費方是如何處理領域事件的,甚至都不需要知道事件被哪些消費方所消費。

在將業務資料和領域事件同時儲存到資料庫之後,接下來的事情便是如何將領域事件釋出出去了。在釋出事件時,應該從資料庫的事件釋出表中載入領域事件,然後通過訊息中介軟體的API將事件傳送出去,這裡需要解決以下2個問題:

  1. 什麼時候啟動對領域事件的釋出?
  2. 如何處理髮布失敗的情況?

對於第1個問題,需要資料庫事務執行完畢之後,也即保證領域事件落盤之後,才可進行對事件的釋出,顯然從應用服務中釋出並不滿足此條件(因為@Transactional註解是打在應用服務上的,應用服務的方法在執行過程中事務尚未結束),除此之外便只有Controller了,但是如果在Controller中釋出領域事件又會導致需要在每個Controller中都重複呼叫事件釋出邏輯的程式碼。有沒有其他辦法呢?有,一是可以通過AOP的方式在每個Controller方法執行完畢之後啟動對事件的釋出,另一種是通過Spring框架提供的HandlerInterceptor對每個HTTP請求進行攔截並啟動對事件的釋出,在碼如雲中,我們採用了HandlerInterceptor的方式:

public class DomainEventHandlingInterceptor implements HandlerInterceptor {
    private final DomainEventPublisher eventPublisher;

    @Override
    public void postHandle(HttpServletRequest request,
                           HttpServletResponse response,
                           Object handler,
                           ModelAndView modelAndView) {

        //從資料庫中載入所有尚未釋出的事件(status=CREATED或PUBLISH_FAILED)並行布
        eventPublisher.publishEvents();
    }
}

原始碼出處:com/mryqr/common/event/publish/interception/DomainEventHandlingInterceptor.java

這裡,DomainEventHandlingInterceptorpostHandel()方法將在每個HTTP請求完成之後執行,eventPublisher.publishEvents()並不接受任何引數,其實現邏輯是從資料庫中載入出所有尚未傳送的事件並行布(可以通過DomainEventstatus來判斷事件是否已經傳送)。

這種方式依然不完美,因為即便一個請求中沒有任何事件產生,也將導致一次對資料庫的查詢操作,如果有種方式可以記住請求中所產生的事件ID,然後再針對性的傳送相應的事件就好了,答案是有的:使用Java的ThreadLocal(粗略可以理解為執行緒級別的全域性變數)記錄下一次請求中所產生的事件ID。為此,需要在BaseRepository對事件落庫的時候將所有的事件ID記錄到ThreadLocal中:

//MongoBaseRepository

private void saveEvents(List<DomainEvent> events) {
    if (!isEmpty(events)) {
        domainEventDao.insert(events);//儲存事件到資料庫
        ThreadLocalDomainEventIdHolder.addEvents(events);//記錄事件ID以備後用
    }
}

原始碼出處:com/mryqr/common/mongo/MongoBaseRepository.java

這裡的ThreadLocalDomainEventIdHolder.addEvents()將使用ThreadLocal將本次請求中的所有事件ID記錄下來以備後用。ThreadLocalDomainEventIdHolder實現如下:

//ThreadLocalDomainEventIdHolder

public class ThreadLocalDomainEventIdHolder {
    private static final ThreadLocal<LinkedList<String>> THREAD_LOCAL_EVENT_IDS = withInitial(LinkedList::new);

    public static void clear() {
        eventIds().clear();
    }

    public static void remove() {
        THREAD_LOCAL_EVENT_IDS.remove();
    }

    public static List<String> allEventIds() {
        List<String> eventIds = eventIds();
        return isNotEmpty(eventIds) ? List.copyOf(eventIds) : List.of();
    }
    
    public static void addEvents(List<DomainEvent> events) {//新增事件ID
        events.forEach(ThreadLocalDomainEventIdHolder::addEvent);
    }

    public static void addEvent(DomainEvent event) {//新增事件ID
        LinkedList<String> eventIds = eventIds();
        eventIds.add(event.getId());
    }

    private static LinkedList<String> eventIds() {
        return THREAD_LOCAL_EVENT_IDS.get();
    }
}

原始碼出處:com/mryqr/common/event/publish/interception/ThreadLocalDomainEventIdHolder.java

現在,執行緒中有了已產生事件的ID,接下來便可在DomainEventHandlingInterceptor獲取這些事件ID並行布對應事件了:

//DomainEventHandlingInterceptor

public class DomainEventHandlingInterceptor implements HandlerInterceptor {
    private final DomainEventPublisher eventPublisher;

    @Override
    public void postHandle(HttpServletRequest request, 
                           HttpServletResponse response,
                           Object handler, 
                           ModelAndView modelAndView) {
        
        List<String> eventIds = ThreadLocalDomainEventIdHolder.allEventIds();
        try {
            eventPublisher.publish(eventIds);
        } finally {
            ThreadLocalDomainEventIdHolder.remove();
        }
    }
}

原始碼出處:com/mryqr/common/event/publish/interception/DomainEventHandlingInterceptor.java

在傳送事件時,可以採用同步的方式,也可以採用非同步的方式,同步方式即事件的傳送與業務請求的處理在同一個執行緒中完成,這種方式可能導致系統響應時間延長,在高並行場景下可能影響系統吞吐量,因此一般建議採用非同步方式,即通過一個單獨的執行緒池完成對事件的釋出。非同步傳送的程式碼如下:

public class AsynchronousDomainEventPublisher implements DomainEventPublisher {
    private final TaskExecutor taskExecutor;
    private final DomainEventJobs domainEventJobs;

    @Override
    public void publish(List<String> eventIds) {
        if (isNotEmpty(eventIds)) {
            taskExecutor.execute(domainEventJobs::publishDomainEvents);
        }
    }
}

原始碼出處:com/mryqr/common/event/publish/AsynchronousDomainEventPublisher.java

可以看到,AsynchronousDomainEventPublisher通過TaskExecutor完成了事件釋出的非同步化。不過需要注意的是,這種使用ThreadLocal來記錄事件ID的方式只適合於基於執行緒的Web容器,比如Servlet容器,而對於Webflux則不支援了。

在通過DomainEventJobs.publishDomainEvents()傳送領域事件時,先通過DomainEventDao.tobePublishedEvents()獲取到尚未釋出的領域事件,然後根據時間產生順序進行傳送。另外,由於多個執行緒可能同時執行事件傳送邏輯,導致事件的發生順序無法得到保證,因此我們使用了分散式鎖LockingTaskExecutor來保證某個時刻只有事件傳送任務可以工作。

// DomainEventJobs

    public int publishDomainEvents() {
        try {
            //通過分散式鎖保證只有一個publisher工作,以此保證訊息傳送的順序
            TaskResult<Integer> result = lockingTaskExecutor.executeWithLock(this::doPublishDomainEvents,
                    new LockConfiguration(now(), "publish-domain-events", ofMinutes(1), ofMillis(1)));
            Integer publishedCount = result.getResult();
            return publishedCount != null ? publishedCount : 0;
        } catch (Throwable e) {
            log.error("Error while publish domain events.", e);
            return 0;
        }
    }

    private int doPublishDomainEvents() {
        int count = 0;
        int max = 10000;//每次執行最多傳送的條數
        String startEventId = "EVT00000000000000001";//從最早的ID開始算起

        while (true) {
            List<DomainEvent> domainEvents = domainEventDao.tobePublishedEvents(startEventId, 100);
            if (isEmpty(domainEvents)) {
                break;
            }

            for (DomainEvent event : domainEvents) {
                redisDomainEventSender.send(event);
            }

            count = domainEvents.size() + count;
            if (count >= max) {
                break;
            }
            startEventId = domainEvents.get(domainEvents.size() - 1).getId();//下一次直接從最後一條開始查詢
        }

        return count;
    }

原始碼出處:com/mryqr/common/event/DomainEventJobs.java

事件釋出有可能不成功,比如訊息佇列連線不上等原因,此時我們則需要建立事件兜底機制,即在每次請求正常釋出事件之外,還需要定時(比如每2分鐘)掃描資料庫中尚未成功釋出的事件並行布。

    @Scheduled(cron = "0 */2 * * * ?")
    public void houseKeepPublishDomainEvent() {
        int count = domainEventJobs.publishDomainEvents();
        if (count > 0) {
            log.info("House keep published {} domain events.", count);
        }
    }

原始碼出處:com/mryqr/common/scheduling/SchedulingConfiguration.java

這也意味著我們需要記錄每一個事件的釋出狀態status。在事件釋出到訊息中介軟體之後,更新事件的狀態:

public class RedisDomainEventSender {
    private final MryObjectMapper mryObjectMapper;
    private final MryRedisProperties mryRedisProperties;
    private final StringRedisTemplate stringRedisTemplate;
    private final DomainEventDao domainEventDao;

    public void send(DomainEvent event) {
        try {
            String eventString = mryObjectMapper.writeValueAsString(event);
            ObjectRecord<String, String> record = StreamRecords.newRecord()
                    .ofObject(eventString)
                    .withStreamKey(mryRedisProperties.getDomainEventStream());
            stringRedisTemplate.opsForStream().add(record);
            domainEventDao.successPublish(event);
        } catch (Throwable t) {
            log.error("Error happened while publish domain event[{}:{}] to redis.", event.getType(), event.getId(), t);
            domainEventDao.failPublish(event);
        }
    }
}

原始碼出處:com/mryqr/common/event/publish/RedisDomainEventSender.java

這裡,當事件釋出成功後呼叫domainEventDao.successPublish(event)將事件狀態設定為「釋出成功」(status=PUBLISH_SUCCEED),反之將事件狀態設定為「釋出失敗」(status=PUBLISH_FAILED)。事實上,將status放在DomainEvent上並不是一種好的實踐,因為這裡的status主要用於釋出方,對消費方來說則無端地多了一個無用欄位,更好的方式是在釋出方另行建立一張資料庫表來記錄每個事件的釋出狀態。不過,在碼如雲,由於我們採用了單體架構,事件的釋出方和消費方均在同一個程序空間中,為了方便實用起見,我們做出了妥協,即依然將status欄位保留在DomainEvent中。

有趣的是,這裡的RedisDomainEventSender讓我們再次陷入了分散式事務的困境,因為傳送事件需要操作訊息中介軟體,而更新事件狀態需要運算元據庫。在不使用分散式事務的情況下(我們也不想使用),此時的程式碼對於「事件釋出成功 + 資料庫落庫成功」來講是皆大歡喜的,但是依然無法排除有很小的概率導致事件傳送成功了但是狀態卻為得到更新的情況。要解決這個問題,我們做了一個妥協,即事件釋出方無法保證事件的「精確一次性投遞(Exactly Once)」,而是保證「至少一次投遞(At Least Once)」。假設在事件釋出成功之後,由於種種原因導致事件的狀態未得到更新,即依然為CREATED狀態,那麼稍後,當事件兜底機制啟動時,它將載入系統中尚未釋出的事件進行釋出,其中就包含狀態為CREATED的事件,進而導致事件的重複投遞。

「至少一次投遞」將更多的負擔轉嫁給了事件的消費方,使得事件傳送方得以全身而退,在下文中我們將講到對事件的消費。

領域事件的消費

事件消費的重點在於如何解決釋出方的「至少一次投遞」問題。舉個例子,假設在電商系統中,訂單子系統釋出了「訂單已成交」(OrderPlacedEvent)事件,積分子系統消費這個事件時會給使用者新增與訂單價格等額的積分,但是對事件的「至少一次投遞」有可能導致該事件被重複投遞進而導致重複給使用者積分的情況產生。解決這個問題通常有2種方式:

  1. 將消費方自身的處理邏輯設計為冪等的,即多次執行和一次執行的結果是相同的
  2. 消費方在資料庫中建立一個事件消費表,用於跟蹤已經被消費的事件

第1種方式是最理想的,消費方不用引入額外的支撐性機制,但是這種方式對消費方的要求太高,並不是所有場景都能將消費方本身的處理邏輯設計為冪等。因此,實踐中主要採用第2種方式。

在消費事件時,通過DomainEventConsumer類作為事件處理的統一入口,其中將遍歷所有可以處理給定事件的DomainEventHandler,這些DomainEventHandler中包含對事件的實際處理邏輯:

public class DomainEventConsumer {
    private final List<DomainEventHandler> handlers;
    private final DomainEventDao domainEventDao;

    public DomainEventConsumer(List<DomainEventHandler> handlers, DomainEventDao domainEventDao) {
        this.handlers = handlers;
        this.handlers.sort(comparingInt(DomainEventHandler::priority));
        this.domainEventDao = domainEventDao;
    }

    //所有能處理事件的handler依次處理,全部處理成功記錄消費成功,否則記錄為消費失敗;
    //消費失敗後,兜底機制將重新傳送事件,重新傳送最多不超過3次
    public void consume(DomainEvent domainEvent) {
        log.info("Start consume domain event[{}:{}].", domainEvent.getType(), domainEvent.getId());

        boolean hasError = false;
        MryTaskRunner taskRunner = newTaskRunner();

        for (DomainEventHandler handler : handlers) {
            try {
                if (handler.canHandle(domainEvent)) {
                    handler.handle(domainEvent, taskRunner);
                }
            } catch (Throwable t) {
                hasError = true;
                log.error("Error while handle domain event[{}:{}] by [{}].",
                        domainEvent.getType(), domainEvent.getId(), handler.getClass().getSimpleName(), t);
            }
        }

        if (taskRunner.isHasError()) {
            hasError = true;
        }

        if (hasError) {
            domainEventDao.failConsume(domainEvent);
        } else {
            domainEventDao.successConsume(domainEvent);
        }
    }
}

原始碼出處:com/mryqr/core/common/domain/event/DomainEventConsumer.java

對於事件處理器DomainEventHandler而言,其地位與應用服務相當,也即它並不處理具體的業務邏輯,而是代理給領域模型進行處理。舉個例子,在碼如雲,當成員姓名更新後,系統中所有記錄該成員姓名的聚合根均需要做相應同步,此時「成員姓名已更新」(MemberNameChangedEvent)事件對應的處理器為:

//MemberNameChangedEventHandler

public class MemberNameChangedEventHandler implements DomainEventHandler {
    private final MemberRepository memberRepository;

    @Override
    public boolean canHandle(DomainEvent domainEvent) {
        return domainEvent.getType() == MEMBER_NAME_CHANGED;
    }

    @Override
    public void handle(DomainEvent domainEvent, MryTaskRunner taskRunner) {
        MemberNameChangedEvent event = (MemberNameChangedEvent) domainEvent;
        memberRepository.byIdOptional(event.getMemberId())
                .ifPresent(memberRepository::syncMemberNameToAllArs);
    }
}

原始碼出處:com/mryqr/core/member/eventhandler/MemberNameChangedEventHandler.java

可以看到,DomainEventHandler並沒有直接完成對姓名的同步,而是將其代理給了領域模型中的MemberRepository,因此DomainEventHandler也應該是很薄的一層。另外,DomainEventHandler是與訊息中介軟體無關的,不管底層使用的是Kafka還是RabbitMQ,DomainEventHandler是不用變化的。

總結

在DDD中,領域事件是用於解耦各個模組(子系統)的常用方式。另外,領域事件的產生、釋出和消費彼此也是解耦的。產生領域事件時,通過本地事件釋出表表保證事件產生和業務操作之間的資料一致性,然後通過「至少一次投遞」的方式釋出事件,消費方通過本地事件消費表的方式保證事件消費的冪等性。在整個釋出和消費的過程中,只有少數幾處存在對訊息中介軟體(Redis Stream)的依賴,其他地方,包括髮布方對事件的產生以及持久化,消費方的各個事件處理器(DomainEventHandler)均是中立於訊息基礎設施的。在下一篇CQRS中,我們將對DDD中的讀寫分離模式進行講解。