.Net Core&RabbitMQ死信佇列

2022-08-30 06:02:10

過期時間

RabbitMQ可以為訊息和佇列設定過期時間Time To Live(TTL)。其目的即過期。

訊息過期時間

訊息儲存在佇列中時,如果想為其設定一個有限的生命週期,而不是一直儲存著,可以為其設定過期時間。比如,一條訊息,我想要三分鐘內有效,三分鐘後再接收到該訊息就算過時了,如果在佇列中儲存已經超過三分鐘,消費者再去接收就是過時了,那便沒有意義了。

為訊息設定過期時間可以從兩方面著手,一是為訊息本身設定過期時間,二是為訊息的承載體佇列設定過期時間。兩者同時設定情況下取最短生命週期。

為訊息設定

在BasicPublish方法中,可以設定BasicProperties中的Expiration來設定過期時間(單位為毫秒)。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "rabbitmqdemo@test",
    VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "messagettl_queue";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

        while (true)
        {
            Console.WriteLine("訊息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            var basicProperties = channel.CreateBasicProperties();
            basicProperties.Expiration = "10000";//10秒
            channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);
            Console.WriteLine("訊息內容傳送完畢:" + message);
        }
    }
}

如此一來,設定訊息生命週期為10秒,當超過該時間後,再由消費者去獲取該訊息,則獲取不到,可以直接從Web介面看到,經過10秒後,訊息過期(未啟動消費者)。

為佇列設定

為每個訊息設定過期時間可能不符合一些特定的場景,當需要設定特定佇列中的訊息都是指定的過期時間時,可以為佇列中的訊息統一設定過期時間。

佇列宣告時可以指定引數,其中設定x-message-ttl引數。

var arguments = new Dictionary<string, object>
{
    { "x-message-ttl", 10000 }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);

如此一來,該佇列傳送訊息時,如訊息本身沒有設定過期時間,則使用佇列的過期時間。
生產者傳送的7條訊息過期時間都為10s,一段時間後,訊息全部過期。

當ttl設定為0時,僅當訊息能夠立即被消費,否則訊息立馬過期,Web面板中只能見到訊息傳送,佇列中沒有訊息,都被立馬過期了。

過期策略

為佇列中的訊息統一設定過期時間時,當超出了過期時間,訊息立馬過期挪出佇列。先傳送的訊息在佇列頭部,先過期的也在佇列頭部,因此可從頭部掃描清除過期訊息。

而為直接為訊息設定過期時間時,各個訊息的過期時間不盡相同,掃描時得佇列全域性掃描才能識別哪些訊息是過期的。

因此,設定在投遞給消費者前判斷是否過期,超出過期時間訊息仍在佇列中。

佇列過期時間

此處的佇列過期時間與訊息中的為佇列設定過期時間不同,此處是為佇列本身考慮,佇列自身沒有消費者超過一段時間內且沒有重新生命該佇列,則無需考慮存在。

佇列宣告時指定引數,其中設定x-expires引數,需大於0,單位毫秒。

var arguments = new Dictionary<string, object>
{
    { "x-expires", 20000 }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);

生產者啟動應用建立佇列,傳送訊息。

空閒20s不傳送訊息後佇列被刪除,再次傳送訊息匹配不到佇列,訊息被回退。


死信佇列

死信交換機即Dead-Letter-Exchange(DLX),和備份交換機一樣,沒有什麼特殊,只是屬性上標記了下,其繫結的佇列稱之為死信佇列。當訊息在一個佇列中變成死信之後,被重發到死信交換機,儲存到死信佇列中。

訊息變為死信情況

  • 訊息被拒絕且不重入佇列
  • 訊息超出過期時間
  • 佇列達到最大長度

設定死信佇列

宣告佇列時可以給定引數,其中設定x-dead-letter-exchange來指明該佇列對應的死信交換機。

//死信交換機和死信佇列
var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");

//常規佇列
var queueName = "nornalmessage_queue";
var arguments = new Dictionary<string, object>
{
    { "x-message-ttl", 10000},
    { "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);

如下可見到,生產者初期傳送的訊息過期後經死信交換機路由進入到死信佇列中,後期傳送的暫未過期的訊息仍在原佇列中。當有該部分過期訊息的需要時,消費者可以監聽死信佇列獲取訊息。

當死信交換機的型別為direct時,可以指定RoutingKey(不指定預設使用原佇列RoutingKey)。

可在宣告常規佇列的屬性中設定x-dead-letter-routing-key,以能夠匹配上死信交換機與死信佇列繫結時的routingkey。

//死信交換機和死信佇列
var dlxExchangeName = "dlxroutingkey_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "direct", durable: false, autoDelete: false, arguments: null);
var dlxQueueName1 = "dlx_queue1";
channel.QueueDeclare(queue: dlxQueueName1, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName1, exchange: dlxExchangeName, routingKey: "waring");
var dlxQueueName2 = "dlx_queue2";
channel.QueueDeclare(queue: dlxQueueName2, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName2, exchange: dlxExchangeName, routingKey: "info");
var dlxQueueName3 = "dlx_queue1";
channel.QueueDeclare(queue: dlxQueueName3, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName3, exchange: dlxExchangeName, routingKey: "error");

//常規佇列
var queueName = "normalmessage_queue";
var arguments = new Dictionary<string, object>
{
    { "x-message-ttl", 10000},
    { "x-dead-letter-exchange", dlxExchangeName },
    { "x-dead-letter-routing-key", "info" }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);

當訊息過期時,死信交換機根據常規佇列繫結的routingkey,匹配到相應的死信佇列儲存。先傳送的訊息過期後存入到對應佇列中,後續暫未過期訊息仍然保持在原佇列。


延遲佇列

延遲佇列算是死信佇列的一種應用場景,其本身並不在RabbitMQ或是AMQP協定中有所體現。當不想消費者立馬獲取到訊息,而是等待一段時間才讓消費者消費時,比如訂單限時支付,超出時間未支付則取消訂單,那麼可以使用到延遲佇列來處理這一場景。

訊息過期時間

傳送訊息時設定過期時間場景下,訊息可能並不會在過期後立馬從佇列中刪除,而是要等到消費時候才會判斷該訊息是否過時。當出現以下場景時則會有點問題。

第一個訊息的過期時間很長,而後續的訊息的過期時間很短,後續的訊息過期後不會立馬刪除,而是要等到第一個訊息過期刪除後才會被刪除,那麼對應延遲佇列來說會有點問題,時間超出了設定的延遲時間。

注:對佇列設定訊息過期時間不存在該問題。

解決方案

RabbitMQ提供了延遲佇列的外掛,提供延遲佇列型別交換機,其不會根據第一個訊息是否過期來判斷,解決了如上提到的第一個沒有過期,後續訊息過期的場景,不會受訊息先後順序的影響,而是關注過期時間,先過期的先傳送。

生產者程式碼

宣告延遲交換機時型別使用x-delayed-message,需要注意宣告交換機型別時需要給定引數x-delayed-type,至於值是哪種型別可依據匹配佇列的需要選擇。延遲交換機和延遲佇列都需要持久化。訊息傳送時需要使用訊息頭並設定x-delay來設定延遲時間。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "rabbitmqdemo@test",
    VirtualHost = "rabbitmqdemo"
};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        //延遲交換機
        var delayExchangeName = "delay_exchange";
        var delayArguments = new Dictionary<string, object>
        {
            { "x-delayed-type", "direct" } //x-delayed-type必須,否則啟動報錯
        };
        channel.ExchangeDeclare(exchange: delayExchangeName, type: "x-delayed-message", durable: true, autoDelete: false, arguments: delayArguments); //持久化必須,否則啟動報錯

        //延遲佇列
        var delayQueueName = "delay_queue";
        channel.QueueDeclare(delayQueueName, durable: true, exclusive: false, autoDelete: false);//持久化必須,否則啟動報錯
        channel.QueueBind(queue: delayQueueName, exchange: delayExchangeName, routingKey: delayQueueName);

        channel.BasicReturn += new EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>((sender, e) =>
        {
            var message = Encoding.UTF8.GetString(e.Body.ToArray());
            Console.WriteLine($"收到回退訊息:{message}");
        });

        while (true)
        {
            Console.WriteLine("訊息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            var basicProperties = channel.CreateBasicProperties();
            basicProperties.Headers = new Dictionary<string, object>
            {
                { "x-delay", message == "aaa" ? 30000 : 10000 }//延時時間從header賦值
            };
            channel.BasicPublish(exchange: delayExchangeName, routingKey: delayQueueName, mandatory: true, basicProperties: basicProperties, body: body);
            Console.WriteLine("訊息內容傳送完畢:" + message + $" {DateTime.Now}");
        }
    }
}

生產者傳送訊息,到延遲交換機,訊息將在設定的延遲時間後路由到相應的延遲佇列。

此處設定了mandatory為true,訊息最終到了延遲佇列,但又被回退到了生產者,實際上對於該延遲交換機外掛並不支援mandatory,官方不建議使用該引數


2022-08-29,望技術有成後能回來看見自己的腳步