RocketMQ 順序消費機制

2023-05-29 18:00:22

順序訊息是指對於一個指定的 Topic ,訊息嚴格按照先進先出(FIFO)的原則進行訊息釋出和消費,即先發布的訊息先消費,後釋出的訊息後消費。

順序訊息分為分割區順序訊息全域性順序訊息

1、分割區順序訊息

對於指定的一個 Topic ,所有訊息根據 Sharding Key 進行區塊分割區,同一個分割區內的訊息按照嚴格的先進先出(FIFO)原則進行釋出和消費。同一分割區內的訊息保證順序,不同分割區之間的訊息順序不做要求。

  • 適用場景:適用於效能要求高,以 Sharding Key 作為分割區欄位,在同一個區塊中嚴格地按照先進先出(FIFO)原則進行訊息釋出和消費的場景。
  • 範例:電商的訂單建立,以訂單 ID 作為 Sharding Key ,那麼同一個訂單相關的建立訂單訊息、訂單支付訊息、訂單退款訊息、訂單物流訊息都會按照發布的先後順序來消費。

2、全域性順序訊息

對於指定的一個 Topic ,所有訊息按照嚴格的先入先出(FIFO)的順序來發布和消費。

  • 適用場景:適用於效能要求不高,所有的訊息嚴格按照 FIFO 原則來發布和消費的場景。
  • 範例:在證券處理中,以人民幣兌換美元為 Topic,在價格相同的情況下,先出價者優先處理,則可以按照 FIFO 的方式釋出和消費全域性順序訊息。

全域性順序訊息實際上是一種特殊的分割區順序訊息,即 Topic 中只有一個分割區,因此全域性順序和分割區順序的實現原理相同

因為分割區順序訊息有多個分割區,所以分割區順序訊息比全域性順序訊息的並行度和效能更高

訊息的順序需要由兩個階段保證:

  • 訊息傳送

    如上圖所示,A1、B1、A2、A3、B2、B3 是訂單 A 和訂單 B 的訊息產生的順序,業務上要求同一訂單的訊息保持順序,例如訂單 A 的訊息傳送和消費都按照 A1、A2、A3 的順序。

    如果是普通訊息,訂單A 的訊息可能會被輪詢傳送到不同的佇列中,不同佇列的訊息將無法保持順序,而順序訊息傳送時 RocketMQ 支援將 Sharding Key 相同(例如同一訂單號)的訊息序路由到一個佇列中。

  • 訊息消費

    消費者消費訊息時,使用單執行緒消費重平衡已分配的訊息佇列,保證訊息消費順序和儲存順序一致,最終實現消費順序和釋出順序的一致。

我們知道負載均衡服務是使用者端開始消費的起點。在負載均衡階段,並行消費和順序消費並沒有什麼大的差別,最大的差別在於:向 Borker 申請鎖

消費者根據分配的佇列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取訊息,如果失敗,則定時任務每隔20秒會重新嘗試。

s

見上圖,順序消費核心流程如下:

1、 組裝成消費物件

2、 將請求物件提交到消費執行緒池

和並行消費不同的是,這裡的消費請求包含消費快照 processQueue ,訊息佇列 messageQueue 兩個物件,並不對訊息列表做任何處理。

3、 消費執行緒內,對消費佇列加鎖

4、 從消費快照中取得待消費的訊息列表

消費快照 processQueue 物件裡,建立了一個紅黑樹物件 consumingMsgOrderlyTreeMap 用於臨時儲存的待消費的訊息。

5、 執行訊息監聽器

執行監聽器邏輯容易理解,消費快照的消費鎖 consumeLock的作用是:防止 Rebalance 執行緒把當前消費的 MessageQueue 物件移除掉。

6、 處理消費結果

消費成功時,首先獲取需要提交的偏移量,然後更新本地消費進度。

消費失敗時,分兩種場景:

  • 假如已消費次數小於最大重試次數,則將放入物件 consumingMsgOrderlyTreeMap 用例臨時儲存的待消費的訊息,重新加入到消費快照紅黑樹 msgTreeMap中,然後使用定時任務嘗試重新消費。
  • 假如已消費次數大於等於最大重試次數,則將失敗訊息傳送到 Broker ,Broker 接收到訊息後,會加入到死信佇列裡 , 獲取需要提交的偏移量,然後更新本地消費進度。

我們做一個關於順序消費的總結:

  1. 順序消費需要由兩個階段訊息傳送訊息消費協同配合,底層支撐依靠的是 RocketMQ 的儲存模型;
  2. 順序消費服務啟動後,通過三把鎖的機制,使得消費者範例單執行緒的消費重平衡分配的消費佇列;
  3. 假如發生擴容,消費者重啟,或者 Broker 宕機 ,順序消費也會有一定機率較短時間內亂序,所以消費者的業務邏輯還是要保障冪等