RabbitMQ 入門系列:7、保障訊息不重複消費:產生訊息的唯一ID。

2022-08-29 12:02:11

系列目錄

RabbitMQ 入門系列:1、MQ的應用場景的選擇與RabbitMQ安裝。

RabbitMQ 入門系列:2、基礎含義:連結、通道、佇列、交換機。

RabbitMQ 入門系列:3、基礎含義:持久化、排它性、自動刪除、強制性、路由鍵。

RabbitMQ 入門系列:4、基礎編碼:官方SDK使用:連結建立、單例改造、傳送訊息、接收訊息。

RabbitMQ 入門系列:5、基礎編碼:交換機的進階介紹及編碼方式。

RabbitMQ 入門系列:6、保障訊息:不丟失:傳送方、Rabbit儲存端、接收方。

RabbitMQ 入門系列:7、保障訊息:不重複消費:產生訊息的唯一ID。

RabbitMQ 入門系列:8、擴充套件內容:接收資訊時:可否根據RoutingKey過濾監聽資訊,答案是不能。

RabbitMQ 入門系列:9、擴充套件內容:死信佇列:真不適合當延時佇列。

RabbitMQ 入門系列:10、擴充套件內容:延時佇列:延時佇列外掛及其有限的適用場景。

前言:

上一篇介紹了訊息如何確保不丟失,本篇簡單介紹如何保障訊息不重複消費的處理方式。

對於接收端而言,對訊息的確認來往是需要時間的。

如果同一個佇列,同時存在多個使用者端監聽,那麼多個使用者端有一定概率能收到相同的資訊。

這時候就會產生訊息重複的問題的:

1、處理訊息重複消費的幾種方式:

網上人們說的主要兩種方式:

方式一、Redis:用setnx命令,做訊息id判斷。

方式三、資料庫:做唯一索引,訊息id能插入就可以處理,所以重複消費就會失敗。

優缺點說明:

方式一:Redis setnx 分散式鎖:

不推薦使用:一種不太靠譜的方式,如果專案對資料允許出錯,可以嘗試使用。

為啥不靠譜,可以參考文章:https://zhuanlan.zhihu.com/p/418268774

方式二:資料庫 的鎖:

推薦使用:可以用唯一索引,也可以用事務鎖,使用起來成熟又簡單。

方式三:獨佔檔案、剪貼版

推薦場景:單機場景下

1、可以通過對檔案的獨佔開啟和釋放,來達到程序間的鎖。

2、可以通過對剪貼版的讀寫,來達到程序間的鎖。

前面幾種方式,都有一個要素,就是需要訊息的唯一ID。

2、如何產生訊息的唯一ID:

A:對訊息進行hash,取hash值做為唯一ID(無法避免,有一定概念產生相同的hash)。

B:對每一個傳送的訊息,都產生一個GUID,這樣在獲取訊息的時候,就可以拿到訊息對應的唯一ID。

下面看程式碼演示:

3、傳送訊息時,帶唯一ID:

using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
    channel.BasicReturn += (sender, e) =>
    {
        //通過交換機傳送過去,但沒傳送到指定的佇列,資料丟失
        //do .....
        Console.WriteLine("訊息沒發到指定佇列:" + Encoding.UTF8.GetString(e.Body.ToArray()));
    };
    channel.ConfirmSelect();
    channel.QueueDeclare("FirstQueue", false, false, false);
    var pro = channel.CreateBasicProperties();
    pro.MessageId = Guid.NewGuid().ToString();
    channel.BasicPublish("", "FirstQueue", true, pro, Encoding.UTF8.GetBytes("這是要傳送的內容"));
    if (channel.WaitForConfirms(TimeSpan.FromSeconds(10)))
    {
        //傳送確認成功
    }
    else
    {
        //超時或失敗,需要處理是否重發訊息。
    }
}

在訊息傳送前,生成一個屬性,這個屬性可以附帶很多資訊,其中一個是MessageId。

然後傳送的時候,第4個引數,把屬性帶過去即可。

4、接收訊息:獲取訊息唯一ID:

var channel = Rabbit.Instance.DefaultConnection.CreateModel();
 
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
   
    Console.WriteLine("收到預設訊息 {0}", message);
    Console.WriteLine("收到預設訊息GUID {0}", ea.BasicProperties.MessageId);
    try
    {
        channel.BasicAck(ea.DeliveryTag, false);
    }
    catch (Exception err)
    {

       //處理確認失敗的情況。
    }
   
   
};
channel.BasicConsume(queue: "FirstQueue",
                      autoAck: false,
                      consumer: consumer);

 

總結:

本篇介紹如何保障訊息不重複消費以及如何產生訊息的唯一ID,除了網上的基本兩種方式,個人還奉獻了單機版的場景方式。