RocketMQ 5.0:雲原生「訊息、事件、流」實時資料處理平臺,覆蓋雲邊端一體化資料處理場景。
核心特性
1. 基本概念
1、訊息由生產者初始化並行送到Apache RocketMQ 伺服器端。
2、訊息按照到達Apache RocketMQ 伺服器端的順序儲存到主題的指定佇列中。
3、消費者按照指定的訂閱關係從Apache RocketMQ 伺服器端中獲取訊息並消費。
1.1. 訊息
訊息是 Apache RocketMQ 中的最小資料傳輸單元。生產者將業務資料的負載和拓展屬性包裝成訊息傳送到 Apache RocketMQ 伺服器端,伺服器端按照相關語意將訊息投遞到消費端進行消費。
RocketMQ 訊息構成非常簡單,如下所示:
訊息內部屬性
欄位名 | 必填 | 說明 |
主題名稱 | 是 |
當前訊息所屬的主題的名稱。叢集內全域性唯一。 |
訊息體 | 是 | 訊息體 |
訊息型別 | 是 |
Normal:普通訊息,訊息本身無特殊語意,訊息之間也沒有任何關聯。 FIFO:順序訊息,Apache RocketMQ 通過訊息分組MessageGroup標記一組特定訊息的先後順序,可以保證訊息的投遞順序嚴格按照訊息傳送時的順序。 Delay:定時/延時訊息,通過指定延時時間控制訊息生產後不要立即投遞,而是在延時間隔後才對消費者可見。 Transaction:事務訊息,Apache RocketMQ 支援分散式事務訊息,支援應用資料庫更新和訊息呼叫的事務一致性保障。 |
過濾標籤Tag | 否 | 方便伺服器過濾使用,消費者可通過Tag對訊息進行過濾,僅接收指定標籤的訊息。目前只支援每個訊息設定一個。 |
索引Key | 否 | 訊息的索引鍵,可通過設定不同的Key區分訊息和快速查詢訊息。 |
定時時間 | 否 | 定時場景下,訊息觸發延時投遞的毫秒級時間戳。 |
消費重試次數 | 否 | 訊息消費失敗後,Apache RocketMQ 伺服器端重新投遞的次數。每次重試後,重試次數加1。 |
業務自定義屬性 | 否 | 生產者可以自定義設定的擴充套件資訊。 |
系統預設的訊息最大限制如下:
1.2. Tag
Topic 與 Tag 都是業務上用來歸類的標識,區別在於 Topic 是一級分類,而 Tag 可以理解為是二級分類。使用 Tag 可以實現對 Topic 中的訊息進行過濾。
提示:
- Topic:訊息主題,通過 Topic 對不同的業務訊息進行分類。
- Tag:訊息標籤,用來進一步區分某個 Topic 下的訊息分類,訊息從生產者發出即帶上的屬性。
Topic 和 Tag 的關係如下圖所示:
什麼時候該用 Topic,什麼時候該用 Tag?
可以從以下幾個方面進行判斷:
通常情況下,不同的 Topic 之間的訊息沒有必然的聯絡,而 Tag 則用來區分同一個 Topic 下相互關聯的訊息,例如全集和子集的關係、流程先後的關係。
1.3. Keys
Apache RocketMQ 每個訊息可以在業務層面的設定唯一標識碼 keys 欄位,方便將來定位訊息丟失問題。 Broker 端會為每個訊息建立索引(雜湊索引),應用可以通過 topic、key 來查詢這條訊息內容,以及訊息被誰消費。由於是雜湊索引,請務必保證 key 儘可能唯一,這樣可以避免潛在的雜湊衝突。
// 訂單Id
String orderId = "20034568923546";
message.setKeys(orderId);
1.4. 佇列
一個 Topic 可能有多個佇列,並且可能分佈在不同的 Broker 上。
佇列天然具備順序性,即訊息按照進入佇列的順序寫入儲存,同一佇列間的訊息天然存在順序關係,佇列頭部為最早寫入的訊息,佇列尾部為最新寫入的訊息。訊息在佇列中的位置和訊息之間的順序通過位點(Offset)進行標記管理。
Apache RocketMQ 預設提供訊息可靠儲存機制,所有傳送成功的訊息都被持久化儲存到佇列中,配合生產者和消費者使用者端的呼叫可實現至少投遞一次的可靠性語意。
Apache RocketMQ 佇列模型和Kafka的分割區(Partition)模型類似。在 Apache RocketMQ 訊息收發模型中,佇列屬於主題的一部分,雖然所有的訊息資源以主題粒度管理,但實際的操作實現是面向佇列。例如,生產者指定某個主題,向主題內傳送訊息,但實際訊息傳送到該主題下的某個佇列中。
Apache RocketMQ 中通過修改佇列數量,以此實現橫向的水平擴容和縮容。
一般來說一條訊息,如果沒有重複傳送(比如因為伺服器端沒有響應而進行重試),則只會存在在 Topic 的其中一個佇列中,訊息在佇列中按照先進先出的原則儲存,每條訊息會有自己的位點,每個佇列會統計當前訊息的總條數,這個稱為最大位點 MaxOffset;佇列的起始位置對應的位置叫做起始位點 MinOffset。佇列可以提升訊息傳送和消費的並行度。
注意:按照實際業務消耗設定佇列數,佇列數量的設定應遵循少用夠用原則,避免隨意增加佇列數量。
1.5. 生產者
生產者(Producer)就是訊息的傳送者,Apache RocketMQ 擁有豐富的訊息型別,可以支援不同的應用場景,在不同的場景中,需要使用不同的訊息進行傳送。比如在電商交易中超時未支付關閉訂單的場景,在訂單建立時會傳送一條延時訊息。這條訊息將會在 30 分鐘以後投遞給消費者,消費者收到此訊息後需要判斷對應的訂單是否已完成支付。如支付未完成,則關閉訂單。如已完成支付則忽略,此時就需要用到延遲訊息;電商場景中,業務上要求同一訂單的訊息保持嚴格順序,此時就要用到順序訊息。在紀錄檔處理場景中,可以接受的比較大的傳送延遲,但對吞吐量的要求很高,希望每秒能處理百萬條紀錄檔,此時可以使用批次訊息。在銀行扣款的場景中,要保持上游的扣款操作和下游的簡訊通知保持一致,此時就要使用事務訊息。
注意:不要在同一個主題內使用多種訊息型別
生產者通常被整合在業務系統中,將業務訊息按照要求封裝成 Apache RocketMQ 的訊息(Message)並行送至伺服器端。
生產者和主題的關係為多對多關係,即同一個生產者可以向多個主題傳送訊息,同一個主題也可以接收多個生產者的訊息。
注意:不建議頻繁建立和銷燬生產者
Producer p = ProducerBuilder.build();
for (int i =0;i<n;i++){
Message m= MessageBuilder.build();
p.send(m);
}
p.shutdown();
1.6. 消費者與消費者組
如果多個消費者設定了相同的Consumer Group,我們認為這些消費者在同一個消費組內。同一個消費組的多個消費者必須保持消費邏輯和設定一致,共同分擔該消費組訂閱的訊息,實現消費能力的水平擴充套件。
在 Apache RocketMQ 有兩種消費模式,分別是:
負載均衡
RocketMQ的負載均衡策略與Kafka極其類似,幾乎一毛一樣
叢集模式下,同一個消費組內的消費者會分擔收到的全量訊息,這裡的分配策略是怎樣的?如果擴容消費者是否一定能提升消費能力?
Apache RocketMQ 提供了多種叢集模式下的分配策略,包括平均分配策略、機房優先分配策略、一致性hash分配策略等,可以通過如下程式碼進行設定相應負載均衡策略。
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
預設的分配策略是平均分配,這也是最常見的策略。平均分配策略下消費組內的消費者會按照類似分頁的策略均攤消費。
在平均分配的演演算法下,可以通過增加消費者的數量來提高消費的並行度。比如下圖中,通過增加消費者來提高消費能力。
但也不是一味地增加消費者就能提升消費能力的,比如下圖中Topic的總佇列數小於消費者的數量時,消費者將分配不到佇列,即使消費者再多也無法提升消費能力。
1.7. 消費者分類
如上圖所示, Apache RocketMQ 的消費者處理訊息時主要經過以下階段:訊息獲取--->訊息處理--->消費狀態提交。
針對以上幾個階段,Apache RocketMQ 提供了不同的消費者型別: PushConsumer 、SimpleConsumer 和 PullConsumer。這幾種型別的消費者通過不同的實現方式和介面可滿足您在不同業務場景下的消費需求。具體差異如下:
注:在實際使用場景中,PullConsumer 僅推薦在流處理框架中整合使用,大多數訊息收發場景使用 PushConsumer 和 SimpleConsumer 就可以滿足需求。
PushConsumer
PushConsumers是一種高度封裝的消費者型別,消費訊息僅通過消費監聽器處理業務並返回消費結果。訊息的獲取、消費狀態提交以及消費重試都通過 Apache RocketMQ 的使用者端SDK完成。
SimpleConsumer
SimpleConsumer 是一種介面原子型的消費者型別,訊息的獲取、消費狀態提交以及消費重試都是通過消費者業務邏輯主動發起呼叫完成。
補充:
rocketmq-client中定義的:
- DefaultMQProducer
- DefaultMQPushConsumer
- DefaultLitePullConsumer
rocketmq-client-java中定義的:
- Producer
- PushConsumer
- SimpleConsumer
1.8. 消費位點
訊息是按到達Apache RocketMQ 伺服器端的先後順序儲存在指定主題的多個佇列中,每條訊息在佇列中都有一個唯一的Long型別座標,這個座標被定義為訊息位點。一條訊息被某個消費者消費完成後不會立即從佇列中刪除,Apache RocketMQ 會基於每個消費者分組記錄消費過的最新一條訊息的位點,即消費位點。
如上圖所示,在Apache RocketMQ中每個佇列都會記錄自己的最小位點、最大位點。針對於消費組,還有消費位點的概念,在叢集模式下,消費位點是由使用者端提給交伺服器端儲存的,在廣播模式下,消費位點是由使用者端自己儲存的。一般情況下消費位點正常更新,不會出現訊息重複,但如果消費者發生崩潰或有新的消費者加入群組,就會觸發重平衡,重平衡完成後,每個消費者可能會分配到新的佇列,而不是之前處理的佇列。為了能繼續之前的工作,消費者需要讀取每個佇列最後一次的提交的消費位點,然後從消費位點處繼續拉取訊息。但在實際執行過程中,由於使用者端提交給伺服器端的消費位點並不是實時的,所以重平衡就可能會導致訊息少量重複。
1.9. 訂閱關係
一個訂閱關係指的是指定某個消費者分組對於某個主題的訂閱。
不同消費者分組對於同一個主題的訂閱相互獨立如下圖所示,消費者分組Group A和消費者分組Group B分別以不同的訂閱關係訂閱了同一個主題Topic A,這兩個訂閱關係互相獨立,可以各自定義,不受影響。
同一個消費者分組對於不同主題的訂閱也相互獨立如下圖所示,消費者分組Group A訂閱了兩個主題Topic A和Topic B,對於Group A中的消費者來說,訂閱的Topic A為一個訂閱關係,訂閱的Topic B為另外一個訂閱關係,且這兩個訂閱關係互相獨立,可以各自定義,不受影響。
2. 訊息型別
1、順序訊息(FIFO):這類訊息必須設定 message group,這種型別的訊息需要與FIFO消費者組一起使用
2、延遲訊息(DELAY):訊息被傳送後不會立即對消費者可見,這種型別的訊息必須設定delivery timestamp以決定對消費者可見的時間;
3、事務訊息(TRANSACTIONAL):將一個或多個訊息的釋出包裝到一個事務中,提供提交/回滾方法來決定訊息的可見性;
4、普通訊息(NORMAL):預設型別
不同的型別是互斥的,當意味著要釋出的訊息不能同時是FIFO型別和DELAY型別。實際上,主題的型別決定了訊息的型別。例如,FIFO主題不允許釋出其他型別的訊息。
2.1. 普通訊息
普通訊息一般應用於微服務解耦、事件驅動、資料整合等場景,這些場景大多數要求資料傳輸通道具有可靠傳輸的能力,且對訊息的處理時機、處理順序沒有特別要求。
典型場景一:微服務非同步解耦
如上圖所示,以線上的電商交易場景為例,上游訂單系統將使用者下單支付這一業務事件封裝成獨立的普通訊息並行送至Apache RocketMQ伺服器端,下游按需從伺服器端訂閱訊息並按照本地消費邏輯處理下游任務。每個訊息之間都是相互獨立的,且不需要產生關聯。
典型場景二:資料整合傳輸
如上圖所示,以離線的紀錄檔收集場景為例,通過埋點元件收集前端應用的相關操作紀錄檔,並轉發到 Apache RocketMQ 。每條訊息都是一段紀錄檔資料,Apache RocketMQ 不做任何處理,只需要將紀錄檔資料可靠投遞到下游的儲存系統和分析系統即可,後續功能由後端應用完成。
2.2. 順序訊息
應用場景
在有序事件處理、撮合交易、資料實時增量同步等場景下,異構系統間需要維持強一致的狀態同步,上游的事件變更需要按照順序傳遞到下游進行處理。在這類場景下使用 Apache RocketMQ 的順序訊息可以有效保證資料傳輸的順序性。
典型場景一:撮合交易
以證券、股票交易撮合場景為例,對於出價相同的交易單,堅持按照先出價先交易的原則,下游處理訂單的系統需要嚴格按照出價順序來處理訂單。
典型場景二:資料實時增量同步
以資料庫變更增量同步場景為例,上游源端資料庫按需執行增刪改操作,將二進位制操作紀錄檔作為訊息,通過 Apache RocketMQ 傳輸到下游搜尋系統,下游系統按順序還原訊息資料,實現狀態資料按序重新整理。如果是普通訊息則可能會導致狀態混亂,和預期操作結果不符,基於順序訊息可以實現下游狀態和上游操作結果一致。
功能原理
順序訊息是 Apache RocketMQ 提供的一種高階訊息型別,支援消費者按照傳送訊息的先後順序獲取訊息,從而實現業務場景中的順序處理。 相比其他型別訊息,順序訊息在傳送、儲存和投遞的處理過程中,更多強調多條訊息間的先後順序關係。
Apache RocketMQ 順序訊息的順序關係通過訊息組(MessageGroup)判定和識別,傳送順序訊息時需要為每條訊息設定歸屬的訊息組,相同訊息組的多條訊息之間遵循先進先出的順序關係,不同訊息組、無訊息組的訊息之間不涉及順序性。
基於訊息組的順序判定邏輯,支援按照業務邏輯做細粒度拆分,可以在滿足業務區域性順序的前提下提高系統的並行度和吞吐能力。
如何保證訊息的順序性?
Apache RocketMQ 的訊息的順序性分為兩部分,生產順序性和消費順序性。
1、生產順序性
如需保證訊息生產的順序性,則必須滿足以下條件:
滿足以上條件的生產者,將順序訊息傳送至 Apache RocketMQ 後,會保證設定了同一訊息組的訊息,按照傳送順序儲存在同一佇列中。伺服器端順序儲存邏輯如下:
2、消費順序性
如需保證訊息消費的順序性,則必須滿足以下條件:
生產順序性和消費順序性組合
如果訊息需要嚴格按照先進先出(FIFO)的原則處理,即先傳送的先消費、後傳送的後消費,則必須要同時滿足生產順序性和消費順序性。
一般業務場景下,同一個生產者可能對接多個下游消費者,不一定所有的消費者業務都需要順序消費,您可以將生產順序性和消費順序性進行差異化組合,應用於不同的業務場景。例如傳送順序訊息,但使用非順序的並行消費方式來提高吞吐能力。更多組合方式如下表所示:
生產順序 | 消費順序 | 順序性效果 |
設定訊息組,保證訊息順序傳送。 | 順序消費 | 按照訊息組粒度,嚴格保證訊息順序。 同一訊息組內的訊息的消費順序和傳送順序完全一致。 |
設定訊息組,保證訊息順序傳送。 | 並行消費 | 並行消費,儘可能按時間順序處理。 |
未設定訊息組,訊息亂序傳送。 | 順序消費 | 按佇列儲存粒度,嚴格順序。 基於 Apache RocketMQ 本身佇列的屬性,消費順序和佇列儲存的順序一致,但不保證和傳送順序一致。 |
未設定訊息組,訊息亂序傳送。 | 並行消費 | 並行消費,儘可能按照時間順序處理。 |
2.3. 定時/延時訊息
注:定時訊息和延時訊息本質相同,都是伺服器端根據訊息設定的定時時間在某一固定時刻將訊息投遞給消費者消費。
應用場景
在分散式定時排程觸發、任務超時處理等場景,需要實現精準、可靠的定時事件觸發。使用 Apache RocketMQ 的定時訊息可以簡化定時排程任務的開發邏
輯,實現高效能、可延伸、高可靠的定時觸發能力。
典型場景一:分散式定時排程
在分散式定時排程場景下,需要實現各類精度的定時任務,例如每天5點執行檔案清理,每隔2分鐘觸發一次訊息推播等需求。基於 Apache RocketMQ 的定時訊息可以封裝出多種型別的定時觸發器。
典型場景二:任務超時處理
以電商交易場景為例,訂單下單後暫未支付,此時不可以直接關閉訂單,而是需要等待一段時間後才能關閉訂單。使用 Apache RocketMQ 定時訊息可以實現超時任務的檢查觸發。
基於定時訊息的超時任務處理具備如下優勢:
功能原理
定時時間設定原則
Apache RocketMQ 定時訊息設定的定時時間是一個預期觸發的系統時間戳,延時時間也需要轉換成當前系統時間後的某一個時間戳,而不是一段延時時長。
投遞等級
Apache RocketMQ 一共支援18個等級的延遲投遞,具體時間如下:
2.4. 事務訊息
以電商交易場景為例,使用者支付訂單這一核心操作的同時會涉及到下游物流發貨、積分變更、購物車狀態清空等多個子系統的變更。當前業務的處理分支包括:
使用普通訊息和訂單事務無法保證一致的原因,本質上是由於普通訊息無法像單機資料庫事務一樣,具備提交、回滾和統一協調的能力。而基於 RocketMQ 的分散式事務訊息功能,在普通訊息基礎上,支援二階段的提交能力。將二階段提交和本地事務繫結,實現全域性提交結果的一致性。
事務訊息傳送分為兩個階段。第一階段會傳送一個半事務訊息,半事務訊息是指暫不能投遞的訊息,生產者已經成功地將訊息傳送到了 Broker,但是Broker 未收到生產者對該訊息的二次確認,此時該訊息被標記成「暫不能投遞」狀態,如果傳送成功則執行本地事務,並根據本地事務執行成功與否,向 Broker 半事務訊息狀態(commit或者rollback),半事務訊息只有 commit 狀態才會真正向下游投遞。如果由於網路閃斷、生產者應用重啟等原因,導致某條事務訊息的二次確認丟失,Broker 端會通過掃描發現某條訊息長期處於「半事務訊息」時,需要主動向訊息生產者詢問該訊息的最終狀態(Commit或是Rollback)。這樣最終保證了本地事務執行成功,下游就能收到訊息,本地事務執行失敗,下游就收不到訊息。總而保證了上下游資料的一致性。(PS:重點是兩階段提交)
事務訊息處理流程
1、生產者將訊息傳送至Apache RocketMQ伺服器端。
2、Apache RocketMQ伺服器端將訊息持久化成功之後,向生產者返回Ack確認訊息已經傳送成功,此時訊息被標記為"暫不能投遞",這種狀態下的訊息即為半事務訊息。
3、生產者開始執行本地事務邏輯。
4、生產者根據本地事務執行結果向伺服器端提交二次確認結果(Commit或是Rollback),伺服器端收到確認結果後處理邏輯如下:
5、在斷網或者是生產者應用重啟的特殊情況下,若伺服器端未收到傳送者提交的二次確認結果,或伺服器端收到的二次確認結果為Unknown未知狀態,經過固定時間後,伺服器端將對訊息生產者即生產者叢集中任一生產者範例發起訊息回查。
6、生產者收到訊息回查後,需要檢查對應訊息的本地事務執行的最終結果。
7、生產者根據檢查到的本地事務的最終狀態再次提交二次確認,伺服器端仍按照步驟4對半事務訊息進行處理。
3. 機制
3.1. 訊息傳送重試機制
Apache RocketMQ 使用者端連線伺服器端發起訊息傳送請求時,可能會因為網路故障、服務異常等原因導致呼叫失敗。為保證訊息的可靠性, Apache RocketMQ 在使用者端SDK中內建請求重試邏輯,嘗試通過重試傳送達到最終呼叫成功的效果。
同步傳送和非同步傳送模式均支援訊息傳送重試。
重試觸發條件:
重試流程:
生產者在初始化時設定訊息傳送最大重試次數,當出現上述觸發條件的場景時,生產者使用者端會按照設定的重試次數一直重試傳送訊息,直到訊息傳送成功或達到最大重試次數重試結束,並在最後一次重試失敗後返回撥用錯誤響應。
重試間隔
3.2. 訊息流控機制
訊息流控指的是系統容量或水位過高, Apache RocketMQ 伺服器端會通過快速失敗返回流控錯誤來避免底層資源承受過高壓力。
觸發條件
流控行為
當系統觸發訊息傳送流控時,使用者端會收到系統限流錯誤和異常,錯誤碼資訊如下:
3.3. 消費重試
消費者出現異常,消費某條訊息失敗時, Apache RocketMQ 會根據消費重試策略重新投遞該訊息。消費重試主要解決的是業務處理邏輯失敗導致的消費完整性問題,是一種為業務兜底的策略,不應該被用做業務流程控制。
推薦使用訊息重試場景如下:
消費重試策略
消費重試指的是,消費者在消費某條訊息失敗後,Apache RocketMQ 伺服器端會根據重試策略重新消費該訊息,超過一次定數後若還未消費成功,則該訊息將不再繼續重試,直接被傳送到死信佇列中。
訊息重試的觸發條件
重試策略差異
3.4. 消費進度
訊息位點(Offset)
訊息是按到達伺服器端的先後順序儲存在指定主題的多個佇列中,每條訊息在佇列中都有一個唯一的Long型別座標,這個座標被定義為訊息位點。
任意一個訊息佇列在邏輯上都是無限儲存,即訊息位點會從0到Long.MAX無限增加。通過主題、佇列和位點就可以定位任意一條訊息的位置,具體關係如下圖所示:
Apache RocketMQ 定義佇列中最早一條訊息的位點為最小訊息位點(MinOffset);最新一條訊息的位點為最大訊息位點(MaxOffset)。雖然訊息佇列邏輯上是無限儲存,但由於伺服器端物理節點的儲存空間有限, Apache RocketMQ 會捲動刪除佇列中儲存最早的訊息。因此,訊息的最小消費位點和最大消費位點會一直遞增變化。
消費位點(ConsumerOffset)
Apache RocketMQ 領域模型為釋出訂閱模式,每個主題的佇列都可以被多個消費者分組訂閱。若某條訊息被某個消費者消費後直接被刪除,則其他訂閱了該主題的消費者將無法消費該訊息。
因此,Apache RocketMQ 通過消費位點管理訊息的消費進度。每條訊息被某個消費者消費完成後不會立即在佇列中刪除,Apache RocketMQ 會基於每個消費者分組維護一份消費記錄,該記錄指定消費者分組消費某一個佇列時,消費過的最新一條訊息的位點,即消費位點。
當消費者使用者端離線,又再次重新上線時,會嚴格按照伺服器端儲存的消費進度繼續處理訊息。如果伺服器端儲存的歷史位點資訊已過期被刪除,此時消費位點向前移動至伺服器端儲存的最小位點。
注:消費位點的儲存和恢復是基於 Apache RocketMQ 伺服器端的儲存實現,和任何消費者無關。
佇列中訊息位點MinOffset、MaxOffset和每個消費者分組的消費位點ConsumerOffset的關係如下:
ConsumerOffset≤MaxOffset:
ConsumerOffset≥MinOffset:
消費位點初始值
消費位點初始值指的是消費者分組首次啟動消費者消費訊息時伺服器端儲存的消費位點的初始值。Apache RocketMQ 定義消費位點的初始值為消費者首次獲取訊息時,該時刻佇列中的最大訊息位點。相當於消費者將從佇列中最新的訊息開始消費。
3.5. 訊息儲存機制
Apache RocketMQ 使用儲存時長作為訊息儲存的依據,即每個節點對外承諾訊息的儲存時長。在儲存時長範圍內的訊息都會被保留,無論訊息是否被消費;超過時長限制的訊息則會被清理掉。
4. 架構
4.1. 技術架構
RocketMQ架構上主要分為四部分,如上圖所示:
4.2. 部署架構
RocketMQ 網路部署特點:
Producer與NameServer叢集中的其中一個節點(隨機選擇)建立長連線,定期從NameServer獲取Topic路由資訊,並向提供Topic 服務的Master建立長連線,且定時向Master傳送心跳。Producer完全無狀態,可叢集部署。
結合部署架構圖,描述叢集工作流程:
5. 使用者端
在編寫使用者端程式碼時,首先準備一個簡單的環境,可以選用Local模式。這裡不多介紹,只說一句,啟動broker的時候可以-c指定組態檔,啟動完以後通過jps檢視程序
通過mqadmin命令建立並檢視主題
mqadmin updateTopic -n localhost:9876 -b 172.16.52.116:10911 -t TEST_TOPIC
mqadmin topicList -n localhost:9876
具體命令引數,參見 https://rocketmq.apache.org/zh/docs/deploymentOperations/16admintool/
也可以通過RocketMQ Dashboard建立主題
5.1. rocketmq-client
引入依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.0.0</version>
</dependency>
程式碼片段
public class AppTest extends TestCase {
private String producerGroupName = "MyProducerGroup";
private String consumerGroupName = "MyConsumerGroup";
/**
* 傳送同步訊息
*/
@Test
public void testSyncProducer() throws Exception {
// 範例化訊息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
// 設定NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer範例
producer.start();
// 傳送訊息
Message message = new Message("TEST_TOPIC", "A", "UserID12345", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
// 關閉Producer範例
producer.shutdown();
}
/**
* 傳送非同步訊息
*/
@Test
public void testAsyncProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
Message msg = new Message("TEST_TOPIC", "B", "OrderID12346", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收非同步返回結果的回撥
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
public void onException(Throwable e) {
e.printStackTrace();
}
});
// 等待5秒
TimeUnit.SECONDS.sleep(5);
}
/**
* 單向傳送訊息
* 這種方式主要用在不特別關心傳送結果的場景,例如紀錄檔傳送。
*/
@Test
public void testOnewayProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TEST_TOPIC", "C", "OrderID12348", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 傳送單向訊息,沒有任何返回結果
producer.sendOneway(msg);
}
/**
* 消費訊息
*/
@Test
public void testConsumer() throws Exception {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroupName);
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TEST_TOPIC", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.start();
while (true) {
List<MessageExt> messageExts = consumer.poll();
if (messageExts.isEmpty()) {
continue;
}
messageExts.forEach(msg -> {
System.out.println(String.format("MsgId: %s, MsgBody: %s", msg.getMsgId(), new String(msg.getBody())));
});
consumer.commitSync();
}
}
}
5.2. rocketmq-spring-boot-starter
依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
application.yml
設定項詳見 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties
rocketmq:
name-server: localhost:9876
producer:
group: MyProducerGroup
send-message-timeout: 10000
consumer:
group: MyConsumerGroup
傳送訊息
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: ChengJianSheng
* @Date: 2023/1/18
*/
@RestController
@RequestMapping("/message")
public class MessageController {
private String springTopic = "SPRING_TOPIC";
private String userTopic = "USER_TOPIC";
private String orderTopic = "ORDER_TOPIC";
private String extTopic = "EXT_TOPIC";
private String reqTopic = "REQ_TOPIC";
private String objTopic = "OBJECT_TOPIC";
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public String send() {
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello World");
Message message = MessageBuilder.withPayload("Hello World!2222".getBytes()).build();
sendResult = rocketMQTemplate.syncSend(springTopic, message);
message = MessageBuilder.withPayload("Hello, World! I'm from spring message").build();
sendResult = rocketMQTemplate.syncSend(springTopic, message);
sendResult = rocketMQTemplate.syncSend(userTopic, new User("zhangsan", 20));
message = MessageBuilder.withPayload(new User("lisi", 21))
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.build();
sendResult = rocketMQTemplate.syncSend(userTopic, message);
rocketMQTemplate.asyncSend(orderTopic, new Order("oid1234", "4.56"), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("async onSucess SendResult=%s %n", sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.printf("async onException Throwable=%s %n", throwable);
}
});
rocketMQTemplate.convertAndSend(extTopic + ":tag0", "I'm from tag0");
rocketMQTemplate.convertAndSend(extTopic + ":tag1", "I'm from tag1");
String replyString = rocketMQTemplate.sendAndReceive(reqTopic, "request string", String.class);
System.out.printf("receive %s %n", replyString);
User replyUser = rocketMQTemplate.sendAndReceive(objTopic, new User("wangwu", 21), User.class);
System.out.printf("receive %s %n", replyUser);
return "ok";
}
}
接收訊息
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "SPRING_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received: %s \n", message);
}
}
@Component
@RocketMQMessageListener(topic = "ORDER_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order message) {
System.out.printf("------- OrderConsumer received: %s [orderId : %s]\n", message, message.getOrderNo());
}
}
@Component
@RocketMQMessageListener(topic = "USER_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class UserConsumer implements RocketMQListener<User>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(User message) {
System.out.printf("------ UserConsumer received: %s ; age: %s ; name: %s \n", message, message.getAge(), message.getName());
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
}
}
@Component
@RocketMQMessageListener(topic = "REQ_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
return "reply string";
}
}
@Component
@RocketMQMessageListener(topic = "OBJECT_TOPIC", consumerGroup = "${rocketmq.consumer.group}")
public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User> {
@Override
public User onMessage(User message) {
System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", message);
return new User("tom", 8);
}
}
@Component
@RocketMQMessageListener(topic = "EXT_TOPIC", selectorExpression = "tag0||tag1", consumerGroup = "${rocketmq.consumer.group}")
public class MessageExtConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.printf("------- MessageExtConsumer received message, msgId: %s, body:%s \n", message.getMsgId(), new String(message.getBody()));
}
}
6. 檔案
https://rocketmq.apache.org/zh/
https://rocketmq.apache.org/zh/docs/deploymentOperations/15deploy/
https://github.com/apache/rocketmq/tree/rocketmq-all-5.0.0/docs/cn
https://github.com/apache/rocketmq/blob/rocketmq-all-5.0.0/docs/cn/architecture.md
https://github.com/apache/rocketmq/blob/rocketmq-all-5.0.0/docs/cn/RocketMQ_Example.md
https://github.com/apache/rocketmq-dashboard
https://github.com/apache/rocketmq-spring
https://github.com/apache/rocketmq