RocketMQ 是筆者非常喜歡的訊息佇列,4.9.X 版本是目前使用最廣泛的版本,但它的消費邏輯相對較重,很多同學學習起來沒有頭緒。
這篇文章,筆者梳理了 RocketMQ 的消費邏輯,希望對大家有所啟發。
在展開叢集消費邏輯細節前,我們先對 RocketMQ 4.X 架構做一個概覽。
整體架構中包含四種角色 :
1、NameServer
名字服務是是一個幾乎無狀態節點,可叢集部署,節點之間無任何資訊同步。它是一個非常簡單的 Topic 路由註冊中心,其角色類似 Dubbo 中的 zookeeper ,支援 Broker 的動態註冊與發現。
2、BrokerServer
Broker 主要負責訊息的儲存、投遞和查詢以及服務高可用保證 。
3、Producer
訊息釋出的角色,Producer 通過 MQ 的負載均衡模組選擇相應的 Broker 叢集佇列進行訊息投遞,投遞的過程支援快速失敗並且低延遲。
4、Consumer
訊息消費的角色,支援以 push 推,pull 拉兩種模式對訊息進行消費。
RocketMQ 叢集工作流程:
1、啟動 NameServer,NameServer 起來後監聽埠,等待 Broker、Producer 、Consumer 連上來,相當於一個路由控制中心。
2、Broker 啟動,跟所有的 NameServer 保持長連線,定時傳送心跳包。心跳包中包含當前 Broker資訊( IP+埠等 )以及儲存所有 Topic 資訊。註冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的對映關係。
3、收發訊息前,先建立 Topic,建立 Topic 時需要指定該 Topic 要儲存在哪些 Broker 上,也可以在傳送訊息時自動建立 Topic。
4、Producer 傳送訊息,啟動時先跟 NameServer 叢集中的其中一臺建立長連線,並從 NameServer 中獲取當前傳送的 Topic 存在哪些 Broker 上,輪詢從佇列列表中選擇一個佇列,然後與佇列所在的 Broker 建立長連線從而向 Broker 發訊息。
5、Consumer 跟 Producer 類似,跟其中一臺 NameServer 建立長連線,獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連線通道,開始消費訊息。
RocketMQ 的傳輸模型是:釋出訂閱模型 。
釋出訂閱模型具有如下特點:
消費獨立
相比佇列模型的匿名消費方式,釋出訂閱模型中消費方都會具備的身份,一般叫做訂閱組(訂閱關係),不同訂閱組之間相互獨立不會相互影響。
一對多通訊
基於獨立身份的設計,同一個主題內的訊息可以被多個訂閱組處理,每個訂閱組都可以拿到全量訊息。因此釋出訂閱模型可以實現一對多通訊。
RocketMQ 支援兩種訊息模式:叢集消費( Clustering )和廣播消費( Broadcasting )。
叢集消費:同一 Topic 下的一條訊息只會被同一消費組中的一個消費者消費。也就是說,訊息被負載均衡到了同一個消費組的多個消費者範例上。
廣播消費:當使用廣播消費模式時,每條訊息推播給叢集內所有的消費者,保證訊息至少被每個消費者消費一次。
為了實現這種釋出訂閱模型 , RocketMQ 精心設計了它的儲存模型。先進入 Broker 的檔案儲存目錄。
RocketMQ 採用的是混合型的儲存結構。
1、Broker 單個範例下所有的佇列共用一個資料檔案(commitlog)來儲存
生產者傳送訊息至 Broker 端,然後 Broker 端使用同步或者非同步的方式對訊息刷盤持久化,儲存至 commitlog 檔案中。只要訊息被刷盤持久化至磁碟檔案 commitlog 中,那麼生產者傳送的訊息就不會丟失。
單個檔案大小預設 1G , 檔名長度為 20 位,左邊補零,剩餘為起始偏移量,比如 00000000000000000000 代表了第一個檔案,起始偏移量為 0 ,檔案大小為1 G = 1073741824 。
這種設計有兩個優點:
充分利用順序寫,大大提升寫入資料的吞吐量;
快讀定位訊息。
因為訊息是一條一條寫入到 commitlog 檔案 ,寫入完成後,我們可以得到這條訊息的物理偏移量。
每條訊息的物理偏移量是唯一的, commitlog 檔名是遞增的,可以根據訊息的物理偏移量通過二分查詢,定位訊息位於那個檔案中,並獲取到訊息實體資料。
2、Broker 端的後臺服務執行緒會不停地分發請求並非同步構建 consumequeue(消費檔案)和 indexfile(索引檔案)
進入索引檔案儲存目錄 :
1、消費檔案按照主題儲存,每個主題下有不同的佇列,圖中主題 my-mac-topic 有 16 個佇列 (0 到 15) ;
2、每個佇列目錄下 ,儲存 consumequeue 檔案,每個 consumequeue 檔案也是順序寫入,資料格式見下圖。
每個 consumequeue 檔案包含 30 萬個條目,每個條目大小是 20 個位元組,每個檔案的大小是 30 萬 * 20 = 60萬位元組,每個檔案大小約 5.72M 。
和 commitlog 檔案類似,consumequeue 檔案的名稱也是以偏移量來命名的,可以通過訊息的邏輯偏移量定位訊息位於哪一個檔案裡。
消費檔案按照主題-佇列來儲存 ,這種方式特別適配釋出訂閱模型。
消費者從 Broker 獲取訂閱訊息資料時,不用遍歷整個 commitlog 檔案,只需要根據邏輯偏移量從 consumequeue 檔案查詢訊息偏移量 , 最後通過定位到 commitlog 檔案, 獲取真正的訊息資料。
要實現釋出訂閱模型,還需要一個重要檔案:消費進度檔案。原因有兩點:
因此消費進度檔案需要儲存消費組所訂閱主題的消費進度。
我們瀏覽下叢集消費場景下的 Broker 端的消費進度檔案 consumerOffset.json 。
在進度檔案 consumerOffset.json 裡,資料以 key-value 的結構儲存,key 表示:主題@消費者組 , value 是 consumequeue 中每個佇列對應的邏輯偏移量 。
寫到這裡,我們粗糙模擬下 RocketMQ 儲存模型如何滿足釋出訂閱模型 。
1、傳送訊息:生產者傳送訊息到 Broker ;
2、儲存訊息:Broker 將訊息儲存到 commitlog 檔案 ,非同步執行緒會構建消費檔案 consumequeue ;
3、消費流程:消費者啟動後,會通過負載均衡分配對應的佇列,然後向 Broker 傳送拉取訊息請求。Broker 收到消費者拉取請求之後,根據訂閱組,消費者編號,主題,佇列名,邏輯偏移量等引數 ,從該主題下的 consumequeue 檔案查詢訊息消費條目,然後從 commitlog 檔案中獲取訊息實體。消費者在收到訊息資料之後,執行消費監聽器,消費完訊息;
4、儲存進度:消費者將消費進度提交到 Broker ,Broker 會將該消費組的消費進度儲存在進度檔案裡。
我們重點講解下叢集消費的消費流程 ,因為叢集消費是使用最普遍的消費模式,理解了叢集消費,廣播消費也就能順理成章的掌握了。
叢集消費範例程式碼裡,啟動消費者,我們需要設定三個核心屬性:消費組名、訂閱主題、訊息監聽器,最後呼叫 start 方法啟動。
消費者啟動後,我們可以將整個流程簡化成:
消費端的負載均衡是指將 Broker 端中多個佇列按照某種演演算法分配給同一個消費組中的不同消費者,負載均衡是使用者端開始消費的起點。
RocketMQ 負載均衡的核心設計理念是
負載均衡是每個使用者端獨立進行計算,那麼何時觸發呢 ?
消費端啟動時,立即進行負載均衡;
消費端定時任務每隔 20 秒觸發負載均衡;
消費者上下線,Broker 端通知消費者觸發負載均衡。
負載均衡流程如下:
1、傳送心跳
消費者啟動後,它就會通過定時任務不斷地向 RocketMQ 叢集中的所有 Broker 範例傳送心跳包(訊息消費分組名稱、訂閱關係集合、訊息通訊模式和使用者端範例編號等資訊)。
Broker 端在收到消費者的心跳訊息後,會將它維護在 ConsumerManager 的本地快取變數 consumerTable,同時並將封裝後的使用者端網路通道資訊儲存在本地快取變數 channelInfoTable 中,為之後做消費端的負載均衡提供可以依據的後設資料資訊。
2、啟動負載均衡服務
負載均衡服務會根據消費模式為」廣播模式」還是「叢集模式」做不同的邏輯處理,這裡主要來看下叢集模式下的主要處理流程:
(1) 獲取該主題下的訊息消費佇列集合;
(2) 查詢 Broker 端獲取該消費組下消費者 Id 列表;
(3) 先對 Topic 下的訊息消費佇列、消費者 Id 排序,然後用訊息佇列分配策略演演算法(預設為:訊息佇列的平均分配演演算法),計算出待拉取的訊息佇列;
這裡的平均分配演演算法,類似於分頁的演演算法,將所有 MessageQueue 排好序類似於記錄,將所有消費端排好序類似頁數,並求出每一頁需要包含的平均 size 和每個頁面記錄的範圍 range ,最後遍歷整個 range 而計算出當前消費端應該分配到的記錄。
(4) 分配到的訊息佇列集合與 processQueueTable 做一個過濾比對操作。
消費者範例內 ,processQueueTable 物件儲存著當前負載均衡的佇列 ,以及該佇列的處理佇列 processQueue (消費快照)。
標紅的 Entry 部分表示與分配到的訊息佇列集合互不包含,則需要將這些紅色佇列 Dropped 屬性為 true , 然後從 processQueueTable 物件中移除。
綠色的 Entry 部分表示與分配到的訊息佇列集合的交集,processQueueTable 物件中已經存在該佇列。
黃色的 Entry 部分表示這些佇列需要新增到 processQueueTable 物件中,為每個分配的新佇列建立一個訊息拉取請求 pullRequest
, 在訊息拉取請求中儲存一個處理佇列 processQueue
(佇列消費快照),內部是紅黑樹(TreeMap
),用來儲存拉取到的訊息。
最後建立拉取訊息請求列表,並將請求分發到訊息拉取服務,進入拉取訊息環節。
在負載均衡這一小節,我們已經知道負載均衡觸發了拉取訊息的流程。
消費者啟動的時候,會建立一個拉取訊息服務 PullMessageService ,它是一個單執行緒的服務。
核心流程如下:
1、負載均衡服務將訊息拉取請求放入到拉取請求佇列 pullRequestQueue , 拉取訊息服務從佇列中獲取拉取訊息請求 ;
2、拉取訊息服務向 Brorker 服務傳送拉取請求 ,拉取請求的通訊模式是非同步回撥模式 ;
消費者的拉取訊息服務本身就是一個單執行緒,使用非同步回撥模式,傳送拉取訊息請求到 Broker 後,拉取訊息執行緒並不會阻塞 ,可以繼續處理佇列 pullRequestQueue 中的其他拉取任務。
3、Broker 收到消費者拉取訊息請求後,從儲存中查詢出訊息資料,然後返回給消費者;
4、消費者的網路通訊層會執行拉取回撥函數相關邏輯,首先會將訊息資料儲存在佇列消費快照 processQueue 裡;
消費快照使用紅黑樹 msgTreeMap 儲存拉取服務拉取到的訊息 。
5、回撥函數將消費請求提交到訊息消費服務 ,而訊息消費服務會非同步的消費這些訊息;
6、回撥函數會將處理中佇列的拉取請放入到定時任務中;
7、定時任務再次將訊息拉取請求放入到佇列 pullRequestQueue 中,形成了閉環:負載均衡後的佇列總會有任務執行拉取訊息請求,不會中斷。
細心的同學肯定有疑問:既然消費端是拉取訊息,為什麼是長輪詢呢 ?
雖然拉模式的主動權在消費者這一側,但是缺點很明顯。
因為消費者並不知曉 Broker 端什麼時候有新的訊息 ,所以會不停地去 Broker 端拉取訊息,但拉取頻率過高, Broker 端壓力就會很大,頻率過低則會導致訊息延遲。
所以要想消費訊息的延遲低,伺服器端的推播必不可少。
下圖展示了 RocketMQ 如何通過長輪詢減小拉取訊息的延遲。
核心流程如下:
1、Broker 端接收到消費者的拉取訊息請求後,拉取訊息處理器開始處理請求,根據拉取請求查詢訊息儲存 ;
2、從訊息儲存中獲取訊息資料 ,若存在新訊息 ,則將訊息資料通過網路返回給消費者。若無新訊息,則將拉取請求放入到拉取請求表 pullRequestTable 。
3、長輪詢請求管理服務 pullRequestHoldService 每隔 5 秒從拉取請求表中判斷拉取訊息請求的佇列是否有新的訊息。
判定標準是:拉取訊息請求的偏移量是否小於當前消費佇列最大偏移量,如果條件成立則說明有新訊息了。
若存在新的訊息 , 長輪詢請求管理服務會觸發拉取訊息處理器重新處理該拉取訊息請求。
4、當 commitlog 中新增了新的訊息,訊息分發服務會構建消費檔案和索引檔案,並且會通知長輪詢請求管理服務,觸發拉取訊息處理器重新處理該拉取訊息請求。
在拉取訊息的流程裡, Broker 端返回訊息資料,消費者的通訊框架層會執行回撥函數。
回撥執行緒會將資料儲存在佇列消費快照 processQueue(內部使用紅黑樹 msgTreeMap)裡,然後將訊息提交到消費訊息服務,消費訊息服務會非同步消費這些訊息。
訊息消費服務有兩種型別:並行消費服務和順序消費服務 。
並行消費是指消費者將並行消費訊息,消費的時候可能是無序的。
消費訊息並行服務啟動後,會初始化三個元件:消費執行緒池、清理過期訊息定時任務、處理失敗訊息定時任務。
核心流程如下:
0、通訊框架回撥執行緒會將資料儲存在消費快照裡,然後將訊息列表 msgList 提交到消費訊息服務
1、 訊息列表 msgList 組裝成消費物件
2、將消費物件提交到消費執行緒池
我們看到10 條訊息被組裝成三個消費請求物件,不同的消費執行緒會執行不同的消費請求物件。
3、消費執行緒執行訊息監聽器
執行完消費監聽器,會返回消費結果。
4、處理異常訊息
當消費異常時,異常訊息將重新發回 Broker 端的重試佇列( RocketMQ 會為每個 topic 建立一個重試佇列,以 %RETRY% 開頭),達到重試時間後將訊息投遞到重試佇列中進行消費重試。
我們將在重試機制這一節重點講解 RocketMQ 如何實現延遲消費功能 。
假如異常的訊息傳送到 Broker 端失敗,則重新將這些失敗訊息通過處理失敗訊息定時任務重新提交到訊息消費服務。
5、更新本地消費進度
消費者消費一批訊息完成之後,需要儲存消費進度到進度管理器的本地記憶體。
首先我們會從佇列消費快照 processQueue 中移除訊息,返回消費快照 msgTreeMap 第一個偏移量 ,然後呼叫消費訊息進度管理器 offsetStore 更新消費進度。
待更新的偏移量是如何計算的呢?
場景1:快照中1001(訊息1)到1010(訊息10)消費了,快照中沒有了訊息,返回已消費的訊息最大偏移量 + 1 也就是1011。
場景2:快照中1001(訊息1)到1008(訊息8)消費了,快照中只剩下兩條訊息了,返回最小的偏移量 1009。
在場景3,RocketMQ 為了保證訊息肯定被消費成功,消費進度只能維持在1001(訊息1),直到1001也被消費完,原生的消費進度才會一下子更新到1011。
假設1001(訊息1)還沒有消費完成,消費者範例突然退出(機器斷電,或者被 kill ),就存在重複消費的風險。
因為佇列的消費進度還是維持在1001,當佇列重新被分配給新的消費者範例的時候,新的範例從 Broker 上拿到的消費進度還是維持在1001,這時候就會又從1001開始消費,1001-1010這批訊息實際上已經被消費過還是會投遞一次。
所以業務必須要保證訊息消費的冪等性。
寫到這裡,我們會有一個疑問:假設1001(訊息1)因為加鎖或者消費監聽器邏輯非常耗時,導致極長時間沒有消費完成,那麼消費進度就會一直卡住 ,怎麼解決呢 ?
RocketMQ 提供兩種方式一起配合解決:
拉取服務根據並行消費間隔設定限流
拉取訊息服務在拉取訊息時候,會判斷當前佇列的 processQueue 消費快照裡訊息的最大偏移量 - 訊息的最小偏移量大於消費並行間隔(2000)的時候 , 就會觸發流控 , 這樣就可以避免消費者無限迴圈的拉取新的訊息。
清理過期訊息
消費訊息並行服務啟動後,會定期掃描所有消費的訊息,若當前時間減去開始消費的時間大於消費超時時間,首先會將過期訊息傳送 sendMessageBack 命令傳送到 Broker ,然後從快照中刪除該訊息。
順序訊息是指對於一個指定的 Topic ,訊息嚴格按照先進先出(FIFO)的原則進行訊息釋出和消費,即先發布的訊息先消費,後釋出的訊息後消費。
順序訊息分為分割區順序訊息和全域性順序訊息。
1、分割區順序訊息
對於指定的一個 Topic ,所有訊息根據 Sharding Key 進行區塊分割區,同一個分割區內的訊息按照嚴格的先進先出(FIFO)原則進行釋出和消費。同一分割區內的訊息保證順序,不同分割區之間的訊息順序不做要求。
2、全域性順序訊息
對於指定的一個 Topic ,所有訊息按照嚴格的先入先出(FIFO)的順序來發布和消費。
全域性順序訊息實際上是一種特殊的分割區順序訊息,即 Topic 中只有一個分割區,因此全域性順序和分割區順序的實現原理相同。
因為分割區順序訊息有多個分割區,所以分割區順序訊息比全域性順序訊息的並行度和效能更高。
訊息的順序需要由兩個階段保證:
訊息傳送
如上圖所示,A1、B1、A2、A3、B2、B3 是訂單 A 和訂單 B 的訊息產生的順序,業務上要求同一訂單的訊息保持順序,例如訂單 A 的訊息傳送和消費都按照 A1、A2、A3 的順序。
如果是普通訊息,訂單A 的訊息可能會被輪詢傳送到不同的佇列中,不同佇列的訊息將無法保持順序,而順序訊息傳送時 RocketMQ 支援將 Sharding Key 相同(例如同一訂單號)的訊息序路由到同一個佇列中。
下圖是生產者傳送順序訊息的封裝,原理是傳送訊息時,實現 MessageQueueSelector 介面, 根據 Sharding Key 使用 Hash 取模法來選擇待傳送的佇列。
訊息消費
消費者消費訊息時,需要保證單執行緒消費每個佇列的訊息資料,從而實現消費順序和釋出順序的一致。
順序消費服務的類是 ConsumeMessageOrderlyService ,在負載均衡階段,並行消費和順序消費並沒有什麼大的差別。
最大的差別在於:順序消費會向 Borker 申請鎖 。消費者根據分配的佇列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取訊息,如果失敗,則定時任務每隔20秒會重新嘗試。
順序消費核心流程如下:
1、 組裝成消費物件
2、 將請求物件提交到消費執行緒池
和並行消費不同的是,這裡的消費請求包含消費快照 processQueue ,訊息佇列 messageQueue 兩個物件,並不對訊息列表做任何處理。
3、 消費執行緒內,對消費佇列加鎖
順序消費也是通過執行緒池消費的,synchronized 鎖用來保證同一時刻對於同一個佇列只有一個執行緒去消費它
4、 從消費快照中取得待消費的訊息列表
消費快照 processQueue 物件裡,建立了一個紅黑樹物件 consumingMsgOrderlyTreeMap 用於臨時儲存的待消費的訊息。
5、 執行訊息監聽器
消費快照的消費鎖 consumeLock 的作用是:防止負載均衡執行緒把當前消費的 MessageQueue 物件移除掉。
6、 處理消費結果
消費成功時,首先計算需要提交的偏移量,然後更新本地消費進度。
消費失敗時,分兩種場景:
我們做一個關於順序消費的總結 :
RocketMQ 消費者消費完一批資料後, 會將佇列的進度儲存在本地記憶體,但還需要將佇列的消費進度持久化。
1、 叢集模式
叢集模式下,分兩種場景:
Broker 的這兩個處理器都呼叫消費者進度管理器 consumerOffsetManager 的 commitOffset 方法,定時任務非同步將消費進度持久化到消費進度檔案 consumerOffset.json 中。
2、 廣播模式
廣播模式消費進度儲存在消費者本地,定時任務每隔 5 秒通過 LocalFileOffsetStore 持久化到本地檔案offsets.json
,資料格式為 MessageQueue:Offset
。
廣播模式下,消費進度和消費組沒有關係,本地檔案 offsets.json
儲存在設定的目錄,檔案中包含訂閱主題中所有的佇列以及佇列的消費進度。
叢集消費下,重試機制的本質是 RocketMQ 的延遲訊息功能。
消費訊息失敗後,消費者範例會通過 CONSUMER_SEND_MSG_BACK 請求,將失敗訊息發回到 Broker 端。
Broker 端會為每個 topic 建立一個重試佇列 ,佇列名稱是:%RETRY% + 消費者組名 ,達到重試時間後將訊息投遞到重試佇列中進行消費重試(消費者組會自動訂閱重試 Topic)。最多重試消費 16 次,重試的時間間隔逐漸變長,若達到最大重試次數後訊息還沒有成功被消費,則訊息將被投遞至死信佇列。
第幾次重試 | 與上次重試的間隔時間 | 第幾次重試 | 與上次重試的間隔時間 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分鐘 |
2 | 30 秒 | 10 | 8 分鐘 |
3 | 1 分鐘 | 11 | 9 分鐘 |
4 | 2 分鐘 | 12 | 10 分鐘 |
5 | 3 分鐘 | 13 | 20 分鐘 |
6 | 4 分鐘 | 14 | 30 分鐘 |
7 | 5 分鐘 | 15 | 1 小時 |
8 | 6 分鐘 | 16 | 2 小時 |
開源 RocketMQ 4.X 支援延遲訊息,預設支援18 個 level 的延遲訊息,這是通過 broker 端的 messageDelayLevel 設定項確定的,如下:
Broker 在啟動時,內部會建立一個內部主題:SCHEDULE_TOPIC_XXXX,根據延遲 level 的個數,建立對應數量的佇列,也就是說18個 level 對應了18個佇列。
我們先梳理下延遲訊息的實現機制。
1、生產者傳送延遲訊息
Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設定延遲level為5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);
2、Broker端儲存延遲訊息
延遲訊息在 RocketMQ Broker 端的流轉如下圖所示:
第一步:修改訊息 Topic 名稱和佇列資訊
Broker 端接收到生產者的寫入訊息請求後,首先都會將訊息寫到 commitlog 中。假如是正常非延遲訊息,MessageStore 會根據訊息中的 Topic 資訊和佇列資訊,將其轉發到目標 Topic 的指定佇列 consumequeue 中。
但由於訊息一旦儲存到 consumequeue 中,消費者就能消費到,而延遲訊息不能被立即消費,所以 RocketMQ 將 Topic 的名稱修改為SCHEDULE_TOPIC_XXXX,並根據延遲級別確定要投遞到哪個佇列下。
同時,還會將訊息原來要傳送到的目標 Topic 和佇列資訊儲存到訊息的屬性中。
第二步:構建 consumequeue 檔案時,計算並儲存投遞時間
上圖是 consumequeue 檔案一條訊息的格式,最後 8 個位元組儲存 Tag 的雜湊值,此時儲存訊息的投遞時間。
第三步:定時排程服務啟動
ScheduleMessageService 類是一個定時排程服務,讀取 SCHEDULE_TOPIC_XXXX 佇列的訊息,並將訊息投遞到目標 Topic 中。
定時排程服務啟動時,建立一個定時排程執行緒池 ,並根據延遲級別的個數,啟動對應數量的 HandlePutResultTask ,每個 HandlePutResultTask 負責一個延遲級別的消費與投遞。
第四步:投遞時間到了,將訊息資料重新寫入到 commitlog
訊息到期後,需要投遞到目標 Topic 。第一步已經記錄了原來的 Topic 和佇列資訊,這裡需要重新設定,再儲存到 commitlog 中。
第五步:將訊息投遞到目標 Topic 中
Broker 端的後臺服務執行緒會不停地分發請求並非同步構建 consumequeue(消費檔案)和 indexfile(索引檔案)。因此訊息會直接投遞到目標 Topic 的 consumequeue 中,之後消費者就可以消費到這條訊息。
回顧了延遲訊息的機制,消費訊息失敗後,消費者範例會通過 CONSUMER_SEND_MSG_BACK 請求,將失敗訊息發回到 Broker 端。
Broker 端 SendMessageProcessor 處理器會呼叫 asyncConsumerSendMsgBack 方法。
首先判斷訊息的當前重試次數是否大於等於最大重試次數,如果達到最大重試次數,或者設定的重試級別小於0,則重新建立 Topic ,規則是 %DLQ% + consumerGroup,後續處理訊息傳送到死信佇列。
正常的訊息會進入 else 分支,對於首次重試的訊息,預設的 delayLevel 是 0 ,RocketMQ 會將 delayLevel + 3,也就是加到 3 ,這就是說,如果沒有顯示的設定延時級別,訊息消費重試首次,是延遲了第三個級別發起的重試,也就是距離首次傳送 10s 後重試,其主題的預設規則是 %RETRY% + consumerGroup。
當延時級別設定完成,重新整理訊息的重試次數為當前次數加 1 ,Broker 端將該訊息刷盤,邏輯如下:
延遲訊息寫入到 commitlog 裡 ,這裡其實和延遲訊息機制的第一步類似,後面按照延遲訊息機制的流程執行即可(第二步到第六步)。
下圖展示了叢集模式下消費者並行消費流程 :
核心流程如下:
pullRequest
, 拉取請求儲存一個處理佇列 processQueue
,內部是紅黑樹(TreeMap
),用來儲存拉取到的訊息 ;pullRequestQueue
中彈出拉取訊息,執行拉取任務 ,拉取請求是非同步回撥模式,將拉取到的訊息放入到處理佇列;pullRequestQueue
中 ;consumeMessageService
的 submitConsumeRequest
方法 ,消費訊息服務內部有一個消費執行緒池;listener.consumeMessage
;updateOffset
,先更新到記憶體 offsetTable
,定時上報到 Broker ;若消費失敗,則將失敗消費傳送到 Broker 。commitOffset
方法修改記憶體的消費進度,定時刷盤到 consumerOffset.json
。RocketMQ 4.X 的消費邏輯有兩個非常明顯的特點:
如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,你的支援會激勵我輸出更高質量的文章,非常感謝!