RocketMq5.0 任意延遲時間 TimerMessageStore 原始碼解析
2023-07-06 15:00:51
TimerMessageStore 簡略介紹
- 延遲佇列
rmq_sys_wheel_timer
- 指定時間的延遲訊息。會先投遞到
rmq_sys_wheel_timer
佇列中
- 然後由
TimerMessageStore
消費佇列資料,將資料消費到 timerWheel
使用時間輪演演算法,實現秒級任務
TimerMessageStore 操作的檔案
store\consumequeue\rmq_sys_wheel_timer
從佇列中讀取訊息, 提取資料存到 timerlog
與 timerwheel
中
store\checkpoint
對應 TimerMessageStore#timerCheckpoint
lastReadTimeMs
上次消費的時間節點
lastTimerLogFlushPos
最後重新整理 log的 pos
lastTimerQueueOffset
最後一次消費的佇列節點
masterTimerQueueOffset
主 Broker 的佇列消費節點
store\timerwheel
時間輪,內由 Slot
組成 結構如下
timeMs
訊息到達時間
firstPos
開始的 pos
lastPos
結束的 pos 在 timerLog 中讀取資料, 後面會講具體邏輯
num
訊息數量
magic
no use now, just keep it
store\timerlog
對應 TimerMessageStore#timerCheckpoint
裡邊也是由多個 mappedFile
組成。
主要是儲存原msg的資料,
因為從 rmq_sys_wheel_timer
消費了之後,
會存到 timerwheel
與 timerlog
中
TimerMessageStore 啟動
- enqueueGetService.start();
- enqueuePutService.start();
- dequeueWarmService.start();
- dequeueGetService.start();
- timerFlushService.start();
- dequeueGetMessageServices[getThreadNum].start();
- dequeuePutMessageServices[getThreadNum].start();
深入 TimerMessageStore 之 TimerEnqueueGetService
TimerMessageStore.this.enqueue
預設 100毫秒執行一次
- 從 訊息佇列
rmq_sys_wheel_timer
消費資料 ps: currQueueOffset
從 checkpoint
讀取出來的
- 將消費出來的資料, 封裝成 TimerRequest 投入到
enqueuePutQueue
中
currQueueOffset + 1
進入下一個迴圈 消費下一個 offset 節點
深入 TimerMessageStore 之 TimerEnqueuePutService
- 消費
enqueuePutQueue
中的資料
shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs
檢查消費的訊息是否已到達投遞時間。
- 到達時間。投遞到
dequeuePutQueue.put(req);
中
- 訊息未到達時間
doEnqueue
->
timerWheel.getSlot(delayedTime)
獲取延遲時間插槽。
- 構建
ByteBuffer
投入 timerLog
中資料結構為:
- |訊息大小|前一個節點的pos|magic|log寫入時間|延遲時間|offsetPy|sizePy|realTopic|0
timerLog.append
返回插入位置 ret
- 構建
timerWheel
|訊息到達時間戳|firstPos|ret (timerLog.append返回位置)| 訊息數量| 0|
深入 TimerMessageStore 之 TimerDequeueGetService
- 消費
timerWheel
中的資料
- 根據
currReadTimeMs
來獲取 timerWheel
插槽資料
currReadTimeMs
初始化的時候 timerCheckpoint.getLastReadTimeMs()
讀取的是上次最後消費的資料
- 假設broker 宕機了一段時間。那麼
currReadTimeMs
會按照上一次宕機的時間開始搜尋資料, 這樣子宕機訊息也不會丟失。會在啟動的那段時間被投遞出去
currReadTimeMs
在 moveReadTime
方法中會自增
timerWheel.getSlot(currReadTimeMs);
讀取插槽資料
long currOffsetPy = slot.lastPos;
讀取插槽屬性, 最後一個pos節點
timerLog.getWholeBuffer(currOffsetPy)
根據 currOffsetPy
獲取 SelectMappedBufferResult
- 從
timerLog
的 SelectMappedBufferResult
中獲取資料。
prevPos
上一個節點資料
enqueueTime
放入 timerLog 的時間
delayedTime
訊息到達時間戳
offsetPy
commitLog的資料位置
sizePy
commitLog的資料大小
- 構建
TimerRequest
講訊息投遞到 dequeueGetQueue
中
currOffsetPy = prevPos
將位置移動到前一個,進行遍歷
深入 TimerMessageStore 之 TimerDequeueGetMessageService
- 預設有三個
TimerDequeueGetMessageService
範例同時消費 dequeueGetQueue
getMessageByCommitOffset
從 commitLog
中讀取原投遞的訊息資料
- 讀取
uniqkey
判斷不在 deleteList
中的時候 將訊息投遞到 dequeuePutQueue
中去
深入 TimerMessageStore 之 TimerDequeuePutMessageService
- 預設有三個
TimerDequeuePutMessageService
範例同時消費 dequeuePutQueue
convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
將訊息轉換成原始的 topic 訊息,清除無用屬性
doPut
-> messageStore.putMessage(message)
將訊息投遞到指定 messageQueue
中
TimerFlushService
timerLog
刷盤
timerWheel
刷盤
timerCheckpoint
刷盤
TimerMessageStore 初始化載入原始碼
timerLog.load()
載入檔案
timerMetrics.load
載入檔案
recover
->
recoverAndRevise(lastFlushPos, true)
ps: (用於 timerWhel
跟 timerLog
的資料保持一致重新整理)
lastFlushPos
最後一次刷盤的位置, 其實最終是拿到 timerlog -> mappedFile
的第幾個檔案
- 遍歷這個
mappedFile
的資料
timerWheel.reviseSlot
修改插槽資料。 檢查這個時間的插槽是否已經有填充資料。
- 如果有的話,重新整理
lastPos
(順序遍歷。這裡最終還是會是最後一個 lastPos)
- 如果不存在插槽資料 則插入插槽資料
putSlot
reviseQueueOffset(processOffset);
讀取 timerLog
最後一個資料, 為了校驗最後一個資料是否正常,是否能讀取到訊息。
- 確認
currQueueOffset
資料
- 確認
currReadTimeMs
資料