MQ是一箇中介軟體
(1)提升系統的響應速度、非同步操作
(2)提升系統的穩定性
(3)服務之間的解耦
(4)佇列FIFO
(5)消除峯值
Rocket和Kafka適合在大型專案中使用
中小型專案我們選擇ActiveMQ和RabbitMQ
單論功能功能齊全,可以將訊息持久化到數據庫中
但是RabbitMQ高併發支援比較好(得益於erlang語言)
基於AMQP協定的,跨語言存取的
雖然ActiveMQ功能強大,任何語言也可以存取,但是Java最優
RabbitMQ的優點:
1、使用簡單,功能強大。
2、基於AMQP協定。
3、社羣活躍,文件完善。
4、高併發效能好,這主要得益於Erlang語言。
5、Spring Boot預設已整合RabbitMQ
因爲RabbitMQ是基於erlang語言編寫的,所以要想使用RabbitMQ,我們首先要在電腦上安裝erlang的環境
一鍵安裝安裝包,將安裝包下的bin設定到系統環境變數的path中
檢查是否成功設定環境
erl -version
一鍵安裝用戶端
在快速啓動欄找到start啓動
在RabbitMQ的Sbin檔案下開啓cmd,啓用命令安裝RabbitMQ的視覺化外掛
rabbitmq-plugins.bat enable rabbitmq_management
輸入網址檢視是否安裝
http://127.0.0.1:15672/
使用者名稱和密碼都輸入預設的guest
java操作RabbitMQ一共有7種模型,我們這裏只介紹5種比較常見的。
一個生產者,預設轉換機,一個佇列,一個消費者
一個生產者,一個佇列,多個消費者
一條訊息只能在一個消費者中進行消費
預設是平均分配訊息,我們可以做到按勞分配
之前都是一個訊息被一個消費者消費
某些業務場景下,一個訊息要被多個消費者消費(羣聊)
通過設定不同的訂閱模型實現
之前訊息直接發到佇列中
現在要通過交換機Exchange進行路由,只要滿足路由條件,都被分發到對應的佇列中
佇列都有系結的消費者,這時候就可以實現一個訊息多個消費者消費
關鍵點:
(1)交換機
Fanout:廣播 發送給所有系結到這個交換機的佇列
Direct:根據指定的routing key(路由),將訊息發送到指定的一個或者多個佇列中
Topic:和Direct幾乎一樣,只是routing key支援萬用字元
# 匹配任意多個單詞 * 匹配任意一個單詞
(2)路由的設定
生產者發送一條訊息到交換機,每一條佇列都可以通過交換機拉取這條訊息,然後分別傳遞給與佇列系結的消費者
4.Direct路由
路由模式
一個生產者,預設交換機,一個消費者
1.建立普通maven專案
2.導包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.1</version>
</dependency>
3.編寫連線到RabbitEQ的工具類
public class ConnectionUtil {
/**
* 建立與RabbitMQ的連線的工具類
*/
public static Connection getConnection() throws Exception{
//定義連線工廠
ConnectionFactory connectionFactory=new ConnectionFactory();
//設定服務地址
connectionFactory.setHost("127.0.0.1");
//設定埠
connectionFactory.setPort(5672);
//設定使用者名稱,密碼,VHost
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
//通過工廠獲取連線
Connection connection=connectionFactory.newConnection();
return connection;
}
}
4.編寫生產者類
/**
* 該模組實現的是Hello World模型的互動方式。
* 該類的作用是模擬一個生產者,連線到RabbitMQ,向佇列中提供一條Message;
*/
public class producer {
//定義一個常數
private static final String QUEUE_NAME="queue_hello";
public static void main(String[] args) throws Exception{
//獲取連線
Connection connection= ConnectionUtil.getConnection();
//建立通道
Channel channel=connection.createChannel();
/**
* 宣告一個佇列
* @param queue 佇列名稱
* @param durable 是否持久化佇列
* @param exclusive 專有佇列,佇列是否獨佔此連線
* @param autoDelete 是否自動刪除此佇列(沒有在使用的時候)
* @param arguments 佇列的其他參數
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//模擬一條訊息
String message = "helloworld小明" + System.currentTimeMillis();
/**
* 發佈一條訊息
* @param exchange 發佈到哪個交換機
* @param routingKey 路由的key
* @param props 訊息的其他屬性 - routing headers 等等
* @param body 訊息內容
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
//釋放資源
channel.close();
connection.close();
}
}
5.編寫消費者類
public class Consumer {
//定義一個常數,要與生產者中的值一樣,用來作爲標識
private static final String QUEUE_NAME="queue_hello";
public static void main(String[] args) throws Exception{
//建立連線
Connection connection= ConnectionUtil.getConnection();
//建立通道
Channel channel=connection.createChannel();
/**
* 宣告佇列,佇列一旦在MQ中宣告瞭,不需要反覆 反復宣告
*/
DefaultConsumer consumer=new DefaultConsumer(channel){
/**
* 訊息接收的時候自動呼叫
* @param consumerTag 消費者的標籤
* @param envelope 訊息的封裝,訊息的相關資訊都封裝到了這個物件中
* @param properties
* @param body 訊息內容
*/
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//將封裝的訊息body用message接收
String message = new String(body);
System.out.println(message);
}
};
/**
* 消費訊息
* @param queue 佇列名稱
* @param autoAck 自動回執
* @param callback 回撥 監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
*/
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
如果訊息在消費過程中(拋出異常)出現問題,導致訊息沒有正常消費,然而訊息佇列中訊息已經刪除,導致訊息丟失。所以我們可以通過訊息確認機制 機製防止這種事情發生,只有訊息正常消費,向佇列回執成功的資訊,佇列接收到回執,再刪除訊息,否則訊息一直存在。
關閉自動回執
//channel.basicConsume(QUEUE_NAME,true,consumer);
channel.basicConsume(QUEUE_NAME,false,consumer);
手動回執
//將封裝的訊息body用message接收
String message = new String(body);
System.out.println(message);
//第一個參數:訊息的標籤 第二個參數:true回執所有訊息,false僅回執當前訊息
channel.basicAck(envelope.getDeliveryTag(),false);
一個生產者,一個佇列,多個消費者
public class Consumer2 {
private static final String QUEUE_NAME = "queue_work";
public static void main(String[] args)throws Exception {
//建立連線
Connection connection = ConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
//宣告佇列 佇列一旦在MQ中存中了,不需要重複宣告的
//當前消費者只能預先拉取一條訊息進行消費,只有消費完了才能 纔能拉取另一條
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 訊息接收的時候自動呼叫
* @param consumerTag 消費者的標籤
* @param envelope 訊息的封裝,訊息的相關資訊都封裝到了這個物件中
* @param properties
* @param body 訊息內容
*/
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//模擬500毫秒,時間越短,效能越高
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body);
//int i = 1/0;//異常
System.out.println("Consumer2:"+message);
//手動回執 第一個參數 訊息的標籤 第二個參數 true回執所有訊息 false 僅僅回執當前訊息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/**
* 消費訊息
* @param queue 佇列名稱
* @param autoAck 自動回執
* @param callback 回撥 監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
*/
//channel.basicConsume(QUEUE_NAME,true,consumer);
//關閉訊息的自動確認
channel.basicConsume(QUEUE_NAME,false,consumer);
}
生產者向交換機傳遞一條訊息,訊息通過交換機系結到單個佇列,多個消費者通過輪詢的方式分別從佇列中獲取訊息。也可以通過在訊息中設定單次只能拉取一條訊息,必須等訊息消費完畢了過後才能 纔能拉取另一條,然後關閉訊息 自動確認,加入手動回執,這樣就能按效能、執行速度分配。我們這裏使用了一個執行緒來模擬消費者的效能。
生產者:
public class producer {
//定義一個常數,指定交換機的名稱
private static final String EXCHANGE_NAME="exchange_fanout";
private static final String QUEUE_NAME ="queue_fanout_1";
public static void main(String[] args)throws Exception {
//建立連線
Connection connection= ConnectionUtil.getConnection();
//建立通道
Channel channel=connection.createChannel();
/**
* 定義一個交換機
* @param exchange 交換機的名稱
* @param type 交換機的型別
*/
channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT);
//定義一條訊息
String message="今天天氣真真好!";
/**
* 發送一條訊息
*
* 訊息發送給交換機,佇列是儲存訊息的地方
* 如果我只定義了交換機,交換機並沒有系結任何的佇列
* 現在我向交換機中發送訊息,訊息丟失
*
* @param exchange 交換機名稱
* @param routingKey 路由
* @param props 訊息其他的屬性 - routing headers 等
* @param body 訊息內容
*/
channel.basicPublish("exchange_fanout","",null,message.getBytes());
System.out.println("發送了一條訊息:"+message);
//釋放資源
channel.close();
connection.close();
}
後面消費者定義的佇列必須要和這個交換機系結,不然訊息就會丟失。
public class consumer {
//定義交換機的名字
private static final String EXCHANGE_NAME="exchange_fanout";
//定義佇列的名字
private static final String QUEUE_NAME ="queue_fanout_1";
public static void main(String[] args) throws Exception{
//建立連線
Connection connection= ConnectionUtil.getConnection();
//建立通道
final Channel channel = connection.createChannel();
//宣告一個佇列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 系結佇列到交換機
* @param queue 佇列名稱
* @param exchange 交換機名稱
* @param routingKey 路由
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//當前消費者只能預先拉取一條訊息進行消費,只有消費完了才能 纔能拉取另一條
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 訊息接收的時候自動呼叫
* @param consumerTag 消費者的標籤
* @param envelope 訊息的封裝,訊息的相關資訊都封裝到了這個物件中
* @param properties
* @param body 訊息內容
*/
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//模擬500毫秒,時間越短,效能越高
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body);
//int i = 1/0;//異常
System.out.println("Consumer1:"+message);
//手動回執 第一個參數 訊息的標籤 第二個參數 true回執所有訊息 false 僅僅回執當前訊息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/**
* 消費訊息
* @param queue 佇列名稱
* @param autoAck 自動回執
* @param callback 回撥 監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
*/
//channel.basicConsume(QUEUE_NAME,true,consumer);
//關閉訊息的自動確認
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
生產者系結交換機,交換機中設定路由,消費者中系結的佇列只有與交換機中路由相對應纔可以接受到訊息
生產者:
public class Producer {
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args)throws Exception {
//獲取連線
Connection connection = ConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
/**
* 定義一個交換機
* @param exchange 交換機的名稱
* @param type 交換機的型別
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "小明註冊成功!----" + System.currentTimeMillis() ;
/**
* 發送一條訊息
*
* 訊息發送給交換機,佇列是儲存訊息的地方
* 如果我只定義了交換機,交換機並沒有系結任何的佇列
* 現在我向交換機中發送訊息,訊息丟失
*
* @param exchange 交換機名稱
* @param routingKey 路由
* @param props 訊息其他的屬性 - routing headers 等
* @param body 訊息內容
*/
channel.basicPublish(EXCHANGE_NAME,"register.sms",null,message.getBytes());
System.out.println("發送了一條訊息:"+message);
//釋放資源
channel.close();
connection.close();
}
消費者:
public class Consumer2 {
private static final String QUEUE_NAME = "queue_direct_sms";
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args)throws Exception {
//建立連線
Connection connection = ConnectionUtil.getConnection();
//建立通道
final Channel channel = connection.createChannel();
//宣告佇列 佇列一旦在MQ中存中了,不需要重複宣告的
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 系結佇列到交換機
* @param queue 佇列名稱
* @param exchange 交換機名稱
* @param routingKey 路由
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"register.sms");
//當前消費者只能預先拉取一條訊息進行消費,只有消費完了才能 纔能拉取另一條
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 訊息接收的時候自動呼叫
* @param consumerTag 消費者的標籤
* @param envelope 訊息的封裝,訊息的相關資訊都封裝到了這個物件中
* @param properties
* @param body 訊息內容
*/
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//模擬500毫秒,時間越短,效能越高
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body);
//int i = 1/0;//異常
System.out.println("sms:"+message);
//手動回執 第一個參數 訊息的標籤 第二個參數 true回執所有訊息 false 僅僅回執當前訊息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/**
* 消費訊息
* @param queue 佇列名稱
* @param autoAck 自動回執
* @param callback 回撥 監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
*/
//channel.basicConsume(QUEUE_NAME,true,consumer);
//關閉訊息的自動確認
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
與Direct路由類似,但是使用了萬用字元來實現多個佇列存取同一個交換機,其中,*可以匹配一個單詞,#可以匹配任意多個單詞
生產者:
public class Producer {
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args)throws Exception {
//獲取連線
Connection connection = ConnectionUtil.getConnection();
//建立通道
Channel channel = connection.createChannel();
/**
* 定義一個交換機
* @param exchange 交換機的名稱
* @param type 交換機的型別
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = "小明註冊成功!----" + System.currentTimeMillis() ;
/**
* 發送一條訊息
*
* 訊息發送給交換機,佇列是儲存訊息的地方
* 如果我只定義了交換機,交換機並沒有系結任何的佇列
* 現在我向交換機中發送訊息,訊息丟失
*
* @param exchange 交換機名稱
* @param routingKey 路由
* @param props 訊息其他的屬性 - routing headers 等
* @param body 訊息內容
*/
channel.basicPublish(EXCHANGE_NAME,"register.employee.user.email",null,message.getBytes());
System.out.println("發送了一條訊息:"+message);
//釋放資源
channel.close();
connection.close();
}
消費者:
public class Consumer {
private static final String QUEUE_NAME = "queue_topic_email";
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args)throws Exception {
//建立連線
Connection connection = ConnectionUtil.getConnection();
//建立通道
final Channel channel = connection.createChannel();
//宣告佇列 佇列一旦在MQ中存中了,不需要重複宣告的
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 系結佇列到交換機
* @param queue 佇列名稱
* @param exchange 交換機名稱
* @param routingKey 路由
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"register.*.email");
//當前消費者只能預先拉取一條訊息進行消費,只有消費完了才能 纔能拉取另一條
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 訊息接收的時候自動呼叫
* @param consumerTag 消費者的標籤
* @param envelope 訊息的封裝,訊息的相關資訊都封裝到了這個物件中
* @param properties
* @param body 訊息內容
*/
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//模擬500毫秒,時間越短,效能越高
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body);
//int i = 1/0;//異常
System.out.println("email:"+message);
//手動回執 第一個參數 訊息的標籤 第二個參數 true回執所有訊息 false 僅僅回執當前訊息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/**
* 消費訊息
* @param queue 佇列名稱
* @param autoAck 自動回執
* @param callback 回撥 監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
*/
//channel.basicConsume(QUEUE_NAME,true,consumer);
//關閉訊息的自動確認
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
e.printStackTrace();
}
String message = new String(body);
//int i = 1/0;//異常
System.out.println("email:"+message);
//手動回執 第一個參數 訊息的標籤 第二個參數 true回執所有訊息 false 僅僅回執當前訊息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/**
* 消費訊息
* @param queue 佇列名稱
* @param autoAck 自動回執
* @param callback 回撥 監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
*/
//channel.basicConsume(QUEUE_NAME,true,consumer);
//關閉訊息的自動確認
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}