MQ系列7:訊息通訊,追求極致效能

2022-10-28 18:02:32

MQ系列1:訊息中介軟體執行原理
MQ系列2:訊息中介軟體的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ訊息的傳送模式
MQ系列6:訊息的消費

1 介紹

前面的章節我學習了 NameServer的原理,訊息的生產傳送,以及訊息的消費的全過程。
我們來回顧一下:
RocketMQ 訊息佇列架構主要包括NameServe、Broker(Master/Slave)、Producer、Consumer 4個核心部件,基本執行流程如下:

  1. NameServer 優先啟動。NameServer 是整個 RocketMQ 的「中央大腦」 ,作為 RocketMQ 的服務註冊中心,所以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
  2. Broker 啟動後,需要將自己註冊至NameServer中,並 保持長連線,每 30s 傳送一次傳送心跳包,來確保Broker是否存活。並將 Broker 資訊 ( IP+、埠等資訊)以及Broker中儲存的Topic資訊上報。註冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的對映關係。
  3. NameServer 如果檢測到Broker 宕機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無響應),則從路由登入檔中將其移除。
  4. 生產者在傳送某個主題的訊息之前先從 NamerServer 獲取 Broker 伺服器地址列表(Broker可能是Cluster模式),然後根據負載均衡演演算法從列表中選擇1臺Broker ,建立連線通道,進行訊息傳送。
  5. 消費者在訂閱某個topic的訊息之前從 NamerServer 獲取 Broker 伺服器地址列表(Broker可能是Cluster模式),包括關聯的全部Topic佇列資訊。進而獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連線通道,開始消費資料。
  6. 生產者和消費者預設每30s 從 NamerServer 獲取 Broker 伺服器地址列表,以及關聯的所有Topic佇列資訊,更新到Client本地。
    2 ~ 4 步驟實際上是 Producer、Broker 以及NameServer 之間整個進行資料通訊的過程,面對複雜的訊息佇列系統,一個效能優良,穩定性高的網路通訊模組是非常重要的,它體現了RocketMQ叢集訊息的整體吞吐和負載能力。也是RocketMQ保證高效能、高穩定性的基石。

2 網路通訊過程分析

2.1 通訊類(rocketmq-remoting )的結構解析


通過上圖可以看到,在整個RocketMQ佇列系統中,rocketmq-remoting 這個module是專門用來負責網路通訊職能的。
並且從模組依賴關係中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服務) 等模組均依賴了它。

通訊層是基於 Netty 進行擴充套件的,並自定義了通訊協定,用於將訊息傳遞給 Broker 進行儲存。實現Client與Server之間高效的資料請求與接收。

2.2 協定結構設計

因為是基於Netty進行擴充套件的,所以自定義了RocketMQ的訊息協定,在傳輸過程的資料進行結構制定、封裝、編解碼的過程。
在RocketMQ中,負責這個工作的就是RemotingCommand類,我們來看看這個類的幾個重要屬性:

欄位 型別 Request維度 Response維度
code int 請求操作碼,依據不同的請求碼做不同的業務處理 應答響應碼:0成功,非0標識對應的錯誤
language LanguageCode 列舉(JAVA、CPP、PYThON、GO等):請求方實現的編碼語言 應答方實現的編碼語言
version int 請求方程式的版本 應答方版本
opaque int 類似請求ID:reqeustId,唯一識別碼,區分每一個獨立的請求 response的時候直接返回
flag int 區分是普通還是oneway的RPC:RPC_ONEWAY = 1; RPC = 0。 區分是普通還是oneway RPC
remark String 自定義備註資訊 自定義備註資訊
extFields HashMap<String, String> Request自定義擴充套件的欄位屬性 Response自定義擴充套件的欄位屬性

2.3 訊息內容的組成結構

傳輸的訊息內容主要由一下幾個部分組成:

組成部分 說明
訊息長度 訊息的總長度,int型別,四個位元組儲存
序列化型別+訊息頭length int型別,位元組1表示序列化型別,位元組2~4表示訊息頭長度
訊息頭的資料 序列化後的訊息頭資料
訊息主體資料 訊息主體資料內容,二進位制位元組

2.4 RocketMQ 訊息通訊流程

在RocketMQ訊息佇列中支援通訊的模式主要有

  • sync 同步傳送模式
  • async 非同步傳送模式
  • oneway 單向模式,無需關注Response

2.4.1 通訊流程說明

下圖從 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基於 NettyRemotingClient 的訊息傳送,以及Handler 處理過程來說明。

  • Broker 和 NameServer 啟動時同步呼叫 NettyRemotingServer.start() 方法, 初始化 Netty 伺服器
    • 設定 BossGroup/WorkerGroup NioEventLoopGroup 執行緒組
    • 設定 Channel
    • 新增 NettyServerHandler
    • 呼叫 serverBootstrap.bind() 監聽埠,等待client的connection
  • Producer 和 Consumer 同樣需要啟動 Netty 的使用者端,通過呼叫NettyRemotingClient.start() 初始化 Netty 使用者端
    • 設定使用者端 NioEventLoopGroup 執行緒組
    • 設定 Channel
    • 新增 NettyClientHandler
  • 傳送同步訊息時,呼叫 NettyRemoteClient.invokeSync(),從 channelTables 快取中獲取或者建立用於通訊的 Channel 通道。
  • 建立完 Channel 後,生產者 Producer 呼叫 Channel.writeAndFlush() 傳送資料
  • NettyRemotingServer 伺服器端執行緒組 處理可讀事件,呼叫 NettyServerHandler 處理資料。
  • 下一步,NettyServerHandler 呼叫 processMessageReceived方法,接收並處理傳送過來的資料。
  • 根據請求碼 RequestCode 區別不同的請求,來執行不同的 Processor。
    • 說明:Processor 在伺服器端初始化的時候,將 RequestCode 新增到 Processor 快取中。訊息的存、查、拉取都是不同的請求碼。
  • processMessageReceived 從ResponseTables(key 為 opaque) 快取中取出 ResponseFuture,並將將返回結果設定到 ResponseFuture。同步模式下執行 responseFuture.putResponse()方法,非同步呼叫執行回撥方法。
  • NettyRemotingClient 收到可讀事件,呼叫 NettyClientHandler 讀取並處理返回事件。

2.4.2 Reactor多執行緒設計

上面我們說過了,RocketMQ的通訊是採用Netty元件作為底層通訊庫。同樣的,它也遵循Reactor多執行緒模型,並在此基礎上做了一些優化。

上面圖中四個圖形可以大致說明NettyRemotingServer的Reactor 多執行緒模型,在RocketMQ中的存在形式。

  • M:1個 Reactor 主執行緒:eventLoopGroupBoss,它的職能是負責監聽 TCP網路連線請求,有連線請求過來時候,建立SocketChannel,並註冊到selector上。
  • S:RocketMQ的原始碼中會選擇NIO或Epoll,來監聽網路資料,當監聽到網路資料過來時,讀取資料並丟給Worker執行緒池:eventLoopGroupSelector,Rocket原始碼中預設設定執行緒數為3。
  • M1:執行業務之前的各種雜事(SSL認證、空閒檢查、網路連線檢查、編解碼、序列化反序列化 等等),交付給 這些工作交給defaultEventExecutorGroup 去處理,RocketMQ原始碼中預設執行緒數設定為8。
  • M2:剩下處理業務的操作,就直接放在業務執行緒池中執行了。按照之前說的,依據RequestCode去processorTable 本地快取中找到對應的 processor,並封裝成task任務,在丟給對應的業務processor執行緒池來處理。
執行緒數標識 執行緒名 說明
1 NettyBoss Reactor 主執行緒,預設1
N NettyServerEPOLLSelector Reactor 執行緒池,預設3
M1 NettyServerCodecThread Worker 執行緒池,預設8
M2 RemotingExecutorThread Processor執行緒池,處理業務邏輯

完整的可以參照官網的這張圖:

總結

上面介紹了 RocketMQ 訊息通訊的主要內容,我們用幾句話總結下:

  • 整個RocketMQ佇列系統中,rocketmq-remoting Module是專門用來負責網路通訊職能的。
  • 網路通訊模組基於Netty進行擴充套件的,所以自定義了RocketMQ的訊息協定,在傳輸過程的資料進行結構制定、封裝、編解碼的過程。
  • 理解 NettyRemotingServer/NettyRemotingClient 的初始化過程,以及呼叫 NettyServerHandler/NettyClienthandler 進行處理的執行流程。
  • 同步非同步:同步和非同步消核心區別是 同步訊息通過 Netty 傳送請求後會執行 ResponseFuture.waitResponse() 阻塞等待,非同步的請求則 SendCallback 相應的方法進行回撥處理。
  • 多執行緒模式下會通過1個Reactor 主執行緒(監聽連線),以及Reactor 執行緒池(監聽資料)、Worker 執行緒池(處理前置工作)、Processor執行緒池(處理業務邏輯) 來處理通訊過程。