訊息中介軟體RabbitMQ

2020-08-13 15:56:22

1. RabbitMQ

1.1. 現實問題

目前我們已經完成了商品和搜尋系統的開發。我們思考一下,是否存在問題?

  • 商品的原始數據儲存在數據庫中,增刪改查都在數據庫中完成。
  • 搜尋服務數據來源是索引庫,如果數據庫商品發生變化,索引庫數據不能及時更新。

如果我們在後台修改了商品的價格,搜尋頁面依然是舊的價格,這樣顯然不對。該如何解決?

這裏有兩種解決方案:

  • 方案1:每當後臺對商品做增刪改操作,同時要修改索引庫數據
  • 方案2:搜尋服務對外提供操作介面,後臺在商品增刪改後,呼叫介面

以上兩種方式都有同一個嚴重問題:就是程式碼耦合,後臺服務中需要嵌入搜尋和商品頁面服務,違背了微服務的獨立原則。

所以,我們會通過另外一種方式來解決這個問題:訊息佇列

1.2. 訊息佇列(MQ)

1.2.1. 什麼是訊息佇列

訊息佇列,即MQ,Message Queue。

在这里插入图片描述

訊息佇列是典型的:生產者、消費者模型。生產者不斷向訊息佇列中生產訊息,消費者不斷的從佇列中獲取訊息。因爲訊息的生產和消費都是非同步的,而且只關心訊息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。

結合前面所說的問題:

  • 商品服務對商品增刪改以後,無需去操作索引庫,只是發送一條訊息,也不關心訊息被誰接收。
  • 搜尋服務服務接收訊息,去處理索引庫。

如果以後有其它系統也依賴商品服務的數據,同樣監聽訊息即可,商品服務無需任何程式碼修改。

1.2.2. AMQP和JMS

MQ是訊息通訊的模型,並不是具體實現。現在實現MQ的有兩種主流方式:AMQP、JMS。

在这里插入图片描述
在这里插入图片描述

兩者間的區別和聯繫:

  • JMS是定義了統一的介面,來對訊息操作進行統一;AMQP是通過規定協定來統一數據互動的格式
  • JMS限定了必須使用Java語言;AMQP只是協定,不規定實現方式,因此是跨語言的。
  • JMS規定了兩種訊息模型;而AMQP的訊息模型更加豐富

1.2.3. 常見MQ產品

  • ActiveMQ:基於JMS
  • RabbitMQ:基於AMQP協定,erlang語言開發,穩定性好
  • RocketMQ:基於JMS,阿裡巴巴產品,目前交由Apache基金會
  • Kafka:分佈式訊息系統,高吞吐量

1.2.4. RabbitMQ

RabbitMQ是基於AMQP的一款訊息管理系統

官網: http://www.rabbitmq.com/

官方教學:http://www.rabbitmq.com/getstarted.html

1.3. 下載和安裝

1.3.1. 下載

官網下載地址:http://www.rabbitmq.com/download.html

1.3.2. 安裝

下載映象:docker pull rabbitmq:management

建立範例並啓動:

docker run -d --name rabbitmq --publish 5671:5671 \
--publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
rabbitmq:management

注:
4369 – erlang發現口
5672 --client端通訊口

15672 – 管理介面ui埠
25672 – server間內部通訊口

1.3.3. 測試

在web瀏覽器中輸入地址:http://虛擬機器ip:15672/

輸入預設賬號: guest : guest

overview:概覽

connections:無論生產者還是消費者,都需要與RabbitMQ建立連線後纔可以完成訊息的生產和消費,在這裏可以檢視連線情況

channels:通道,建立連線後,會形成通道,訊息的投遞獲取依賴通道。

Exchanges:交換機,用來實現訊息的路由

Queues:佇列,即訊息佇列,訊息存放在佇列中,等待消費,消費後被移除佇列。

埠:

5672: rabbitMq的程式語言用戶端連線埠

15672:rabbitMq管理介面埠

25672:rabbitMq叢集的埠

1.4. 管理介面

1.4.1. 新增使用者

如果不使用guest,我們也可以自己建立一個使用者:

1、 超級管理員(administrator)

可登陸管理控制檯,可檢視所有的資訊,並且可以對使用者,策略(policy)進行操作。

2、 監控者(monitoring)

可登陸管理控制檯,同時可以檢視rabbitmq節點的相關資訊(進程數,記憶體使用情況,磁碟使用情況等)

3、 策略制定者(policymaker)

可登陸管理控制檯, 同時可以對policy進行管理。但無法檢視節點的相關資訊(上圖紅框標識的部分)。

4、 普通管理者(management)

僅可登陸管理控制檯,無法看到節點資訊,也無法對策略進行管理。

5、 其他

無法登陸管理控制檯,通常就是普通的生產者和消費者。

1.4.2. 建立Virtual Hosts

虛擬主機:類似於mysql中的database。他們都是以「/」開頭

1.4.3. 設定許可權

2. 五種訊息模型

RabbitMQ提供了6種訊息模型,但是第6種其實是RPC,並不是MQ,因此不予學習。那麼也就剩下5種。

但是其實3、4、5這三種都屬於訂閱模型,只不過進行路由的方式不同。

我們通過一個demo工程來了解下RabbitMQ的工作方式,匯入工程:

依賴:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>cn.itcast.rabbitmq</groupId>
	<artifactId>itcast-rabbitmq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.2.RELEASE</version>
	</parent>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>3.3.2</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
		</dependency>
	</dependencies>
</project>

我們抽取一個建立RabbitMQ連線的工具類,方便其他程式獲取連線:

public class ConnectionUtil {
    /**
     * 建立與RabbitMQ的連線
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定義連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定服務地址
        factory.setHost("192.168.77.168");
        //埠
        factory.setPort(5672);
        //設定賬號資訊,使用者名稱、密碼、vhost
        factory.setVirtualHost("/wjzcc");
        factory.setUsername("wjzcc");
        factory.setPassword("wjzcc");
        // 通過工程獲取連線
        Connection connection = factory.newConnection();
        return connection;
    }
}

2.1. 基本訊息模型

官方介紹:

RabbitMQ是一個訊息代理:它接受和轉發訊息。 你可以把它想象成一個郵局:當你把郵件放在郵箱裏時,你可以確定郵差先生最終會把郵件發送給你的收件人。 在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。

RabbitMQ與郵局的主要區別是它不處理紙張,而是接受,儲存和轉發數據訊息的二進制數據塊。

P(producer/ publisher):生產者,一個發送訊息的使用者應用程式。

C(consumer):消費者,消費和接收有類似的意思,消費者是一個主要用來等待接收訊息的使用者應用程式

佇列(紅色區域):rabbitmq內部類似於郵箱的一個概念。雖然訊息流經rabbitmq和你的應用程式,但是它們只能儲存在佇列中。佇列只受主機的記憶體和磁碟限制,實質上是一個大的訊息緩衝區。許多生產者可以發送訊息到一個佇列,許多消費者可以嘗試從一個佇列接收數據。

總之:

生產者將訊息發送到佇列,消費者從佇列中獲取訊息,佇列是儲存訊息的緩衝區。

我們將用Java編寫兩個程式;發送單個訊息的生產者,以及接收訊息並將其列印出來的消費者。我們將詳細介紹Java API中的一些細節,這是一個訊息傳遞的「Hello World」。

我們將呼叫我們的訊息發佈者(發送者)Send和我們的訊息消費者(接收者)Recv。發佈者將連線到RabbitMQ,發送一條訊息,然後退出。

2.1.1. 生產者發送訊息

public class Send {

    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 從連線中建立通道,這是完成大部分API的地方。
        Channel channel = connection.createChannel();

        // 宣告(建立)佇列,必須宣告佇列才能 纔能夠發送訊息,我們可以把訊息發送到佇列中。
        // 宣告一個佇列是冪等的 - 只有當它不存在時纔會被建立
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 訊息內容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //關閉通道和連線
        channel.close();
        connection.close();
    }
}

控制檯:

2.1.2. 管理工具中檢視訊息

進入佇列頁面,可以看到新建了一個佇列:simple_queue

點選佇列名稱,進入詳情頁,可以檢視訊息:

在控制檯檢視訊息並不會將訊息消費,所以訊息還在。

2.1.3. 消費者獲取訊息

public class Recv {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 建立通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
            }
        };
        // 監聽佇列,第二個參數:是否自動進行訊息確認。
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

控制檯:

這個時候,佇列中的訊息就沒了:

我們發現,消費者已經獲取了訊息,但是程式沒有停止,一直在監聽佇列中是否有新的訊息。一旦有新的訊息進入佇列,就會立即列印.

2.1.4. 生產者訊息確認機制 機製

面試題:如何避免訊息丟失?

訊息的丟失,在MQ角度考慮,一般有三種途徑:

  1. 生產者確認發送到MQ伺服器(生產者確認機制 機製)
  2. MQ伺服器不丟數據(訊息持久化)
  3. 消費者確認消費掉訊息(消費者確認機制 機製)

生產者/消費者保證訊息不丟失有兩種實現方式:

  1. 開啓事務模式
  2. 訊息確認模式

開啓事務會大幅降低訊息發送及接收效率,使用的相對較少,因此我們生產環境一般都採取訊息確認模式,我們只是講解訊息確認模式及訊息持久化

  1. 生產者的ACK機制 機製。有時,業務處理成功,訊息也發了,但是我們並不知道訊息是否成功到達了rabbitmq,例如:由於網路等原因導致業務成功而訊息發送失敗,此時可以使用rabbitmq的發送確認功能,要求rabbitmq顯式告知我們訊息是否已成功發送。
  2. 消費者的ACK機制 機製。可以防止消費者丟失訊息。

生產者確認機制 機製有很嚴重的效能問題,如果每秒鐘只有幾百的訊息量,可以使用。所以,我們主要講了消費者的訊息確認機制 機製。

生產者確認

// 開啓訊息確認機制 機製
channel.confirmSelect();
// 訊息是否正常發送到交換機
channel.addConfirmListener((long deliveryTag, boolean multiple) -> {
    System.out.println("訊息發送成功!");
}, (long deliveryTag, boolean multiple) -> {
    // 此種情況無法演示
    System.out.println("訊息發送失敗!");
});

2.1.5. 消費者訊息確認機制 機製(ACK)

通過剛纔的案例可以看出,訊息一旦被消費者接收,佇列中的訊息就會被刪除。

那麼問題來了:RabbitMQ怎麼知道訊息被接收了呢?

如果消費者領取訊息後,還沒執行操作就掛掉了呢?或者拋出了異常?訊息消費失敗,但是RabbitMQ無從得知,這樣訊息就丟失了!

因此,RabbitMQ有一個ACK機制 機製。當消費者獲取訊息後,會向RabbitMQ發送回執ACK,告知訊息已經被接收。不過這種回執ACK分兩種情況:

  • 自動ACK:訊息一旦被接收,消費者自動發送ACK
  • 手動ACK:訊息接收後,不會發送ACK,需要手動呼叫

大家覺得哪種更好呢?

這需要看訊息的重要性:

  • 如果訊息不太重要,丟失也沒有影響,那麼自動ACK會比較方便
  • 如果訊息非常重要,不容丟失。那麼最好在消費完成後手動ACK,否則接收訊息後就自動ACK,RabbitMQ就會把訊息從佇列中刪除。如果此時消費者宕機,那麼訊息就丟失了。

我們之前的測試都是自動ACK的,如果要手動ACK,需要改動我們的程式碼:

public class Recv2 {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 建立通道
        final Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
                // 手動進行ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 監聽佇列,第二個參數false,手動進行ACK
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

注意到最後一行程式碼:

channel.basicConsume(QUEUE_NAME, false, consumer);

如果第二個參數爲true,則會自動進行ACK;如果爲false,則需要手動ACK。方法的宣告:

2.1.5.1. 自動ACK存在的問題

修改消費者,新增異常,如下:

生產者不做任何修改,直接執行,訊息發送成功:

執行消費者,程式拋出異常。但是訊息依然被消費:

管理介面:

2.1.5.2. 演示手動ACK

修改消費者,把自動改成手動(去掉之前製造的異常)

生產者不變,再次執行:

執行消費者

但是,檢視管理介面,發現:

停掉消費者的程式,發現:

這是因爲雖然我們設定了手動ACK,但是程式碼中並沒有進行訊息確認!所以訊息並未被真正消費掉。

當我們關掉這個消費者,訊息的狀態再次稱爲Ready

修改程式碼手動ACK:

執行:

訊息消費成功!

2.2. work訊息模型

工作佇列或者競爭消費者模式

在第一篇教學中,我們編寫了一個程式,從一個命名佇列中發送並接受訊息。在這裏,我們將建立一個工作佇列,在多個工作者之間分配耗時任務。

工作佇列,又稱任務佇列。主要思想就是避免執行資源密集型任務時,必須等待它執行完成。相反我們稍後完成任務,我們將任務封裝爲訊息並將其發送到佇列。 在後台執行的工作進程將獲取任務並最終執行作業。當你執行許多消費者時,任務將在他們之間共用,但是一個訊息只能被一個消費者獲取

這個概念在Web應用程式中特別有用,因爲在短的HTTP請求視窗中無法處理複雜的任務。

接下來我們來模擬這個流程:

P:生產者:任務的發佈者

C1:消費者,領取任務並且完成任務,假設完成速度較快

C2:消費者2:領取任務並完成任務,假設完成速度慢

面試題:避免訊息堆積?

1)採用workqueue,多個消費者監聽同一佇列。

2)接收到訊息以後,而是通過執行緒池,非同步消費。

2.2.1. 生產者

生產者與案例1中的幾乎一樣:

public class Send {
    private final static String QUEUE_NAME = "test_work_queue";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 回圈發佈任務
        for (int i = 0; i < 50; i++) {
            // 訊息內容
            String message = "task .. " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 2);
        }
        // 關閉通道和連線
        channel.close();
        connection.close();
    }
}

不過這裏我們是回圈發送50條訊息。

2.2.2. 消費者1

2.2.3. 消費者2

與消費者1基本類似,就是沒有設定消費耗時時間。

這裏是模擬有些消費者快,有些比較慢。

接下來,兩個消費者一同啓動,然後發送50條訊息:

可以發現,兩個消費者各自消費了25條訊息,而且各不相同,這就實現了任務的分發。

2.2.4. 能者多勞

剛纔的實現有問題嗎?

  • 消費者1比消費者2的效率要低,一次任務的耗時較長
  • 然而兩人最終消費的訊息數量是一樣的
  • 消費者2大量時間處於空閒狀態,消費者1一直忙碌

現在的狀態屬於是把任務平均分配,正確的做法應該是消費越快的人,消費的越多。

怎麼實現呢?

我們可以使用basicQos方法和prefetchCount = 1設定。 這告訴RabbitMQ一次不要向工作人員發送多於一條訊息。 或者換句話說,不要向工作人員發送新訊息,直到它處理並確認了前一個訊息。 相反,它會將其分派給不是仍然忙碌的下一個工作人員。

再次測試:

2.3. 訂閱模型分類

在之前的模式中,我們建立了一個工作佇列。 工作佇列背後的假設是:每個任務只被傳遞給一個工作人員。 在這一部分,我們將做一些完全不同的事情 - 我們將會傳遞一個資訊給多個消費者。 這種模式被稱爲「發佈/訂閱」。

訂閱模型示意圖:

解讀:

1、1個生產者,多個消費者

2、每一個消費者都有自己的一個佇列

3、生產者沒有將訊息直接發送到佇列,而是發送到了交換機

4、每個佇列都要系結到交換機

5、生產者發送的訊息,經過交換機到達佇列,實現一個訊息被多個消費者獲取的目的

X(Exchanges):交換機一方面:接收生產者發送的訊息。另一方面:知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄。到底如何操作,取決於Exchange的型別。

Exchange型別有以下幾種:

Fanout:廣播,將訊息交給所有系結到交換機的佇列

Direct:定向,把訊息交給符合指定routing key 的佇列 

Topic:萬用字元,把訊息交給符合routing pattern(路由模式) 的佇列

我們這裏先學習

Fanout:即廣播模式

Exchange(交換機)只負責轉發訊息,不具備儲存訊息的能力,因此如果沒有任何佇列與Exchange系結,或者沒有符合路由規則的佇列,那麼訊息會丟失!

2.4. 訂閱模型-Fanout

Fanout,也稱爲廣播。

流程圖:

在廣播模式下,訊息發送流程是這樣的:

  • 1) 可以有多個消費者
  • 2) 每個消費者有自己的queue(佇列)
  • 3) 每個佇列都要系結到Exchange(交換機)
  • 4) 生產者發送的訊息,只能發送到交換機,交換機來決定要發給哪個佇列,生產者無法決定。
  • 5) 交換機把訊息發送給系結過的所有佇列
  • 6) 佇列的消費者都能拿到訊息。實現一條訊息被多個消費者消費

2.4.1. 生產者

兩個變化:

  • 1) 宣告Exchange,不再宣告Queue
  • 2) 發送訊息到Exchange,不再發送到Queue
public class Send {

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        
        // 宣告exchange,指定型別爲fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        
        // 訊息內容
        String message = "Hello everyone";
        // 發佈訊息到Exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [生產者] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

2.4.2. 消費者1

public class Recv {
    private final static String QUEUE_NAME = "fanout_exchange_queue_1";

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 系結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者1] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

要注意程式碼中:佇列需要和交換機系結

2.4.3. 消費者2

public class Recv2 {
    private final static String QUEUE_NAME = "fanout_exchange_queue_2";

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 系結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        
        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者2] received : " + msg + "!");
            }
        };
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.4.4. 測試

我們執行兩個消費者,然後發送1條訊息:

2.5. 訂閱模型-Direct

有選擇性的接收訊息

在訂閱模式中,生產者發佈訊息,所有消費者都可以獲取所有訊息。

在路由模式中,我們將新增一個功能 - 我們將只能訂閱一部分訊息。 例如,我們只能將重要的錯誤訊息引導到日誌檔案(以節省磁碟空間),同時仍然能夠在控制檯上列印所有日誌訊息。

但是,在某些場景下,我們希望不同的訊息被不同的佇列消費。這時就要用到Direct型別的Exchange。

在Direct模型下,佇列與交換機的系結,不能是任意系結了,而是要指定一個RoutingKey(路由key)

訊息的發送方在向Exchange發送訊息時,也必須指定訊息的routing key。

P:生產者,向Exchange發送訊息,發送訊息時,會指定一個routing key。

X:Exchange(交換機),接收生產者的訊息,然後把訊息遞交給 與routing key完全匹配的佇列

C1:消費者,其所在佇列指定了需要routing key 爲 error 的訊息

C2:消費者,其所在佇列指定了需要routing key 爲 info、error、warning 的訊息

2.5.1. 生產者

此處我們模擬商品的增刪改,發送訊息的RoutingKey分別是:insert、update、delete

public class Send {
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告exchange,指定型別爲direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 訊息內容
        String message = "商品新增了, id = 1001";
        // 發送訊息,並且指定routing key 爲:insert ,代表新增商品
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
        System.out.println(" [商品服務:] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

2.5.2. 消費者1

我們此處假設消費者1只接收兩種型別的訊息:更新商品和刪除商品。

public class Recv {
    private final static String QUEUE_NAME = "direct_exchange_queue_1";
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 系結佇列到交換機,同時指定需要訂閱的routing key。假設此處需要update和delete訊息
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者1] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.5.3. 消費者2

我們此處假設消費者2接收所有型別的訊息:新增商品,更新商品和刪除商品。

public class Recv2 {
    private final static String QUEUE_NAME = "direct_exchange_queue_2";
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 系結佇列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者2] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.5.4. 測試

我們分別發送增、刪、改的RoutingKey,發現結果:

2.6. 訂閱模型-Topic

Topic型別的ExchangeDirect相比,都是可以根據RoutingKey把訊息路由到不同的佇列。只不過Topic型別Exchange可以讓佇列在系結Routing key 的時候使用萬用字元!

Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以」.」分割,例如: item.insert

萬用字元規則:

`#`:匹配一個或多個詞

`*`:匹配不多不少恰好1個詞

舉例:

`audit.#`:能夠匹配`audit.irs.corporate` 或者 `audit.irs`

`audit.*`:只能匹配`audit.irs`

在這個例子中,我們將發送所有描述動物的訊息。訊息將使用由三個字(兩個點)組成的routing key發送。路由關鍵字中的第一個單詞將描述速度,第二個顏色和第三個種類:「..」。

我們建立了三個系結:Q1系結了系結鍵「* .orange.」,Q2系結了「.*.rabbit」和「lazy.#」。

Q1匹配所有的橙色動物。

Q2匹配關於兔子以及懶惰動物的訊息。

練習,生產者發送如下訊息,會進入那個佇列:

quick.orange.rabbit Q1 Q2

lazy.orange.elephant

quick.orange.fox

lazy.pink.rabbit

quick.brown.fox

quick.orange.male.rabbit

orange

2.6.1. 生產者

使用topic型別的Exchange,發送訊息的routing key有3種: item.isnertitem.updateitem.delete

public class Send {
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告exchange,指定型別爲topic
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 訊息內容
        String message = "新增商品 : id = 1001";
        // 發送訊息,並且指定routing key 爲:insert ,代表新增商品
        channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
        System.out.println(" [商品服務:] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

2.6.2. 消費者1

我們此處假設消費者1只接收兩種型別的訊息:更新商品和刪除商品

public class Recv {
    private final static String QUEUE_NAME = "topic_exchange_queue_1";
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 系結佇列到交換機,同時指定需要訂閱的routing key。需要 update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者1] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.6.3. 消費者2

我們此處假設消費者2接收所有型別的訊息:新增商品,更新商品和刪除商品。

/**
 * 消費者2
 */
public class Recv2 {
    private final static String QUEUE_NAME = "topic_exchange_queue_2";
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線
        Connection connection = ConnectionUtil.getConnection();
        // 獲取通道
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 系結佇列到交換機,同時指定需要訂閱的routing key。訂閱 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");

        // 定義佇列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 獲取訊息,並且處理,這個方法類似事件監聽,如果有訊息的時候,會被自動呼叫
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即訊息體
                String msg = new String(body);
                System.out.println(" [消費者2] received : " + msg + "!");
            }
        };
        // 監聽佇列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

2.7. 持久化

要將訊息持久化,前提是:佇列、Exchange都持久化

2.7.1. 交換機持久化

2.7.2. 佇列持久化

2.7.3. 訊息持久化

3. Spring AMQP

3.1. 簡介

Sprin有很多不同的專案,其中就有對AMQP的支援:

Spring AMQP的頁面:http://spring.io/projects/spring-amqp

注意這裏一段描述:

     Spring-amqp是對AMQP協定的抽象實現,而spring-rabbit 是對協定的具體實現,也是目前的唯一實現。底層使用的就是RabbitMQ。

3.2. 入門程式

3.2.1. 依賴和設定

新增AMQP的啓動器:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml中新增RabbitMQ地址:

spring:
  rabbitmq:
    host: 192.168.56.101
    username: fengge
    password: fengge
    virtual-host: /fengge

3.2.2. 監聽者(消費者)

在SpringAmqp中,對訊息的消費者進行了封裝和抽象,一個普通的JavaBean中的普通方法,只要通過簡單的註解,就可以成爲一個消費者。

@Component
public class Listener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "spring.test.queue", durable = "true"),
            exchange = @Exchange(
                    value = "spring.test.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"a.#"}))
    public void listen(String msg){
        System.out.println("接收到訊息:" + msg);
    }
}

  • @Componet:類上的註解,註冊到Spring容器
  • @RabbitListener:方法上的註解,宣告這個方法是一個消費者方法,需要指定下面 下麪的屬性:
    • bindings:指定系結關係,可以有多個。值是@QueueBinding的陣列。@QueueBinding包含下面 下麪屬性:
      • value:這個消費者關聯的佇列。值是@Queue,代表一個佇列
      • exchange:佇列所系結的交換機,值是@Exchange型別
      • key:佇列和交換機系結的RoutingKey

類似listen這樣的方法在一個類中可以寫多個,就代表多個消費者。

3.2.3. AmqpTemplate(生產者)

Spring最擅長的事情就是封裝,把他人的框架進行封裝和整合。

Spring爲AMQP提供了統一的訊息處理模板:AmqpTemplate,非常方便的發送訊息,其發送方法:

紅框圈起來的是比較常用的3個方法,分別是:

  • 指定交換機、RoutingKey和訊息體
  • 指定訊息
  • 指定RoutingKey和訊息,會向預設的交換機發送訊息

3.2.4. 測試程式碼

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testSend() throws InterruptedException {
        String msg = "hello, Spring boot amqp";
        this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
        // 等待10秒後再結束
        Thread.sleep(10000);
    }
}

執行後檢視日誌:

3.3. 生產者確認

內容如下:

/**
 * @Description 訊息發送確認
 * <p>
 * ConfirmCallback  只確認訊息是否正確到達 Exchange 中
 * ReturnCallback   訊息沒有正確到達佇列時觸發回撥,如果正確到達佇列不執行
 * <p>
 * 1. 如果訊息沒有到exchange,則confirm回撥,ack=false
 * 2. 如果訊息到達exchange,則confirm回撥,ack=true
 * 3. exchange到queue成功,則不回撥return
 * 4. exchange到queue失敗,則回撥return
 * @Author qy
 */
@Configuration
@Slf4j
public class ProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback
    }

    /**
     * 確認訊息是否正確到達 Exchange 中
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("訊息發送成功:" + JSON.toJSONString(correlationData));
        } else {
            log.info("訊息發送失敗:{} 數據:{}", cause, JSON.toJSONString(correlationData));
        }
    }

    /**
     * 訊息沒有正確到達佇列時觸發回撥,如果正確到達佇列不執行
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化物件輸出
        System.out.println("訊息主體: " + new String(message.getBody()));
        System.out.println("應答碼: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("訊息使用的交換器 exchange : " + exchange);
        System.out.println("訊息使用的路由鍵 routing : " + routingKey);
    }
}

測試1:訊息正常發送,正常消費

測試2:訊息到達交換機,沒有達到佇列

測試3:訊息不能到達交換機

3.4. 消費者確認

springboot-rabbit提供了三種訊息確認模式:

  • AcknowledgeMode.NONE:不確認模式(不管程式是否異常只要執行了監聽方法,訊息即被消費。相當於rabbitmq中的自動確認,這種方式不推薦使用)
  • AcknowledgeMode.AUTO:自動確認模式(預設,消費者沒有異常會自動確認,有異常則不確認,無限重試,導致程式死回圈。不要和rabbit中的自動確認混淆)
  • AcknowledgeMode.MANUAL:手動確認模式(需要手動呼叫channel.basicAck確認,可以捕獲異常控制重試次數,甚至可以控制失敗訊息的處理方式)

設定方法:

spring.rabbitmq.listener.simple.acknowledge-mode=manual/none/auto

3.4.1. 自動確認模式

在消費者中製造一個異常:

可以看到mq將無限重試,消費訊息:

訊息將無法消費:

停掉應用訊息回到Ready狀態,訊息不會丟失!

3.4.2. 不確認模式

在application.yml修改確認模式爲none:

保留消費者中的int i = 1/0異常,再次執行,程式報錯:

訊息已經被消費:

3.4.3. 手動確認模式

確認訊息:

// 參數二:是否批次確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

拒絕訊息:

// 參數二:是否重新入隊,false時訊息不再重發,如果設定了死信佇列則進入死信佇列,沒有死信就會被丟棄
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

不確認訊息:

// 參數二:是否批次; 參數三:是否重新回到佇列,true重新入隊
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

改造消費者監聽器程式碼如下:

@Component
public class Listener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "spring.test.queue", durable = "true"),
            exchange = @Exchange(
                    value = "spring.test.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"a.*"}))
    public void listen(String msg, Channel channel, Message message) throws IOException {
        try {
            System.out.println("接收到訊息:" + msg);

            int i = 1 / 0;
            // 確認收到訊息,false只確認當前consumer一個訊息收到,true確認所有consumer獲得的訊息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                System.out.println("訊息重試後依然失敗,拒絕再次接收");
                // 拒絕訊息,不再重新入隊(如果系結了死信佇列訊息會進入死信佇列,沒有系結死信佇列則訊息被丟棄,也可以把失敗訊息記錄到redis或者mysql中),也可以設定爲true再重試。
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("訊息消費時出現異常,即將再次返回佇列處理");
                // Nack訊息,重新入隊(重試一次)
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            e.printStackTrace();
        }
    }
}

輸出日誌如下:

接收到訊息:hello, Spring boot amqp
訊息消費時出現異常,即將再次返回佇列處理
java.lang.ArithmeticException: / by zero
	at com.atuigu.rabbitmq.spring.Listener.listen(Listener.java:31)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	........................
2020-03-29 16:56:20.432  INFO 23432 --- [16.116.100:5672] c.a.rabbitmq.spring.ProducerAckConfig    : 訊息發送成功:null


接收到訊息:hello, Spring boot amqp
訊息重試後依然失敗,拒絕再次接收
java.lang.ArithmeticException: / by zero
	at com.atuigu.rabbitmq.spring.Listener.listen(Listener.java:31)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	.........................

3.5. 死信佇列

死信,在官網中對應的單詞爲「Dead Letter」,可以看出翻譯確實非常的簡單粗暴。那麼死信是個什麼東西呢?

「死信」是RabbitMQ中的一種訊息機制 機製,當你在消費訊息時,如果佇列裡的訊息出現以下情況:

  1. 訊息被否定確認,使用 channel.basicNackchannel.basicReject ,並且此時requeue 屬性被設定爲false
  2. 訊息在佇列的存活時間超過設定的TTL時間。
  3. 訊息佇列的訊息數量已經超過最大佇列長度。

那麼該訊息將成爲「死信」。

「死信」訊息會被RabbitMQ進行特殊處理,如果設定了死信佇列資訊,那麼該訊息將會被丟進死信佇列中,如果沒有設定,則該訊息將會被丟棄。

死信的佇列的使用,大概可以分爲以下步驟:

  1. 設定業務佇列,系結到業務交換機上
  2. 爲業務佇列設定死信交換機(DLX)和路由key
  3. 爲死信交換機設定死信佇列(DLQ)

在設定類中增加設定:

內容如下:

/**
     * 宣告業務交換機
     *
     * @return
     */
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("spring.test.exchange", true, false);
}

/**
     * 宣告業務佇列
     * 並把死信交換機系結到業務佇列
     * @return
     */
@Bean
public Queue queue() {
    Map<String, Object> arguments = new HashMap<>();
    //         x-dead-letter-exchange    這裏宣告當前佇列系結的死信交換機
    arguments.put("x-dead-letter-exchange", "dead-exchange");
    //         x-dead-letter-routing-key  這裏宣告當前佇列的死信路由key
    arguments.put("x-dead-letter-routing-key", "msg.dead");
    return new Queue("spring.test.queue", true, false, false, arguments);
}

/**
     * 業務佇列系結到業務交換機
     *
     * @return
     */
@Bean
public Binding binding() {
    return new Binding("spring.test.queue", Binding.DestinationType.QUEUE, "spring.test.exchange", "a.b", null);
}

/**
     * 宣告死信交換機
     * @return
     */
@Bean
public TopicExchange deadExchange(){
    return new TopicExchange("dead-exchange", true, false);
}

/**
     * 宣告死信佇列
     * @return
     */
@Bean
public Queue deadQueue(){
    return new Queue("dead-queue", true, false, false);
}

/**
     * 把死信佇列系結到死信交換機
     * @return
     */
@Bean
public Binding deadBinding() {
    return new Binding("dead-queue", Binding.DestinationType.QUEUE, "dead-exchange", "msg.dead", null);
}

改造消費者監聽器:

注意:測試前,需要把專案停掉,並在rabbitmq瀏覽器控制檯刪除之前宣告好的交換機及佇列

執行測試後:

可以看到spring.test.queue有了系結死信交換機,死信訊息已經進入死信佇列。

3.6. 延時佇列

延時佇列,最重要的特性就體現在它的延時屬性上,跟普通的佇列不一樣的是,普通佇列中的元素總是等着希望被早點取出處理,而延時佇列中的元素則是希望被在指定時間得到取出和處理,所以延時佇列中的元素是都是帶時間屬性的,通常來說是需要被處理的訊息或者任務。

簡單來說,延時佇列就是用來存放需要在指定時間被處理的元素的佇列。

3.6.1. 使用場景

那麼什麼時候需要用延時佇列呢?考慮一下以下場景:

  1. 訂單在十分鐘之內未支付則自動取消。
  2. 新建立的店鋪,如果在十天內都沒有上傳過商品,則自動發送訊息提醒。
  3. 賬單在一週內未支付,則自動結算。
  4. 使用者註冊成功後,如果三天內沒有登陸則進行簡訊提醒。
  5. 使用者發起退款,如果三天內沒有得到處理則通知相關運營人員。
  6. 預定會議後,需要在預定的時間點前十分鐘通知各個與會人員參加會議。

這些任務看起來似乎可以使用定時任務,一直輪詢數據,每秒查一次,取出需要被處理的數據,然後處理不就完事了嗎?如果數據量比較少,確實可以這樣做,比如:對於「如果賬單一週內未支付則進行自動結算」這樣的需求,如果對於時間不是嚴格限制,而是寬鬆意義上的一週,那麼每天晚上跑個定時任務檢查一下所有未支付的賬單,確實也是一個可行的方案。但對於數據量比較大,並且時效性較強的場景,如:「訂單十分鐘內未支付則關閉「,短期內未支付的訂單數據可能會有很多,活動期間甚至會達到百萬甚至千萬級別,對這麼龐大的數據量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內無法完成所有訂單的檢查,同時會給數據庫帶來很大壓力,無法滿足業務要求而且效能低下。

更重要的一點是,不!優!雅!

3.6.2. 怎麼宣告

延時佇列需要設定TTL,那麼什麼時TTL呢?

訊息的TTL(Time To Live)就是訊息的存活時間,單位是毫秒。我們可以對佇列或者訊息設定TTL,訊息如果在TTL設定的時間內沒有被消費,則會成爲「死信」。如果同時設定了佇列的TTL和訊息的TTL,那麼較小的那個值將會被使用。

佇列設定TTL:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

訊息設定TTL:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

這樣這條訊息的過期時間也被設定成了6s。

但這兩種方式是有區別的,如果設定了佇列的TTL屬性,那麼一旦訊息過期,就會被佇列丟棄,而第二種方式,訊息即使過期,也不一定會被馬上丟棄,因爲訊息是否過期是在即將投遞到消費者之前判定的,如果當前佇列有嚴重的訊息積壓情況,則已過期的訊息也許還能存活較長時間。

另外,還需要注意的一點是,如果不設定TTL,表示訊息永遠不會過期,如果將TTL設定爲0,則表示除非此時可以直接投遞該訊息到消費者,否則該訊息將會被丟棄。

3.6.3. 如何使用

實現如下:

設定延時佇列及死信佇列:

@Configuration
public class TTLQueueConfig {

    /**
     * 交換機
     * @return
     */
    @Bean
    public Exchange exchange(){

        return new TopicExchange("ORDER-EXCHANGE", true, false, null);
    }

    /**
     * 延時佇列
     * @return
     */
    @Bean("ORDER-TTL-QUEUE")
    public Queue ttlQueue(){

        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "ORDER-EXCHANGE");
        arguments.put("x-dead-letter-routing-key", "order.close");
        arguments.put("x-message-ttl", 120000); // 僅僅用於測試,實際根據需求,通常30分鐘或者15分鐘
        return new Queue("ORDER-TTL-QUEUE", true, false, false, arguments);
    }

    /**
     * 延時佇列系結到交換機
     * rountingKey:order.create
     * @return
     */
    @Bean("ORDER-TTL-BINDING")
    public Binding ttlBinding(){

        return new Binding("ORDER-TTL-QUEUE", Binding.DestinationType.QUEUE, "ORDER-EXCHANGE", "order.create", null);
    }

    /**
     * 死信佇列
     * @return
     */
    @Bean("ORDER-CLOSE-QUEUE")
    public Queue queue(){

        return new Queue("ORDER-CLOSE-QUEUE", true, false, false, null);
    }

    /**
     * 死信佇列系結到交換機
     * routingKey:order.close
     * @return
     */
    @Bean("ORDER-CLOSE-BINDING")
    public Binding closeBinding(){

        return new Binding("ORDER-CLOSE-QUEUE", Binding.DestinationType.QUEUE, "ORDER-EXCHANGE", "order.close", null);
    }
}

在MqDemo測試類中新增發送訊息的測試用例:

@Test
public void testTTL() throws IOException {

    this.rabbitTemplate.convertAndSend("ORDER-EXCHANGE", "order.create", "hello world!");
    System.in.read();
}

新增消費者,消費死信訊息:

@Component
public class DeadListener {

    @RabbitListener(queues = "ORDER-CLOSE-QUEUE")
    public void testDead(String msg){
        System.out.println(msg);
    }
}