目前我們已經完成了商品和搜尋系統的開發。我們思考一下,是否存在問題?
如果我們在後台修改了商品的價格,搜尋頁面依然是舊的價格,這樣顯然不對。該如何解決?
這裏有兩種解決方案:
以上兩種方式都有同一個嚴重問題:就是程式碼耦合,後臺服務中需要嵌入搜尋和商品頁面服務,違背了微服務的獨立
原則。
所以,我們會通過另外一種方式來解決這個問題:訊息佇列
訊息佇列,即MQ,Message Queue。
訊息佇列是典型的:生產者、消費者模型。生產者不斷向訊息佇列中生產訊息,消費者不斷的從佇列中獲取訊息。因爲訊息的生產和消費都是非同步的,而且只關心訊息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。
結合前面所說的問題:
如果以後有其它系統也依賴商品服務的數據,同樣監聽訊息即可,商品服務無需任何程式碼修改。
MQ是訊息通訊的模型,並不是具體實現。現在實現MQ的有兩種主流方式:AMQP、JMS。
兩者間的區別和聯繫:
RabbitMQ是基於AMQP的一款訊息管理系統
官網: http://www.rabbitmq.com/
官方教學:http://www.rabbitmq.com/getstarted.html
官網下載地址:http://www.rabbitmq.com/download.html
下載映象:docker pull rabbitmq:management
建立範例並啓動:
docker run -d --name rabbitmq --publish 5671:5671 \
--publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
rabbitmq:management
注:
4369 – erlang發現口
5672 --client端通訊口
15672 – 管理介面ui埠
25672 – server間內部通訊口
在web瀏覽器中輸入地址:http://虛擬機器ip:15672/
輸入預設賬號: guest : guest
overview:概覽
connections:無論生產者還是消費者,都需要與RabbitMQ建立連線後纔可以完成訊息的生產和消費,在這裏可以檢視連線情況
channels:通道,建立連線後,會形成通道,訊息的投遞獲取依賴通道。
Exchanges:交換機,用來實現訊息的路由
Queues:佇列,即訊息佇列,訊息存放在佇列中,等待消費,消費後被移除佇列。
埠:
5672: rabbitMq的程式語言用戶端連線埠
15672:rabbitMq管理介面埠
25672:rabbitMq叢集的埠
如果不使用guest,我們也可以自己建立一個使用者:
1、 超級管理員(administrator)
可登陸管理控制檯,可檢視所有的資訊,並且可以對使用者,策略(policy)進行操作。
2、 監控者(monitoring)
可登陸管理控制檯,同時可以檢視rabbitmq節點的相關資訊(進程數,記憶體使用情況,磁碟使用情況等)
3、 策略制定者(policymaker)
可登陸管理控制檯, 同時可以對policy進行管理。但無法檢視節點的相關資訊(上圖紅框標識的部分)。
4、 普通管理者(management)
僅可登陸管理控制檯,無法看到節點資訊,也無法對策略進行管理。
5、 其他
無法登陸管理控制檯,通常就是普通的生產者和消費者。
虛擬主機:類似於mysql中的database。他們都是以「/」開頭
RabbitMQ提供了6種訊息模型,但是第6種其實是RPC,並不是MQ,因此不予學習。那麼也就剩下5種。
但是其實3、4、5這三種都屬於訂閱模型,只不過進行路由的方式不同。
我們通過一個demo工程來了解下RabbitMQ的工作方式,匯入工程:
依賴:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.rabbitmq</groupId>
<artifactId>itcast-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
我們抽取一個建立RabbitMQ連線的工具類,方便其他程式獲取連線:
public class ConnectionUtil {
/**
* 建立與RabbitMQ的連線
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連線工廠
ConnectionFactory factory = new ConnectionFactory();
//設定服務地址
factory.setHost("192.168.77.168");
//埠
factory.setPort(5672);
//設定賬號資訊,使用者名稱、密碼、vhost
factory.setVirtualHost("/wjzcc");
factory.setUsername("wjzcc");
factory.setPassword("wjzcc");
// 通過工程獲取連線
Connection connection = factory.newConnection();
return connection;
}
}
官方介紹:
RabbitMQ是一個訊息代理:它接受和轉發訊息。 你可以把它想象成一個郵局:當你把郵件放在郵箱裏時,你可以確定郵差先生最終會把郵件發送給你的收件人。 在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
RabbitMQ與郵局的主要區別是它不處理紙張,而是接受,儲存和轉發數據訊息的二進制數據塊。
P(producer/ publisher):生產者,一個發送訊息的使用者應用程式。
C(consumer):消費者,消費和接收有類似的意思,消費者是一個主要用來等待接收訊息的使用者應用程式
佇列(紅色區域):rabbitmq內部類似於郵箱的一個概念。雖然訊息流經rabbitmq和你的應用程式,但是它們只能儲存在佇列中。佇列只受主機的記憶體和磁碟限制,實質上是一個大的訊息緩衝區。許多生產者可以發送訊息到一個佇列,許多消費者可以嘗試從一個佇列接收數據。
總之:
生產者將訊息發送到佇列,消費者從佇列中獲取訊息,佇列是儲存訊息的緩衝區。
我們將用Java編寫兩個程式;發送單個訊息的生產者,以及接收訊息並將其列印出來的消費者。我們將詳細介紹Java API中的一些細節,這是一個訊息傳遞的「Hello World」。
我們將呼叫我們的訊息發佈者(發送者)Send和我們的訊息消費者(接收者)Recv。發佈者將連線到RabbitMQ,發送一條訊息,然後退出。
public class Send {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 從連線中建立通道,這是完成大部分API的地方。
Channel channel = connection.createChannel();
// 宣告(建立)佇列,必須宣告佇列才能 纔能夠發送訊息,我們可以把訊息發送到佇列中。
// 宣告一個佇列是冪等的 - 只有當它不存在時纔會被建立
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 訊息內容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//關閉通道和連線
channel.close();
connection.close();
}
}
控制檯:
進入佇列頁面,可以看到新建了一個佇列:simple_queue
點選佇列名稱,進入詳情頁,可以檢視訊息:
在控制檯檢視訊息並不會將訊息消費,所以訊息還在。
public class Recv {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 建立通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
}
};
// 監聽佇列,第二個參數:是否自動進行訊息確認。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
控制檯:
這個時候,佇列中的訊息就沒了:
我們發現,消費者已經獲取了訊息,但是程式沒有停止,一直在監聽佇列中是否有新的訊息。一旦有新的訊息進入佇列,就會立即列印.
面試題:如何避免訊息丟失?
訊息的丟失,在MQ角度考慮,一般有三種途徑:
生產者/消費者保證訊息不丟失有兩種實現方式:
開啓事務會大幅降低訊息發送及接收效率,使用的相對較少,因此我們生產環境一般都採取訊息確認模式,我們只是講解訊息確認模式及訊息持久化
生產者確認機制 機製有很嚴重的效能問題,如果每秒鐘只有幾百的訊息量,可以使用。所以,我們主要講了消費者的訊息確認機制 機製。
生產者確認
// 開啓訊息確認機制 機製
channel.confirmSelect();
// 訊息是否正常發送到交換機
channel.addConfirmListener((long deliveryTag, boolean multiple) -> {
System.out.println("訊息發送成功!");
}, (long deliveryTag, boolean multiple) -> {
// 此種情況無法演示
System.out.println("訊息發送失敗!");
});
通過剛纔的案例可以看出,訊息一旦被消費者接收,佇列中的訊息就會被刪除。
那麼問題來了:RabbitMQ怎麼知道訊息被接收了呢?
如果消費者領取訊息後,還沒執行操作就掛掉了呢?或者拋出了異常?訊息消費失敗,但是RabbitMQ無從得知,這樣訊息就丟失了!
因此,RabbitMQ有一個ACK機制 機製。當消費者獲取訊息後,會向RabbitMQ發送回執ACK,告知訊息已經被接收。不過這種回執ACK分兩種情況:
大家覺得哪種更好呢?
這需要看訊息的重要性:
我們之前的測試都是自動ACK的,如果要手動ACK,需要改動我們的程式碼:
public class Recv2 {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 建立通道
final Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
// 手動進行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 監聽佇列,第二個參數false,手動進行ACK
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
注意到最後一行程式碼:
channel.basicConsume(QUEUE_NAME, false, consumer);
如果第二個參數爲true,則會自動進行ACK;如果爲false,則需要手動ACK。方法的宣告:
修改消費者,新增異常,如下:
生產者不做任何修改,直接執行,訊息發送成功:
執行消費者,程式拋出異常。但是訊息依然被消費:
管理介面:
修改消費者,把自動改成手動(去掉之前製造的異常)
生產者不變,再次執行:
執行消費者
但是,檢視管理介面,發現:
停掉消費者的程式,發現:
這是因爲雖然我們設定了手動ACK,但是程式碼中並沒有進行訊息確認!所以訊息並未被真正消費掉。
當我們關掉這個消費者,訊息的狀態再次稱爲Ready
修改程式碼手動ACK:
執行:
訊息消費成功!
工作佇列或者競爭消費者模式
在第一篇教學中,我們編寫了一個程式,從一個命名佇列中發送並接受訊息。在這裏,我們將建立一個工作佇列,在多個工作者之間分配耗時任務。
工作佇列,又稱任務佇列。主要思想就是避免執行資源密集型任務時,必須等待它執行完成。相反我們稍後完成任務,我們將任務封裝爲訊息並將其發送到佇列。 在後台執行的工作進程將獲取任務並最終執行作業。當你執行許多消費者時,任務將在他們之間共用,但是一個訊息只能被一個消費者獲取。
這個概念在Web應用程式中特別有用,因爲在短的HTTP請求視窗中無法處理複雜的任務。
接下來我們來模擬這個流程:
P:生產者:任務的發佈者
C1:消費者,領取任務並且完成任務,假設完成速度較快
C2:消費者2:領取任務並完成任務,假設完成速度慢
面試題:避免訊息堆積?
1)採用workqueue,多個消費者監聽同一佇列。
2)接收到訊息以後,而是通過執行緒池,非同步消費。
生產者與案例1中的幾乎一樣:
public class Send {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 回圈發佈任務
for (int i = 0; i < 50; i++) {
// 訊息內容
String message = "task .. " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 2);
}
// 關閉通道和連線
channel.close();
connection.close();
}
}
不過這裏我們是回圈發送50條訊息。
與消費者1基本類似,就是沒有設定消費耗時時間。
這裏是模擬有些消費者快,有些比較慢。
接下來,兩個消費者一同啓動,然後發送50條訊息:
可以發現,兩個消費者各自消費了25條訊息,而且各不相同,這就實現了任務的分發。
剛纔的實現有問題嗎?
現在的狀態屬於是把任務平均分配,正確的做法應該是消費越快的人,消費的越多。
怎麼實現呢?
我們可以使用basicQos方法和prefetchCount = 1設定。 這告訴RabbitMQ一次不要向工作人員發送多於一條訊息。 或者換句話說,不要向工作人員發送新訊息,直到它處理並確認了前一個訊息。 相反,它會將其分派給不是仍然忙碌的下一個工作人員。
再次測試:
在之前的模式中,我們建立了一個工作佇列。 工作佇列背後的假設是:每個任務只被傳遞給一個工作人員。 在這一部分,我們將做一些完全不同的事情 - 我們將會傳遞一個資訊給多個消費者。 這種模式被稱爲「發佈/訂閱」。
訂閱模型示意圖:
解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個佇列
3、生產者沒有將訊息直接發送到佇列,而是發送到了交換機
4、每個佇列都要系結到交換機
5、生產者發送的訊息,經過交換機到達佇列,實現一個訊息被多個消費者獲取的目的
X(Exchanges):交換機一方面:接收生產者發送的訊息。另一方面:知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄。到底如何操作,取決於Exchange的型別。
Exchange型別有以下幾種:
Fanout:廣播,將訊息交給所有系結到交換機的佇列
Direct:定向,把訊息交給符合指定routing key 的佇列
Topic:萬用字元,把訊息交給符合routing pattern(路由模式) 的佇列
我們這裏先學習
Fanout:即廣播模式
Exchange(交換機)只負責轉發訊息,不具備儲存訊息的能力,因此如果沒有任何佇列與Exchange系結,或者沒有符合路由規則的佇列,那麼訊息會丟失!
Fanout,也稱爲廣播。
流程圖:
在廣播模式下,訊息發送流程是這樣的:
兩個變化:
public class Send {
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告exchange,指定型別爲fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 訊息內容
String message = "Hello everyone";
// 發佈訊息到Exchange
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生產者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
public class Recv {
private final static String QUEUE_NAME = "fanout_exchange_queue_1";
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 系結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者1] received : " + msg + "!");
}
};
// 監聽佇列,自動返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
要注意程式碼中:佇列需要和交換機系結
public class Recv2 {
private final static String QUEUE_NAME = "fanout_exchange_queue_2";
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 系結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者2] received : " + msg + "!");
}
};
// 監聽佇列,手動返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
我們執行兩個消費者,然後發送1條訊息:
有選擇性的接收訊息
在訂閱模式中,生產者發佈訊息,所有消費者都可以獲取所有訊息。
在路由模式中,我們將新增一個功能 - 我們將只能訂閱一部分訊息。 例如,我們只能將重要的錯誤訊息引導到日誌檔案(以節省磁碟空間),同時仍然能夠在控制檯上列印所有日誌訊息。
但是,在某些場景下,我們希望不同的訊息被不同的佇列消費。這時就要用到Direct型別的Exchange。
在Direct模型下,佇列與交換機的系結,不能是任意系結了,而是要指定一個RoutingKey(路由key)
訊息的發送方在向Exchange發送訊息時,也必須指定訊息的routing key。
P:生產者,向Exchange發送訊息,發送訊息時,會指定一個routing key。
X:Exchange(交換機),接收生產者的訊息,然後把訊息遞交給 與routing key完全匹配的佇列
C1:消費者,其所在佇列指定了需要routing key 爲 error 的訊息
C2:消費者,其所在佇列指定了需要routing key 爲 info、error、warning 的訊息
此處我們模擬商品的增刪改,發送訊息的RoutingKey分別是:insert、update、delete
public class Send {
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告exchange,指定型別爲direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 訊息內容
String message = "商品新增了, id = 1001";
// 發送訊息,並且指定routing key 爲:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
System.out.println(" [商品服務:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
我們此處假設消費者1只接收兩種型別的訊息:更新商品和刪除商品。
public class Recv {
private final static String QUEUE_NAME = "direct_exchange_queue_1";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 系結佇列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete訊息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者1] received : " + msg + "!");
}
};
// 監聽佇列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
我們此處假設消費者2接收所有型別的訊息:新增商品,更新商品和刪除商品。
public class Recv2 {
private final static String QUEUE_NAME = "direct_exchange_queue_2";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 系結佇列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者2] received : " + msg + "!");
}
};
// 監聽佇列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
我們分別發送增、刪、改的RoutingKey,發現結果:
Topic
型別的Exchange
與Direct
相比,都是可以根據RoutingKey
把訊息路由到不同的佇列。只不過Topic
型別Exchange
可以讓佇列在系結Routing key
的時候使用萬用字元!
Routingkey
一般都是有一個或多個單詞組成,多個單詞之間以」.」分割,例如: item.insert
萬用字元規則:
`#`:匹配一個或多個詞
`*`:匹配不多不少恰好1個詞
舉例:
`audit.#`:能夠匹配`audit.irs.corporate` 或者 `audit.irs`
`audit.*`:只能匹配`audit.irs`
在這個例子中,我們將發送所有描述動物的訊息。訊息將使用由三個字(兩個點)組成的routing key發送。路由關鍵字中的第一個單詞將描述速度,第二個顏色和第三個種類:「..」。
我們建立了三個系結:Q1系結了系結鍵「* .orange.」,Q2系結了「.*.rabbit」和「lazy.#」。
Q1匹配所有的橙色動物。
Q2匹配關於兔子以及懶惰動物的訊息。
練習,生產者發送如下訊息,會進入那個佇列:
quick.orange.rabbit Q1 Q2
lazy.orange.elephant
quick.orange.fox
lazy.pink.rabbit
quick.brown.fox
quick.orange.male.rabbit
orange
使用topic型別的Exchange,發送訊息的routing key有3種: item.isnert
、item.update
、item.delete
:
public class Send {
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告exchange,指定型別爲topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 訊息內容
String message = "新增商品 : id = 1001";
// 發送訊息,並且指定routing key 爲:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
System.out.println(" [商品服務:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
我們此處假設消費者1只接收兩種型別的訊息:更新商品和刪除商品
public class Recv {
private final static String QUEUE_NAME = "topic_exchange_queue_1";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 系結佇列到交換機,同時指定需要訂閱的routing key。需要 update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者1] received : " + msg + "!");
}
};
// 監聽佇列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
我們此處假設消費者2接收所有型別的訊息:新增商品,更新商品和刪除商品。
/**
* 消費者2
*/
public class Recv2 {
private final static String QUEUE_NAME = "topic_exchange_queue_2";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 獲取到連線
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 系結佇列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
// 定義佇列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即訊息體
String msg = new String(body);
System.out.println(" [消費者2] received : " + msg + "!");
}
};
// 監聽佇列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
要將訊息持久化,前提是:佇列、Exchange都持久化
Sprin有很多不同的專案,其中就有對AMQP的支援:
Spring AMQP的頁面:http://spring.io/projects/spring-amqp
注意這裏一段描述:
Spring-amqp是對AMQP協定的抽象實現,而spring-rabbit 是對協定的具體實現,也是目前的唯一實現。底層使用的就是RabbitMQ。
新增AMQP的啓動器:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml
中新增RabbitMQ地址:
spring:
rabbitmq:
host: 192.168.56.101
username: fengge
password: fengge
virtual-host: /fengge
在SpringAmqp中,對訊息的消費者進行了封裝和抽象,一個普通的JavaBean中的普通方法,只要通過簡單的註解,就可以成爲一個消費者。
@Component
public class Listener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.test.queue", durable = "true"),
exchange = @Exchange(
value = "spring.test.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"a.#"}))
public void listen(String msg){
System.out.println("接收到訊息:" + msg);
}
}
@Componet
:類上的註解,註冊到Spring容器@RabbitListener
:方法上的註解,宣告這個方法是一個消費者方法,需要指定下面 下麪的屬性:
bindings
:指定系結關係,可以有多個。值是@QueueBinding
的陣列。@QueueBinding
包含下面 下麪屬性:
value
:這個消費者關聯的佇列。值是@Queue
,代表一個佇列exchange
:佇列所系結的交換機,值是@Exchange
型別key
:佇列和交換機系結的RoutingKey
類似listen這樣的方法在一個類中可以寫多個,就代表多個消費者。
Spring最擅長的事情就是封裝,把他人的框架進行封裝和整合。
Spring爲AMQP提供了統一的訊息處理模板:AmqpTemplate,非常方便的發送訊息,其發送方法:
紅框圈起來的是比較常用的3個方法,分別是:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testSend() throws InterruptedException {
String msg = "hello, Spring boot amqp";
this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
// 等待10秒後再結束
Thread.sleep(10000);
}
}
執行後檢視日誌:
內容如下:
/**
* @Description 訊息發送確認
* <p>
* ConfirmCallback 只確認訊息是否正確到達 Exchange 中
* ReturnCallback 訊息沒有正確到達佇列時觸發回撥,如果正確到達佇列不執行
* <p>
* 1. 如果訊息沒有到exchange,則confirm回撥,ack=false
* 2. 如果訊息到達exchange,則confirm回撥,ack=true
* 3. exchange到queue成功,則不回撥return
* 4. exchange到queue失敗,則回撥return
* @Author qy
*/
@Configuration
@Slf4j
public class ProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback
}
/**
* 確認訊息是否正確到達 Exchange 中
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("訊息發送成功:" + JSON.toJSONString(correlationData));
} else {
log.info("訊息發送失敗:{} 數據:{}", cause, JSON.toJSONString(correlationData));
}
}
/**
* 訊息沒有正確到達佇列時觸發回撥,如果正確到達佇列不執行
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 反序列化物件輸出
System.out.println("訊息主體: " + new String(message.getBody()));
System.out.println("應答碼: " + replyCode);
System.out.println("描述:" + replyText);
System.out.println("訊息使用的交換器 exchange : " + exchange);
System.out.println("訊息使用的路由鍵 routing : " + routingKey);
}
}
測試1:訊息正常發送,正常消費
測試2:訊息到達交換機,沒有達到佇列
測試3:訊息不能到達交換機
springboot-rabbit提供了三種訊息確認模式:
設定方法:
spring.rabbitmq.listener.simple.acknowledge-mode=manual/none/auto
在消費者中製造一個異常:
可以看到mq將無限重試,消費訊息:
訊息將無法消費:
停掉應用訊息回到Ready狀態,訊息不會丟失!
在application.yml修改確認模式爲none:
保留消費者中的int i = 1/0異常,再次執行,程式報錯:
訊息已經被消費:
確認訊息:
// 參數二:是否批次確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
拒絕訊息:
// 參數二:是否重新入隊,false時訊息不再重發,如果設定了死信佇列則進入死信佇列,沒有死信就會被丟棄
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
不確認訊息:
// 參數二:是否批次; 參數三:是否重新回到佇列,true重新入隊
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
改造消費者監聽器程式碼如下:
@Component
public class Listener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.test.queue", durable = "true"),
exchange = @Exchange(
value = "spring.test.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"a.*"}))
public void listen(String msg, Channel channel, Message message) throws IOException {
try {
System.out.println("接收到訊息:" + msg);
int i = 1 / 0;
// 確認收到訊息,false只確認當前consumer一個訊息收到,true確認所有consumer獲得的訊息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("訊息重試後依然失敗,拒絕再次接收");
// 拒絕訊息,不再重新入隊(如果系結了死信佇列訊息會進入死信佇列,沒有系結死信佇列則訊息被丟棄,也可以把失敗訊息記錄到redis或者mysql中),也可以設定爲true再重試。
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("訊息消費時出現異常,即將再次返回佇列處理");
// Nack訊息,重新入隊(重試一次)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
e.printStackTrace();
}
}
}
輸出日誌如下:
接收到訊息:hello, Spring boot amqp
訊息消費時出現異常,即將再次返回佇列處理
java.lang.ArithmeticException: / by zero
at com.atuigu.rabbitmq.spring.Listener.listen(Listener.java:31)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
........................
2020-03-29 16:56:20.432 INFO 23432 --- [16.116.100:5672] c.a.rabbitmq.spring.ProducerAckConfig : 訊息發送成功:null
接收到訊息:hello, Spring boot amqp
訊息重試後依然失敗,拒絕再次接收
java.lang.ArithmeticException: / by zero
at com.atuigu.rabbitmq.spring.Listener.listen(Listener.java:31)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
.........................
死信,在官網中對應的單詞爲「Dead Letter」,可以看出翻譯確實非常的簡單粗暴。那麼死信是個什麼東西呢?
「死信」是RabbitMQ中的一種訊息機制 機製,當你在消費訊息時,如果佇列裡的訊息出現以下情況:
channel.basicNack
或 channel.basicReject
,並且此時requeue
屬性被設定爲false
。那麼該訊息將成爲「死信」。
「死信」訊息會被RabbitMQ進行特殊處理,如果設定了死信佇列資訊,那麼該訊息將會被丟進死信佇列中,如果沒有設定,則該訊息將會被丟棄。
死信的佇列的使用,大概可以分爲以下步驟:
在設定類中增加設定:
內容如下:
/**
* 宣告業務交換機
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("spring.test.exchange", true, false);
}
/**
* 宣告業務佇列
* 並把死信交換機系結到業務佇列
* @return
*/
@Bean
public Queue queue() {
Map<String, Object> arguments = new HashMap<>();
// x-dead-letter-exchange 這裏宣告當前佇列系結的死信交換機
arguments.put("x-dead-letter-exchange", "dead-exchange");
// x-dead-letter-routing-key 這裏宣告當前佇列的死信路由key
arguments.put("x-dead-letter-routing-key", "msg.dead");
return new Queue("spring.test.queue", true, false, false, arguments);
}
/**
* 業務佇列系結到業務交換機
*
* @return
*/
@Bean
public Binding binding() {
return new Binding("spring.test.queue", Binding.DestinationType.QUEUE, "spring.test.exchange", "a.b", null);
}
/**
* 宣告死信交換機
* @return
*/
@Bean
public TopicExchange deadExchange(){
return new TopicExchange("dead-exchange", true, false);
}
/**
* 宣告死信佇列
* @return
*/
@Bean
public Queue deadQueue(){
return new Queue("dead-queue", true, false, false);
}
/**
* 把死信佇列系結到死信交換機
* @return
*/
@Bean
public Binding deadBinding() {
return new Binding("dead-queue", Binding.DestinationType.QUEUE, "dead-exchange", "msg.dead", null);
}
改造消費者監聽器:
注意:測試前,需要把專案停掉,並在rabbitmq瀏覽器控制檯刪除之前宣告好的交換機及佇列
執行測試後:
可以看到spring.test.queue有了系結死信交換機,死信訊息已經進入死信佇列。
延時佇列
,最重要的特性就體現在它的延時
屬性上,跟普通的佇列不一樣的是,普通佇列中的元素總是等着希望被早點取出處理,而延時佇列中的元素則是希望被在指定時間得到取出和處理
,所以延時佇列中的元素是都是帶時間屬性的,通常來說是需要被處理的訊息或者任務。
簡單來說,延時佇列就是用來存放需要在指定時間被處理的元素的佇列。
那麼什麼時候需要用延時佇列呢?考慮一下以下場景:
這些任務看起來似乎可以使用定時任務,一直輪詢數據,每秒查一次,取出需要被處理的數據,然後處理不就完事了嗎?如果數據量比較少,確實可以這樣做,比如:對於「如果賬單一週內未支付則進行自動結算」這樣的需求,如果對於時間不是嚴格限制,而是寬鬆意義上的一週,那麼每天晚上跑個定時任務檢查一下所有未支付的賬單,確實也是一個可行的方案。但對於數據量比較大,並且時效性較強的場景,如:「訂單十分鐘內未支付則關閉「,短期內未支付的訂單數據可能會有很多,活動期間甚至會達到百萬甚至千萬級別,對這麼龐大的數據量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內無法完成所有訂單的檢查,同時會給數據庫帶來很大壓力,無法滿足業務要求而且效能低下。
更重要的一點是,不!優!雅!
延時佇列需要設定TTL,那麼什麼時TTL呢?
訊息的TTL(Time To Live)就是訊息的存活時間,單位是毫秒。我們可以對佇列或者訊息設定TTL,訊息如果在TTL設定的時間內沒有被消費,則會成爲「死信」。如果同時設定了佇列的TTL和訊息的TTL,那麼較小的那個值將會被使用。
佇列設定TTL:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
訊息設定TTL:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
這樣這條訊息的過期時間也被設定成了6s。
但這兩種方式是有區別的,如果設定了佇列的TTL屬性,那麼一旦訊息過期,就會被佇列丟棄,而第二種方式,訊息即使過期,也不一定會被馬上丟棄,因爲訊息是否過期是在即將投遞到消費者之前判定的,如果當前佇列有嚴重的訊息積壓情況,則已過期的訊息也許還能存活較長時間。
另外,還需要注意的一點是,如果不設定TTL,表示訊息永遠不會過期,如果將TTL設定爲0,則表示除非此時可以直接投遞該訊息到消費者,否則該訊息將會被丟棄。
實現如下:
設定延時佇列及死信佇列:
@Configuration
public class TTLQueueConfig {
/**
* 交換機
* @return
*/
@Bean
public Exchange exchange(){
return new TopicExchange("ORDER-EXCHANGE", true, false, null);
}
/**
* 延時佇列
* @return
*/
@Bean("ORDER-TTL-QUEUE")
public Queue ttlQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "ORDER-EXCHANGE");
arguments.put("x-dead-letter-routing-key", "order.close");
arguments.put("x-message-ttl", 120000); // 僅僅用於測試,實際根據需求,通常30分鐘或者15分鐘
return new Queue("ORDER-TTL-QUEUE", true, false, false, arguments);
}
/**
* 延時佇列系結到交換機
* rountingKey:order.create
* @return
*/
@Bean("ORDER-TTL-BINDING")
public Binding ttlBinding(){
return new Binding("ORDER-TTL-QUEUE", Binding.DestinationType.QUEUE, "ORDER-EXCHANGE", "order.create", null);
}
/**
* 死信佇列
* @return
*/
@Bean("ORDER-CLOSE-QUEUE")
public Queue queue(){
return new Queue("ORDER-CLOSE-QUEUE", true, false, false, null);
}
/**
* 死信佇列系結到交換機
* routingKey:order.close
* @return
*/
@Bean("ORDER-CLOSE-BINDING")
public Binding closeBinding(){
return new Binding("ORDER-CLOSE-QUEUE", Binding.DestinationType.QUEUE, "ORDER-EXCHANGE", "order.close", null);
}
}
在MqDemo測試類中新增發送訊息的測試用例:
@Test
public void testTTL() throws IOException {
this.rabbitTemplate.convertAndSend("ORDER-EXCHANGE", "order.create", "hello world!");
System.in.read();
}
新增消費者,消費死信訊息:
@Component
public class DeadListener {
@RabbitListener(queues = "ORDER-CLOSE-QUEUE")
public void testDead(String msg){
System.out.println(msg);
}
}