1.5萬字 + 25張圖盤點RocketMQ 11種訊息型別,你知道幾種?

2023-12-12 15:00:33

大家好,我是三友~~

故事的開頭是這樣的

最近有個兄弟私信了我一張截圖

我一看截圖內容,好傢伙,原來是我一年多前立的flag

倒不是我忘了這件事,我後來也的確寫了一篇的關於RocketMQ執行的原理的文章

只不過這篇文章是從上帝的視角去看待RocektMQ一條訊息整個生命週期的過程

所以就沒有具體的分析事務和延遲訊息的實現原理,也算是留下了一個小小的坑吧

不過,既然現在有兄弟問了,那麼今天我這就來把這個坑填上

並且,索性咱就直接把這個坑填得滿滿的,直接盤點RocketMQ支援的11種訊息型別以及背後的實現原理

本文是基於RocketMQ 4.9版本講解

前置知識

為了幫助大家更好地理解這些訊息底層的實現原理,這裡我就通過三個問題來講一講RocketMQ最最基本的原理

如果有什麼不解的,可以看看我之前寫的RocketMQ訊息短暫而又精彩的一生這篇文章

1、生產者如何傳送訊息

在RocketMQ中有兩個重要的角色

  • NameServer:就相當於一個註冊中心
  • Broker:RocketMQ伺服器端

當RocketMQ伺服器端,也就是Broker在啟動的時候,會往NameServer註冊自己的資訊

這些資訊其中就包括

  • 當前Broker所在機器的ip和埠
  • 當前Broker管理的Topic的名稱以及每個Topic有幾個佇列

當生產者和消費者啟動的時候,就會從NameServer拉取這些資訊,這樣生產者和消費者就可以通過NameServer中獲取到Broker的ip和埠,跟Broker通訊了

而Topic我們也都知道,是訊息佇列中一個很重要的概念,代表了一類訊息的集合

在RocketMQ中,每個Topic預設都會有4個佇列,並且每個佇列都有一個id,預設從0開始,依次遞增

生產者傳送訊息的時候,就會從訊息所在Topic的佇列中,根據一定的演演算法選擇一個,然後攜帶這個佇列的id(queueId),再傳送給Broker

攜帶的佇列的id就代表了這條訊息屬於這個佇列的

所以從更細化的來說,訊息雖然是在Topic底下,但是真正是分佈在不同的佇列上的,每個佇列會有這個Topic下的部分訊息。

2、訊息存在哪

當訊息被Broker接收到的時候,Broker會將訊息存到原生的磁碟檔案中,保證Broker重啟之後訊息也不丟失

RocketMQ給這個存訊息的檔案起了一個高大上的名字:CommitLog

由於訊息會很多,所以為了防止檔案過大,CommitLog在物理磁碟檔案上被分為多個磁碟檔案,每個檔案預設的固定大小是1G

訊息在寫入到檔案時,除了包含訊息本身的內容資料,也還會包含其它資訊,比如

  • 訊息的Topic
  • 訊息所在佇列的id,前面提到過
  • 訊息生產者的ip和埠
  • ...

這些資料會和訊息本身按照一定的順序同時寫到CommitLog檔案中

上圖中黃色排列順序和實際的存的內容並非實際情況,我只是舉個例子

3、消費者如何消費訊息

消費者是如何拉取訊息的

在RocketMQ中,訊息的消費單元是以佇列來的

所以RocketMQ為了方便快速的查詢和消費訊息,會為每個Topic的每個佇列也單獨建立一個檔案

RocketMQ給這個檔案也起了一個高大上的名字:ConsumeQueue

當訊息被存到CommitLog之後,其實還會往這條訊息所在佇列的ConsumeQueue檔案中插一條資料

每個佇列的ConsumeQueue也是由多個檔案組成,每個檔案預設是存30萬條資料

插入ConsumeQueue中的每條資料由20個位元組組成,包含3部分資訊

  • 訊息在CommitLog的起始位置(8個位元組)
  • 訊息在CommitLog儲存的長度(8個位元組)
  • 訊息tag的hashCode(4個位元組)

每條資料也有自己的編號(offset),預設從0開始,依次遞增

當消費者拉取訊息的時候,會告訴伺服器端自己消費哪個佇列(queueId),哪個位置的訊息(offset)的訊息

伺服器端接收到訊息之後,會找到queueId對應的ConsumeQueue,然後找到offset位置的資料,最後根據這條資料到CommitLog檔案查詢真正的訊息內容

所以,從這可以看出,ConsumeQueue其實就相當於是一個索引檔案,方便我們快速查詢在CommitLog中的訊息

所以,記住下面這個非常重要的結論,有助於後面的文章內容的理解

要想查詢到某個Topic下的訊息,那麼一定是先找這個Topic佇列對應的ConsumeQueue,之後再通過ConsumeQueue中的資料去CommitLog檔案查詢真正的訊息內容

消費者組和消費模式

在RocketMQ,消費者是有個消費者組的概念,在啟動消費者的時候會指定該消費者屬於哪個消費者組。

//建立一個消費者,指定消費者組的名稱為sanyouConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");

一個消費者組中可以有多個消費者,不同消費者組之間消費訊息是互不干擾的

在同一個消費者組中,訊息消費有兩種模式

  • 叢集模式
  • 廣播模式

同一條訊息在同一個消費者組底下只會被消費一次,這就叫叢集模式

叢集消費的實現就是將佇列按照一定的演演算法分配給消費者,預設是按照平均分配的

廣播模式剛好相反,同一條訊息能被同一個消費者組底下所有的消費者消費一次

RocketMQ預設是叢集模式,如果你想用廣播模式,只需設定一下即可

consumer.setMessageModel(MessageModel.BROADCASTING);

好了,到這就講完了前置知識,這些前置知識後面或多或少都有提到

如果你覺得看的不過癮,更詳細的文章奉上RocketMQ訊息短暫而又精彩的一生

普通訊息

普通訊息其實就很簡單,如下面程式碼所示,就是傳送一條普通的訊息

public class Producer {
    public static void main(String[] args) throws Exception {
        //建立一個生產者,指定生產者組為 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 啟動生產者
        producer.start();

        //建立一條訊息 topic為 sanyouTopic 訊息內容為 三友的java日記
        Message msg = new Message("sanyouTopic""三友的java日記".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 傳送訊息並得到訊息的傳送結果,然後列印
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 關閉生產者
        producer.shutdown();
    }

}

構建的訊息的topic為sanyouTopic,內容為三友的java日記,這就是一條很普通的訊息

批次訊息

批次訊息從名字也可以看出來,就是將多個訊息同時發過去,減少網路請求的次數

public class Producer {
    public static void main(String[] args) throws Exception {
        //建立一個生產者,指定生產者組為 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 啟動生產者
        producer.start();

        //用以及集合儲存多個訊息
        List<Message> messages = new ArrayList<>();
        messages.add(new Message("sanyouTopic""三友的java日記 0".getBytes()));
        messages.add(new Message("sanyouTopic""三友的java日記 1".getBytes()));
        messages.add(new Message("sanyouTopic""三友的java日記 2".getBytes()));
        // 傳送訊息並得到訊息的傳送結果,然後列印
        SendResult sendResult = producer.send(messages);
        System.out.printf("%s%n", sendResult);

        // 關閉生產者
        producer.shutdown();
    }

}

多個普通訊息同時傳送,這就是批次訊息

不過在使用批次訊息的時候,需要注意以下兩點

  • 每條訊息的Topic必須都得是一樣的
  • 不支援延遲訊息和事務訊息

普通訊息和批次訊息比較簡單,沒有複雜的邏輯,就是將訊息傳送過去,在ConsumeQueue和CommitLog存上對應的資料就可以了

順序訊息

所謂的順序訊息就是指

生產者傳送訊息的順序跟消費者消費訊息的順序是一致的

RocketMQ可以保證同一個佇列的訊息絕對順序,先進入佇列的訊息會先被消費者拉取到,但是無法保證一個Topic內訊息的絕對順序

所以要想通過RocketMQ實現順序消費,需要保證兩點

  • 生產者將需要保證順序的訊息傳送到同一個佇列
  • 消費者按照順序消費拉取到的訊息

那麼,第一個問題,如何訊息傳送到同一個佇列

前面有提到,RocketMQ傳送訊息的時候會選擇一個佇列進行傳送

而RocketMQ預設是通過輪詢演演算法來選擇佇列的,這就無法保證需要順序消費的訊息會存到同一個佇列底下

所以,預設情況下是不行了,我們需要自定義佇列的選擇演演算法,才能保證訊息都在同一個佇列中

RocketMQ提供了自定義佇列選擇的介面MessageQueueSelector

比如我們可以實現這個介面,保證相同訂單id的訊息都選擇同一個佇列,在訊息傳送的時候指定一下就可以了

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        //可以根據業務的id從mqs中選擇一個佇列
        return null;
    }
}, new Object());

保證訊息順序傳送之後,第二個問題,消費者怎麼按照順序消費拉取到的訊息?

這個問題RocketMQ已經考慮到了,看看RocketMQ多麼地貼心

RocketMQ在消費訊息的時候,提供了兩種方式:

  • 並行消費
  • 順序消費

並行消費,多個執行緒同時處理同一個佇列拉取到的訊息

順序消費,同一時間只有一個執行緒會處理同一個佇列拉取到的訊息

至於是並行消費還是順序消費,需要我們自己去指定

對於順序處理,只需要實現MessageListenerOrderly介面,處理訊息就可以了

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 建立一個消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");
        // 指定NameServer的地址
        consumer.setNamesrvAddr("192.168.200.143:9876");

        // 訂閱sanyouTopic這個topic下的所有的訊息
        consumer.subscribe("sanyouTopic""*");
        // 註冊一個消費的監聽器,當有訊息的時候,會回撥這個監聽器來消費訊息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("消費訊息:%s", new String(msg.getBody()) + "\n");
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        // 啟動消費者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

如果想並行消費,換成實現MessageListenerConcurrently即可

到這你可能會有一個疑問

並行消費和順序消費跟前面提到的叢集消費和廣播消費有什麼區別?

叢集消費和廣播消費指的是一個消費者組裡的每個消費者是去拉取全部佇列的訊息還是部分佇列的訊息,也就是選擇需要拉取的佇列

而並行和順序消費的意思是,是對已經拉到的同一個佇列的訊息,是並行處理還是按照訊息的順序去處理

延遲訊息

延遲訊息就是指生產者傳送訊息之後,訊息不會立馬被消費,而是等待一定的時間之後再被訊息

RocketMQ的延遲訊息用起來非常簡單,只需要在建立訊息的時候指定延遲級別,之後這條訊息就成為延遲訊息了

Message message = new Message("sanyouTopic""三友的java日記 0".getBytes());
//延遲級別
message.setDelayTimeLevel(1);

雖然用起來簡單,但是背後的實現原理還是有點意思,我們接著往下看

RocketMQ延遲訊息的延遲時間預設有18個級別,不同的延遲級別對應的延遲時間不同

RocketMQ內部有一個Topic,專門用來表示是延遲訊息的,叫SCHEDULE_TOPIC_XXXX,XXXX不是預留位置,就是XXXX

RocketMQ會根據延遲級別的個數為SCHEDULE_TOPIC_XXXX這個Topic建立相對應數量的佇列

比如預設延遲級別是18,那麼SCHEDULE_TOPIC_XXXX就有18個佇列,佇列的id從0開始,所以延遲級別為1時,對應的佇列id就是0,為2時對應的就是1,依次類推

SCHEDULE_TOPIC_XXXX這個Topic有什麼作用呢?

這就得從訊息儲存時的一波偷樑換柱的騷操作了說起了

當伺服器端接收到訊息的時候,判斷延遲級別大於0的時候,說明是延遲訊息,此時會幹下面三件事:

  • 將訊息的Topic改成SCHEDULE_TOPIC_XXXX
  • 將訊息的佇列id設定為延遲級別對應的佇列id
  • 將訊息真正的Topic和佇列id存到前面提到的訊息儲存時的額外資訊中

之後訊息就按照正常儲存的步驟存到CommitLog檔案中

由於訊息存到的是SCHEDULE_TOPIC_XXXX這個Topic中,而不是訊息真正的目標Topic中,所以消費者此時是消費不到訊息的

舉個例子,比如有條訊息,Topic為sanyou,所在的佇列id = 1,延遲級別 = 1,那麼偷樑換柱之後的結果如下圖所示

程式碼如下

所以從上分析可以得出一個結論

所有RocketMQ的延遲訊息,最終都會儲存到SCHEDULE_TOPIC_XXXX這個Topic中,並且同一個延遲級別的訊息在同一個佇列中

在存訊息偷樑換柱之後,實現延遲消費的最關鍵的一個步驟來了

BocketMQ在啟動的時候,除了為每個延遲級別建立一個佇列之後,還會為每個延遲級別建立一個延遲任務,也就相當於一個定時任務,每隔100ms執行一次

這個延遲任務會去檢查這個佇列中的訊息有沒有到達延遲時間,也就是不是可以消費了

前面的結論,每個佇列都有一個ConsumeQueue檔案,可以通過ConsumeQueue找到這個佇列中的訊息

一旦發現到達延遲時間,可以消費了,此時就會從這條訊息額外儲存的訊息中拿到真正的Topic和佇列id,重新構建一條新的訊息,將新的訊息的Topic和佇列id設定成真正的Topic和佇列id,內容還是原來訊息的內容

之後再一次將新構建的訊息儲存到CommitLog中

由於新訊息的Topic變成訊息真正的Topic了,所以之後消費者就能夠消費到這條訊息了

所以,從整體來說,RocketMQ延遲訊息的實現本質上就是最開始訊息是存在SCHEDULE_TOPIC_XXXX這個中轉的Topic中

然後會有一個類似定時任務的東西,不停地去找到這個Topic中的訊息

一旦發現這個訊息達到了延遲任務,說明可以消費了,那麼就重新構建一條訊息,這條訊息的Topic和佇列id都是實際上的Topic和佇列id,然後存到CommitLog

之後消費者就能夠在目標的Topic獲取到訊息了

事務訊息

事務訊息用起來也比較簡單,如下所示:

public class TransactionMessageDemo {

    public static void main(String[] args) throws Exception {
        TransactionMQProducer transactionMQProducer = new TransactionMQProducer("sanyouProducer");
        transactionMQProducer.setNamesrvAddr("192.168.200.143:9876");

        //設定事務監聽器
        transactionMQProducer.setTransactionListener(new TransactionListener() {

            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                //處理本次事務
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                //檢查本地事務
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        transactionMQProducer.start();

        Message message = new Message("sanyouTopic""三友的java日記".getBytes());

        //傳送訊息
        transactionMQProducer.sendMessageInTransaction(message, new Object());
    }

}

事務訊息傳送相對於前面的例子主要有以下不同:

  • 將前面的DefaultMQProducer換成TransactionMQProducer
  • 需要設定事務的監聽器TransactionListener,來執行本地事務
  • 傳送方法改成 sendMessageInTransaction

為什麼要這麼改,接下來我們來講講背後的實現原理

上一節在說延遲訊息的時候提到,RocketMQ使用到了SCHEDULE_TOPIC_XXXX這個中轉Topic,來偷樑換柱實現延遲訊息

不僅僅是延遲訊息,事務訊息其實也是這麼幹的,它也會進行偷樑換柱,將訊息先存在RMQ_SYS_TRANS_HALF_TOPIC這個Topic下,同時也會將訊息真正的Topic和佇列id存到額外資訊中,操作都是一樣滴

由於訊息不在真正目標的Topic下,所以這條訊息消費者也是消費不到滴

當訊息成功儲存之後,伺服器端會向生產者響應,告訴生產者我訊息儲存成功了,你可以執行本地事務了

之後生產者就會執行本地執行事務,也就是執行如下方法

TransactionListener#executeLocalTransaction

當本地事務執行完之後,會將執行的結果傳送給伺服器端

伺服器端會根據事務的執行狀態來執行對應的處理結果

  • commit:提交事務訊息,跟延遲訊息一樣,重新構建一條訊息,Topic和佇列id都設定成訊息真正的Topic和佇列id,然後重新存到CommitLog檔案,這樣消費者就可以消費到訊息了
  • rollback:回滾訊息,其實並沒有實際的操作,因為訊息本身就不在真正的Topic下,所以消費者壓根就消費不到,什麼都不做就可以了
  • unknown:本地事務執行異常時就是這個狀態,這個狀態下會幹一些事,咱們後面再說

所以在正常情況下,事務訊息整個執行流程如下圖所示

既然有正常情況下,那麼就有非正常情況下

比如前面提到的拋異常導致unknown,又或者什麼亂七八糟的原因,導致無法正常提交本地事務的執行狀態,那麼此時該怎麼辦呢?

RocketMQ當然也想到了,他有自己的一套補償機制

RocketMQ內部會起動一個執行緒,預設每隔1分鐘去檢查沒有被commit或者rollback的事務訊息

RocketMQ內部有一套機制,可以找出哪些事務訊息沒有commit或者rollback,這裡就不細說了

當發現這條訊息超過6s沒有提交事務狀態,那麼此時就會向生產者傳送一個請求,讓生產者去檢查一下原生的事務執行的狀態,就是執行下面這行程式碼

TransactionListener#checkLocalTransaction

之後會將這個方法返回的事務狀態提交給伺服器端,伺服器端就可以知道事務的執行狀態了

這裡有一個細節需要注意,事務訊息檢查次數不是無限的,預設最大為15次,一旦超過15次,那麼就不會再被檢查了,而是會直接把這個訊息存到TRANS_CHECK_MAX_TIME_TOPIC

所以你可以從這個Topic讀取那些無法正常提交事務的訊息

這就是RocketMQ事務訊息的原理

小總結

RocketMQ事務訊息的實現主要是先將訊息存到RMQ_SYS_TRANS_HALF_TOPIC這個中間Topic,有些資料會把這個訊息稱為半訊息(half訊息),這是因為這個訊息不能被消費

之後會執行原生的事務,提交本地事務的執行狀態

RocketMQ會根據事務的執行狀態去判斷commit或者是rollback訊息,也就是是不是可以讓消費者消費這條訊息的意思

在一些異常情況下,生產者無法及時正確提交事務執行狀態

RocketMQ會向生產者傳送訊息,讓生產者去檢查原生的事務,之後再提交事務狀態

當然,這個檢查次數預設不超過15次,如果超過15次還未成功提交事務狀態,RocketMQ就會直接把這個訊息存到TRANS_CHECK_MAX_TIME_TOPIC

請求-應答訊息

這個訊息型別比較有意思,類似一種RPC的模式

生產者傳送訊息之後可以阻塞等待消費者消費這個訊息的之後返回的結果

生產者通過過呼叫request方法傳送訊息,接收回復訊息

public class Producer {
    public static void main(String[] args) throws Exception {
        //建立一個生產者,指定生產者組為 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 啟動生產者
        producer.start();


        Message message = new Message("sanyouTopic""三友的java日記".getBytes());
        
        //傳送訊息,拿到響應結果, 3000代表超時時間,3s內未拿到響應結果,就超時,會丟擲RequestTimeoutException異常
        Message result = producer.request(message, 3000);
        System.out.println("接收到響應訊息:" + result);

        // 關閉生產者
        producer.shutdown();
    }

}

而對於消費者來著,當消費完訊息之後,也要作為生產者,將響應的訊息傳送出去

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        //建立一個生產者,指定生產者組為 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 啟動生產者
        producer.start();


        // 通過push模式消費訊息,指定消費者組
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");
        // 指定NameServer的地址
        consumer.setNamesrvAddr("192.168.200.143:9876");

        // 訂閱這個topic下的所有的訊息
        consumer.subscribe("sanyouTopic""*");
        // 註冊一個消費的監聽器,當有訊息的時候,會回撥這個監聽器來消費訊息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context)
 
{
                for (MessageExt msg : msgs) {
                    System.out.printf("消費訊息:%s"new String(msg.getBody()) + "\n");

                    try {
                        // 用RocketMQ自帶的工具類建立響應訊息
                        Message replyMessage = MessageUtil.createReplyMessage(msg, "這是響應訊息內容".getBytes(StandardCharsets.UTF_8));
                        // 將響應訊息傳送出去,拿到傳送結果
                        SendResult replyResult = producer.send(replyMessage, 3000);
                        System.out.println("響應訊息的結果 = " + replyResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 啟動消費者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

這種請求-應答訊息實現原理也比較簡單,如下圖所示

生產者和消費者,會跟RocketMQ伺服器端進行網路連線

所以他們都是通過這個連線來傳送和拉取訊息的

當伺服器端接收到回覆訊息之後,有個專門處理回覆訊息的類

這個類就會直接找到傳送訊息的生產者的連線,之後會通過這個連線將回復訊息傳送給生產者

RocketMQ底層是基於Netty通訊的,所以如果你有用過Netty的話,應該都知道,就是通過Channel來傳送的

重試訊息

重試訊息並不是我們業務中主動傳送的,而是指當消費者消費訊息失敗之後,會間隔一段時間之後再次消費這條訊息

重試的機制在並行消費模式和順序消費模式下實現的原理並不相同

並行消費模式重試實現原理

RocetMQ會為每個消費者組建立一個重試訊息所在的Topic,名字格式為

%RETRY% + 消費者組名稱

舉個例子,假設消費者組為sanyouConsumer,那麼重試Topic的名稱為:%RETRY%sanyouConsumer

當訊息消費失敗後,RocketMQ會把訊息存到這個Topic底下

消費者在啟動的時候會主動去訂閱這個Topic,那麼自然而然就能消費到消費失敗的訊息了

為什麼要為每個消費者組建立一個重試Topic呢?

其實我前面已經說過,每個消費者組的消費是隔離的,互不影響

所以,每個消費者組消費失敗的訊息可能就不一樣,自然要放到不同的Topic下了

重試訊息是如何實現間隔一段時間來消費呢?

說到間隔一段時間消費,你有沒有覺得似曾相識?

不錯,間隔一段時間消費說白了不就是延遲消費麼!

所以,並行消費模式下間隔一段時間底層就是使用的延遲訊息來實現的

RocetMQ會為重試訊息設定一個延遲級別

並且延遲級別與重試次數的關係為

delayLevel = 3 + 已經重試次數

比如第一次消費失敗,那麼已經重試次數就是0,那麼此時延遲級別就是3

對應的預設的延遲時間就是10s,也就是一次訊息重試消費間隔時間是10s

隨著重試次數越多,延遲級別也越來越高,重試的間隔也就越來越長,但是最大也是最大延遲級別的時間

不過需要注意的是,在並行消費模式下,只有叢集消費才支援訊息重試,對於廣播消費模式來說,是不支援訊息重試的,消費失敗就失敗了,不會管

順序消費模式重試實現原理

順序消費模式下重試就比較簡單了

當消費失敗的時候,他並不會將訊息傳送到伺服器端,而是直接在本地等1s鍾之後重試

在這個等待的期間其它訊息是不能被消費的

這是因為保證訊息消費的順序性,即使前面的訊息消費失敗了,它也需要等待前面的訊息處理完畢才能處理後面的訊息

順序消費模式下,並行消費和叢集消費均支援重試訊息

死信訊息

死信訊息就是指如果訊息最終無法被正常消費,那麼這條訊息就會成為死信訊息

RocketMQ中,訊息會變成死信訊息有兩種情況

第一種就是訊息重試次數已經達到了最大重試次數

最大重試次數取決於並行消費還是順序消費

  • 順序消費,預設最大重試次數就是 Integer.MAX_VALUE,基本上就是無限次重試,所以預設情況下順序消費的訊息幾乎不可能成為死信訊息
  • 並行消費的話,那麼最大重試次數預設就是16次

當然可以通過如下的方法來設定最大重試次數

DefaultMQPushConsumer#setMaxReconsumeTimes

除了上面的情況之外,當在並行消費模式下,你可以在訊息消費失敗之後手動指定,直接讓訊息變成死信訊息

在並行消費訊息的模式下,處理訊息的方法有這麼一個引數

ConsumeConcurrentlyContext

這個類中有這麼一個屬性

這個引數值有三種情況,註釋也有寫:

  • 小於0,那麼直接會把訊息放到死信佇列,成為死信訊息。註釋寫的是=-1,其實只要小於0就可以成為死信訊息,不一定非得是-1
  • 0,預設就是0,這個代表訊息重試消費,並且重試的時間間隔(也就是延遲級別)由伺服器端決定,也即是前面重試訊息提到的 delayLevel = 3 + 已經重試次數
  • 大於0,此時就表示使用者端指定訊息重試的時間間隔,是幾就代表延遲級別為幾,比如設定成1,那麼延遲級別就為1

所以,在並行消費模式下,可以通過設定這個引數值為-1,直接讓處理失敗的訊息成為死信訊息

當訊息成為死信訊息之後,訊息並不會丟失

RocketMQ會將死信訊息儲存在死信Topic底下,Topic格式為

%DLQ% + 消費者組名稱

跟重試Topic的格式有點像,只是將%RETRY%換成了%DLQ%

如果你想知道有哪些死信訊息,只需要訂閱這個Topic即可獲得

小總結

所以總的來說,兩種情況會讓訊息成為死信訊息:

  • 訊息重試次數超過最大次數,跟訊息的處理方式有關,預設情況下順序處理最大次數是幾乎是無限次,也就是幾乎不可能成為死信訊息;並行處理的情況下,最大重試次數預設就是16次。最大重試次數是可以設定的。
  • 在並行處理的情況下,通過ConsumeConcurrentlyContextdelayLevelWhenNextConsume屬性設定成-1,讓訊息直接變成死信訊息

當訊息成為死信訊息的時候,會被存到%DLQ% + 消費者組名稱這個Topic下

使用者可以通過這個Topic獲取到死信訊息,手動干預處理這些訊息

同步訊息

同步訊息是指,當生產者傳送訊息的時候,需要阻塞等待伺服器端響應訊息儲存的結果

同步訊息跟前面提到的訊息型別並不是互斥的

比如前面說的普通訊息時舉的例子,他就是同步傳送的,那麼它也是一個同步訊息

這種模式用於對資料一致性要求較高的場景中,但是等待也會消耗一定的時間

非同步訊息

既然有了同步訊息,那麼相對應的就有非同步訊息

非同步訊息就是指生產者傳送訊息後,不需要阻塞等待伺服器端儲存訊息的結果

所以非同步訊息的好處就是可以減少等待響應過程消耗的時間

如果你想知道有沒有傳送成功,可以在傳送訊息的時候傳個回撥的介面SendCallback的實現

Message message = new Message("sanyouTopic""三友的java日記".getBytes());

//非同步傳送訊息
producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("訊息傳送結果 = " + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("訊息傳送異常 = " + e.getMessage());
            }
        }
);

當訊息傳送之後收到傳送結果或者出現異常的時候,RocektMQ就會回撥這個SendCallback實現類,你就可以知道訊息傳送的結果了

單向訊息

所謂的單向訊息就是指,生產者傳送訊息給伺服器端之後,就直接不管了

所以對於生產者來說,他是不會去care訊息傳送的結果了,即使傳送失敗了,對於生產者來說也是無所謂的

所以這種方式的主要應用於那種能夠忍受丟訊息的操作場景

比如像紀錄檔收集就比較適合使用這種方式

單向訊息的傳送是通過sendOneway來呼叫的

Message message = new Message("sanyouTopic""三友的java日記".getBytes());

//傳送單向訊息
producer.sendOneway(message);

總的來說,同步訊息、非同步訊息、單向訊息代表的是訊息的傳送方式,主要是針對訊息的傳送方來說,對訊息的儲存之類是的沒有任何影響的

最後

ok,到這本文就結束了

本文又又是一篇非常非常肝的文章,不知道你是否堅持看到這裡

我在寫的過程中也是不斷地死磕原始碼,儘可能避免出現錯誤的內容

同時也在嘗試爭取把我所看到的原始碼以一種最簡單的方式說出來

所以如果你堅持看到這裡,並覺得文章內容還不錯,歡迎點贊、在看、收藏、轉發分享給其他需要的人

你的支援就是我更新文章最大的動力,非常地感謝!

哦,最後差點忘了,如果有對RocketMQ原始碼感興趣的小夥伴可以從下面這個倉庫fork一下原始碼,我在原始碼中加了中文註釋,並且後面我還會持續更新註釋

https://github.com/sanyou3/rocketmq.git

往期熱門文章推薦

如何去閱讀原始碼,我總結了18條心法

如何寫出漂亮程式碼,我總結了45個小技巧

三萬字盤點Spring/Boot的那些常用擴充套件點

三萬字盤點Spring 9大核心基礎功能

萬字+20張圖剖析Spring啟動時12個核心步驟

1.5萬字+30張圖盤點索引常見的11個知識點

兩萬字盤點那些被玩爛了的設計模式

掃碼或者搜尋關注公眾號 三友的java日記 ,及時乾貨不錯過,公眾號致力於通過畫圖加上通俗易懂的語言講解技術,讓技術更加容易學習,回覆 面試 即可獲得一套面試真題。