當消費者端接收訊息處理業務時,如果出現異常或是拒收訊息將訊息又變更為等待投遞再次推播給消費者,這樣一來,則形成迴圈的條件。
生產者傳送100條訊息到RabbitMQ中,消費者設定讀取到第50條訊息時,設定拒收,同時設定是否還留存在當前佇列中(當requeue為false時,設定了死信佇列則進入死信佇列,否則移除訊息)。
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
Console.WriteLine("拒收");
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
當第50條訊息拒收,則仍在佇列中且處在佇列頭部,重新推播給消費者,再次拒收,再次推播,反反覆覆。
最終其他訊息全部消費完畢,僅剩第50條訊息往復間不斷消費,拒收,消費,這將可能導致RabbitMQ出現記憶體漏失問題。
RabbitMQ及AMQP協定本身沒有提供這類重試功能,但可以利用一些已有的功能來間接實現重試限定(以下只考慮基於手動確認模式情況)。此處只想到或是隻查到了如下幾種方案解決訊息迴圈消費問題。
訊息到達了消費端,可因某些原因消費失敗了,對外可以傳送Ack,而在內部走額外的方式去執行補償操作,比如將訊息轉發到內部的RabbitMQ或是其他處理方式,終歸是隻消費一次。
var queueName = "alwaysack_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
try
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
throw new Exception("模擬異常");
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
}
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
當消費端收到訊息,處理時出現異常,可以另想辦法去處理,而對外保持著ack的返回,以避免訊息的迴圈消費。
在消費者端,因異常或是拒收訊息時,對requeue設定為false時,如果設定了死信佇列,則符合「訊息被拒絕且不重入佇列」這一進入死信佇列的情況,從而避免訊息反覆重試。如未設定死信佇列,則訊息被丟失。
此處假定接收100條訊息,在接收到第50條訊息時設定拒收,並且設定了requeue為false。
var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");
var queueName = "nackorreject_queue";
var arguments = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
Console.WriteLine("拒收");
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//關鍵在於requeue=false
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
如此一來,拒收訊息不會重入佇列,並且現有佇列繫結了死信交換機,因此,訊息進入到死信佇列中,如不繫結,則訊息丟失。
設定重試次數,限定迴圈消費的次數,允許短暫的迴圈,但最終打破迴圈。
在訊息頭中設定次數記錄作為標記,但是,消費端無法對接收到的訊息修改訊息頭然後將原訊息送回MQ,因此,需要將原訊息內容重新傳送訊息到MQ,具體步驟如下
此處假定接收10條訊息,在接收到第5條訊息時設定拒收, 當訊息頭中重試次數未超過設定的3次時,訊息可以重入佇列,再次被消費。
var queueName = "messageheaderretrycount_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))
{
var maxRetryCount = 3;
Console.WriteLine($"拒收 {DateTime.Now}");
//初次消費
if (ea.BasicProperties.Headers == null)
{
//原訊息設定為不重入佇列
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
//傳送新訊息到佇列中
RetryPublishMessage(channel, queueName, message.ToArray(), 1);
return;
}
//獲取重試次數
var retryCount = ParseRetryCount(ea);
if (retryCount < maxRetryCount)
{
//原訊息設定為不重入佇列
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
//傳送新訊息到佇列中
RetryPublishMessage(channel, queueName, message.ToArray(), retryCount + 1);
return;
}
//到達最大次數,不再重試訊息
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
static void RetryPublishMessage(IModel channel, string queueName, byte[] body, int retryCount)
{
var basicProperties = channel.CreateBasicProperties();
basicProperties.Headers = new Dictionary<string, object>();
basicProperties.Headers.Add("retryCount", retryCount);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);
}
static int ParseRetryCount(BasicDeliverEventArgs ea)
{
var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);
if (!existRetryRecord)
{
throw new Exception("沒有設定重試次數");
}
return (int)retryCount;
}
訊息被拒收後,再重新傳送訊息到原有交換機或是佇列下中,以使得訊息像是消費失敗回到了佇列中,如此來控制消費次數,但是這種場景下,新訊息排在了佇列的尾部,而不是原訊息排在佇列頭部。
在儲存服務中儲存訊息的唯一標識與對應重試次數,消費訊息前對訊息進行判斷是否存在。
與訊息頭判斷一致,只是訊息重試次數的儲存從訊息本身挪入儲存服務中了。需要注意的是,訊息傳送端需要設定訊息的唯一標識(MessageId屬性)
//模擬外部儲存服務
var MessageRetryCounts = new Dictionary<ulong, int>();
var queueName = "storageretrycount_queue";
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
var maxRetryCount = 3;
Console.WriteLine("拒收");
//重試次數判斷
var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);
if (!existRetryRecord)
{
//重入佇列,繼續重試
MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
return;
}
if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount)
{
//重入佇列,繼續重試
MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1;
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
return;
}
//到達最大次數,不再重試訊息
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
除第一次拒收外,允許三次重試機會,三次重試完畢後,設定requeue為false,訊息丟失或進入死信佇列(如有設定的話)。
第一種和第二種分別是訊息自身、外部儲存服務來管理訊息重試次數,使用Quorum,由MQ來限定訊息的投遞次數,也就控制了重試次數。
設定佇列型別為quorum,設定投遞最大次數,當超過投遞次數後,訊息被丟棄。
var queueName = "quorumtype_queue";
var arguments = new Dictionary<string, object>()
{
{ "x-queue-type", "quorum"},
{ "x-delivery-limit", 3 }
};
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
Console.WriteLine($"拒收 {DateTime.Now}");
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
第一次消費被拒收重入佇列後,經最大三次投遞後,消費端不再收到訊息,如此一來也限制了訊息的迴圈消費。
當為訊息設定了過期時間時,當訊息沒有受到Ack,且還在佇列中,受到過期時間的限制,反覆消費但未能成功時,訊息將走向過期,進入死信佇列或是被丟棄。
聚焦於過期時間的限制,因此在消費者端,因異常或是拒收訊息時,需要對requeue設定為true,將訊息再次重入到原佇列中。
設定消費者端第五十條訊息會被拒收,且佇列的TTL設定為5秒。
//死信交換機和死信佇列
var dlxExchangeName = "dlx_exchange";
channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);
var dlxQueueName = "dlx_queue";
channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");
//常規佇列
var queueName = "normalmessage_queue";
var arguments = new Dictionary<string, object>
{
{ "x-message-ttl", 5000},
{ "x-dead-letter-exchange", dlxExchangeName }
};
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);
channel.BasicQos(0, 5, false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = ea.Body;
Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))
{
Console.WriteLine($"拒收 {DateTime.Now}");
((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);
return;
}
((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
當消費者端拒收訊息後訊息重入佇列,再次消費,反覆進行超過5秒後,訊息在佇列中達到了過期時間,則被挪入到死信佇列中。
從Web管理中死信佇列中可檢視該條過期的訊息。
2022-10-29,望技術有成後能回來看見自己的腳步