系列目錄
前言:
上一篇介紹了訊息如何確保不丟失,本篇簡單介紹如何保障訊息不重複消費的處理方式。
對於接收端而言,對訊息的確認來往是需要時間的。
如果同一個佇列,同時存在多個使用者端監聽,那麼多個使用者端有一定概率能收到相同的資訊。
這時候就會產生訊息重複的問題的:
1、處理訊息重複消費的幾種方式:
網上人們說的主要兩種方式:
方式一、Redis:用setnx命令,做訊息id判斷。
方式三、資料庫:做唯一索引,訊息id能插入就可以處理,所以重複消費就會失敗。
優缺點說明:
方式一:Redis setnx 分散式鎖:
不推薦使用:一種不太靠譜的方式,如果專案對資料允許出錯,可以嘗試使用。
為啥不靠譜,可以參考文章:https://zhuanlan.zhihu.com/p/418268774
方式二:資料庫 的鎖:
推薦使用:可以用唯一索引,也可以用事務鎖,使用起來成熟又簡單。
方式三:獨佔檔案、剪貼版
推薦場景:單機場景下
1、可以通過對檔案的獨佔開啟和釋放,來達到程序間的鎖。
2、可以通過對剪貼版的讀寫,來達到程序間的鎖。
前面幾種方式,都有一個要素,就是需要訊息的唯一ID。
2、如何產生訊息的唯一ID:
A:對訊息進行hash,取hash值做為唯一ID(無法避免,有一定概念產生相同的hash)。
B:對每一個傳送的訊息,都產生一個GUID,這樣在獲取訊息的時候,就可以拿到訊息對應的唯一ID。
下面看程式碼演示:
3、傳送訊息時,帶唯一ID:
using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
channel.BasicReturn += (sender, e) =>
{
//通過交換機傳送過去,但沒傳送到指定的佇列,資料丟失
//do .....
Console.WriteLine("訊息沒發到指定佇列:" + Encoding.UTF8.GetString(e.Body.ToArray()));
};
channel.ConfirmSelect();
channel.QueueDeclare("FirstQueue", false, false, false);
var pro = channel.CreateBasicProperties();
pro.MessageId = Guid.NewGuid().ToString();
channel.BasicPublish("", "FirstQueue", true, pro, Encoding.UTF8.GetBytes("這是要傳送的內容"));
if (channel.WaitForConfirms(TimeSpan.FromSeconds(10)))
{
//傳送確認成功
}
else
{
//超時或失敗,需要處理是否重發訊息。
}
}
在訊息傳送前,生成一個屬性,這個屬性可以附帶很多資訊,其中一個是MessageId。
然後傳送的時候,第4個引數,把屬性帶過去即可。
4、接收訊息:獲取訊息唯一ID:
var channel = Rabbit.Instance.DefaultConnection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine("收到預設訊息 {0}", message);
Console.WriteLine("收到預設訊息GUID {0}", ea.BasicProperties.MessageId);
try
{
channel.BasicAck(ea.DeliveryTag, false);
}
catch (Exception err)
{
//處理確認失敗的情況。
}
};
channel.BasicConsume(queue: "FirstQueue",
autoAck: false,
consumer: consumer);
總結:
本篇介紹如何保障訊息不重複消費以及如何產生訊息的唯一ID,除了網上的基本兩種方式,個人還奉獻了單機版的場景方式。