Apache RocketMQ 5.0 筆記

2023-01-18 21:00:49

RocketMQ 5.0:雲原生「訊息、事件、流」實時資料處理平臺,覆蓋雲邊端一體化資料處理場景。

核心特性

  • 雲原生:生與雲,長與雲,無限彈性擴縮,K8s友好
  • 高吞吐:萬億級吞吐保證,同時滿足微服務與巨量資料場景
  • 流處理:提供輕量、高擴充套件、高效能和豐富功能的流計算引擎
  • 金融級:金融級的穩定性,廣泛用於交易核心鏈路
  • 架構極簡:零外部依賴,Shared-nothing 架構
  • 生態友好:無縫對接微服務、實時計算、資料湖等周邊生態

1. 基本概念

1、訊息由生產者初始化並行送到Apache RocketMQ 伺服器端。

2、訊息按照到達Apache RocketMQ 伺服器端的順序儲存到主題的指定佇列中。

3、消費者按照指定的訂閱關係從Apache RocketMQ 伺服器端中獲取訊息並消費。

1.1. 訊息

訊息是 Apache RocketMQ 中的最小資料傳輸單元。生產者將業務資料的負載和拓展屬性包裝成訊息傳送到 Apache RocketMQ 伺服器端,伺服器端按照相關語意將訊息投遞到消費端進行消費。

RocketMQ 訊息構成非常簡單,如下所示:

  • topic:表示要傳送的訊息的主題
  • body:表示訊息的儲存內容
  • properties:表示訊息屬性
  • transactionId:會在事務訊息中使用

訊息內部屬性

欄位名 必填 說明
主題名稱

當前訊息所屬的主題的名稱。叢集內全域性唯一。

訊息體 訊息體
訊息型別

Normal:普通訊息,訊息本身無特殊語意,訊息之間也沒有任何關聯。

FIFO:順序訊息,Apache RocketMQ 通過訊息分組MessageGroup標記一組特定訊息的先後順序,可以保證訊息的投遞順序嚴格按照訊息傳送時的順序。

Delay:定時/延時訊息,通過指定延時時間控制訊息生產後不要立即投遞,而是在延時間隔後才對消費者可見。

Transaction:事務訊息,Apache RocketMQ 支援分散式事務訊息,支援應用資料庫更新和訊息呼叫的事務一致性保障。

過濾標籤Tag 方便伺服器過濾使用,消費者可通過Tag對訊息進行過濾,僅接收指定標籤的訊息。目前只支援每個訊息設定一個。
索引Key 訊息的索引鍵,可通過設定不同的Key區分訊息和快速查詢訊息。
定時時間   定時場景下,訊息觸發延時投遞的毫秒級時間戳。
消費重試次數 否   訊息消費失敗後,Apache RocketMQ 伺服器端重新投遞的次數。每次重試後,重試次數加1。
業務自定義屬性   生產者可以自定義設定的擴充套件資訊。

系統預設的訊息最大限制如下:

  • 普通和順序訊息:4 MB
  • 事務和定時或延時訊息:64 KB

1.2. Tag

Topic 與 Tag 都是業務上用來歸類的標識,區別在於 Topic 是一級分類,而 Tag 可以理解為是二級分類。使用 Tag 可以實現對 Topic 中的訊息進行過濾。

提示:

  • Topic:訊息主題,通過 Topic 對不同的業務訊息進行分類。
  • Tag:訊息標籤,用來進一步區分某個 Topic 下的訊息分類,訊息從生產者發出即帶上的屬性。

Topic 和 Tag 的關係如下圖所示:

什麼時候該用 Topic,什麼時候該用 Tag?

可以從以下幾個方面進行判斷:

  • 訊息型別是否一致:如普通訊息、事務訊息、定時(延時)訊息、順序訊息,不同的訊息型別使用不同的 Topic,無法通過 Tag 進行區分。
  • 業務是否相關聯:沒有直接關聯的訊息,如淘寶交易訊息,京東物流訊息使用不同的 Topic 進行區分;而同樣是天貓交易訊息,電器類訂單、女裝類訂單、化妝品類訂單的訊息可以用 Tag 進行區分。
  • 訊息優先順序是否一致:如同樣是物流訊息,盒馬必須小時內送達,天貓超市 24 小時內送達,淘寶物流則相對會慢一些,不同優先順序的訊息用不同的 Topic 進行區分。
  • 訊息量級是否相當:有些業務訊息雖然量小但是實時性要求高,如果跟某些萬億量級的訊息使用同一個 Topic,則有可能會因為過長的等待時間而「餓死」,此時需要將不同量級的訊息進行拆分,使用不同的 Topic。

通常情況下,不同的 Topic 之間的訊息沒有必然的聯絡,而 Tag 則用來區分同一個 Topic 下相互關聯的訊息,例如全集和子集的關係、流程先後的關係。

1.3. Keys

Apache RocketMQ 每個訊息可以在業務層面的設定唯一標識碼 keys 欄位,方便將來定位訊息丟失問題。 Broker 端會為每個訊息建立索引(雜湊索引),應用可以通過 topic、key 來查詢這條訊息內容,以及訊息被誰消費。由於是雜湊索引,請務必保證 key 儘可能唯一,這樣可以避免潛在的雜湊衝突。

// 訂單Id
String orderId = "20034568923546";
message.setKeys(orderId);

1.4. 佇列

一個 Topic 可能有多個佇列,並且可能分佈在不同的 Broker 上。

佇列天然具備順序性,即訊息按照進入佇列的順序寫入儲存,同一佇列間的訊息天然存在順序關係,佇列頭部為最早寫入的訊息,佇列尾部為最新寫入的訊息。訊息在佇列中的位置和訊息之間的順序通過位點(Offset)進行標記管理。

Apache RocketMQ 預設提供訊息可靠儲存機制,所有傳送成功的訊息都被持久化儲存到佇列中,配合生產者和消費者使用者端的呼叫可實現至少投遞一次的可靠性語意。

Apache RocketMQ 佇列模型和Kafka的分割區(Partition)模型類似。在 Apache RocketMQ 訊息收發模型中,佇列屬於主題的一部分,雖然所有的訊息資源以主題粒度管理,但實際的操作實現是面向佇列。例如,生產者指定某個主題,向主題內傳送訊息,但實際訊息傳送到該主題下的某個佇列中。

Apache RocketMQ 中通過修改佇列數量,以此實現橫向的水平擴容和縮容。

一般來說一條訊息,如果沒有重複傳送(比如因為伺服器端沒有響應而進行重試),則只會存在在 Topic 的其中一個佇列中,訊息在佇列中按照先進先出的原則儲存,每條訊息會有自己的位點,每個佇列會統計當前訊息的總條數,這個稱為最大位點 MaxOffset;佇列的起始位置對應的位置叫做起始位點 MinOffset。佇列可以提升訊息傳送和消費的並行度。

注意:按照實際業務消耗設定佇列數,佇列數量的設定應遵循少用夠用原則,避免隨意增加佇列數量。

1.5. 生產者

生產者(Producer)就是訊息的傳送者,Apache RocketMQ 擁有豐富的訊息型別,可以支援不同的應用場景,在不同的場景中,需要使用不同的訊息進行傳送。比如在電商交易中超時未支付關閉訂單的場景,在訂單建立時會傳送一條延時訊息。這條訊息將會在 30 分鐘以後投遞給消費者,消費者收到此訊息後需要判斷對應的訂單是否已完成支付。如支付未完成,則關閉訂單。如已完成支付則忽略,此時就需要用到延遲訊息;電商場景中,業務上要求同一訂單的訊息保持嚴格順序,此時就要用到順序訊息。在紀錄檔處理場景中,可以接受的比較大的傳送延遲,但對吞吐量的要求很高,希望每秒能處理百萬條紀錄檔,此時可以使用批次訊息。在銀行扣款的場景中,要保持上游的扣款操作和下游的簡訊通知保持一致,此時就要使用事務訊息。

注意:不要在同一個主題內使用多種訊息型別

生產者通常被整合在業務系統中,將業務訊息按照要求封裝成 Apache RocketMQ 的訊息(Message)並行送至伺服器端。

生產者和主題的關係為多對多關係,即同一個生產者可以向多個主題傳送訊息,同一個主題也可以接收多個生產者的訊息。

注意:不建議頻繁建立和銷燬生產者

Producer p = ProducerBuilder.build();
for (int i =0;i<n;i++){
    Message m= MessageBuilder.build();
    p.send(m);
}
p.shutdown();

1.6. 消費者與消費者組

如果多個消費者設定了相同的Consumer Group,我們認為這些消費者在同一個消費組內。同一個消費組的多個消費者必須保持消費邏輯和設定一致,共同分擔該消費組訂閱的訊息,實現消費能力的水平擴充套件。

在 Apache RocketMQ 有兩種消費模式,分別是:

  • 叢集消費模式:當使用叢集消費模式時,RocketMQ 認為任意一條訊息只需要被消費組內的任意一個消費者處理即可。
  • 廣播消費模式:當使用廣播消費模式時,RocketMQ 會將每條訊息推播給消費組所有的消費者,保證訊息至少被每個消費者消費一次。

負載均衡

RocketMQ的負載均衡策略與Kafka極其類似,幾乎一毛一樣

叢集模式下,同一個消費組內的消費者會分擔收到的全量訊息,這裡的分配策略是怎樣的?如果擴容消費者是否一定能提升消費能力?

Apache RocketMQ 提供了多種叢集模式下的分配策略,包括平均分配策略、機房優先分配策略、一致性hash分配策略等,可以通過如下程式碼進行設定相應負載均衡策略。

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());

預設的分配策略是平均分配,這也是最常見的策略。平均分配策略下消費組內的消費者會按照類似分頁的策略均攤消費。

在平均分配的演演算法下,可以通過增加消費者的數量來提高消費的並行度。比如下圖中,通過增加消費者來提高消費能力。

但也不是一味地增加消費者就能提升消費能力的,比如下圖中Topic的總佇列數小於消費者的數量時,消費者將分配不到佇列,即使消費者再多也無法提升消費能力。

1.7. 消費者分類

如上圖所示, Apache RocketMQ 的消費者處理訊息時主要經過以下階段:訊息獲取--->訊息處理--->消費狀態提交。

針對以上幾個階段,Apache RocketMQ 提供了不同的消費者型別: PushConsumer 、SimpleConsumer 和 PullConsumer。這幾種型別的消費者通過不同的實現方式和介面可滿足您在不同業務場景下的消費需求。具體差異如下:

注:在實際使用場景中,PullConsumer 僅推薦在流處理框架中整合使用,大多數訊息收發場景使用 PushConsumer 和 SimpleConsumer 就可以滿足需求。

PushConsumer

PushConsumers是一種高度封裝的消費者型別,消費訊息僅通過消費監聽器處理業務並返回消費結果。訊息的獲取、消費狀態提交以及消費重試都通過 Apache RocketMQ 的使用者端SDK完成。

SimpleConsumer

SimpleConsumer 是一種介面原子型的消費者型別,訊息的獲取、消費狀態提交以及消費重試都是通過消費者業務邏輯主動發起呼叫完成。

補充:

rocketmq-client中定義的:

  • DefaultMQProducer
  • DefaultMQPushConsumer
  • DefaultLitePullConsumer

rocketmq-client-java中定義的:

  • Producer
  • PushConsumer
  • SimpleConsumer

1.8. 消費位點

訊息是按到達Apache RocketMQ 伺服器端的先後順序儲存在指定主題的多個佇列中,每條訊息在佇列中都有一個唯一的Long型別座標,這個座標被定義為訊息位點。一條訊息被某個消費者消費完成後不會立即從佇列中刪除,Apache RocketMQ 會基於每個消費者分組記錄消費過的最新一條訊息的位點,即消費位點

如上圖所示,在Apache RocketMQ中每個佇列都會記錄自己的最小位點、最大位點。針對於消費組,還有消費位點的概念,在叢集模式下,消費位點是由使用者端提給交伺服器端儲存的,在廣播模式下,消費位點是由使用者端自己儲存的。一般情況下消費位點正常更新,不會出現訊息重複,但如果消費者發生崩潰或有新的消費者加入群組,就會觸發重平衡,重平衡完成後,每個消費者可能會分配到新的佇列,而不是之前處理的佇列。為了能繼續之前的工作,消費者需要讀取每個佇列最後一次的提交的消費位點,然後從消費位點處繼續拉取訊息。但在實際執行過程中,由於使用者端提交給伺服器端的消費位點並不是實時的,所以重平衡就可能會導致訊息少量重複。

1.9. 訂閱關係

一個訂閱關係指的是指定某個消費者分組對於某個主題的訂閱。

不同消費者分組對於同一個主題的訂閱相互獨立如下圖所示,消費者分組Group A和消費者分組Group B分別以不同的訂閱關係訂閱了同一個主題Topic A,這兩個訂閱關係互相獨立,可以各自定義,不受影響。

同一個消費者分組對於不同主題的訂閱也相互獨立如下圖所示,消費者分組Group A訂閱了兩個主題Topic A和Topic B,對於Group A中的消費者來說,訂閱的Topic A為一個訂閱關係,訂閱的Topic B為另外一個訂閱關係,且這兩個訂閱關係互相獨立,可以各自定義,不受影響。

2. 訊息型別

1、順序訊息(FIFO):這類訊息必須設定 message group,這種型別的訊息需要與FIFO消費者組一起使用

2、延遲訊息(DELAY):訊息被傳送後不會立即對消費者可見,這種型別的訊息必須設定delivery timestamp以決定對消費者可見的時間;

3、事務訊息(TRANSACTIONAL):將一個或多個訊息的釋出包裝到一個事務中,提供提交/回滾方法來決定訊息的可見性;

4、普通訊息(NORMAL):預設型別

不同的型別是互斥的,當意味著要釋出的訊息不能同時是FIFO型別和DELAY型別。實際上,主題的型別決定了訊息的型別。例如,FIFO主題不允許釋出其他型別的訊息。

2.1. 普通訊息

普通訊息一般應用於微服務解耦、事件驅動、資料整合等場景,這些場景大多數要求資料傳輸通道具有可靠傳輸的能力,且對訊息的處理時機、處理順序沒有特別要求。

典型場景一:微服務非同步解耦

如上圖所示,以線上的電商交易場景為例,上游訂單系統將使用者下單支付這一業務事件封裝成獨立的普通訊息並行送至Apache RocketMQ伺服器端,下游按需從伺服器端訂閱訊息並按照本地消費邏輯處理下游任務。每個訊息之間都是相互獨立的,且不需要產生關聯。

典型場景二:資料整合傳輸

如上圖所示,以離線的紀錄檔收集場景為例,通過埋點元件收集前端應用的相關操作紀錄檔,並轉發到 Apache RocketMQ 。每條訊息都是一段紀錄檔資料,Apache RocketMQ 不做任何處理,只需要將紀錄檔資料可靠投遞到下游的儲存系統和分析系統即可,後續功能由後端應用完成。

2.2. 順序訊息

應用場景

在有序事件處理、撮合交易、資料實時增量同步等場景下,異構系統間需要維持強一致的狀態同步,上游的事件變更需要按照順序傳遞到下游進行處理。在這類場景下使用 Apache RocketMQ 的順序訊息可以有效保證資料傳輸的順序性。

典型場景一:撮合交易

以證券、股票交易撮合場景為例,對於出價相同的交易單,堅持按照先出價先交易的原則,下游處理訂單的系統需要嚴格按照出價順序來處理訂單。

典型場景二:資料實時增量同步

以資料庫變更增量同步場景為例,上游源端資料庫按需執行增刪改操作,將二進位制操作紀錄檔作為訊息,通過 Apache RocketMQ 傳輸到下游搜尋系統,下游系統按順序還原訊息資料,實現狀態資料按序重新整理。如果是普通訊息則可能會導致狀態混亂,和預期操作結果不符,基於順序訊息可以實現下游狀態和上游操作結果一致。

功能原理

順序訊息是 Apache RocketMQ 提供的一種高階訊息型別,支援消費者按照傳送訊息的先後順序獲取訊息,從而實現業務場景中的順序處理。 相比其他型別訊息,順序訊息在傳送、儲存和投遞的處理過程中,更多強調多條訊息間的先後順序關係。

Apache RocketMQ 順序訊息的順序關係通過訊息組(MessageGroup)判定和識別,傳送順序訊息時需要為每條訊息設定歸屬的訊息組,相同訊息組的多條訊息之間遵循先進先出的順序關係,不同訊息組、無訊息組的訊息之間不涉及順序性。

基於訊息組的順序判定邏輯,支援按照業務邏輯做細粒度拆分,可以在滿足業務區域性順序的前提下提高系統的並行度和吞吐能力。

如何保證訊息的順序性?

Apache RocketMQ 的訊息的順序性分為兩部分,生產順序性和消費順序性。

1、生產順序性

如需保證訊息生產的順序性,則必須滿足以下條件:

  • 單一生產者:訊息生產的順序性僅支援單一生產者,不同生產者分佈在不同的系統,即使設定相同的訊息組,不同生產者之間產生的訊息也無法判定其先後順序。
  • 序列傳送:Apache RocketMQ 生產者使用者端支援多執行緒安全存取,但如果生產者使用多執行緒並行傳送,則不同執行緒間產生的訊息將無法判定其先後順序。

滿足以上條件的生產者,將順序訊息傳送至 Apache RocketMQ 後,會保證設定了同一訊息組的訊息,按照傳送順序儲存在同一佇列中。伺服器端順序儲存邏輯如下:

  • 相同訊息組的訊息按照先後順序被儲存在同一個佇列。
  • 不同訊息組的訊息可以混合在同一個佇列中,且不保證連續。

2、消費順序性

如需保證訊息消費的順序性,則必須滿足以下條件:

  • 投遞順序:Apache RocketMQ 通過使用者端SDK和伺服器端通訊協定保障訊息按照伺服器端儲存順序投遞,但業務方消費訊息時需要嚴格按照接收---處理---應答的語意處理訊息,避免因非同步處理導致訊息亂序。
  • 有限重試:Apache RocketMQ 順序訊息投遞僅在重試次數限定範圍內,即一條訊息如果一直重試失敗,超過最大重試次數後將不再重試,跳過這條訊息消費,不會一直阻塞後續訊息處理。對於需要嚴格保證消費順序的場景,請務設定合理的重試次數,避免引數不合理導致訊息亂序。

生產順序性和消費順序性組合

如果訊息需要嚴格按照先進先出(FIFO)的原則處理,即先傳送的先消費、後傳送的後消費,則必須要同時滿足生產順序性和消費順序性。

一般業務場景下,同一個生產者可能對接多個下游消費者,不一定所有的消費者業務都需要順序消費,您可以將生產順序性和消費順序性進行差異化組合,應用於不同的業務場景。例如傳送順序訊息,但使用非順序的並行消費方式來提高吞吐能力。更多組合方式如下表所示:

生產順序 消費順序 順序性效果
設定訊息組,保證訊息順序傳送。 順序消費 按照訊息組粒度,嚴格保證訊息順序。 同一訊息組內的訊息的消費順序和傳送順序完全一致。
設定訊息組,保證訊息順序傳送。 並行消費 並行消費,儘可能按時間順序處理。
未設定訊息組,訊息亂序傳送。 順序消費 按佇列儲存粒度,嚴格順序。 基於 Apache RocketMQ 本身佇列的屬性,消費順序和佇列儲存的順序一致,但不保證和傳送順序一致。
未設定訊息組,訊息亂序傳送。 並行消費 並行消費,儘可能按照時間順序處理。

2.3. 定時/延時訊息

注:定時訊息和延時訊息本質相同,都是伺服器端根據訊息設定的定時時間在某一固定時刻將訊息投遞給消費者消費。

應用場景

在分散式定時排程觸發、任務超時處理等場景,需要實現精準、可靠的定時事件觸發。使用 Apache RocketMQ 的定時訊息可以簡化定時排程任務的開發邏

輯,實現高效能、可延伸、高可靠的定時觸發能力。

典型場景一:分散式定時排程

在分散式定時排程場景下,需要實現各類精度的定時任務,例如每天5點執行檔案清理,每隔2分鐘觸發一次訊息推播等需求。基於 Apache RocketMQ 的定時訊息可以封裝出多種型別的定時觸發器。

典型場景二:任務超時處理

以電商交易場景為例,訂單下單後暫未支付,此時不可以直接關閉訂單,而是需要等待一段時間後才能關閉訂單。使用 Apache RocketMQ 定時訊息可以實現超時任務的檢查觸發。

基於定時訊息的超時任務處理具備如下優勢:

  • 精度高、開發門檻低:基於訊息通知方式不存在定時階梯間隔。可以輕鬆實現任意精度事件觸發,無需業務去重。
  • 高效能可延伸:傳統的資料庫掃描方式較為複雜,需要頻繁呼叫介面掃描,容易產生效能瓶頸。 Apache RocketMQ 的定時訊息具有高並行和水平擴充套件的能力。

功能原理

定時時間設定原則

Apache RocketMQ 定時訊息設定的定時時間是一個預期觸發的系統時間戳,延時時間也需要轉換成當前系統時間後的某一個時間戳,而不是一段延時時長。

投遞等級

Apache RocketMQ 一共支援18個等級的延遲投遞,具體時間如下:

2.4. 事務訊息

以電商交易場景為例,使用者支付訂單這一核心操作的同時會涉及到下游物流發貨、積分變更、購物車狀態清空等多個子系統的變更。當前業務的處理分支包括:

  • 主分支訂單系統狀態更新:由未支付變更為支付成功。
  • 物流系統狀態新增:新增待發貨物流記錄,建立訂單物流記錄。
  • 積分系統狀態變更:變更使用者積分,更新使用者積分表。
  • 購物車系統狀態變更:清空購物車,更新使用者購物車記錄。

使用普通訊息和訂單事務無法保證一致的原因,本質上是由於普通訊息無法像單機資料庫事務一樣,具備提交、回滾和統一協調的能力。而基於 RocketMQ 的分散式事務訊息功能,在普通訊息基礎上,支援二階段的提交能力。將二階段提交和本地事務繫結,實現全域性提交結果的一致性。

事務訊息傳送分為兩個階段。第一階段會傳送一個半事務訊息,半事務訊息是指暫不能投遞的訊息,生產者已經成功地將訊息傳送到了 Broker,但是Broker 未收到生產者對該訊息的二次確認,此時該訊息被標記成「暫不能投遞」狀態,如果傳送成功則執行本地事務,並根據本地事務執行成功與否,向 Broker 半事務訊息狀態(commit或者rollback),半事務訊息只有 commit 狀態才會真正向下游投遞。如果由於網路閃斷、生產者應用重啟等原因,導致某條事務訊息的二次確認丟失,Broker 端會通過掃描發現某條訊息長期處於「半事務訊息」時,需要主動向訊息生產者詢問該訊息的最終狀態(Commit或是Rollback)。這樣最終保證了本地事務執行成功,下游就能收到訊息,本地事務執行失敗,下游就收不到訊息。總而保證了上下游資料的一致性。(PS:重點是兩階段提交

事務訊息處理流程

1、生產者將訊息傳送至Apache RocketMQ伺服器端。

2、Apache RocketMQ伺服器端將訊息持久化成功之後,向生產者返回Ack確認訊息已經傳送成功,此時訊息被標記為"暫不能投遞",這種狀態下的訊息即為半事務訊息。

3、生產者開始執行本地事務邏輯。

4、生產者根據本地事務執行結果向伺服器端提交二次確認結果(Commit或是Rollback),伺服器端收到確認結果後處理邏輯如下:

  • 二次確認結果為Commit:伺服器端將半事務訊息標記為可投遞,並投遞給消費者。
  • 二次確認結果為Rollback:伺服器端將回滾事務,不會將半事務訊息投遞給消費者。

5、在斷網或者是生產者應用重啟的特殊情況下,若伺服器端未收到傳送者提交的二次確認結果,或伺服器端收到的二次確認結果為Unknown未知狀態,經過固定時間後,伺服器端將對訊息生產者即生產者叢集中任一生產者範例發起訊息回查。

6、生產者收到訊息回查後,需要檢查對應訊息的本地事務執行的最終結果。

7、生產者根據檢查到的本地事務的最終狀態再次提交二次確認,伺服器端仍按照步驟4對半事務訊息進行處理。

3. 機制

3.1. 訊息傳送重試機制

Apache RocketMQ 使用者端連線伺服器端發起訊息傳送請求時,可能會因為網路故障、服務異常等原因導致呼叫失敗。為保證訊息的可靠性, Apache RocketMQ 在使用者端SDK中內建請求重試邏輯,嘗試通過重試傳送達到最終呼叫成功的效果。

同步傳送和非同步傳送模式均支援訊息傳送重試。

重試觸發條件:

  • 使用者端訊息傳送請求呼叫失敗或請求超時
  • 網路異常造成連線失敗或請求超時
  • 伺服器端節點處於重啟或下線等狀態造成連線失敗
  • 伺服器端執行慢造成請求超時
  • 伺服器端返回失敗錯誤碼

重試流程:

生產者在初始化時設定訊息傳送最大重試次數,當出現上述觸發條件的場景時,生產者使用者端會按照設定的重試次數一直重試傳送訊息,直到訊息傳送成功或達到最大重試次數重試結束,並在最後一次重試失敗後返回撥用錯誤響應。

  • 同步傳送:呼叫執行緒會一直阻塞,直到某次重試成功或最終重試失敗,丟擲錯誤碼和異常。
  • 非同步傳送:呼叫執行緒不會阻塞,但呼叫結果會通過異常事件或者成功事件返回。

重試間隔

  • 除伺服器端返回系統流控錯誤場景,其他觸發條件觸發重試後,均會立即進行重試,無等待間隔。
  • 若由於伺服器端返回流控錯誤觸發重試,系統會按照指數退避策略進行延遲重試。指數退避演演算法通過以下引數控制重試行為:
    • INITIAL_BACKOFF: 第一次失敗重試前後需等待多久,預設值:1秒
    • MULTIPLIER :指數退避因子,即退避倍率,預設值:1.6
    • JITTER :隨機抖動因子,預設值:0.2
    • MAX_BACKOFF :等待間隔時間上限,預設值:120秒
    • MIN_CONNECT_TIMEOUT :最短重試間隔,預設值:20秒 

3.2. 訊息流控機制

訊息流控指的是系統容量或水位過高, Apache RocketMQ 伺服器端會通過快速失敗返回流控錯誤來避免底層資源承受過高壓力。

觸發條件

  • 儲存壓力大:消費者分組的初始消費位點為當前佇列的最大消費位點。
  • 伺服器端請求任務排隊溢位:若消費者消費能力不足,導致佇列中有大量堆積訊息,當堆積訊息超過一定數量後會觸發訊息流控,減少下游消費系統壓力。

流控行為

當系統觸發訊息傳送流控時,使用者端會收到系統限流錯誤和異常,錯誤碼資訊如下:

  • reply-code:530
  • reply-text:TOO_MANY_REQUESTS

3.3. 消費重試

消費者出現異常,消費某條訊息失敗時, Apache RocketMQ 會根據消費重試策略重新投遞該訊息。消費重試主要解決的是業務處理邏輯失敗導致的消費完整性問題,是一種為業務兜底的策略,不應該被用做業務流程控制。

推薦使用訊息重試場景如下:

  • 業務處理失敗,且失敗原因跟當前的訊息內容相關,比如該訊息對應的事務狀態還未獲取到,預期一段時間後可執行成功。
  • 消費失敗的原因不會導致連續性,即當前訊息消費失敗是一個小概率事件,不是常態化的失敗,後面的訊息大概率會消費成功。此時可以對當前訊息進行重試,避免程序阻塞。

消費重試策略

消費重試指的是,消費者在消費某條訊息失敗後,Apache RocketMQ 伺服器端會根據重試策略重新消費該訊息,超過一次定數後若還未消費成功,則該訊息將不再繼續重試,直接被傳送到死信佇列中。

訊息重試的觸發條件

  • 消費失敗,包括消費者返回訊息失敗狀態標識或丟擲非預期異常。
  • 訊息處理超時,包括在PushConsumer中排隊超時。

重試策略差異

3.4. 消費進度

訊息位點(Offset)

訊息是按到達伺服器端的先後順序儲存在指定主題的多個佇列中,每條訊息在佇列中都有一個唯一的Long型別座標,這個座標被定義為訊息位點。

任意一個訊息佇列在邏輯上都是無限儲存,即訊息位點會從0到Long.MAX無限增加。通過主題、佇列和位點就可以定位任意一條訊息的位置,具體關係如下圖所示:

Apache RocketMQ 定義佇列中最早一條訊息的位點為最小訊息位點(MinOffset);最新一條訊息的位點為最大訊息位點(MaxOffset)。雖然訊息佇列邏輯上是無限儲存,但由於伺服器端物理節點的儲存空間有限, Apache RocketMQ 會捲動刪除佇列中儲存最早的訊息。因此,訊息的最小消費位點和最大消費位點會一直遞增變化。

消費位點(ConsumerOffset)

Apache RocketMQ 領域模型為釋出訂閱模式,每個主題的佇列都可以被多個消費者分組訂閱。若某條訊息被某個消費者消費後直接被刪除,則其他訂閱了該主題的消費者將無法消費該訊息。

因此,Apache RocketMQ 通過消費位點管理訊息的消費進度。每條訊息被某個消費者消費完成後不會立即在佇列中刪除,Apache RocketMQ 會基於每個消費者分組維護一份消費記錄,該記錄指定消費者分組消費某一個佇列時,消費過的最新一條訊息的位點,即消費位點。

當消費者使用者端離線,又再次重新上線時,會嚴格按照伺服器端儲存的消費進度繼續處理訊息。如果伺服器端儲存的歷史位點資訊已過期被刪除,此時消費位點向前移動至伺服器端儲存的最小位點。

注:消費位點的儲存和恢復是基於 Apache RocketMQ 伺服器端的儲存實現,和任何消費者無關。

佇列中訊息位點MinOffset、MaxOffset和每個消費者分組的消費位點ConsumerOffset的關係如下:

ConsumerOffset≤MaxOffset:

  • 當消費速度和生產速度一致,且全部訊息都處理完成時,最大訊息位點和消費位點相同,即ConsumerOffset=MaxOffset
  • 當消費速度較慢小於生產速度時,佇列中會有部分訊息未消費,此時消費位點小於最大訊息位點,即ConsumerOffset<MaxOffset,兩者之差就是該佇列中堆積的訊息量

ConsumerOffset≥MinOffset:

  • 正常情況下有效的消費位點ConsumerOffset必然大於等於最小訊息位點MinOffset。消費位點小於最小訊息位點時是無效的,相當於消費者要消費的訊息已經從佇列中刪除了,是無法消費到的,此時伺服器端會將消費位點強制糾正到合法的訊息位點。

消費位點初始值

消費位點初始值指的是消費者分組首次啟動消費者消費訊息時伺服器端儲存的消費位點的初始值。Apache RocketMQ 定義消費位點的初始值為消費者首次獲取訊息時,該時刻佇列中的最大訊息位點。相當於消費者將從佇列中最新的訊息開始消費。

3.5. 訊息儲存機制

Apache RocketMQ 使用儲存時長作為訊息儲存的依據,即每個節點對外承諾訊息的儲存時長。在儲存時長範圍內的訊息都會被保留,無論訊息是否被消費;超過時長限制的訊息則會被清理掉。

4. 架構

4.1. 技術架構

RocketMQ架構上主要分為四部分,如上圖所示:

  • Producer:訊息釋出的角色,支援分散式叢集方式部署。Producer通過MQ的負載均衡模組選擇相應的Broker叢集佇列進行訊息投遞,投遞的過程支援快速失敗並且低延遲。
  • Consumer:訊息消費的角色,支援分散式叢集方式部署。支援以push推,pull拉兩種模式對訊息進行消費。同時也支援叢集方式和廣播方式的消費,它提供實時訊息訂閱機制,可以滿足大多數使用者的需求。
  • NameServer:NameServer是一個非常簡單的Topic路由註冊中心,其角色類似Dubbo中的zookeeper,支援Broker的動態註冊與發現。主要包括兩個功能:Broker管理,NameServer接受Broker叢集的註冊資訊並且儲存下來作為路由資訊的基本資料。然後提供心跳檢測機制,檢查Broker是否還存活;路由資訊管理,每個NameServer將儲存關於Broker叢集的整個路由資訊和用於使用者端查詢的佇列資訊。然後Producer和Consumer通過NameServer就可以知道整個Broker叢集的路由資訊,從而進行訊息的投遞和消費。NameServer通常也是叢集的方式部署,各範例間相互不進行資訊通訊。Broker是向每一臺NameServer註冊自己的路由資訊,所以每一個NameServer範例上面都儲存一份完整的路由資訊。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由資訊,Producer和Consumer仍然可以動態感知Broker的路由的資訊。
  • BrokerServer:Broker主要負責訊息的儲存、投遞和查詢以及服務高可用保證,為了實現這些功能,Broker包含了以下幾個重要子模組。

4.2. 部署架構

RocketMQ 網路部署特點:

  • NameServer是一個幾乎無狀態節點,可叢集部署,節點之間無任何資訊同步。
  • Broker部署相對複雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave 的對應關係通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer叢集中的所有節點建立長連線,定時註冊Topic資訊到所有NameServer。 注意:當前RocketMQ版本在部署架構上支援一Master多Slave,但只有BrokerId=1的從伺服器才會參與訊息的讀負載。
  • Producer與NameServer叢集中的其中一個節點(隨機選擇)建立長連線,定期從NameServer獲取Topic路由資訊,並向提供Topic 服務的Master建立長連線,且定時向Master傳送心跳。Producer完全無狀態,可叢集部署。

  • Consumer與NameServer叢集中的其中一個節點(隨機選擇)建立長連線,定期從NameServer獲取Topic路由資訊,並向提供Topic服務的Master、Slave建立長連線,且定時向Master、Slave傳送心跳。Consumer既可以從Master訂閱訊息,也可以從Slave訂閱訊息,消費者在向Master拉取訊息時,Master伺服器會根據拉取偏移量與最大偏移量的距離(判斷是否讀老訊息,產生讀I/O),以及從伺服器是否可讀等因素建議下一次是從Master還是Slave拉取。

結合部署架構圖,描述叢集工作流程:

  • 啟動NameServer,NameServer起來後監聽埠,等待Broker、Producer、Consumer連上來,相當於一個路由控制中心。
  • Broker啟動,跟所有的NameServer保持長連線,定時傳送心跳包。心跳包中包含當前Broker資訊(IP+埠等)以及儲存所有Topic資訊。註冊成功後,NameServer叢集中就有Topic跟Broker的對映關係。
  • 收發訊息前,先建立Topic,建立Topic時需要指定該Topic要儲存在哪些Broker上,也可以在傳送訊息時自動建立Topic。
  • Producer傳送訊息,啟動時先跟NameServer叢集中的其中一臺建立長連線,並從NameServer中獲取當前傳送的Topic存在哪些Broker上,輪詢從佇列列表中選擇一個佇列,然後與佇列所在的Broker建立長連線從而向Broker發訊息。
  • Consumer跟Producer類似,跟其中一臺NameServer建立長連線,獲取當前訂閱Topic存在哪些Broker上,然後直接跟Broker建立連線通道,開始消費訊息。

5. 使用者端

在編寫使用者端程式碼時,首先準備一個簡單的環境,可以選用Local模式。這裡不多介紹,只說一句,啟動broker的時候可以-c指定組態檔,啟動完以後通過jps檢視程序

通過mqadmin命令建立並檢視主題

mqadmin updateTopic -n localhost:9876 -b 172.16.52.116:10911 -t TEST_TOPIC
mqadmin topicList -n localhost:9876

具體命令引數,參見  https://rocketmq.apache.org/zh/docs/deploymentOperations/16admintool/

也可以通過RocketMQ Dashboard建立主題

5.1. rocketmq-client

引入依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.0.0</version>
</dependency>

程式碼片段

public class AppTest extends TestCase {

    private String producerGroupName = "MyProducerGroup";
    private String consumerGroupName = "MyConsumerGroup";

    /**
     * 傳送同步訊息
     */
    @Test
    public void testSyncProducer() throws Exception {
        // 範例化訊息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
        // 設定NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 啟動Producer範例
        producer.start();
        // 傳送訊息
        Message message = new Message("TEST_TOPIC", "A", "UserID12345", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);
        // 關閉Producer範例
        producer.shutdown();
    }

    /**
     * 傳送非同步訊息
     */
    @Test
    public void testAsyncProducer() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        Message msg = new Message("TEST_TOPIC", "B", "OrderID12346", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // SendCallback接收非同步返回結果的回撥
        producer.send(msg, new SendCallback() {
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
        //  等待5秒
        TimeUnit.SECONDS.sleep(5);
    }

    /**
     * 單向傳送訊息
     * 這種方式主要用在不特別關心傳送結果的場景,例如紀錄檔傳送。
     */
    @Test
    public void testOnewayProducer() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TEST_TOPIC", "C", "OrderID12348", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 傳送單向訊息,沒有任何返回結果
        producer.sendOneway(msg);
    }

    /**
     * 消費訊息
     */
    @Test
    public void testConsumer() throws Exception {
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroupName);
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TEST_TOPIC", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.start();

        while (true) {
            List<MessageExt> messageExts = consumer.poll();
            if (messageExts.isEmpty()) {
                continue;
            }
            messageExts.forEach(msg -> {
                System.out.println(String.format("MsgId: %s, MsgBody: %s", msg.getMsgId(), new String(msg.getBody())));
            });
            consumer.commitSync();
        }
    }
}

5.2. rocketmq-spring-boot-starter

依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

application.yml

設定項詳見 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties

rocketmq:
  name-server: localhost:9876
  producer:
    group: MyProducerGroup
    send-message-timeout: 10000
  consumer:
    group: MyConsumerGroup

傳送訊息

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: ChengJianSheng
 * @Date: 2023/1/18
 */
@RestController
@RequestMapping("/message")
public class MessageController {

    private String springTopic = "SPRING_TOPIC";
    private String userTopic = "USER_TOPIC";
    private String orderTopic = "ORDER_TOPIC";
    private String extTopic = "EXT_TOPIC";
    private String reqTopic = "REQ_TOPIC";
    private String objTopic = "OBJECT_TOPIC";


    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send")
    public String send() {

        SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello World");

        Message message = MessageBuilder.withPayload("Hello World!2222".getBytes()).build();
        sendResult = rocketMQTemplate.syncSend(springTopic, message);

        message = MessageBuilder.withPayload("Hello, World! I'm from spring message").build();
        sendResult = rocketMQTemplate.syncSend(springTopic, message);


        sendResult = rocketMQTemplate.syncSend(userTopic, new User("zhangsan", 20));

        message = MessageBuilder.withPayload(new User("lisi", 21))
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
                .build();
        sendResult = rocketMQTemplate.syncSend(userTopic, message);


        rocketMQTemplate.asyncSend(orderTopic, new Order("oid1234", "4.56"), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("async onSucess SendResult=%s %n", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.printf("async onException Throwable=%s %n", throwable);
            }
        });

        rocketMQTemplate.convertAndSend(extTopic + ":tag0", "I'm from tag0");
        rocketMQTemplate.convertAndSend(extTopic + ":tag1", "I'm from tag1");

        String replyString = rocketMQTemplate.sendAndReceive(reqTopic, "request string", String.class);
        System.out.printf("receive %s %n", replyString);
        User replyUser = rocketMQTemplate.sendAndReceive(objTopic, new User("wangwu", 21), User.class);
        System.out.printf("receive %s %n", replyUser);

        return "ok";
    }
}

接收訊息

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "SPRING_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class StringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.printf("------- StringConsumer received: %s \n", message);
    }
}

@Component
@RocketMQMessageListener(topic = "ORDER_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class OrderConsumer implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order message) {
        System.out.printf("------- OrderConsumer received: %s [orderId : %s]\n", message, message.getOrderNo());
    }
}

@Component
@RocketMQMessageListener(topic = "USER_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class UserConsumer implements RocketMQListener<User>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(User message) {
        System.out.printf("------ UserConsumer received: %s ; age: %s ; name: %s \n", message, message.getAge(), message.getName());
    }
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    }
}

@Component
@RocketMQMessageListener(topic = "REQ_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
    @Override
    public String onMessage(String message) {
        System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
        return "reply string";
    }
}

@Component
@RocketMQMessageListener(topic = "OBJECT_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User> {
    @Override
    public User onMessage(User message) {
        System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", message);
        return new User("tom", 8);
    }
}

@Component
@RocketMQMessageListener(topic = "EXT_TOPIC", selectorExpression = "tag0||tag1", consumerGroup = "${rocketmq.consumer.group}")
public class MessageExtConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        System.out.printf("------- MessageExtConsumer received message, msgId: %s, body:%s \n", message.getMsgId(), new String(message.getBody()));
    }
}

6. 檔案

https://rocketmq.apache.org/zh/

https://rocketmq.apache.org/zh/docs/deploymentOperations/15deploy/

https://github.com/apache/rocketmq/tree/rocketmq-all-5.0.0/docs/cn

https://github.com/apache/rocketmq/blob/rocketmq-all-5.0.0/docs/cn/architecture.md

https://github.com/apache/rocketmq/blob/rocketmq-all-5.0.0/docs/cn/RocketMQ_Example.md

https://github.com/apache/rocketmq-dashboard

https://github.com/apache/rocketmq-spring

https://github.com/apache/rocketmq