MQ系列1:訊息中介軟體執行原理
MQ系列2:訊息中介軟體的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ訊息的傳送模式
MQ系列6:訊息的消費
MQ系列7:訊息通訊,追求極致效能
1 介紹
在之前的章節中,我們介紹了訊息的傳送 和 訊息通訊 的原理。但是這邊有一個比較核心的關鍵點,那就是如果已經把訊息傳遞給Broker。在Broker在被消費之前,如何保證訊息的穩定性,避免訊息丟失和資料。
這時候就需要資料持久化資料來進行保障了。
根據之前我們 MQ系列2:訊息中介軟體的技術選型 章節做的分析,RabbitMQ支援 1W+ 級別的吞吐,
Kafka 和 Rocket 支援 10W+ 級別的吞吐,想要實現這麼大的吞吐,必須具備一個很強悍的儲存功能。下面我們來看看。
2 Broker 儲存架構
RocketMQ採用檔案儲存機制(類似Kafka),即直接在磁碟上使用檔案來儲存訊息,而不是採用Redis或者MySQL之類的持久化工具。
它會把訊息儲存所屬相關的檔案儲存在ROCKETMQ_HOME下,包含三個部分:
2.1 CommitLog 訊息後設資料
儲存訊息的後設資料,所有訊息都會順序存入到CommitLog檔案中。CommitLog由多個檔案組成,每個檔案固定大小1G。它有如下特徵:
- 單個檔案預設大小為1G
- 檔名稱長度20,儲存偏移量,偏移量不夠20位的補0。
- 如第1個檔案沒有偏移量,則為:00000000000000000000
- 第2個檔案起始偏移量為1073741824(1G=1073740842),則檔名為 00000000001073741824。
- 第一個1G檔案檔案寫滿之後之後轉入第2個檔案,如此反覆,因為是順序的,所以寫入效率較高。
2.2 ConsumeQueue 訊息邏輯佇列
ConsumeQueue是指儲存訊息在CommitLog上的索引,一個MessageQueue一個檔案,記錄當前MessageQueue被哪些消費者組消費到了哪一條CommitLog。它有如下特徵:
- ComsumeQueue的結構組成共 20 個位元組,包含 8 位元組的 commitlog 物理偏移量、4 位元組的訊息長度、8 位元組 tag hashcode
- ConsumeQueue 裡只存偏移量資訊,內容精悍。載入到記憶體中,操作效率非常高。
- 一致性保障,CommitLog 裡儲存了 ConsumeQueues、Message Key、Tag 等所有資訊,在 ConsumeQueue 丟失或者故障時候,資料可快速回復。
- 因為每個Topic下可能有多個Queueu,所以儲存結構為:HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
2.3 IndexFile 索引檔案
IndexFile 是一種可選索引檔案,提供了一種可以通過 key 或時間區間來查詢訊息的方法,並且這種查詢訊息的方法不影響傳送與消費訊息的主流程。它的特徵如下:
- 演演算法原理:IndexFile 索引檔案的底層實現 為 hash 索引,可以對照 Java 的HashMap比較,通過計算 Key 的 hashcode, 取餘獲得 hash 槽,並通過拉鍊法解決雜湊衝突。
- 大小限制:IndexFile 以建立時間戳命名,單個 IndexFile 檔案大小約為 400M,一個 IndexFile 可以儲存 2000W 個索引。
通過上面的三個部件說明可以瞭解到,RocketMQ 訊息儲存結構主要是由 CommitLog,ConsumeQueue,IndexFile 三部分組成的。當我們傳送訊息的時候,會執行如下過程:
- 訊息格式化成 CommitLog的欄位結構,並按照順序寫入到CommitLog 檔案中。
- Broker會按照 ConsumeQueue 的欄位結構的要求建立一條索引記錄。
- 按需建立IndexFile索引檔案。
3 儲存的執行過程
通過上面我們已經瞭解到了,Kafka 和 Rocket 均支援 10W+ 級別的吞吐,那麼上述的儲存結構是如何保持這樣的高超效能的呢?
- 之前的章節我們已經瞭解到,Broker 啟動時同步啟動 NettyRemotingServer 進行埠監聽,等坐等使用者端的連線。
- 當用戶端傳送請求時,NettyRemotingServer WorkerGroup 處理可讀事件,執行 processRequestCommand 處理來源訊息資料。
- 接收到訊息之後就需要儲存下來了,DefaultMessageStore對資料進行校驗,校驗如下,校驗完成之後傳送儲存指令。
- Broker 無響應時拒絕訊息寫入
- Broker 角色 為 SLAVE 時也拒絕寫入
- 判斷是否支援寫入,不支援寫入時也拒絕
- topic length 小於等於 256 字元,否則拒絕訊息寫入
- 訊息 length 小於等於 65536 字元,否則拒絕訊息寫入
- PageCache 繁忙時報錯誤訊息,無法寫入
- DefaultMessageStore 呼叫 CommitLog.putMessage 存入訊息
- 獲取可以寫入的 CommitLog 進行寫入
- CommitLog(每個CommitLog預設1G大小) 對應 MappedFile(程式視角),當有多個 MappedFiled 時,組成 MappedFileQueue。
- MappedFile 持有物理 CommitLog 的 fileChannel (Java NIO 檔案讀寫的通道),但並沒有通過 fileChannel 直接存取物理 CommitLog 檔案,而是對映到一個 MappedByteBuffer,並把序列化後的訊息寫入這個 ByteBuffer 中,已達到提升執行效率的目的。
- 最後寫入 MappedFile 相對應的 CommitLog 檔案中。
4 總結
- 理解好RabbitMQ 中 Broker 儲存的組成要素 CommitLog,ConsumeQueue,IndexFile。
- 當 Broker 收到訊息儲存請求時,通過呼叫 CommitLog 對應的 MappedFile,把訊息寫入MappedFile的MeppedByteBuffer(記憶體對映)。