RocketMq5.0 任意延遲時間 TimerMessageStore 原始碼解析

2023-07-06 15:00:51

TimerMessageStore 簡略介紹

  • 延遲佇列 rmq_sys_wheel_timer
  • 指定時間的延遲訊息。會先投遞到 rmq_sys_wheel_timer 佇列中
  • 然後由 TimerMessageStore 消費佇列資料,將資料消費到 timerWheel 使用時間輪演演算法,實現秒級任務

TimerMessageStore 操作的檔案

  • store\consumequeue\rmq_sys_wheel_timer 從佇列中讀取訊息, 提取資料存到 timerlogtimerwheel
  • store\checkpoint 對應 TimerMessageStore#timerCheckpoint
    • lastReadTimeMs 上次消費的時間節點
    • lastTimerLogFlushPos 最後重新整理 log的 pos
    • lastTimerQueueOffset 最後一次消費的佇列節點
    • masterTimerQueueOffset 主 Broker 的佇列消費節點
  • store\timerwheel 時間輪,內由 Slot 組成 結構如下
    • timeMs 訊息到達時間
    • firstPos 開始的 pos
    • lastPos 結束的 pos 在 timerLog 中讀取資料, 後面會講具體邏輯
    • num 訊息數量
    • magic no use now, just keep it
  • store\timerlog 對應 TimerMessageStore#timerCheckpoint
    裡邊也是由多個 mappedFile 組成。
    主要是儲存原msg的資料,
    因為從 rmq_sys_wheel_timer 消費了之後,
    會存到 timerwheeltimerlog

TimerMessageStore 啟動

  • enqueueGetService.start();
  • enqueuePutService.start();
  • dequeueWarmService.start();
  • dequeueGetService.start();
  • timerFlushService.start();
  • dequeueGetMessageServices[getThreadNum].start();
  • dequeuePutMessageServices[getThreadNum].start();

深入 TimerMessageStore 之 TimerEnqueueGetService

  • TimerMessageStore.this.enqueue 預設 100毫秒執行一次
  • 從 訊息佇列 rmq_sys_wheel_timer 消費資料 ps: currQueueOffsetcheckpoint 讀取出來的
  • 將消費出來的資料, 封裝成 TimerRequest 投入到 enqueuePutQueue
  • currQueueOffset + 1 進入下一個迴圈 消費下一個 offset 節點

深入 TimerMessageStore 之 TimerEnqueuePutService

  • 消費 enqueuePutQueue 中的資料
  • shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs 檢查消費的訊息是否已到達投遞時間。
    • 到達時間。投遞到 dequeuePutQueue.put(req);
    • 訊息未到達時間 doEnqueue ->
      • timerWheel.getSlot(delayedTime) 獲取延遲時間插槽。
      • 構建 ByteBuffer 投入 timerLog 中資料結構為:
      • |訊息大小|前一個節點的pos|magic|log寫入時間|延遲時間|offsetPy|sizePy|realTopic|0
      • timerLog.append 返回插入位置 ret
      • 構建 timerWheel |訊息到達時間戳|firstPos|ret (timerLog.append返回位置)| 訊息數量| 0|

深入 TimerMessageStore 之 TimerDequeueGetService

  • 消費 timerWheel 中的資料
  • 根據 currReadTimeMs 來獲取 timerWheel 插槽資料
    • currReadTimeMs 初始化的時候 timerCheckpoint.getLastReadTimeMs() 讀取的是上次最後消費的資料
    • 假設broker 宕機了一段時間。那麼 currReadTimeMs 會按照上一次宕機的時間開始搜尋資料, 這樣子宕機訊息也不會丟失。會在啟動的那段時間被投遞出去
    • currReadTimeMsmoveReadTime 方法中會自增
  • timerWheel.getSlot(currReadTimeMs); 讀取插槽資料
    • long currOffsetPy = slot.lastPos; 讀取插槽屬性, 最後一個pos節點
    • timerLog.getWholeBuffer(currOffsetPy) 根據 currOffsetPy 獲取 SelectMappedBufferResult
    • timerLogSelectMappedBufferResult 中獲取資料。
      • prevPos 上一個節點資料
      • enqueueTime 放入 timerLog 的時間
      • delayedTime 訊息到達時間戳
      • offsetPy commitLog的資料位置
      • sizePy commitLog的資料大小
    • 構建 TimerRequest 講訊息投遞到 dequeueGetQueue
    • currOffsetPy = prevPos 將位置移動到前一個,進行遍歷

深入 TimerMessageStore 之 TimerDequeueGetMessageService

  • 預設有三個 TimerDequeueGetMessageService 範例同時消費 dequeueGetQueue
  • getMessageByCommitOffsetcommitLog 中讀取原投遞的訊息資料
  • 讀取 uniqkey 判斷不在 deleteList 中的時候 將訊息投遞到 dequeuePutQueue 中去

深入 TimerMessageStore 之 TimerDequeuePutMessageService

  • 預設有三個 TimerDequeuePutMessageService 範例同時消費 dequeuePutQueue
  • convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic())); 將訊息轉換成原始的 topic 訊息,清除無用屬性
  • doPut -> messageStore.putMessage(message) 將訊息投遞到指定 messageQueue

TimerFlushService

  • timerLog 刷盤
  • timerWheel 刷盤
  • timerCheckpoint 刷盤

TimerMessageStore 初始化載入原始碼

  • timerLog.load() 載入檔案
  • timerMetrics.load 載入檔案
  • recover ->
    • recoverAndRevise(lastFlushPos, true) ps: (用於 timerWheltimerLog 的資料保持一致重新整理)
      • lastFlushPos 最後一次刷盤的位置, 其實最終是拿到 timerlog -> mappedFile 的第幾個檔案
      • 遍歷這個 mappedFile 的資料
      • timerWheel.reviseSlot 修改插槽資料。 檢查這個時間的插槽是否已經有填充資料。
        • 如果有的話,重新整理 lastPos (順序遍歷。這裡最終還是會是最後一個 lastPos)
        • 如果不存在插槽資料 則插入插槽資料 putSlot
    • reviseQueueOffset(processOffset); 讀取 timerLog 最後一個資料, 為了校驗最後一個資料是否正常,是否能讀取到訊息。
    • 確認 currQueueOffset 資料
    • 確認 currReadTimeMs 資料