MQ系列6:訊息的消費

2022-10-09 15:00:16

MQ系列1:訊息中介軟體執行原理
MQ系列2:訊息中介軟體的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ訊息的傳送模式

在之前的文章中,我們學習了RocketMQ的原理;RocketMQ中 命名服務 ServiceName 的執行流程;以及訊息生產、傳送的原理和模式。這一篇,就讓我們從訊息消費的角度去進一步的學習。

1 訊息消費

訊息的消費主要是由如下幾個核心能力組成的:

  • 消費方式:Push(推) 或者 Pull(拉)
  • 消費模式:廣播模式和叢集模式
  • 訊息消費反饋
  • 流量控制(包括消費並行執行緒數設定)
  • 訊息的過濾(Tag, Key),過濾標籤 TagA||TagB||TagC

1.1 消費方式Push or Pull

RocketMQ訊息訂閱有方式:

  • Push方式(MQPushConsumer),MQ Server主動向消費端推播;
    這種模式不考慮消費端是否有能力處理消費資料,實時性比較高,能夠及時推播資料,適合大部分業務場景。但同時存在一個問題,如果遇到峰值期,瞬間推播過多訊息,會導致積壓,甚至使用者端雪崩。
  • Pull方式(MQPullConsumer),消費端在有需要時,主動從MQ Server拉取資料。
    消費端比較靈活,可以根據自己的吞吐能力,消費的節奏,主動安排訊息拉取。適合消費和計算耗時比較大的消費場景。
    缺點就是如何從程式碼層面精準地控制拉取的頻率,過短對消費端有壓力,並且有可能空拉照成資源拉菲;過長可能對訊息及時性有影響,可以採用長輪詢的方式進行處理。
  • Push模式與Pull模式的區別
    Push方式的做法是,Consumer封裝了長輪詢的操作,並註冊MessageListener監聽器,當MessageListener監聽到有新的訊息的時候,消費端便被喚醒,讀取訊息進行消費。從使用者角度上,訂閱訊息並消費感覺訊息是推過來的。
    Pull方式的做法是,消費端主動去拉取資料,獲取相應的Topic的,遍歷MessageQueue集合,取數,重新標記offset,再取數,直至消費完成。

1.2 消費模式 叢集 or 廣播

RocketMQ 目前支援叢集模式和廣播消費模式,其中叢集模式使用範圍比較大,即對等,訊息消費了即完成。

  • 叢集負載均衡消費模式(預設)
    叢集模式是一個主題下的單條訊息只允許被同一消費組內的一個消費者消費,消費完即完成,即P2P。
    在叢集模式下,訊息佇列負載的模式:一個MessageQueue集合同一個時間內只允許被同一消費組內的單個消費者消費一次(這種模式不允許重複消費,如付款,訂單提交),單個Consumer可以消費多個遍歷MessageQueue集合。
  • 廣播消費模式
    廣播模式指的是當前主題下的消費組所有消費者都可以消費並處理訊息一次,達到廣播的目的。很多業務場景,比如航班延遲的訊息通知,告知使用者端快取資訊過期需要重新拉起等。

1.3 消費進度反饋

RocketMQ使用者端消費資料之後,需要向Broker反饋訊息的消費進度,Broker獲取到消費進度記錄下來。這樣保證 佇列rebalance和使用者端消費者重啟動的時候,可以獲取到準確的消費進度。

訊息消費以及進度反饋的主步驟如下:

  • 消費執行緒池消費完資料之後,將訊息消費進度快取在記憶體中。
  • 定時排程任務 5s 一次將訊息隊裡的消費 offset 提交至Broker。
  • Broker接受到訊息之後,儲存在記憶體中,如果有新的過來,可以更行,同樣的每5s將offset持久化下來。
  • 消費使用者端從Broker拉取訊息時,同步將MessageQueue的消費偏移量提交到Broker。

綜合上面的內容,需要注意的點如下:

  • RocketMQ以Consumer Group(消費者小組)和 Queue(佇列)為標準對消費刻度進行管理的
  • Consumer Offset標記消費組在訊息佇列(Queue)上的消費進度。
  • 消費成功後,消費進度暫時更新到本地快取,排程任務會定時(預設5s)將進度同步到broker(需注意如果宕機,消費進度未提交則可能導致被重複消費),Broker最終將消費進度持久化到磁碟。
  • RocketMQ支援並行消費,所以是多個執行緒並行處理,每次記錄消費進度的時候,把執行緒中最小的offset值作為消費進度值,這樣避免了訊息丟失,但有重複消費的風險,業務中需保證操作冪等性。
  • offset儲存模式:叢集模式,訊息進度儲存於Broker上;廣播模式,訊息消費進度在消費端即可。

1.4 消費端流量控制

可以在DefaultMQPushConsumer 物件中設定各種屬性來對消費流量進行控制:

  • PullInterval: 設定消費端拉取MQ訊息的間隔時間。間隔時間是按照上次消費完成之後(比如rocketMQ收到Ack回覆訊息之後)。
    PullInterval=20s,比如上次rocketMq服務收到Ack訊息是12:15:15,則 12:15:35再去拉訊息。

  • PullBatchSize: 消費端每個佇列一次拉取多少個訊息,若該消費端分賠了N個監控佇列,每次拉取M個,那麼消費端每次去rocketMq拉取的訊息為N * M。
    消費端每次pull到訊息總數=PullBatchSize * 監聽佇列數,如 PullBatchSize = 2, 監聽佇列=5,則 訊息總數量 = 2 * 5 = 10。

  • ThreadMin和ThreadMax: 消費端消費pull到的訊息需要的執行緒數量。

    • ThreadMin:消費端拉取到訊息後分配消費的執行緒數
    • ThreadMax:最大消費執行緒,如果預設佇列滿了,則啟用新的執行緒
  • RocketMq 邏輯消費佇列數量的設定
    rocketMq 可以設定消費佇列,如 queue Read1 ,queue Read2,設定數量決定每次pull到的訊息總數。Rocket MQ 提供了讀寫佇列數量的設定。

  • 消費端節點部署數量
    多節點消費端執行緒數量要比單節點消費執行緒數量多,理論上消費速度大於單節點,分治思維。

1.5 訊息的過濾

在過濾訊息的時候,標籤模式簡單而是用,可以篩選出你需要的資料。如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupTest");
consumer.subscribe("testTopic", MessageSelector.byTag("Tag1  || Tag2 || Tag3").bySql("sex = 'male' AND name = 'brand'));

這種情況下,訊息中帶有 Tag1 、Tag2、Tag3 標籤就會被過濾出來,但是單個消限制息只能有一個標籤,這就遠遠滿足不了各種複雜的並交集場景的需要了。
這時候Rocket MQ可以在訊息中設定一些屬性,再使用SQL表示式篩選屬性來過濾出需要的資料。 如下

------------
| message  |
|----------|  sex = male AND name = 'brand' , Gotten
| name = 'brand' |  
| sex = 'male'|
| age = 21|
------------

------------
| message  |
|----------|   sex = male AND name = 'brand', Gotten , Missed
| name = 'Anny'    | 
| sex = 'female'|
| age = 20 |
------------

1.8 提高Consumer的處理能力 :看情況

  1. 提高消費並行度
    在同一個ConsumerGroup下(Clustering方式),可以通過增加Consumer範例的數量來提高並行度。
    通過加機器,或者在已有機器中啟動多個Consumer程序都可以增加Consumer範例數。
    注意:總的Consumer數量不要超過Topic下Read Queue數量,超過的Consumer範例接收不到訊息。
    此外,通過提高單個Consumer範例中的並行處理的執行緒數,可以在同一個Consumer內增加並行度來提高吞吐量(設定方法是修改consumeThreadMin和consumeThreadMax)。

  2. 以批次方式進行消費
    某些業務場景下,多條訊息同時處理的時間會大大小於逐個處理的時間總和,比如消費訊息中涉及update某個資料庫,一次update10條的時間會大大小於十次update1條資料的時間。
    可以通過批次方式消費來提高消費的吞吐量。實現方法是設定Consumer的consumeMessageBatchMaxSize這個引數,預設是1,如果設定為N,在訊息多的時候每次收到的是個長度為N的訊息連結串列。

  3. 檢測延時情況,跳過非重要訊息
    Consumer在消費的過程中,如果發現由於某種原因發生嚴重的訊息堆積,短時間無法消除堆積,這個時候可以選擇丟棄不重要的訊息,使Consumer儘快追上Producer的進度。

2 訊息消費的模式

2.1 基本資訊消費

消費者的基本實現,連線 NameServer的地址,指定Topic和Tag,讀取到需要消費的資料,然後輪詢並處理。

public class SimpleConsumerApplication {
    public static void main(String[] args) throws MQClientException {
        // 1.建立消費者Consumer,並指定消費者組名為 testConsumGroup
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
        // 2.指定NameServer的地址,以獲取Broker路由地址
        consumer.setNamesrvAddr("192.168.139.1:9876");
        // 3.指定Topic和Tag 資訊。* 代表所有
        consumer.subscribe("testTopic", "*");

        // 4.設定回撥函數,用來處理讀取到的訊息, MessageListenerOrderly 用單個執行緒處理處理佇列的資料
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgList) {
                    System.out.println("執行緒 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
                    // Todo,具體的業務邏輯
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 5.消費者開始執行消費任務
        consumer.start();
    }
}

2.2 順序消費

相比與基本消費,多了一個 ConsumeFromWhere的設定。代表消費者從哪個位置開始消費,列舉如下:

  • CONSUME_FROM_LAST_OFFSET:第一次啟動從佇列最後位置消費,非第一次啟動接著上次消費的進度繼續消費
  • CONSUME_FROM_FIRST_OFFSET:第一次啟動從佇列初始位置消費,非第一次啟動接著上次消費的進度繼續消費
  • CONSUME_FROM_TIMESTAMP:第一次啟動從指定時間點位置消費,非第一次啟動接著上次消費的進度繼續消費
    以上所說的第一次啟動是指從來沒有消費過的消費者,如果該消費者消費過,那麼會在broker端記錄該消費者的消費位置,消費者掛了再啟動,則從上次消費進度繼續執行。
public class SimpleOrderApplication {
    public static void main(String[] args) throws MQClientException {
        // 1.建立消費者Consumer,並指定消費者組名為 testConsumGroup
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
        // 2.指定NameServer的地址,以獲取Broker路由地址
        consumer.setNamesrvAddr("192.168.139.1:9876");

        /**
         * 設定Consumer第一次啟動是從佇列頭部、佇列尾部、還是指定時間戳節點開始消費
         * 非第一次啟動接著上次消費的進度繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 3.指定Topic和Tag 資訊。* 代表所有
        consumer.subscribe("testTopic", "*");

        // 4.設定回撥函數,用來處理讀取到的訊息, MessageListenerOrderly 用單個執行緒處理處理佇列的資料
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgList) {
                    System.out.println("執行緒 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
                    // Todo,具體的業務邏輯
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 5.消費者開始執行消費任務
        consumer.start();
    }
}

2.3 過濾訊息消費

可以使用MessageSelector.byTag來進行標籤篩選;或者使用MessageSelector.bySql 來進行訊息屬性篩選;或者混合使用。
參考下面程式碼,註釋說明的比較清楚。

public class FilterConsumerApplication {
    public static void main(String[] args) throws MQClientException {
        // 1.建立消費者Consumer,並指定消費者組名為 testConsumGroup
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
        // 2.指定NameServer的地址,以獲取Broker路由地址
        consumer.setNamesrvAddr("192.168.139.1:9876");

        // 3.指定Topic和Tag 資訊。只有訂閱的訊息有 sex 和 name 屬性, 並且年齡為 18 歲以上的男性
        // consumer.subscribe("testTopic", MessageSelector.byTag("userTag1 || userTag2"));
        consumer.subscribe("testTopic", MessageSelector.bySql("sex = 'male' AND age > 18"));

        // 4.設定回撥函數,用來處理讀取到的訊息, MessageListenerOrderly 用單個執行緒處理處理佇列的資料
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgList) {
                    System.out.println("執行緒 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
                    // Todo,具體的業務邏輯
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 5.消費者開始執行消費任務
        consumer.start();
    }
}

3 總結

  • 消費方式:Push(推) 或者 Pull(拉)
  • 消費模式:廣播模式和叢集模式
  • 訊息消費反饋
  • 流量控制(包括消費並行執行緒數設定)
  • 訊息的過濾(Tag, Key),過濾標籤 TagA||TagB||TagC