MQ全稱爲Message Queue,訊息佇列是應用程式和應用程式之間的通訊方法。
爲什麼使用MQ
在專案中,可將一些無需即時返回且耗時的操作提取出來,進行非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。
開發中訊息佇列通常有如下應用場景:
1、任務非同步處理
將不需要同步處理的並且耗時長的操作由訊息佇列通知訊息接收方進行非同步處理。提高了應用程式的響應時間。
2、應用程式解耦合
MQ相當於一箇中介,生產方通過MQ與消費方互動,它將應用程式進行解耦合。
3、削峯填谷
如訂單系統,在下單的時候就會往數據庫寫數據。但是數據庫只能支撐每秒1000左右的併發寫入,併發量再高就容易宕機。低峯期的時候併發也就100多個,但是在高峯期時候,併發量會突然激增到5000以上,這個時候數據庫肯定卡死了。
訊息被MQ儲存起來了,然後系統就可以按照自己的消費能力來消費,比如每秒1000個數據,這樣慢慢寫入數據庫,這樣就不會卡死數據庫了。
但是使用了MQ之後,限制消費訊息的速度爲1000,但是這樣一來,高峯期產生的數據勢必會被積壓在MQ中,高峯就被「削」掉了。但是因爲訊息積壓,在高峯期過後的一段時間內,消費訊息的速度還是會維持在1000QPS,直到消費完積壓的訊息,這就叫做「填谷」
MQ是訊息通訊的模型;實現MQ的大致有兩種主流方式:AMQP、JMS。
AMQP是一種協定,更準確的說是一種binary wire-level protocol(鏈接協定)。這是其和JMS的本質差別,AMQP不從API層進行限定,而是直接定義網路交換的數據格式。
AMQP,即 Advanced Message Queuing Protocol(高階訊息佇列協定),是一個網路協定,是應用層協定的一個開放標準,爲訊息導向中介層設計。基於此協定的用戶端與訊息中介軟體可傳遞訊息,並不受用戶端/中介軟體不同產品,不同的開發語言等條件的限制。2006年,AMQP 規範發佈。類比HTTP。
JMS即Java訊息服務(JavaMessage Service)應用程式介面,是一個Java平臺中關於訊息導向中介軟體(MOM)的API,用於在兩個應用程式之間,或分佈式系統中發送訊息,進行非同步通訊。
市場上常見的訊息佇列有如下:
RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高階訊息佇列協定)協定實現的訊息佇列,它是一種應用程式之間的通訊方法,訊息佇列在分佈式系統開發中應用非常廣泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6種模式(現在更新爲7種了):簡單模式,work模式,Publish/Subscribe發佈與訂閱模式,Routing路由模式,Topics主題模式,RPC遠端呼叫模式(遠端呼叫,不太算MQ;暫不作介紹);
官網對應模式介紹:https://www.rabbitmq.com/getstarted.html
Broker:接收和分發訊息的應用,RabbitMQ Server就是 Message Broker
Virtual host:出於多租戶和安全因素設計的,把 AMQP 的基本元件劃分到一個虛擬的分組中,類似於網路中的 namespace 概念。當多個不同的使用者使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個使用者在自己的 vhost 建立 exchange/queue 等
Connection:publisher/consumer 和 broker 之間的 TCP 連線
Channel:如果每一次存取 RabbitMQ 都建立一個 Connection,在訊息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連線,如果應用程式支援多執行緒,通常每個thread建立單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助用戶端和message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作爲輕量級的 Connection 極大減少了操作系統建立 TCP connection 的開銷
Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發訊息到queue 中去。常用的型別有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:訊息最終被送到這裏等待 consumer 取走
Binding:exchange 和 queue 之間的虛擬連線,binding 中可以包含 routing key。Binding 資訊被儲存到 exchange 中的查詢表中,用於 message 的分發依據
準備工作:
建立maven工程
在
pom.xml檔案中新增如下依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
模型圖:
在上圖的模型中,有以下概念:
public class HelloWorld_Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//宣告(建立)佇列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive,
* boolean autoDelete, Map<String, Object> arguments)
*參數:
* 1.queue:佇列名稱
* 2.durable:是否持久化,當mq重新啓動之後,還在
* 3.exclusive:是否獨佔。只能有一個消費監聽這個佇列
* 當Connection關閉時,是否刪除佇列
* 4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
* 5.arguments:參數
*
*/
//如果沒有一個名字叫做hello_word的佇列,則會建立該佇列,如果有則不會建立
channel.queueDeclare("hello_world",true,false,false,null);
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*
* 1.exchange:交換機的名字
* 2.routingKey:路由名稱
* 3.props:設定資訊
* 4.body:發送的訊息數據
*/
String body="hello rabbitmq~";
//發送訊息
channel.basicPublish("","hello_world",null,body.getBytes());
//釋放資源
channel.close();
connection.close();
}
}
public class HelloWorld_Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//宣告(建立)佇列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive,
* boolean autoDelete, Map<String, Object> arguments)
*參數:
* 1.queue:佇列名稱
* 2.durable:是否持久化,當mq重新啓動之後,還在
* 3.exclusive:是否獨佔。只能有一個消費監聽這個佇列
* 當Connection關閉時,是否刪除佇列
* 4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
* 5.arguments:參數
*
*/
//如果沒有一個名字叫做hello_word的佇列,則會建立該佇列,如果有則不會建立
channel.queueDeclare("hello_world",true,false,false,null);
/**
*
* basicConsume(String queue, boolean autoAck, Consumer callback)
*
* 1.queue:佇列的名稱
* 2.autoAck:是否自動確認
* 3.callback:回撥物件
*
*/
//接收訊息
Consumer consumer=new DefaultConsumer(channel){
/**
* 回撥方法,當收到訊息之後,會自動執行該方法
*
* @param consumerTag: 標識
* @param envelope:獲取一些資訊,交換機,路由key
* @param properties:設定資訊
* @param body:數據
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag======"+consumerTag);
System.out.println("交換機的一些資訊====="+envelope.getExchange());
System.out.println("路由key的一些訊息====="+envelope.getRoutingKey());
System.out.println("properties===="+properties);
System.out.println("body========="+new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);
//關閉資源? 不要關閉資源,保持監聽即可
}
}
測試結果:
模型圖:
Work Queues
與入門程式的簡單模式
相比,多了一個或一些消費端,多個消費端共同消費同一個佇列中的訊息。
應用場景:對於 任務過重或任務較多情況使用工作佇列可以提高任務處理的速度。
public class WorkQueues_Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//宣告(建立)佇列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive,
* boolean autoDelete, Map<String, Object> arguments)
*參數:
* 1.queue:佇列名稱
* 2.durable:是否持久化,當mq重新啓動之後,還在
* 3.exclusive:是否獨佔。只能有一個消費監聽這個佇列
* 當Connection關閉時,是否刪除佇列
* 4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
* 5.arguments:參數
*
*/
//如果沒有一個名字叫做hello_word的佇列,則會建立該佇列,如果有則不會建立
channel.queueDeclare("work_queues",true,false,false,null);
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*
* 1.exchange:交換機的名字
* 2.routingKey:路由名稱
* 3.props:設定資訊
* 4.body:發送的訊息數據
*/
for (int i=1;i<=10;i++){
String body="序號"+i+"--work!!! rabbitmq~";
//發送訊息
channel.basicPublish("","work_queues",null,body.getBytes());
}
//釋放資源
channel.close();
connection.close();
}
}
public class WorkQueues1_Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//宣告(建立)佇列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive,
* boolean autoDelete, Map<String, Object> arguments)
*參數:
* 1.queue:佇列名稱
* 2.durable:是否持久化,當mq重新啓動之後,還在
* 3.exclusive:是否獨佔。只能有一個消費監聽這個佇列
* 當Connection關閉時,是否刪除佇列
* 4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
* 5.arguments:參數
*
*/
//如果沒有一個名字叫做hello_word的佇列,則會建立該佇列,如果有則不會建立
channel.queueDeclare("work_queues",true,false,false,null);
/**
*
* basicConsume(String queue, boolean autoAck, Consumer callback)
*
* 1.佇列的名稱
* 2.是否自動確認
* 3.回撥物件
*
*/
//接收訊息
Consumer consumer=new DefaultConsumer(channel){
/**
* 回撥方法,當收到訊息之後,會自動執行該方法
*
* @param consumerTag: 標識
* @param envelope:獲取一些資訊,交換機,路由key
* @param properties:設定資訊
* @param body:數據
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body========="+new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//關閉資源? 不要關閉資源
}
}
public class WorkQueues2_Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//宣告(建立)佇列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive,
* boolean autoDelete, Map<String, Object> arguments)
*參數:
* 1.queue:佇列名稱
* 2.durable:是否持久化,當mq重新啓動之後,還在
* 3.exclusive:是否獨佔。只能有一個消費監聽這個佇列
* 當Connection關閉時,是否刪除佇列
* 4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
* 5.arguments:參數
*
*/
//如果沒有一個名字叫做hello_word的佇列,則會建立該佇列,如果有則不會建立
channel.queueDeclare("work_queues",true,false,false,null);
/**
*
* basicConsume(String queue, boolean autoAck, Consumer callback)
*
* 1.佇列的名稱
* 2.是否自動確認
* 3.回撥物件
*
*/
//接收訊息
Consumer consumer=new DefaultConsumer(channel){
/**
* 回撥方法,當收到訊息之後,會自動執行該方法
*
* @param consumerTag: 標識
* @param envelope:獲取一些資訊,交換機,路由key
* @param properties:設定資訊
* @param body:數據
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body========="+new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//關閉資源? 不要關閉資源
}
}
消費者1:
消費者2:
我們通過測試結果可以發現:
在一個佇列中如果有多個消費者,那麼消費者之間對於同一個訊息的關係是競爭的關係,但是他們是交替消費。
範例圖:
前面2個案例中,只有3個角色:
而在訂閱模型中,多了一個exchange角色,而且過程略有變化:
Exchange(交換機)只負責轉發訊息,不具備儲存訊息的能力,因此如果沒有任何佇列與Exchange系結,或者沒有符合路由規則的佇列,那麼訊息會丟失!
發佈訂閱模式:
1、每個消費者監聽自己的佇列。
2、生產者將訊息發給broker,由交換機將訊息轉發到系結此交換機的每個佇列,每個系結交換機的佇列都將接收
到訊息
public class PubSub_Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//建立交換機
/**
*
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable,
* boolean autoDelete, boolean internal, Map<String, Object> arguments)
*
* 1.exchange:交換機的名稱
* 2.type:交換機型別
* DIRECT("direct"), 定向
* FANOUT("fanout"), 扇形(廣播),發送訊息到每一個與之系結的佇列
* TOPIC("topic"), 萬用字元的方式
* HEADERS("headers"); 參數匹配
*
* 3.durable:是否持久化
* 4.autoDelete:自動刪除
* 5.internal:內部使用。一般爲false
* 6.arguments:參數
*/
String exchangeName="test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//建立佇列
String queue1Name="test_fanout_queue1";
String queue2Name="test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//系結佇列和交換機
/**
* queueBind(String queue, String exchange, String routingKey)
* 1.queue:佇列名稱
* 2.exchange:交換機名稱
* 3.routingKey:路由鍵,系結規則
* 如果交換機型別爲fanout,routingKey設定爲"",代表之後
* 會將訊息分發給每一個與之系結的佇列
*
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body="日誌資訊: 李四呼叫了findAll方法----日誌級別info----";
//發送訊息
channel.basicPublish(exchangeName,"",null , body.getBytes());
//釋放資源
channel.close();
connection.close();
}
}
public class PubSub1_Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queue1Name="test_fanout_queue1";
String queue2Name="test_fanout_queue2";
/**
*
* basicConsume(String queue, boolean autoAck, Consumer callback)
*
* 1.佇列的名稱
* 2.是否自動確認
* 3.回撥物件
*
*/
//接收訊息
Consumer consumer=new DefaultConsumer(channel){
/**
* 回撥方法,當收到訊息之後,會自動執行該方法
*
* @param consumerTag: 標識
* @param envelope:獲取一些資訊,交換機,路由key
* @param properties:設定資訊
* @param body:數據
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body========="+new String(body));
System.out.println("=====將日誌資訊列印到控制檯=====");
}
};
channel.basicConsume(queue1Name,true,consumer);
//關閉資源? 不要關閉資源
}
}
public class PubSub2_Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queue1Name="test_fanout_queue1";
String queue2Name="test_fanout_queue2";
/**
*
* basicConsume(String queue, boolean autoAck, Consumer callback)
*
* 1.佇列的名稱
* 2.是否自動確認
* 3.回撥物件
*
*/
//接收訊息
Consumer consumer=new DefaultConsumer(channel){
/**
* 回撥方法,當收到訊息之後,會自動執行該方法
*
* @param consumerTag: 標識
* @param envelope:獲取一些資訊,交換機,路由key
* @param properties:設定資訊
* @param body:數據
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body========="+new String(body));
System.out.println("=====將日誌資訊存入數據庫=====");
}
};
channel.basicConsume(queue2Name,true,consumer);
//關閉資源? 不要關閉資源
}
}
我們可以看到該交換機系結了兩個佇列,且沒有routing Key
消費者1:
消費者2:
路由模式特點:
RoutingKey
(路由key)RoutingKey
。Routing Key
進行判斷,只有佇列的Routingkey
與訊息的 Routing key
完全一致,纔會接收到訊息圖解:
比較:
在編碼上與 Publish/Subscribe發佈與訂閱模式
的區別是交換機的型別爲:Direct,還有佇列系結交換機的時候需要指定routing key。
public class Routing_Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//建立交換機
/**
*
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable,
* boolean autoDelete, boolean internal, Map<String, Object> arguments)
*
* 1.exchange:交換機的名稱
* 2.type:交換機型別
* DIRECT("direct"), 定向
* FANOUT("fanout"), 扇形(廣播),發送訊息到每一個與之系結的佇列
* TOPIC("topic"), 萬用字元的方式
* HEADERS("headers"); 參數匹配
*
* 3.durable:是否持久化
* 4.autoDelete:自動刪除
* 5.internal:內部使用。一般爲false
* 6.arguments:參數
*/
//路由模式的交換機的模式爲: direct
String exchangeName="test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//建立佇列
String queue1Name="test_direct_queue1";
String queue2Name="test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//系結佇列和交換機
/**
* queueBind(String queue, String exchange, String routingKey)
* 1.queue:佇列名稱
* 2.exchange:交換機名稱
* 3.routingKey:路由鍵,系結規則
* 如果交換機型別爲fanout,routingKey設定爲"",代表之後
* 會將訊息分發給每一個與之系結的佇列
*
*/
//佇列1系結error
channel.queueBind(queue1Name,exchangeName,"error");
//佇列2系結info、error、warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
String body="日誌資訊: 李四呼叫了findAll方法----日誌級別info----";
//發送訊息
channel.basicPublish(exchangeName,"info",null , body.getBytes());
//釋放資源
channel.close();
connection.close();
}
}
public class Routing1_Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queue1Name="test_direct_queue1";
String queue2Name="test_direct_queue2";
/**
*
* basicConsume(String queue, boolean autoAck, Consumer callback)
*
* 1.佇列的名稱
* 2.是否自動確認
* 3.回撥物件
*
*/
//接收訊息
Consumer consumer=new DefaultConsumer(channel){
/**
* 回撥方法,當收到訊息之後,會自動執行該方法
*
* @param consumerTag: 標識
* @param envelope:獲取一些資訊,交換機,路由key
* @param properties:設定資訊
* @param body:數據
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body========="+new String(body));
System.out.println("=====將日誌資訊存入數據庫=====");
}
};
channel.basicConsume(queue1Name,true,consumer);
//關閉資源? 不要關閉資源
}
}
public class Routing2_Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queue1Name="test_direct_queue1";
String queue2Name="test_direct_queue2";
/**
*
* basicConsume(String queue, boolean autoAck, Consumer callback)
*
* 1.佇列的名稱
* 2.是否自動確認
* 3.回撥物件
*
*/
//接收訊息
Consumer consumer=new DefaultConsumer(channel){
/**
* 回撥方法,當收到訊息之後,會自動執行該方法
*
* @param consumerTag: 標識
* @param envelope:獲取一些資訊,交換機,路由key
* @param properties:設定資訊
* @param body:數據
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body========="+new String(body));
System.out.println("=====將日誌資訊列印到控制檯=====");
}
};
channel.basicConsume(queue2Name,true,consumer);
//關閉資源? 不要關閉資源
}
}
路由模式下交換機切爲direct
我們將佇列1與交換機系結時的routingKey設爲error
我們將佇列2與交換機系結時的routingKey設有info、error、warning
開始測試:
我們把生產者發送訊息時的routingKey設定爲 info
結果:
消費者1:
消費者2:
我們把生產者發送訊息時的routingKey設定爲 error
結果:
消費者1:
消費者2:
Routing模式要求佇列在系結交換機時要指定routing key,訊息會轉發到符合routing key的佇列。
Topic
型別與Direct
相比,都是可以根據RoutingKey
把訊息路由到不同的佇列。只不過Topic
型別Exchange
可以讓佇列在系結Routing key
的時候使用萬用字元!
Routingkey
一般都是有一個或多個單詞組成,多個單詞之間以」.」分割,例如: savior.insert
萬用字元規則:
#
:匹配一個或多個詞
*
:匹配不多不少恰好1個詞
舉例:
savior.#
:能夠匹配savior.insert.abc
或者 savior.insert
savior.*
:只能匹配savior.insert
圖解:
usa.#
,因此凡是以 usa.
開頭的routing key
都會被匹配到#.news
,因此凡是以 .news
結尾的 routing key
都會被匹配
public class Topic_Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//建立交換機
/**
*
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable,
* boolean autoDelete, boolean internal, Map<String, Object> arguments)
*
* 1.exchange:交換機的名稱
* 2.type:交換機型別
* DIRECT("direct"), 定向
* FANOUT("fanout"), 扇形(廣播),發送訊息到每一個與之系結的佇列
* TOPIC("topic"), 萬用字元的方式
* HEADERS("headers"); 參數匹配
*
* 3.durable:是否持久化
* 4.autoDelete:自動刪除
* 5.internal:內部使用。一般爲false
* 6.arguments:參數
*/
//topic 萬用字元模式下交換機爲 topic
String exchangeName="test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//建立佇列
String queue1Name="test_topic_queue1";
String queue2Name="test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//系結佇列和交換機
/**
* queueBind(String queue, String exchange, String routingKey)
* 1.queue:佇列名稱
* 2.exchange:交換機名稱
* 3.routingKey:路由鍵,系結規則
* 如果交換機型別爲fanout,routingKey設定爲"",代表之後
* 會將訊息分發給每一個與之系結的佇列
*
*/
//佇列1 系結的routingKey爲 #.error 、order.*
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
//佇列2 系結的routingKey爲 *.*
channel.queueBind(queue2Name,exchangeName,"*.*");
String body="日誌資訊: 李四呼叫了delete方法----日誌級別error----";
//發送訊息
channel.basicPublish(exchangeName,"order.error",null , body.getBytes());
//釋放資源
channel.close();
connection.close();
}
}
public class Topic1_Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queue1Name="test_topic_queue1";
String queue2Name="test_topic_queue2";
/**
*
* basicConsume(String queue, boolean autoAck, Consumer callback)
*
* 1.佇列的名稱
* 2.是否自動確認
* 3.回撥物件
*
*/
//接收訊息
Consumer consumer=new DefaultConsumer(channel){
/**
* 回撥方法,當收到訊息之後,會自動執行該方法
*
* @param consumerTag: 標識
* @param envelope:獲取一些資訊,交換機,路由key
* @param properties:設定資訊
* @param body:數據
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body========="+new String(body));
System.out.println("=====將日誌資訊存入數據庫=====");
}
};
channel.basicConsume(queue1Name,true,consumer);
//關閉資源? 不要關閉資源
}
}
public class Topic2_Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連線工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//主機地址;預設爲 localhost
connectionFactory.setHost("192.168.237.128");
//連線埠;預設爲 5672
connectionFactory.setPort(5672);
//虛擬主機名稱;預設爲 /
connectionFactory.setVirtualHost("/savior");
//連線使用者名稱;預設爲guest
connectionFactory.setUsername("root");
//連線密碼;預設爲guest
connectionFactory.setPassword("root");
//建立新連線
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queue1Name="test_topic_queue1";
String queue2Name="test_topic_queue2";
/**
*
* basicConsume(String queue, boolean autoAck, Consumer callback)
*
* 1.佇列的名稱
* 2.是否自動確認
* 3.回撥物件
*
*/
//接收訊息
Consumer consumer=new DefaultConsumer(channel){
/**
* 回撥方法,當收到訊息之後,會自動執行該方法
*
* @param consumerTag: 標識
* @param envelope:獲取一些資訊,交換機,路由key
* @param properties:設定資訊
* @param body:數據
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body========="+new String(body));
System.out.println("=====將日誌資訊列印到控制檯=====");
}
};
channel.basicConsume(queue2Name,true,consumer);
//關閉資源? 不要關閉資源
}
}
將發送訊息時的將路由鍵routingKey設爲 order.error
佇列1 系結的routingKey爲 #.error 、order.*
佇列2 系結的routingKey爲 .
消費者1:
消費者2:
Topic主題模式可以實現 Publish/Subscribe發佈與訂閱模式
和 Routing路由模式
的功能;只是Topic在設定routing key 的時候可以使用萬用字元,顯得更加靈活。
RabbitMQ工作模式:
1、簡單模式 HelloWorld
一個生產者、一個消費者,不需要設定交換機(使用預設的交換機)
2、工作佇列模式 Work Queue
一個生產者、多個消費者(競爭關係),不需要設定交換機(使用預設的交換機)
3、發佈訂閱模式 Publish/subscribe
需要設定型別爲fanout的交換機,並且交換機和佇列進行系結,當發送訊息到交換機後,交換機會將訊息發送到系結的佇列
4、路由模式 Routing
需要設定型別爲direct的交換機,交換機和佇列進行系結,並且指定routing key,當發送訊息到交換機後,交換機會根據routing key將訊息發送到對應的佇列
5、萬用字元模式 Topic
需要設定型別爲topic的交換機,交換機和佇列進行系結,並且指定萬用字元方式的routing key,當發送訊息到交換機後,交換機會根據routing key將訊息發送到對應的佇列
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spring-rabbitmq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
在resource資料夾下建立rabbitmq.properties連線參數等組態檔;
rabbitmq.host=192.168.237.128
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.virtual-host=/savior
在resource資料夾下建立spring-rabbitmq-producer.xml 的spring組態檔;
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--載入組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定義管理交換機、佇列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定義持久化佇列,不存在則自動建立;不系結到交換機則系結到預設交換機
預設交換機型別爲direct,名字爲:"",路由鍵爲佇列的名稱
-->
<!--
id:bean的名稱
name:queue的名稱
auto-declare:自動建立
auto-delete:自動刪除。 最後一個消費者和該佇列斷開連線後,自動刪除佇列
exclusive:是否獨佔
durable:是否持久化
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~廣播;所有佇列都能收到訊息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定義廣播型別交換機;並系結上述兩個佇列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1" />
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--<rabbit:direct-exchange name="aa" >
<rabbit:bindings>
<!–direct 型別的交換機系結佇列 key :路由key queue:佇列名稱–>
<rabbit:binding queue="spring_queue" key="xxx"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>-->
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~萬用字元;*匹配一個單詞,#匹配多個單詞 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="savior.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="savior.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="savior.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定義rabbitTemplate物件操作可以在程式碼中方便發送訊息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
建立測試類,在裏面建立發送訊息
利用rabbitTemplate呼叫convertAndSend根據不同模式的需求選取不同參數的過載方法
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
RabbitTemplate rabbitTemplate;
//簡單模式下的發訊息
@Test
public void testHelloWorld(){
rabbitTemplate.convertAndSend("spring_queue","hello world spring~~~~");
}
//發佈訂閱模式下 fanout 的發訊息
@Test
public void testFanout(){
rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout~~~~");
}
//萬用字元模式下 topic 的發訊息
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("spring_topic_exchange","savior.niu.bi","savior come on from spring~~`");
}
}
新增依賴、建立組態檔rabbitmq.properties 這兩個同上生產者
建立spring-rabbitmq-consumer.xml (spring組態檔)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--載入組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<bean id="springQueueListener" class="com.duanping.rabbitmq.listener.SpringQueueListener"/>
<bean id="fanoutListener1" class="com.duanping.rabbitmq.listener.FanoutListener1"/>
<!-- <bean id="fanoutListener2" class="com.duanping.rabbitmq.listener.FanoutListener2"/>-->
<!-- <bean id="topicListenerStar" class="com.duanping.rabbitmq.listener.TopicListenerStar"/>
<bean id="topicListenerWell" class="com.duanping.rabbitmq.listener.TopicListenerWell"/>
<bean id="topicListenerWell2" class="com.duanping.rabbitmq.listener.TopicListenerWell2"/>-->
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/><!--
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
</rabbit:listener-container>
</beans>
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(new String(message.getBody()));
}
}
public class FanoutListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(new String(message.getBody()));
}
}
在Spring專案中,可以使用Spring-Rabbit去操作RabbitMQ
https://github.com/spring-projects/spring-amqp
尤其是在spring boot專案中只需要引入對應的amqp啓動器依賴即可,方便的使用RabbitTemplate發送訊息,使用註解接收訊息。
一般在開發過程中:
生產者工程:
application.yml檔案設定RabbitMQ相關資訊;
在生產者工程中編寫設定類,用於建立交換機和佇列,並進行系結
注入RabbitTemplate物件,通過RabbitTemplate物件發送訊息到交換機
消費者工程:
application.yml檔案設定RabbitMQ相關資訊
建立訊息處理類,用於接收佇列中的訊息並進行處理
利用springboot快速構建工程
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.237.128
port: 5672
username: root
password: root
virtual-host: /savior
建立RabbitMQ佇列與交換機系結的設定類
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME="boot_topic_exchange";
public static final String QUEUE_NAME="boot_queue";
//交換機
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//佇列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//佇列和交換機系結關係 Binding
/*
1.知道哪個佇列
2.知道哪個交換機
3.routing Key
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
@SpringBootTest
class DemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test1(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.nice","springboot rabbitMQ~~~");
}
}
新增依賴和生產者相同
建立application.yml
spring:
rabbitmq:
host: 192.168.237.128
port: 5672
username: root
password: root
virtual-host: /savior
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println(new String(message.getBody()));
}
}