1. 一個消費者,一個佇列,一個消費者。
2. 訊息產生訊息放入佇列,訊息的消費者(consumer) 監聽(while) 訊息佇列,如果佇列中有訊息,就消費掉,訊息被拿走後,自動從佇列中刪除(隱患 訊息可能沒有被消費者正確處理,已經從佇列中消失了,造成訊息的丟失)應用場景:聊天(中間有一個過度的伺服器;p端,c端)
22.cnblogs.com/blog/1913282/202207/1913282-20220730231124618-1368574550.png)
獲取RabbitMQ連線幫助類
後面程式碼,這部分建立連線共用
public class RabbitMQHelper
{
/// <summary>
/// 獲取RabbitMQ連線
/// </summary>
/// <returns></returns>
public static IConnection GetConnection()
{
//範例化連線工廠
var factory = new ConnectionFactory
{
HostName = "127.0.0.1", //ip
Port = 5672, // 埠
UserName = "Admin", // 賬戶
Password = "Admin", // 密碼
VirtualHost = "/" // 虛擬主機
};
return factory.CreateConnection();
}
}
生產者
public class Send
{
public static void SendMessage()
{
string queueName = "normal";
//1.建立連結
using (var connection = RabbitMQHelper.GetConnection())
{
// 2.建立通道
using(var channel = connection.CreateModel())
{
// 3.宣告佇列
channel.QueueDeclare(queueName, false, false, false, null);
// 沒有繫結交換機,怎麼找到路由佇列的呢?
for (int i = 1; i <= 30; i++)
{
//4.構建Byte訊息封包
string message =$"第{i}條訊息";
var body = Encoding.UTF8.GetBytes(message);//訊息以二進位制形式傳輸
// 傳送訊息到rabbitmq,使用rabbitmq中預設提供交換機路由,預設的路由Key和佇列名稱完全一致
//5.傳送封包
channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
Thread.Sleep(1000);//新增延遲
Console.WriteLine("生產:" + message);
}
}
}
}
}
消費者
public class Receive
{
public static void ReceiveMessage()
{
// 消費者消費是佇列中訊息
string queueName = "normal";
//1.建立連結連結
var connection = RabbitMQHelper.GetConnection();
{
//2.建立通道
var channel = connection.CreateModel();
{
//3.宣告佇列:如果你先啟動是消費端就會異常
channel.QueueDeclare(queueName, false, false, false, null);
//4.建立一個消費者範例
var consumer = new EventingBasicConsumer(channel);
//5.繫結訊息接收後的事件委託
consumer.Received +=(model, ea) => {
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Thread.Sleep(1000);
Console.WriteLine(" Normal Received => {0}", message);
};
//6.啟動消費者
channel.BasicConsume( queue: queueName, autoAck:true, consumer);//開始消費
}
}
}
}
一個消費者,一個佇列,多個消費者。但多個消費者中只會有一個會成功地消費訊息
訊息產生者將訊息放入佇列消費者可以有多個,消費者1,消費者2,同時監聽同一個佇列,訊息被消費?C1 C2共同爭搶當前的訊息佇列內容,誰先拿到誰負責消費訊息(隱患,高並行情況下,預設會產生某一個訊息被多個消費者共同使用。
應用場景:紅包;大專案中的資源排程(任務分配系統不需知道哪一個任務執行系統在空閒,直接將任務扔到訊息佇列中,空閒的系統自動爭搶)
生產者
public class WorkerSend
{
public static void SendMessage()
{
string queueName = "Worker_Queue";
using (var connection = RabbitMQHelper.GetConnection())
{
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, false, false, false, null);
for (int i = 0; i < 30; i++)
{
string message = $"RabbitMQ Worker {i + 1} Message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", queueName, null, body);
Console.WriteLine("send Task {0} message",i + 1);
}
}
}
}
}
消費者
public class WorkerReceive
{
public static void ReceiveMessage()
{
string queueName = "Worker_Queue";
var connection = RabbitMQHelper.GetConnection();
{
var channel = connection.CreateModel();
{
channel.QueueDeclare(queueName, false, false, false, null);
var consumer = new EventingBasicConsumer(channel);
//設定prefetchCount : 1來告知RabbitMQ,在未收到消費端的訊息確認時,不再分發訊息,也就確保了當消費端處於忙碌狀態時,不再分配任務。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
consumer.Received +=(model, ea) => {
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(" Worker Queue Received => {0}", message);
};
channel.BasicConsume(queueName,true, consumer);
}
}
}
}
一個訊息生產者,一個交換機,多個佇列,多個訊息消費者。每個消費佇列中訊息一致,且每個訊息消費者都從自己的訊息佇列的第一個訊息開始消費,直到最後。
交換機為rabbitMQ中內部元件。訊息生產者將訊息傳送給rabbitMQ後,rabbitMQ會根據訂閱的消費者個數,生成對應數目的訊息佇列,這樣每個消費者都能獲取生產者傳送的全部訊息。
一旦消費者斷開與rabbitMQ的連線,佇列就會消失。如果消費者數目很多,對於rabbitMQ而言,也是個重大負擔,訂閱模式是個長連線,佔用並行數,且每個消費者一個佇列會佔用大量空間
相關應用場景:郵件群發,群聊,廣播
public static void SendMessage()
{
//1.建立連線
using (var connection = RabbitMQHelper.GetConnection())
{
//2.建立通道
using(var channel = connection.CreateModel())
{
// 3.宣告交換機物件
channel.ExchangeDeclare("fanout_exchange", "fanout");
// 4.建立佇列
string queueName1 = "fanout_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "fanout_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "fanout_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 5.繫結到互動機
// fanout_exchange 繫結了 3個佇列
channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");//指定交換機
channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");
for (int i = 0; i < 10; i++)
{
//6.構建訊息byte陣列
string message = $"RabbitMQ Fanout {i + 1} Message";
var body = Encoding.UTF8.GetBytes(message);
//7.傳送訊息
channel.BasicPublish("fanout_exchange", "", null, body);//同時把訊息傳送到訂閱的三個佇列
Console.WriteLine("Send Fanout {0} message",i + 1);
}
}
}
}
}
public class FanoutConsumer
{
public static void ConsumerMessage()
{
//1.建立連線
var connection = RabbitMQHelper.GetConnection();
{
//2,。建立通道
var channel = connection.CreateModel();
{
//3.申明exchange
channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
// 4.建立佇列
string queueName1 = "fanout_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "fanout_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "fanout_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 5.繫結到互動機
channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");
Console.WriteLine("[*] Waitting for fanout logs.");
//6.申明consumer
var consumer = new EventingBasicConsumer(channel);
//繫結訊息接收後的事件委託
consumer.Received += (model, ea) => {
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("[x] {0}", message);
};
//7.啟動消費者
channel.BasicConsume(queue: queueName1, autoAck: true, consumer: consumer);//只會消費佇列queueName1中的訊息,其他佇列中訂閱的訊息仍然存在
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
一個訊息生產者,一個交換機,多個佇列,多個訊息消費者。一個交換機繫結多個訊息佇列,每個訊息佇列都有自己唯一的Routekey,每一個訊息佇列有一個消費者監聽。
訊息生產者將訊息傳送給交換機,交換機按照路由判斷,將路由到的RouteKey的訊息,推播與之繫結的佇列,交換機根據路由的key,只能匹配上路由key對應的訊息佇列,對應的消費者才能消費訊息;
public static void SendMessage()
{
//1.建立連線
using (var connection = RabbitMQHelper.GetConnection())
{
//2.建立通道
using(var channel = connection.CreateModel())
{
// 3.宣告Direct交換機
channel.ExchangeDeclare("direct_exchange", "direct");
// 4.建立佇列
string queueName1 = "direct_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "direct_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "direct_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 5.繫結到互動機 指定routingKey
channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");
for (int i = 0; i < 10; i++)
{
string message = $"RabbitMQ Direct {i + 1} Message =>green";
var body = Encoding.UTF8.GetBytes(message);
// 傳送訊息的時候需要指定routingKey傳送
channel.BasicPublish(exchange: "direct_exchange", routingKey: "green", null, body);//只發布到RouteKey:green的佇列
Console.WriteLine("Send Direct {0} message",i + 1);
}
}
}
}
}
public class DirectConsumer
{
public static void ConsumerMessage()
{
//1.建立連線
var connection = RabbitMQHelper.GetConnection();
//2.建立通訊
var channel = connection.CreateModel();
//3.宣告交換機
channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
//4.繫結交換機
var queueName = "direct_queue2";//佇列direct_queue3繫結有red,yellow,green共3個RouteKey
channel.QueueDeclare(queueName, false, false, false, null);
//此處消費通訊沒有必要繫結所有的RouteKey,根據前生產者通訊的路由規則,每個佇列中只會路由到一種訊息
channel.QueueBind(queue: queueName,
exchange: "direct_exchange",
routingKey: "red");
channel.QueueBind(queue: queueName,
exchange: "direct_exchange",
routingKey: "yellow");
channel.QueueBind(queue: queueName,
exchange: "direct_exchange",
routingKey: "green");
Console.WriteLine(" [*] Waiting for messages.");
//5.範例化消費者
var consumer = new EventingBasicConsumer(channel);
//6.為消費者繫結消費委託事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
// 消費完成後需要手動簽收訊息,如果不寫該程式碼就容易導致重複消費問題
//7.手動確認簽收訊息
channel.BasicAck(ea.DeliveryTag, true); // 可以降低每次簽收效能損耗
};
// 訊息簽收模式
// 手動簽收 保證正確消費,不會丟訊息(基於使用者端而已)
// 自動簽收 容易丟訊息
// 簽收:意味著訊息從佇列中刪除
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);//設定為不自動簽收,進行手動簽收
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
一個訊息生產者,一個交換機,多個佇列,多個訊息消費者。一個交換機繫結多個訊息佇列,每個訊息佇列都有自己唯一的Routekey,每一個訊息佇列有一個消費者監聽。
此時的自己唯一的Routekey,不是一個確定值,像我們熟悉的正規表示式對應的匹配規則。
生產者產生訊息,把訊息交給交換機,交換機根據RouteKey的模糊匹配到對應的佇列,由佇列監聽消費者消費訊息。
規則:
*代表多個單詞
public static void SendMessage()
{
//1.建立連線
using (var connection = RabbitMQHelper.GetConnection())
{
//2.建立通道
using (var channel = connection.CreateModel())
{
//3.宣告交換機
channel.ExchangeDeclare("topic_exchange", "topic");
//4.宣告佇列
string queueName1 = "topic_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "topic_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "topic_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
//5.繫結到互動機
channel.QueueBind(queue: queueName1, exchange: "topic_exchange", routingKey: "user.data.*");
channel.QueueBind(queue: queueName2, exchange: "topic_exchange", routingKey: "user.data.delete");
channel.QueueBind(queue: queueName3, exchange: "topic_exchange", routingKey: "user.data.update");
for (int i = 0; i < 10; i++)
{
//6.準備傳送位元組陣列
string message = $"RabbitMQ Topic {i + 1} Delete Message";
var body = Encoding.UTF8.GetBytes(message);
//7.根據RouteKey釋出訊息
channel.BasicPublish("topic_exchange", "user.data.delete", null, body);//會發布到queueName1,queueName2
Console.WriteLine("Send Topic {0} message", i + 1);
}
}
}
}
public static void ConsumerMessage()
{
//1.建立連線
var connection = RabbitMQHelper.GetConnection();
//2.建立通訊
var channel = connection.CreateModel();
//3.宣告交換機
channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
//4.宣告佇列
var queueName = "topic_queue3";
channel.QueueDeclare(queueName, false, false, false, null);
//5.繫結交換機
channel.QueueBind(queue: queueName,
exchange: "topic_exchange",
routingKey: "user.data.*");
Console.WriteLine(" [*] Waiting for messages.");
//6.建立消費者
var consumer = new EventingBasicConsumer(channel);
//7.繫結消費委託事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
};
//8.啟動消費
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
RPC即使用者端遠端呼叫伺服器端的方法 ,使用MQ可以實現RPC的非同步呼叫,基於Direct交換機實現,流程如下:
1、使用者端即是生產者也是消費者,向RPC請求佇列傳送RPC呼叫訊息,同時監聽RPC響應佇列。
2、伺服器端監聽RPC請求佇列的訊息,收到訊息後執行伺服器端的方法,得到方法返回的結果。
3、伺服器端將RPC方法 的結果傳送到RPC響應佇列。
4、使用者端(RPC呼叫方)監聽RPC響應佇列,接收到RPC呼叫結果。
本文來自部落格園,作者:農碼一生,轉載請註明原文連結:https://www.cnblogs.com/wml-it/p/16536123.html