三此君看了好幾本書,看了很多遍原始碼整理的 一張圖進階 RocketMQ 圖片連結,關於 RocketMQ 你只需要記住這張圖!覺得不錯的話,記得點贊關注哦。
【重要】視訊在 B 站同步更新,歡迎圍觀,輕輕鬆鬆漲姿勢。一張圖進階 RocketMQ-訊息傳送(視訊版)
本文是「一張圖進階 RocketMQ」 系列第 3 篇,對 RocketMQ 不瞭解的同學可以先看看三此君的
一張圖進階 RocketMQ-整體架構,一張圖進階 RocketMQ - NameServer。
在瞭解了 RocketMQ 的整體架構之後,我們來深入的分析下生產者訊息傳送的設計與實現。本文從一個生產者範例開始,以兩行程式碼為切入點,逐步剖析生產者啟動流程以及同步訊息傳送流程。
訊息傳送分為同步訊息、非同步訊息和單向訊息,簡單來說:
我們先來回顧下同步訊息傳送的例子:
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 範例化訊息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設定NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer範例
producer.start();
// 建立訊息,並指定Topic,Tag和訊息體
Message msg = new Message("sancijun","order", "orderId", "我一定會關注三此君".getBytes("UTF-8"));
// 傳送訊息到一個Broker
SendResult sendResult = producer.send(msg);
// 通過sendResult返回訊息是否成功送達
System.out.printf("%s%n", sendResult);
// 如果不再傳送訊息,關閉Producer範例。
producer.shutdown();
}
}
producer
,並告訴它 NameServer 的地址,這樣生產者才能從 NameServer 獲取路由資訊。producer
得做一些初始化(這是很關鍵的步驟),它要和 NameServer 通訊,要先初始化通訊模組等。producer
已經準備好了,那得準備好要發的內容,把 "我一定會關注三此君" 傳送到 Topic=」sanicjun「。producer
就可以把訊息傳送出去了。producer
怎麼知道 Broker 地址呢?他會去 NameServer 獲取路由資訊,得到 Broker 的地址是 localhost:10909,然後通過網路通訊將訊息傳送給 Broker。其中有兩個關鍵的地方:producer.start()
及 producer.send()
,也就是生產者初始化及訊息傳送。我們以這兩行程式碼為切入點,看看 RocketMQ Producer 的設計與實現。
Tips:因為本文是RocketMQ 設計與實現分析,雖然不會貼上任何原始碼,但是圖文中會有大量的類名和方法名,看的時候不必執著於這些陌生的類名和方法名,三此君會解釋這些類和方法的用途。
目標:將訊息傳送給 Broker 進行儲存
關鍵點 1: 怎樣根據 topic+路由資訊 建立網路通道,進行訊息的傳送
關鍵點 2: 訊息在傳送過程中又經過了哪些處理?
我們範例化一個生產者 DefaultMQProducer,並呼叫 DefaultMQProducer.start() 方法進行初始化:
啟動流程比較長,其實最重要的就是初始化了通訊模組,並啟動了多個定時任務,這些在後面的訊息傳送過程中都會用到:
檢查設定是否合法:生產者組名是否為空、是否滿足命名規則、長度是否滿足等。
啟動通訊模組服務 Netty RemotingClient:RemotingClient 是一個介面,底層使用的通訊框架是Netty,提供了實現類 NettyRemotingClient,RemotingClient 在初始化的時候範例化 Bootstrap,方便後續用來建立 SocketChannel;後文會介紹 RocketMQ 的通訊機制,大家稍安勿躁。
啟動 5 個後臺定時任務:定時更新 NameServerAddr 資訊,定時更新 topic 的路由資訊,定時向 Broker 傳送心跳及清理下線的 Broker,定時持久化 Consumer 的 Offset 資訊,定時調整執行緒池;
生產者每 30s 會從某臺 NameServer 獲取 Topic 和 Broker 的對映關係(路由資訊)存在本地記憶體中,如果發現新的 Broker 就會和其建立長連線,每 30s 會傳送心跳至 Broker 維護連線。
Tips:生產者為什麼要啟動訊息拉取服務?重平衡服務是什麼?簡單來說,這兩個服務都是用於消費者的,這裡我們暫且不理會。訊息拉取服務 pullMessageService 是從 Broker 拉取訊息的服務 ,重平衡服務 rebalanceService 用於消費者的負載均衡,負責分配消費者可消費的訊息佇列。
總體上講,訊息傳送可以劃分為三個層級:
我們通過前面的範例來看整個同步訊息傳送的處理流程,整個過程我們的主要目標就是把訊息傳送到 Broker:
第一步:業務層構建待傳送訊息 Message msg = new Message("sancijun","order", "orderId", "我一定會關注三此君".getBytes("UTF-8"));
第二步:然後我們呼叫 producer.send(msg)
傳送訊息,可是 producer 怎麼知道發給誰呢?訊息本身又需要經過哪些處理呢?我們進入呼叫鏈直到 sendDefaultImpl
檢查訊息是否為空,訊息的 Topic 的名字是否為空或者是否符合規範,訊息體大小是否符合要求,最大值為4MB,可以通過 maxMessageSize 進行設定。
執行 tryToFindTopicPublishInfo() 方法:獲取 Topic 路由資訊,如果不存在則丟擲異常。如果本地快取沒有路由資訊,就通過Namesrv 獲取路由資訊,更新到本地。訊息構建的時候我們指定了訊息所屬 Topic,根據 Topic 路由資訊我們可以找到對應的 Broker。
Tips:從 NameServer 獲取的路由資訊 TopicRouteData 會包含指定 Topic 的 topicQueueTable、brokerAddrTable。在 NameServer 叢集後設資料管理部分我們講過,通過 topicName 從 topicQueueTable 獲取對應的 brokerName,再根據 brokerName 從 brokerAddrTable 中獲取 Broker IP 地址。
計算訊息傳送的重試次數,同步重試和非同步重試的執行方式是不同的。在同步傳送情況下如果傳送失敗會預設重投兩次(預設retryTimesWhenSendFailed = 2),並且不會選擇上次失敗的 Broker,會向其他 Broker 投遞。
執行佇列選擇方法 selectOneMessageQueue()。根據 lastBrokerName(上次傳送訊息失敗的 Broker 的名字)和 Topic 路由資訊選一個 MessageQueue。
首次傳送時 lastBrokerName 為 null,採用輪詢策略選擇一個 MessageQueue。如果上次傳送失敗,也是採用輪詢策略選擇一個 MessageQueue,但是會跳過上次傳送失敗 Broker 的 MessageQueue,也就是換一個 Broker 傳送。
Tips:選擇一個 MessageQueue,什麼是 MessageQueue 呢?這和 Broker 的儲存結構相關,我們會在儲存部分詳細介紹,這裡先說結論,我們建立 Topic 時指定了這個 Topic 的讀寫佇列數,每個 MessageQueue 有不同的 queueId(0-3)。
我們也可以通過sendLatencyFaultEnable 來設定是否總是傳送到延遲級別較低的Broker,預設值為False,我麼這裡就不展開討論了。
執行 sendKernelImpl() 方法。
第三步:sendDefaultImpl 做了一系列邏輯處理,我們已經得到了待傳送的 BrokerName,而我們的目標是把訊息傳送到 Broker。sendKernelImpl 方法是傳送訊息的核心方法,主要用於準備通訊層的入參(比如Broker地址、請求體等),將請求傳遞給通訊層。
根據 MessageQueue.brokerName 獲取 Broker IP 地址,給 message 新增全域性唯一 ID。
Tips:sendKernelImpl 也有很多的邏輯處理,我們暫時先略過這裡的壓縮、事務訊息、勾點函數、重試訊息:
對大於4k的普通訊息進行壓縮,並設定訊息的系統標記為MessageSysFlag.COMPRESSED_FLAG。
如果是事務Prepared訊息,則設定訊息的系統標記為MessageSysFlag.TRANSACTION_PREPARED_TYPE
如果註冊了訊息傳送勾點函數,則執行訊息傳送之前的增強邏輯,通過DefaultMQProducerImpl#registerSendMessageHook註冊勾點處理類,並且可以註冊多個。
構建傳送訊息請求頭:生產者組、主題名稱、預設建立主題Key、該主題在單個Broker預設佇列數、佇列ID(佇列序號)、訊息系統標記(MessageSysFlag)、訊息傳送時間、訊息標記、訊息擴充套件屬性、訊息重試次數、是否是批次訊息等
處理重試訊息。
呼叫 MQClientAPIImpl.sendMessage(),首先構建一個遠端請求 RemotingCommand,根據傳送型別(同步或非同步)呼叫不同的通訊層實現方法。我們這裡是同步訊息,則呼叫 RemotingClient.invokeSync()。
處理返回結果,將通訊層返回的結果封裝成 SendResult 物件返回給業務層。
第四步:RemotingClient 是基於 Netty 實現的,熟悉 Netty 的同學已經大概知道後面的流程,不熟悉的同學也沒有關係,這裡先混個眼熟,下面我們會對 Netty 做簡單的介紹。
第五步:結果處理及返回。
到這裡,生產者已經將訊息傳送到指定的 Broker 了,其中包括了訊息的層層校驗及封裝;還有很重要的是如何選擇一個 MessageQueue 進行傳送(重試),重試是保證訊息傳送可靠的關鍵步驟;最後通過 Netty 將請求傳送給 Broker。我們先不管 Broker 收到請求如何處理,但是要明白訊息如何送到 Broker 進行儲存,需要對 Netty 有簡單的理解。
以上就是 RocketMQ 訊息傳送的主要內容,我們簡單的總結下:
丁威, 周繼鋒. RocketMQ技術內幕:RocketMQ架構設計與實現原理. 機械工業出版社, 2019-01.
李偉. RocketMQ分散式訊息中介軟體:核心原理與最佳實踐. 電子工業出版社, 2020-08.
楊開元. RocketMQ實戰與原理解析. 機械工業出版社, 2018-06.