.Net Core&RabbitMQ限制迴圈消費

2022-10-29 18:01:35

前言

當消費者端接收訊息處理業務時,如果出現異常或是拒收訊息將訊息又變更為等待投遞再次推播給消費者,這樣一來,則形成迴圈的條件。

迴圈場景

生產者傳送100條訊息到RabbitMQ中,消費者設定讀取到第50條訊息時,設定拒收,同時設定是否還留存在當前佇列中(當requeue為false時,設定了死信佇列則進入死信佇列,否則移除訊息)。

consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine("拒收");
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

當第50條訊息拒收,則仍在佇列中且處在佇列頭部,重新推播給消費者,再次拒收,再次推播,反反覆覆。

最終其他訊息全部消費完畢,僅剩第50條訊息往復間不斷消費,拒收,消費,這將可能導致RabbitMQ出現記憶體漏失問題。



解決方案

RabbitMQ及AMQP協定本身沒有提供這類重試功能,但可以利用一些已有的功能來間接實現重試限定(以下只考慮基於手動確認模式情況)。此處只想到或是隻查到了如下幾種方案解決訊息迴圈消費問題。

  • 一次消費
    • 無論成功與否,消費者都對外返回ack,將拒收原因或是異常資訊catch存入本地或是新佇列中另作重試。
    • 消費者拒絕訊息或是出現異常,返回Nack或Reject,訊息進入死信佇列或丟棄(requeue設定為false)。
  • 限定重試次數
    • 在訊息的頭中新增重試次數,並將訊息重新傳送出去,再每次重新消費時從頭中判斷重試次數,遞增或遞減該值,直到達到限制,requeue改為false,最終進入死信佇列或丟棄。
    • 可以在Redis、Memcache或其他儲存中儲存訊息唯一鍵(例如Guid、雪花Id等,但必須在釋出訊息時手動設定它),甚至在mysql中連同重試次數一起儲存,然後在每次重新消費時遞增/遞減該值,直到達到限制,requeue改為false,最終進入死信佇列或丟棄。
    • 佇列使用Quorum型別,限制投遞次數,超過次數訊息被刪除。
  • 佇列訊息過期
    • 設定過期時間,給佇列或是訊息設定TTL,重試一定次數訊息達到過期時間後進入死信佇列或丟棄(requeue設定為true)。
  • 也許還有更多好的方案...

一次消費

對外總是Ack

訊息到達了消費端,可因某些原因消費失敗了,對外可以傳送Ack,而在內部走額外的方式去執行補償操作,比如將訊息轉發到內部的RabbitMQ或是其他處理方式,終歸是隻消費一次。

var queueName = "alwaysack_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    try
    {
        var message = ea.Body;
        Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));

        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
        {
            throw new Exception("模擬異常");
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
    finally
    {
        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
    }
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

當消費端收到訊息,處理時出現異常,可以另想辦法去處理,而對外保持著ack的返回,以避免訊息的迴圈消費。

訊息不重入佇列

在消費者端,因異常或是拒收訊息時,對requeue設定為false時,如果設定了死信佇列,則符合「訊息被拒絕且不重入佇列」這一進入死信佇列的情況,從而避免訊息反覆重試。如未設定死信佇列,則訊息被丟失。

此處假定接收100條訊息,在接收到第50條訊息時設定拒收,並且設定了requeue為false。

var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");

var queueName = "nackorreject_queue";
var arguments = new Dictionary<string, object>
{
    { "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine("拒收");
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//關鍵在於requeue=false
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

如此一來,拒收訊息不會重入佇列,並且現有佇列繫結了死信交換機,因此,訊息進入到死信佇列中,如不繫結,則訊息丟失。


限定重試次數

設定重試次數,限定迴圈消費的次數,允許短暫的迴圈,但最終打破迴圈。

訊息頭設定次數

在訊息頭中設定次數記錄作為標記,但是,消費端無法對接收到的訊息修改訊息頭然後將原訊息送回MQ,因此,需要將原訊息內容重新傳送訊息到MQ,具體步驟如下

  1. 原訊息設定不重入佇列。
  2. 再傳送新的訊息其內容與原訊息一致,可設定新訊息的訊息頭來攜帶重試次數。
  3. 消費端再次消費時,便可從訊息頭中檢視訊息被消費的次數。

此處假定接收10條訊息,在接收到第5條訊息時設定拒收, 當訊息頭中重試次數未超過設定的3次時,訊息可以重入佇列,再次被消費。

var queueName = "messageheaderretrycount_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))
    {
        var maxRetryCount = 3;

        Console.WriteLine($"拒收 {DateTime.Now}");

        //初次消費
        if (ea.BasicProperties.Headers == null)
        {
            //原訊息設定為不重入佇列
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);

            //傳送新訊息到佇列中
            RetryPublishMessage(channel, queueName, message.ToArray(), 1);
            return;
        }

        //獲取重試次數
        var retryCount = ParseRetryCount(ea);
        if (retryCount < maxRetryCount)
        {
            //原訊息設定為不重入佇列
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);

            //傳送新訊息到佇列中
            RetryPublishMessage(channel, queueName, message.ToArray(), retryCount + 1);
            return;
        }

        //到達最大次數,不再重試訊息
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

static void RetryPublishMessage(IModel channel, string queueName, byte[] body, int retryCount)
{
    var basicProperties = channel.CreateBasicProperties();
    basicProperties.Headers = new Dictionary<string, object>();
    basicProperties.Headers.Add("retryCount", retryCount);
    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);
}

static int ParseRetryCount(BasicDeliverEventArgs ea)
{
    var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);
    if (!existRetryRecord)
    {
        throw new Exception("沒有設定重試次數");
    }

    return (int)retryCount;
}

訊息被拒收後,再重新傳送訊息到原有交換機或是佇列下中,以使得訊息像是消費失敗回到了佇列中,如此來控制消費次數,但是這種場景下,新訊息排在了佇列的尾部,而不是原訊息排在佇列頭部。

儲存重試次數

在儲存服務中儲存訊息的唯一標識與對應重試次數,消費訊息前對訊息進行判斷是否存在。

與訊息頭判斷一致,只是訊息重試次數的儲存從訊息本身挪入儲存服務中了。需要注意的是,訊息傳送端需要設定訊息的唯一標識(MessageId屬性)

//模擬外部儲存服務
var MessageRetryCounts = new Dictionary<ulong, int>();

var queueName = "storageretrycount_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        var maxRetryCount = 3;
        Console.WriteLine("拒收");
    
        //重試次數判斷
        var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);
        if (!existRetryRecord)
        {
            //重入佇列,繼續重試
            MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
    
        if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount)
        {
            //重入佇列,繼續重試
            MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1;
            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
            return;
        }
    
        //到達最大次數,不再重試訊息
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

除第一次拒收外,允許三次重試機會,三次重試完畢後,設定requeue為false,訊息丟失或進入死信佇列(如有設定的話)。

佇列使用Quorum型別

第一種和第二種分別是訊息自身、外部儲存服務來管理訊息重試次數,使用Quorum,由MQ來限定訊息的投遞次數,也就控制了重試次數。

設定佇列型別為quorum,設定投遞最大次數,當超過投遞次數後,訊息被丟棄。

var queueName = "quorumtype_queue";
var arguments = new Dictionary<string, object>()
{
    { "x-queue-type", "quorum"},
    { "x-delivery-limit", 3 }
};
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine($"拒收 {DateTime.Now}");
        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

第一次消費被拒收重入佇列後,經最大三次投遞後,消費端不再收到訊息,如此一來也限制了訊息的迴圈消費。


佇列訊息過期

當為訊息設定了過期時間時,當訊息沒有受到Ack,且還在佇列中,受到過期時間的限制,反覆消費但未能成功時,訊息將走向過期,進入死信佇列或是被丟棄。

聚焦於過期時間的限制,因此在消費者端,因異常或是拒收訊息時,需要對requeue設定為true,將訊息再次重入到原佇列中。

設定消費者端第五十條訊息會被拒收,且佇列的TTL設定為5秒。

//死信交換機和死信佇列
var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");

//常規佇列
var queueName = "normalmessage_queue";
var arguments = new Dictionary<string, object>
{
    { "x-message-ttl", 5000},
    { "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = ea.Body;
    Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));

    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
    {
        Console.WriteLine($"拒收 {DateTime.Now}");

        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
        return;
    }

    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

當消費者端拒收訊息後訊息重入佇列,再次消費,反覆進行超過5秒後,訊息在佇列中達到了過期時間,則被挪入到死信佇列中。

從Web管理中死信佇列中可檢視該條過期的訊息。



參考資料

  1. https://www.jianshu.com/p/f77a0b10c140
  2. https://www.jianshu.com/p/4904c609632f
  3. https://stackoverflow.com/questions/23158310/how-do-i-set-a-number-of-retry-attempts-in-rabbitmq

2022-10-29,望技術有成後能回來看見自己的腳步