RabbitMQ 入門系列:10、擴充套件內容:延時佇列:延時佇列外掛及其有限的適用場景(系列大結局)。

2022-09-01 15:02:52

系列目錄

RabbitMQ 入門系列:1、MQ的應用場景的選擇與RabbitMQ安裝。

RabbitMQ 入門系列:2、基礎含義:連結、通道、佇列、交換機。

RabbitMQ 入門系列:3、基礎含義:持久化、排它性、自動刪除、強制性、路由鍵。

RabbitMQ 入門系列:4、基礎編碼:官方SDK使用:連結建立、單例改造、傳送訊息、接收訊息。

RabbitMQ 入門系列:5、基礎編碼:交換機的進階介紹及編碼方式。

RabbitMQ 入門系列:6、保障訊息:不丟失:傳送方、Rabbit儲存端、接收方。

RabbitMQ 入門系列:7、保障訊息:不重複消費:產生訊息的唯一ID。

RabbitMQ 入門系列:8、擴充套件內容:接收資訊時:可否根據RoutingKey過濾監聽資訊,答案是不能。

RabbitMQ 入門系列:9、擴充套件內容:死信佇列:真不適合當延時佇列。

RabbitMQ 入門系列:10、擴充套件內容:延時佇列:延時佇列外掛及其有限的適用場景。

前言:

延遲佇列用於事件發生後間隔一段時間後需要做特定處理的場景,如:

1、電商支付系統中,使用者下單後N分鐘不支付,自動取消訂單。
2、使用者瀏覽商品長時間後還沒下單,後續推播相關產品和優惠券。
3、使用者註冊或修改生日後:生日簡訊推播等。
4、7天后的自動確認收貨等。。。
......

對於這類應用,其實用訊息佇列,對個要求要求,是合適的,但對整個系統應用而言,它是不靠譜的。

訊息佇列的核心應用,是保持記憶體的佇列,不斷的產生並不斷的消耗,最佳狀態的保持系統的穩定和流暢。

而延遲佇列的核心,是積壓訊息,還是大量積壓,這明顯與訊息佇列的設計就不符合。

下面來看看網上官方的延時佇列插入:

1、延時佇列外掛:下載解壓與啟用

 1、下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

 

 

2、解壓到RabbitMQ安裝目錄下的plugins目錄下 

 

3、啟動:命令列執行以下命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

4、重啟RabbitMQ:

 

多了個x-delayed-message則為安裝成功。

2、延時佇列外掛:實現原理

3、延時佇列外掛:編碼實現

其實,佇列一直還是那個普通的佇列,只是多了一個交換機的型別是延時型別。

因此,原理就是把訊息,設定過期時間,發給延時交換機,它自己存著,到時間了,會轉發到佇列去。

程式碼如下:

using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{

    //定義佇列
    channel.QueueDeclare("dead");
   
   //定義延時路由 IDictionary
<string, object> dic = new Dictionary<string, object>(); dic.Add("x-delayed-type", "direct"); channel.ExchangeDeclare("ex-dead", "x-delayed-message", arguments: dic);
channel.QueueBind(
"dead", "ex-dead", "dead"); var p1 = channel.CreateBasicProperties(); IDictionary<string, object> header = new Dictionary<string, object>(); header.Add("x-delay", 6000); p1.Headers = header; var p2 = channel.CreateBasicProperties(); IDictionary<string, object> header2 = new Dictionary<string, object>(); header2.Add("x-delay", 16000); p2.Headers = header2; channel.BasicPublish("ex-dead", "dead", false, p1, Encoding.UTF8.GetBytes("6秒就過期了1。")); channel.BasicPublish("ex-dead", "dead", false, p2, Encoding.UTF8.GetBytes("16秒就過期了2。")); }

執行,等了16秒,終於等來了資料:

 

4、延時佇列外掛:適合應用場景

如果說,死信佇列的適合場景,是短時間的固定間隔時間。

那麼說,延時佇列外掛的適合場景,就是更進一步的短時間內的隨機時間。

劃重點的話,還是取決於積壓的資料量。

官方的吐槽說明,目前的設計,由於儲存資料庫設計不佳,該外掛推薦積壓資料在100萬條資料以下適合應用。

總結:

使用延時佇列,應該考量業務積壓的資料量,如果資料量小,那麼都不是問題。

如果資料量大,那麼建立佇列資料庫,按時間存檔要傳送的資料,定時掃描處理更合適。