現實生活中有一些場景需要延遲或在特定時間傳送訊息,例如智慧熱水器需要 30 分鐘後開啟,未支付的訂單或傳送簡訊、電子郵件和推播通知下午 2:00 開始的促銷活動。
RabbitMQ 本身沒有直接支援延遲佇列的功能,如果您搜尋「如何在 RabbitMQ 中使用延遲訊息」,您很可能會遇到兩種可能的解決方案。第一種解決方案是使用訊息 TTL 功能和死信功能的組合。第二種選擇是使用官方的 RabbitMQ 延遲訊息外掛。
本文詳細介紹了 RabbitMQ 延遲訊息。
RabbitMQ Assistant 是一款 RabbitMQ 視覺化管理與監控——深入瞭解您的佇列、訂閱與消費訊息,展示完整的訊息流圖以及壓力測試。
RabbitMQ 是一個開源訊息代理(也稱為訊息導向中介層),建立它是為了支援高階訊息佇列協定 (Advanced Message Queuing Protocol, AMQP)。此後,它通過外掛架構進行了擴充套件,以支援簡單(或流式)面向文字的訊息協定 (Text Oriented Message Protocol, STOMP)、訊息查詢遙測傳輸 (Message Query Telemetry Transport, MQTT) 等協定。
對於叢集和故障轉移,RabbitMQ 伺服器是用 Erlang 編寫的,並採用了開放電信平臺框架。用於與代理互動的使用者端庫可用於所有主要程式語言,原始碼可在 Mozilla 公共許可證下獲得。
簡單來說,RabbitMQ是一個訊息傳遞系統,可以在本地或雲端使用。並且支援多種訊息傳遞協定。
以下是 RabbitMQ 的一些特性:
很長一段時間以來,人們一直在尋找使用 RabbitMQ 實現延遲訊息傳遞的方法。 迄今為止,公認的解決方案是使用訊息的組合——TTL 和死信交換器。
RabbitMQ 延遲訊息外掛向 RabbitMQ 新增了一種新的交換型別,如果使用者願意,允許延遲通過該交換路由的訊息。 讓我們看看如何使用這兩種方法。
通過組合這些功能,我們可以將訊息釋出到佇列,該訊息將在 TTL 後過期,然後它被重新被傳送到另一個交換器中,這個交換器就是 DLX,繫結 DLX 的佇列就稱之為
死信佇列。
下面建立一個佇列,為其設定 TTL 和 DLX 等:
// 建立兩個交換器,一個為正常的交換器exchange.normal,另一個為死信交換器exchange.dlx
channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.exchangeDeclare("exchange.normal", "fanout", true);
//建立一個佇列queue.normal,並繫結到exchange.normal
Map<String, Object> args = new HashMap <>();
//設定佇列中訊息的過期時間
args.put("x-message-ttl", 10000);
//當queue.normal中的訊息過期時,將傳送到exchange.dlx
args.put("x-dead-letter-exchange", "exchange.dlx");
//也可以為這個 DLX 指定路由鍵,如果沒有特殊指定,則使用原佇列的路由鍵
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare("queue.normal", true, false, false, args);
channel.queueBind("queue.normal", "exchange.normal", "");
//建立死信佇列queue.dlx,並當到死信交換器exchange.dlx
channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");
//向exchange.normal釋出一條訊息
channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN , "dlx".getBytes());
參考下圖,生產者首先傳送一條攜帶路由鍵為「rk」的訊息,然後經過交換器 exchange.normal 順利地儲存到佇列queue.normal 中。由於佇列 queue.normal 設定了過期時間為 10s,在這 10s 內沒有消費者消費這條訊息,那麼判定這條訊息為過期。由於設定了 DLX,過期之時,訊息被丟給交換器 exchange.dlx 中,這時找到與 exchange.dlx 匹配的佇列 queue.dlx,最後訊息被儲存在 queue.dlx 這個死信佇列中。對於 RabbitMQ 來說,DLX 是一個非常有用的特性。它可以處理異常情況下,訊息不能夠被消費者正確消費(消費者呼叫了 Basic.Nack 或者 Basic.Reject)而被置入死信佇列中的情況,後續分析程式可以通過消費這個死信佇列中的內容來分析當時所遇到的異常情況,進而可以改善和優化系統。
在上圖中,不僅展示的是死信佇列的用法,也是延遲佇列的用法,對於 queue.dlx 這個死信佇列來說,同樣可以看
作延遲佇列。假設一個應用中需要將每條訊息都設定為 10 秒的延遲,生產者通過 exchange.normal 這個交換器將傳送的訊息儲存在 queue.normal 這個佇列中。消費者訂閱的並非是 queue.normal 這個佇列,而是 queue.dlx 這個佇列。當訊息從 queue.normal 這個佇列中過期之後被存入 queue.dlx 這個佇列中,消費者就恰巧消費到了延遲 10 秒的這條訊息。
在真實應用中,對於延遲佇列可以根據延遲時間的長短分為多個等級,一般分為 5 秒、10 秒、30 秒、1 分鐘、5 分
鍾、10 分鐘、30 分鐘、1 小時這幾個維度,當然也可以再細化一下。
參考下圖,為了簡化說明,這裡只設定了 5 秒、10 秒、30 秒、1 分鐘這四個等級。根據應用需求的不同,生產者在傳送訊息的時候通過設定不同的路由鍵,以此將訊息傳送到與交換器繫結的不同的佇列中。這裡佇列分別設定了過期時間為 5 秒、10 秒、30 秒、1 分鐘,同時也分別設定了 DLX 和相應的死信佇列。當相應的訊息過期時,就會轉存到相應的死信佇列(即延遲佇列)中,這樣消費者根據業務自身的情況,分別選擇不同延遲等級的延遲佇列進行消費。
從安裝外掛開始,但首先,讓我們看一下以下先決條件:
在Github下載外掛。將外掛複製到 RabbitMQ 的外掛資料夾,然後執行以下命令啟用它:
# 下載外掛
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
# 將外掛移動到plugins目錄下
mv rabbitmq_delayed_message_exchange-3.11.1.ez ./rabbitmq_server-3.11.1/plugins/
# 啟用外掛
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
要使用延遲訊息交換器,只需宣告一個型別為 x-delayed-message
的交換器,如下所示:
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
稍後,我們將解釋交換宣告中的特殊引數 x-delayed-type
的含義。
要延遲訊息,使用者必須使用 x-delay
檔頭髮布它,該檔頭接受一個整數,表示訊息應由 RabbitMQ 延遲的毫秒數。值得注意的是,在此上下文中的延遲表示著訊息路由到佇列或其他交換器的延遲。交換器沒有消費者的概念。
因此,一旦延遲過去,外掛將嘗試將訊息路由到與交換器的路由規則匹配的佇列。如果訊息無法路由到任何佇列,它將被丟棄。
// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
在上面的範例中,訊息在被外掛路由之前將延遲五秒鐘。
當我們在上面宣告交換時,我們使用了一個設定為 direct
的 x-delayed-type
引數。這告訴交換器我們希望它在路由訊息、建立繫結等時具有什麼樣的行為。
一旦我們在消費者端收到訊息,我們如何判斷訊息是否被延遲? x-delay
訊息頭由外掛保留。如果您以 5000 毫秒的延遲傳送訊息,消費者會發現 x-delay
檔頭設定為 5000。
參考資料: