MQ系列14:MQ如何做到訊息延時處理

2023-09-06 15:00:46

MQ系列1:訊息中介軟體執行原理
MQ系列2:訊息中介軟體的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ訊息的傳送模式
MQ系列6:訊息的消費
MQ系列7:訊息通訊,追求極致效能
MQ系列8:資料儲存,訊息佇列的高可用保障
MQ系列9:高可用架構分析
MQ系列10:如何保證訊息冪等性消費
MQ系列11:如何保證訊息可靠性傳輸
MQ系列12:如何保證訊息順序性
MQ系列13:訊息大量堆積如何解決

1 背景

在網際網路業務的實際應用場景中,訊息的延時處理是非常必要的。例如,在金融交易系統中,某些交易的確認可能需要一段時間才能完成。又如,在物流跟蹤系統中,貨物的運輸狀態需要一段時間才能更新。而MQ作為中介軟體的角色專門來處理訊息媒介,實際也具備了使用訊息的延時處理來保證資訊的及時性的能力。
這邊舉兩個具體的例子:

  • 火車票訂購,提交了訂單就把車票給佔位了,這時候可以傳送一個延時確認的訊息,15m 未付款,就要把該車票釋放,這樣其他人就可以購買了。
  • 購買電影票,可以傳送一個核銷檢查訊息,在電影開場前15分鐘就無法退票了。

既然訊息延遲處理的使用場景這麼常見,那我們就要詳細來分析下怎麼使用MQ來實現,這邊以RocketMQ為技術選型。

2 訊息延時處理原理

RocketMQ的訊息延時處理是通過預定義的訊息延時級別和延時佇列來實現的。在傳送訊息時,生產者可以設定一個延時級別,該訊息將會被延遲一段時間後才能被消費者消費。RocketMQ預設提供了18個延時級別,每個級別對應不同的延遲時間。

所以延時時間並不是隨意指定的,Rocket原始碼中指定的18種等級如下:

// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  • RocketMq不支援任意時間延時,需設定固定的延時等級,從1s到2h分別對應著等級1到18
  • 可以使用setDelayTimeLevel(int level) 方法設定延時等級,level 從 0 開始

在RocketMQ中,每個Broker都設定了一個延時佇列,用於儲存延時訊息。當訊息的延時時間到達時,該訊息將會被自動轉移到普通的訊息佇列中,等待消費者的消費。這種方式可以有效地避免因為網路延遲或者消費者處理速度慢而導致訊息的延遲。

3 訊息延時處理實戰

使用RocketMQ的訊息延時處理非常簡單。在傳送訊息時,生產者只需要設定一個延時級別,然後將訊息傳送到RocketMQ即可。例如:

public class DelayProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException {
        // 1、建立生產者producer,並指定生產者組名為 example_group_name
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");  
        // 2、指定NameServer的地址,以獲取Broker路由地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 3、啟動生產者producer
        producer.start();
        // 4、建立訊息,並指定Topic,Tag和訊息體
        Message msg = new Message("example_topic","example_key", "試一試延遲30s傳送的訊息".getBytes("UTF-8"));
        // 5、設定延時等級3,對應30s,所以這個訊息在30秒之後傳送
        msg.setDelayTimeLevel(3);
        // 6、傳送訊息到一個Broker
        SendResult sendResult = producer.send(msg);
        // 7、通過sendResult返回訊息是否成功送達
        System.out.printf("%s%n", sendResult);
        // 8、如果不再傳送訊息,關閉生產者Producer
        producer.shutdown();
    }
}

在上述程式碼中,我們首先建立了一個生產者,然後指定了NameServer的地址,並啟動了生產者。接著,我們建立了一個延時級別為3的訊息,即該訊息將會被延遲30秒後才能傳送並被消費者消費。最後,我們傳送了該訊息,並關閉了生產者。

4 訊息延時的優化

雖然RocketMQ的訊息延時處理功能已經非常強大,但是在實際應用中,我們可能還需要根據自己的業務需求進行一些優化。以下是一些可能的優化方式:

  • 調整延時佇列的大小。在RocketMQ中,每個Broker都只有一個延時佇列,佇列太小可能導致一些延時訊息被miss。可以根據實際需求調整延時佇列的大小。
  • 使用多個消費者來消費同一主題的訊息。在RocketMQ中,可能有批次執行被設定了同樣的延遲時間,這個就存在了一些風險,類似快取的批次過期一樣,稍有不慎,可能會擊穿資料庫。如果只有一個消費者來消費該主題的訊息,可能會導致該消費者的處理速度不夠快,從而影響到訊息的及時性。我們可以根據實際需求增加消費者數量,以提高訊息的處理速度。
  • 調整RocketMQ的設定引數。RocketMQ提供了一些設定引數,可以用來調整其效能和可靠性。我們可以根據實際需求調整這些引數,以優化訊息的延時處理效果。

總之,RocketMQ的訊息延時處理功能非常強大,可以滿足許多實際應用場景的需求。在實際應用中,我們可以根據自己的業務需求進行一些優化,以進一步提高訊息的及時性和可靠性。

5 總結

本文我們介紹了RocketMQ如何使用訊息延時來處理特殊的業務場景。除了上述的方法之外,我們還有一些其他方法,比如:

  • 定時傳送訊息。在定時傳送中,生產者可以指定一個未來的時間戳,在該時間戳到達時,該訊息將會被傳送到Broker。RocketMQ內部會維護一個定時任務,每隔一段時間檢查一次待定時傳送的訊息,並判斷是否到達了指定的時間戳。如果到達了指定的時間戳,該訊息將會被傳送到Broker。
  • 自建環形佇列來實現「延時訊息」 ,參考這篇:1分鐘實現「延遲訊息」功能,寫的不錯