1、活動中心場景介紹
在電商系統上線初期,往往會進行一些「拉新」活動,例如活動部門提出新使用者註冊送積分、送優惠券活動。
基於分散式、微服務的設計理念,通常的架構設計(子系統互動)如下圖所示:
其核心繫統介紹如下:
- 賬戶中心
提供使用者登入、使用者註冊等服務,一個新使用者註冊時,向MQ伺服器中的USER_REGISTER主題傳送一條訊息,主流程結束,與送積分,送優惠券等過程解耦。 - 優惠券(券系統)
提供發放優惠券、使用優惠券等與券相關的基礎服務。 - 積分中心
提供積分相關的服務,例如積分贈送、積分消費、積分查詢等基礎服務。 - 送積分服務(消費者)
訂閱MQ,按照規則決定是否需要贈送積分,如果需要則呼叫積分相關的基礎介面,完成積分的發放。 - 送優惠券(消費者)
訂閱MQ,按照規則決定是否需要贈送優惠券,如果需要則呼叫券系統相關的基礎介面,完成優惠券的發放。
上面的架構設計非常優雅,但並不是無懈可擊,讀者們肯定會想到如果新使用者註冊成功,但訊息傳送到MQ失敗,或者訊息成功傳送到MQ,但傳送完MQ後系統出現異常導致使用者註冊失敗又該如何呢?
上面的問題其實就是典型的分散式事務問題:即如何保證使用者註冊(資料庫操作)與MQ訊息傳送這兩個分散式操作的一致性。
RocketMQ事務訊息閃亮登場。
2、事務訊息實現原理
一言以蔽之:RocketMQ事務訊息要解決的問題是訊息傳送與業務的一致性,其解決思路:二階段提交與事務狀態回查,其具體實現流程如下圖所示:
其核心設計理念:
- 應用程式開啟一個資料庫事務,進行資料庫操作,並且在事務中傳送一條PREPARE訊息,PREPARE訊息傳送成功後通知應用程式記錄本地事務狀態,然後提交本地事務。
- RocketMQ在收到型別為PREPARE的訊息時,首先備份訊息的原主題與原訊息消費佇列,然後將訊息儲存在主題為RMQ_SYS_TRANS_HALF_TOPIC的訊息佇列中,故PREPARE的訊息是不會被使用者端消費的。
- Broker訊息伺服器開啟一個定時任務處理RMQ_SYS_TRANS_HALF_TOPIC中的訊息,會每隔指定時間向訊息傳送者發起事務狀態查詢請求 ,詢問訊息傳送者使用者端本地事務是否成功,然後根據回查狀態決定是提交還是回滾,即對處於PREPARE狀態進行提交或回滾操作。
- 傳送者如果明確得知事務成功,則可以返回COMMIT,伺服器端會提交該條訊息,具體操作是恢復原訊息的主題與佇列,重新傳送到Broker,消費端感知後消費。
- 傳送這如果無法明確得知事務狀態,則返回UNOWN,此時伺服器端會等待一定時間後再次向傳送者詢問,預設詢問15次。
- 傳送者如果非常明確得知事務失敗,則可以返回ROLLBACK。
在具體實踐中,訊息傳送者在無法獲取事務狀態時不要武斷的返回ROLLBACK,而是要返回UNOWN,讓伺服器端定時重試回查,說明如下:
在將PREPARE訊息傳送到Broker後,伺服器端發起事務查詢時本地事務可能還未提交,為了避免無效的事務回查機制,RocketMQ通常至少在收到PREPARE訊息6s後才會發起第一次事務回查,可通過 transactionTimeOut 設定。故使用者端在實現事務回查時無法證明事務狀態時不應該返回ROLLBACK,而是返回UNOWN。
3、事務訊息實戰
光說不練假把式,接下來以一個新使用者註冊送優惠券的場景來詳細介紹如何使用事務訊息。
專案模組職責說明如下:
事務訊息的核心程式碼組裝在transaction-service,其核心類圖如下:
其中核心要點如下:
- UserServiceImpl
Dubbo介面業務實現類,類似MVC的控制層,在這裡做一些引數驗證,但不執行具體的業務邏輯,只是傳送一條事務訊息到MQ。 - UserRegTransactionListener
事務監聽器,在 executeLocalTransaction 方法中執行業務邏輯,資料庫本地事務加在該方法。
溫馨提示:之所以不在UserServicveImpl中執行本地事務,是因為 executeLocalTransaction 中丟擲的異常會被RocketMQ框架捕捉,及異常無法被UserServiceImpl感知,即無法實現其事務的一致性。
接下來展示其核心程式碼,如全部原始碼已上傳到github倉庫。
倉庫地址:https://github.com/dingwpmz/rocketmq-learning
3.1 UserServiceImpl 核心實現
UserServiceImpl 的核心要點如下:
- 首先應該對引數進行校驗、業務邏輯進行校驗,如果不滿足業務條件,會傳送一些無效訊息到MQ,雖然不會造成業務異常,但會消耗效能
- 傳送事務訊息,建議對訊息設定Key,Key的值可以用業務處理流水號(可唯一表示該業務操作)或者核心業務欄位(例如訂單編號)
- 業務入口類可通過事務訊息傳送狀態來判斷業務是否失敗。
3.2 UserRegTransactionListener 核心實現
事務監聽器需要實現執行本地事務與事務回查兩個介面。
3.2.1 實現 executeLocalTransaction
首先需要實現 executeLocalTransaction 方法,執行本地事務,其程式碼如下圖所示:
其中幾個關鍵點說明如下:
- 在該方法上新增資料庫事務標籤。
- 執行業務邏輯,範例Demo只是將使用者資料儲存到資料庫。
- 如果業務執行失敗,可明確告知需要回滾,上層呼叫方也可根據ROLLBACK_MESSAGE進行相應的處理。
- 如果業務成功,不建議直接返回COMMIT,而是建議返回UNKNOW,因為該方法儘管在方法最後一行,但可能發生斷電等異常情況,資料庫並沒有成功。
3.2.2 實現 checkLocalTransaction
其次需要實現事務狀態回查,用來RocketMQ伺服器端感知事務是否成功,其實現原理如下圖所示:
其實現關鍵點如下:
- 如果能明確得知本地事務成功,則返回COMMIT_MESSAGE
- 如該不能明確得知本地事務成功,不能返回ROLLBACK_MESSAGE,而是返回UNKNOW,等待伺服器端下一次事務回查(不會立即觸發),伺服器端預設回查15次,如果15次都得到UNKNOW,則會回滾該訊息。
3.3 程式碼獲取
上文只是將事務訊息的核心程式碼加以解讀,並重點闡述每個步驟的實現關鍵點,筆者基於SpringBoot,嘗試結合場景學習RocketMQ的使用技巧,其程式碼上傳到了github倉庫。
https://github.com/dingwpmz/rocketmq-learning,期待你的Star。
丁威,《RocketMQ技術內幕》作者、CSDN部落格專家,原創公眾號『中介軟體興趣圈』維護者。目前就職於中通快遞研發中心擔任資深架構師,負責訊息中介軟體與全鏈路壓測的實施與落地。歡迎大家加我個人微信:dingwpmz,拉您入技術交流群,共同發展,抱團取暖。擅長JAVA程式設計,對主流中介軟體RocketMQ、Dubbo、ElasticJob、Netty、Sentienl、Mybatis、Mycat等中介軟體有深入研究。