RabbitMQ的六種工作模式

2022-07-31 06:01:44

一、普通佇列模式

1.  一個消費者,一個佇列,一個消費者。
2.  訊息產生訊息放入佇列,訊息的消費者(consumer) 監聽(while) 訊息佇列,如果佇列中有訊息,就消費掉,訊息被拿走後,自動從佇列中刪除(隱患 訊息可能沒有被消費者正確處理,已經從佇列中消失了,造成訊息的丟失)應用場景:聊天(中間有一個過度的伺服器;p端,c端)


22.cnblogs.com/blog/1913282/202207/1913282-20220730231124618-1368574550.png)

  • 獲取RabbitMQ連線幫助類

    後面程式碼,這部分建立連線共用

     public class RabbitMQHelper
        {
            /// <summary>
            /// 獲取RabbitMQ連線
            /// </summary>
            /// <returns></returns>
            public static IConnection GetConnection()
            {
                //範例化連線工廠
                var factory = new ConnectionFactory
                {
                    HostName = "127.0.0.1", //ip
                    Port = 5672, // 埠
                    UserName = "Admin", // 賬戶
                    Password = "Admin", // 密碼
                    VirtualHost = "/"   // 虛擬主機
                };
    
                return factory.CreateConnection();
            }
        }
    
  • 生產者

    public class Send
    {
    
        public static void SendMessage()
        {
            string queueName = "normal";
    
            //1.建立連結
            using (var connection = RabbitMQHelper.GetConnection())
            {
                // 2.建立通道
                using(var channel = connection.CreateModel())
                {
                    // 3.宣告佇列
                    channel.QueueDeclare(queueName, false, false, false, null);
                    // 沒有繫結交換機,怎麼找到路由佇列的呢?
                    for (int i = 1; i <= 30; i++)
                    {
                        //4.構建Byte訊息封包
                        string message =$"第{i}條訊息";
                        var body = Encoding.UTF8.GetBytes(message);//訊息以二進位制形式傳輸
    
                        // 傳送訊息到rabbitmq,使用rabbitmq中預設提供交換機路由,預設的路由Key和佇列名稱完全一致
                        //5.傳送封包
                        channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
                        Thread.Sleep(1000);//新增延遲
                        Console.WriteLine("生產:" + message);
                    }
                }
            }
    
        } 
    }
    
  • 消費者

    public class Receive
    {
        public static void ReceiveMessage()
        {
            // 消費者消費是佇列中訊息
            string queueName = "normal";
            //1.建立連結連結
            var connection = RabbitMQHelper.GetConnection();
            {
                //2.建立通道
                var channel = connection.CreateModel();
                {
                    //3.宣告佇列:如果你先啟動是消費端就會異常
                    channel.QueueDeclare(queueName, false, false, false, null);
                    //4.建立一個消費者範例
                    var consumer = new EventingBasicConsumer(channel);
                    //5.繫結訊息接收後的事件委託
                    consumer.Received +=(model, ea) => {
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Thread.Sleep(1000);
                        Console.WriteLine(" Normal Received => {0}", message);
                    }; 
                    //6.啟動消費者
                    channel.BasicConsume( queue: queueName, autoAck:true, consumer);//開始消費
                }
    
            }
    
        } 
    }
    

二、工作佇列模式

  1. 一個消費者,一個佇列,多個消費者。但多個消費者中只會有一個會成功地消費訊息

  2. 訊息產生者將訊息放入佇列消費者可以有多個,消費者1,消費者2,同時監聽同一個佇列,訊息被消費?C1 C2共同爭搶當前的訊息佇列內容,誰先拿到誰負責消費訊息(隱患,高並行情況下,預設會產生某一個訊息被多個消費者共同使用。

  3. 應用場景:紅包;大專案中的資源排程(任務分配系統不需知道哪一個任務執行系統在空閒,直接將任務扔到訊息佇列中,空閒的系統自動爭搶)

  • 生產者

     public class WorkerSend
        {
    
            public static void SendMessage()
            {
                string queueName = "Worker_Queue";
    
                using (var connection = RabbitMQHelper.GetConnection())
                {
                    using(var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queueName, false, false, false, null);
                        for (int i = 0; i < 30; i++)
                        {
                            string message = $"RabbitMQ Worker {i + 1} Message";
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish("", queueName, null, body);
                            Console.WriteLine("send Task {0} message",i + 1);
                        }
                       
                    }
                }
                
            } 
        }
    
  • 消費者

      public class WorkerReceive
        {
            public static void ReceiveMessage()
            {
                string queueName = "Worker_Queue";
                var connection = RabbitMQHelper.GetConnection();
                {
                    var channel = connection.CreateModel();
                    {
                        channel.QueueDeclare(queueName, false, false, false, null);
                        var consumer = new EventingBasicConsumer(channel);
                        //設定prefetchCount : 1來告知RabbitMQ,在未收到消費端的訊息確認時,不再分發訊息,也就確保了當消費端處於忙碌狀態時,不再分配任務。
                        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                        consumer.Received +=(model, ea) => {
                            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                            Console.WriteLine(" Worker Queue Received => {0}", message);
                        }; 
                        channel.BasicConsume(queueName,true, consumer);
                    }
                   
                }
              
            } 
        }
    

三、扇形佇列模式(釋出/訂閱模式)

  1. 一個訊息生產者,一個交換機,多個佇列,多個訊息消費者。每個消費佇列中訊息一致,且每個訊息消費者都從自己的訊息佇列的第一個訊息開始消費,直到最後。

  2. 交換機為rabbitMQ中內部元件。訊息生產者將訊息傳送給rabbitMQ後,rabbitMQ會根據訂閱的消費者個數,生成對應數目的訊息佇列,這樣每個消費者都能獲取生產者傳送的全部訊息。

  3. 一旦消費者斷開與rabbitMQ的連線,佇列就會消失。如果消費者數目很多,對於rabbitMQ而言,也是個重大負擔,訂閱模式是個長連線,佔用並行數,且每個消費者一個佇列會佔用大量空間

  4. 相關應用場景:郵件群發,群聊,廣播

  • 生產者
 public static void SendMessage()
        {
            //1.建立連線
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //2.建立通道
                using(var channel = connection.CreateModel())
                {
                    // 3.宣告交換機物件
                    channel.ExchangeDeclare("fanout_exchange", "fanout");
                   
                    // 4.建立佇列
                    string queueName1 = "fanout_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "fanout_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "fanout_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    
                    // 5.繫結到互動機
                    // fanout_exchange 繫結了 3個佇列 
                    channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");//指定交換機
                    channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");

                    for (int i = 0; i < 10; i++)
                    {
                        //6.構建訊息byte陣列
                        string message = $"RabbitMQ Fanout {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        //7.傳送訊息
                        channel.BasicPublish("fanout_exchange", "", null, body);//同時把訊息傳送到訂閱的三個佇列
                        Console.WriteLine("Send Fanout {0} message",i + 1);
                    }
                }
            }
            
        } 
    }
  • 消費者
 public class FanoutConsumer
    {
        public static void ConsumerMessage()
        {
            //1.建立連線
            var connection = RabbitMQHelper.GetConnection();
            {
                //2,。建立通道
                var channel = connection.CreateModel();
                {
                    //3.申明exchange
                    channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
                    
                    // 4.建立佇列
                    string queueName1 = "fanout_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "fanout_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "fanout_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    
                    // 5.繫結到互動機
                    channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");

                    Console.WriteLine("[*] Waitting for fanout logs.");

                    //6.申明consumer
                    var consumer = new EventingBasicConsumer(channel);
                    //繫結訊息接收後的事件委託
                    consumer.Received += (model, ea) => {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine("[x] {0}", message);

                    };
                    //7.啟動消費者
                    channel.BasicConsume(queue: queueName1, autoAck: true, consumer: consumer);//只會消費佇列queueName1中的訊息,其他佇列中訂閱的訊息仍然存在
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }

四、直接佇列模式(Routing路由模式)

  1. 一個訊息生產者,一個交換機,多個佇列,多個訊息消費者。一個交換機繫結多個訊息佇列,每個訊息佇列都有自己唯一的Routekey,每一個訊息佇列有一個消費者監聽。

  2. 訊息生產者將訊息傳送給交換機,交換機按照路由判斷,將路由到的RouteKey的訊息,推播與之繫結的佇列,交換機根據路由的key,只能匹配上路由key對應的訊息佇列,對應的消費者才能消費訊息;

  • 生產者:
public static void SendMessage()
        {
            //1.建立連線
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //2.建立通道
                using(var channel = connection.CreateModel())
                {
                    // 3.宣告Direct交換機
                    channel.ExchangeDeclare("direct_exchange", "direct");

                    // 4.建立佇列
                    string queueName1 = "direct_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "direct_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "direct_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);

                    // 5.繫結到互動機 指定routingKey
                    channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
                    channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
                    channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");

                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} Message =>green";
                        var body = Encoding.UTF8.GetBytes(message);
                        // 傳送訊息的時候需要指定routingKey傳送
                        channel.BasicPublish(exchange: "direct_exchange", routingKey: "green", null, body);//只發布到RouteKey:green的佇列
                        Console.WriteLine("Send Direct {0} message",i + 1);
                    }
                }
            }
            
        } 
    }
  • 消費者
   public class DirectConsumer
    {
        public static void ConsumerMessage()
        {
            //1.建立連線
            var connection = RabbitMQHelper.GetConnection();
            //2.建立通訊
            var channel = connection.CreateModel();
            //3.宣告交換機
            channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
            //4.繫結交換機
            var queueName = "direct_queue2";//佇列direct_queue3繫結有red,yellow,green共3個RouteKey
            channel.QueueDeclare(queueName, false, false, false, null);
            //此處消費通訊沒有必要繫結所有的RouteKey,根據前生產者通訊的路由規則,每個佇列中只會路由到一種訊息
            channel.QueueBind(queue: queueName,
                                      exchange: "direct_exchange",
                                      routingKey: "red");
            channel.QueueBind(queue: queueName,
                                      exchange: "direct_exchange",
                                      routingKey: "yellow");
            channel.QueueBind(queue: queueName,
                                      exchange: "direct_exchange",
                                      routingKey: "green");

            Console.WriteLine(" [*] Waiting for messages.");

            //5.範例化消費者
            var consumer = new EventingBasicConsumer(channel);
            //6.為消費者繫結消費委託事件
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);

                // 消費完成後需要手動簽收訊息,如果不寫該程式碼就容易導致重複消費問題
                //7.手動確認簽收訊息
                channel.BasicAck(ea.DeliveryTag, true); // 可以降低每次簽收效能損耗
            };

            // 訊息簽收模式
            // 手動簽收 保證正確消費,不會丟訊息(基於使用者端而已)
            // 自動簽收 容易丟訊息 
            // 簽收:意味著訊息從佇列中刪除
            channel.BasicConsume(queue: queueName,
                                 autoAck: false,
                                 consumer: consumer);//設定為不自動簽收,進行手動簽收

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

五、模糊匹配佇列模式(Topic 主題模式)

  1. 一個訊息生產者,一個交換機,多個佇列,多個訊息消費者。一個交換機繫結多個訊息佇列,每個訊息佇列都有自己唯一的Routekey,每一個訊息佇列有一個消費者監聽。

  2. 此時的自己唯一的Routekey,不是一個確定值,像我們熟悉的正規表示式對應的匹配規則。

  3. 生產者產生訊息,把訊息交給交換機,交換機根據RouteKey的模糊匹配到對應的佇列,由佇列監聽消費者消費訊息。

  4. 規則:

    和* 都是萬用字元,命名規則是多個單詞用頓號(.)分隔開

    代表代表一個單詞

    *代表多個單詞

  • 生產者:
      public static void SendMessage()
        {
            //1.建立連線
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //2.建立通道
                using (var channel = connection.CreateModel())
                {
                    //3.宣告交換機
                    channel.ExchangeDeclare("topic_exchange", "topic");
                    //4.宣告佇列
                    string queueName1 = "topic_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "topic_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "topic_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    //5.繫結到互動機
                    channel.QueueBind(queue: queueName1, exchange: "topic_exchange", routingKey: "user.data.*");
                    channel.QueueBind(queue: queueName2, exchange: "topic_exchange", routingKey: "user.data.delete");
                    channel.QueueBind(queue: queueName3, exchange: "topic_exchange", routingKey: "user.data.update");
                
                    for (int i = 0; i < 10; i++)
                    {
                        //6.準備傳送位元組陣列
                        string message = $"RabbitMQ Topic {i + 1} Delete Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        //7.根據RouteKey釋出訊息
                        channel.BasicPublish("topic_exchange", "user.data.delete", null, body);//會發布到queueName1,queueName2
                        Console.WriteLine("Send Topic {0} message", i + 1);
                    }
                }
            }

        }
  • 消費者:
 public static void ConsumerMessage()
        {
            //1.建立連線
            var connection = RabbitMQHelper.GetConnection();
            //2.建立通訊
            var channel = connection.CreateModel();
            //3.宣告交換機
            channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
            //4.宣告佇列
            var queueName = "topic_queue3";
            channel.QueueDeclare(queueName, false, false, false, null);
            //5.繫結交換機
            channel.QueueBind(queue: queueName,
                                      exchange: "topic_exchange",
                                      routingKey: "user.data.*");

            Console.WriteLine(" [*] Waiting for messages.");
            //6.建立消費者
            var consumer = new EventingBasicConsumer(channel);
            //7.繫結消費委託事件
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
            };

            //8.啟動消費
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

六、RPC 模式(瞭解)

RPC即使用者端遠端呼叫伺服器端的方法 ,使用MQ可以實現RPC的非同步呼叫,基於Direct交換機實現,流程如下:

1、使用者端即是生產者也是消費者,向RPC請求佇列傳送RPC呼叫訊息,同時監聽RPC響應佇列。

2、伺服器端監聽RPC請求佇列的訊息,收到訊息後執行伺服器端的方法,得到方法返回的結果。

3、伺服器端將RPC方法 的結果傳送到RPC響應佇列。

4、使用者端(RPC呼叫方)監聽RPC響應佇列,接收到RPC呼叫結果。

原始碼地址