RabbitMQ詳解(下)

2023-04-24 12:00:15

一:序

  通過《RabbitMQ詳解(上)》一文中,我們可以知道RabbitMQ的一些基本的原生用法,如交換機的建立及訊息的投遞,但是在企業中我們大部分都是把RabbitMQ整合到SpringBoot中的,所以原生的方式我們則不怎麼使用到,下面我將和大家一起走入SpringBoot整合RabbitMQ的世界。

  下面全部案例程式碼:UseSpringBootIntegrateRabbitMQ

二:一個簡單的案例

  為了不那麼麻煩,我就以一個SpringBoot來處理訊息的傳送和接收;通過Postman傳送請求到Controller,再由Controller呼叫生產者,生產者把訊息推播到Brock(交換機),再由交換機具體路由到指定佇列,然後由指定的消費者監聽佇列獲取訊息;這就是一個我要實現的完整流程,以直接交換機來舉例。

如下是SpringBoot整合RabbitMQ的基本案例,實現直接交換機模式,具體流程如上圖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <!--注意:spring-boot-starter-parent版本若達到3.0.0 則只支援JDK17,而2.7.10是支援JDK8的最後一個版本-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.10</version>
    </parent>

    <groupId>cn.xw</groupId>
    <artifactId>HelloWorld</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>HelloWorld</name>
    <description>HelloWorld</description>

    <dependencies>
        <!--Spring Boot的核心啟動器,包含了自動設定、紀錄檔和YAML-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <!--去掉logback設定,要不然衝突-->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 引入SpringBoot的log4j2依賴啟動座標;這座標包含具體的log4j2的座標和連線Slf4j的介面卡 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <!--SpringBootWeb啟動依賴座標-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--RabbitMQ啟動依賴座標-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- JSON格式化座標-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.26</version>
        </dependency>
        <!--Lombok座標匯入-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
            <scope>provided</scope>
        </dependency>
        <!-- RabbitMQ測試座標 -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <version>3.0.3</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--設定maven編譯版本-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source><!--原始碼使用的JDK-->
                    <target>1.8</target><!--target需要生成的目標class檔案的編譯版本-->
                    <encoding>UTF-8</encoding><!--字元集編碼,防止中文亂碼-->
                    <failOnError>true</failOnError><!--指示即使存在編譯錯誤,構建是否仍將繼續-->
                    <failOnWarning>false</failOnWarning><!--指示即使存在編譯警告,構建是否仍將繼續-->
                    <showDeprecation>false</showDeprecation><!--設定是否顯示使用不推薦API的源位置-->
                    <showWarnings>false</showWarnings><!--設為true若要顯示編譯警告,請執行以下操作-->
                    <meminitial>128M</meminitial><!--編譯器使用的初始化記憶體-->
                    <maxmem>512M</maxmem><!--編譯器使用的最大記憶體-->
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
pom.xml座標檔案
<?xml version="1.0" encoding="UTF-8" ?>
<!--monitorInterval屬性值(秒數)為一個非零值來讓Log4j每隔指定的秒數來重新讀取組態檔,可以用來動態應用Log4j設定-->
<Configuration status="info" monitorInterval="30">
    <!--用來自定義一些變數-->
    <Properties>
        <!--變數定義-->
        <Property name="myPattern" value="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        <Property name="dir_url">./logs</Property>
    </Properties>
    <!--使用Appenders元素可以將紀錄檔事件資料寫到各種目標位置-->
    <Appenders>
        <!-- 預設列印到控制檯 -->
        <Console name="ConsoleAppend" target="SYSTEM_OUT">
            <!-- 預設列印格式 -->
            <PatternLayout pattern="${myPattern}"/>
        </Console>
        <!-- 列印到紀錄檔檔案上 -->
        <File name="FileAppend" fileName="${dir_url}/fileLog.log" bufferedIO="true" immediateFlush="true">
            <PatternLayout>
                <pattern>${myPattern}</pattern>
            </PatternLayout>
        </File>
    </Appenders>
    <!--定義logger,只有定義了logger並引入的appender,appender才會生效-->
    <Loggers>
        <!-- 預設列印紀錄檔級別為 error -->
        <Root level="INFO">
            <AppenderRef ref="ConsoleAppend"/>
            <AppenderRef ref="FileAppend"/>
        </Root>
    </Loggers>
</Configuration>
log4j2.xml紀錄檔組態檔
server:
  port: 8081
spring:
  ## 具體RabbitMQ設定請參考:org.springframework.boot.autoconfigure.amqp.RabbitProperties
  rabbitmq:
    host: 49.235.99.193
    port: 5672
    username: admin
    password: 123
    virtual-host: test
application.yml組態檔
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-14 11:49
 * 用來測試RabbitMQ的生產者傳送訊息(物件)到消費者中的一系列傳輸
 */
@Data
public class MessageSendDTO implements Serializable {

    private static final long serialVersionUID = 5905249092659173678L;

    private Integer msgID;          // 訊息ID
    private String msgType;         // 訊息型別
    private Object msgBody;         // 訊息體
}
MessageSendDTO實體物件
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-13 17:55
 * RabbitMQ設定類
 */
@Configuration
public class RabbitMQConfig {

    //定義1:簡單的直接交換機名稱;定義2:簡單佇列名稱;定義3:路由key
    public static final String SIMPLE_DIRECT_EXCHANGE = "simpleDirectExchange";
    public static final String SIMPLE_QUEUE_NAME = "simpleQueueName";
    public static final String SIMPLE_KEY = "simpleKey";

    /***
     * 建立交換機資訊
     * @return Exchange
     */
    @Bean("simpleDirectExchange")   //注:Bean物件可以不寫名稱,預設就是方法名
    public Exchange simpleDirectExchange() {
        //這個ExchangeBuilder就是我們當初使用的如下方式一樣:
        // channel.exchangeDeclare("交換機名稱", "交換機型別",true, false, false, null);
        return ExchangeBuilder.directExchange(SIMPLE_DIRECT_EXCHANGE).durable(true).build();
    }

    /***
     * 建立佇列資訊
     * @return Queue
     */
    @Bean("simpleQueueName")
    public Queue simpleQueueName() {
        //這個QueueBuilder就是我們當初使用的如下方式一樣:
        // channel.queueDeclare("佇列名稱", true, false, false, null);
        return QueueBuilder.durable(SIMPLE_QUEUE_NAME).build();
    }

    /***
     * 佇列資訊系結到交換機上
     * @param simpleDirectExchange 簡單的直接交換機
     * @param simpleQueueName 簡單的佇列
     * @return Binding
     */
    @Bean("simpleQueueBindSimpleExchange")
    public Binding simpleQueueBindSimpleExchange(@Qualifier(value = "simpleDirectExchange") Exchange simpleDirectExchange,
                                                 @Qualifier(value = "simpleQueueName") Queue simpleQueueName) {
        //這個BindingBuilder就是我們當初使用的如下方式一樣:
        // channel.queueBind("佇列名稱", "交換機名稱", "路由key");
        return BindingBuilder.bind(simpleQueueName).to(simpleDirectExchange).with(SIMPLE_KEY).noargs();
    }
}
RabbitMQConfig設定類,設定交換機、佇列及繫結關係
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-13 23:52
 */
@Slf4j  //使用lombok自帶的紀錄檔註解,具體實現是slf4j+log4j2
@RestController
@RequestMapping("/simple")
@RequiredArgsConstructor
public class SimpleController {

    //使用SLF4J來獲取Logger物件;(注意導包:import org.slf4j.Logger; import org.slf4j.LoggerFactory;)
    //Logger logger = LoggerFactory.getLogger(this.getClass());

    //注入生產者物件
    private final SimpleProducer simpleProducer;

    /***
     * 基本的get請求,用來接收訊息,並把訊息交給生產者,並由生產者推播到指定交換機,由交換機分發訊息
     * @param msg 請求訊息
     * @return String
     */
    @PostMapping("/produce")
    public String produce(@RequestBody MessageSendDTO msg) {
        log.info("Controller接收到請求並把請求的資訊交由生產者:{}", msg);
        //傳送訊息到生產者
        simpleProducer.productionSimpleMessage(msg);
        return "請求傳送成功,並已接收";
    }
}
SimpleController用來接收postman傳送的資料
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-13 23:30
 * 生產者
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class SimpleProducer {

    //注入RabbitTemplate物件
    private final RabbitTemplate rabbitTemplate;

    /***
     * 生產者方法
     * @param msg 生產者傳送的訊息
     */
    public void productionSimpleMessage(MessageSendDTO msg) {
        log.info("生產者接收到訊息,並行送到Brock的交換機....");
        //訊息轉換為JSON格式傳送,並行送到Brock的交換機
        byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8);
        //convertAndSend("交換機名稱","路由key","傳送訊息內容"),其實和原生的:
        // channel.basicPublish("交換機名稱","路由key","其它引數","訊息");
        // 使用convertAndSend預設訊息是持久化的,如我們當初原生設定的 其它引數:MessageProperties.PERSISTENT_TEXT_PLAIN
        rabbitTemplate.convertAndSend(RabbitMQConfig.SIMPLE_DIRECT_EXCHANGE, RabbitMQConfig.SIMPLE_KEY, bytes);
    }
}

//----------------------------------------------------------------------

/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-13 23:29
 * 這是一個簡單的消費者
 */
@Slf4j
@Component
public class SimpleConsumer {

    /***
     * 簡單訊息處理(監聽)
     * @param msgData 傳遞的具體訊息,最好是生產者傳送使用什麼型別,這裡接收就用什麼型別
     * @param message 這個就類似我們原生的message
     * @param channel 這個就類似我們原生的channel
     */
    @RabbitListener(queues = {RabbitMQConfig.SIMPLE_QUEUE_NAME}) //只需要監聽佇列即可,多個則在{}裡面逗號分割
    public void messageSimpleHandle(String msgData, Message message, Channel channel) {
        //獲取到佇列訊息,因為傳送是JSON格式,我們要解析物件格式
        MessageSendDTO msg = JSONObject.parseObject(message.getBody(), MessageSendDTO.class);
        log.info("訊息由消費者消費:{},並消費完成", msg);
    }
}
SimpleProducer生產者和SimpleConsumer消費者

1:整合RabbitMQ的常用設定資訊

## 具體RabbitMQ設定請參考:org.springframework.boot.autoconfigure.amqp.RabbitProperties,以實際版本為主
# base
spring.rabbitmq.host: 服務Host
spring.rabbitmq.port: 伺服器埠
spring.rabbitmq.username: 登陸使用者名稱
spring.rabbitmq.password: 登陸密碼
spring.rabbitmq.virtual-host: 連線到rabbitMQ的vhost
spring.rabbitmq.addresses: 指定client連線到的server的地址,多個以逗號分隔(優先取addresses,然後再取host)
spring.rabbitmq.requested-heartbeat: 指定心跳超時,單位秒,0為不指定;預設60s
spring.rabbitmq.publisher-confirm-type: 是否啟用【釋出確認】
spring.rabbitmq.publisher-returns: 是否啟用【釋出返回】
spring.rabbitmq.connection-timeout: 連線超時,單位毫秒,0表示無窮大,不超時
spring.rabbitmq.parsed-addresses:
# ssl
spring.rabbitmq.ssl.enabled: 是否支援ssl
spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路徑
spring.rabbitmq.ssl.key-store-password: 指定存取key store的密碼
spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store
spring.rabbitmq.ssl.trust-store-password: 指定存取trust store的密碼
spring.rabbitmq.ssl.algorithm: ssl使用的演演算法,例如,TLSv1.1
# cache
spring.rabbitmq.cache.channel.size: 快取中保持的channel數量
spring.rabbitmq.cache.channel.checkout-timeout: 當快取數量被設定時,從快取中獲取一個channel的超時時間,單位毫秒;如果為0,則總是建立一個新channel
spring.rabbitmq.cache.connection.size: 快取的連線數,只有是CONNECTION模式時生效
spring.rabbitmq.cache.connection.mode: 連線工廠快取模式:CHANNEL 和 CONNECTION
# listener
spring.rabbitmq.listener.simple.auto-startup: 是否啟動時自動啟動容器
spring.rabbitmq.listener.simple.acknowledge-mode: 表示訊息確認方式,其有三種設定方式,分別是none、manual和auto;預設auto
spring.rabbitmq.listener.simple.concurrency: 最小的消費者數量
spring.rabbitmq.listener.simple.max-concurrency: 最大的消費者數量
spring.rabbitmq.listener.simple.prefetch: 指定一個請求能處理多少個訊息,如果有事務的話,必須大於等於transaction數量.
spring.rabbitmq.listener.simple.transaction-size: 指定一個事務處理的訊息數量,最好是小於等於prefetch的數量.
spring.rabbitmq.listener.simple.default-requeue-rejected: 決定被拒絕的訊息是否重新入隊;預設是true(與引數acknowledge-mode有關係)
spring.rabbitmq.listener.simple.idle-event-interval: 多少長時間釋出空閒容器時間,單位毫秒
spring.rabbitmq.listener.simple.retry.enabled: 監聽重試是否可用
spring.rabbitmq.listener.simple.retry.max-attempts: 最大重試次數
spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次嘗試釋出或傳遞訊息之間的間隔
spring.rabbitmq.listener.simple.retry.multiplier: 應用於上一重試間隔的乘數
spring.rabbitmq.listener.simple.retry.max-interval: 最大重試時間間隔
spring.rabbitmq.listener.simple.retry.stateless: 重試是有狀態or無狀態
# template
spring.rabbitmq.template.mandatory: 啟用強制資訊;預設false
spring.rabbitmq.template.receive-timeout: receive() 操作的超時時間
spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超時時間
spring.rabbitmq.template.retry.enabled: 傳送重試是否可用
spring.rabbitmq.template.retry.max-attempts: 最大重試次數
spring.rabbitmq.template.retry.initial-interval: 第一次和第二次嘗試釋出或傳遞訊息之間的間隔
spring.rabbitmq.template.retry.multiplier: 應用於上一重試間隔的乘數
spring.rabbitmq.template.retry.max-interval: 最大重試時間間隔
RabbitProperties設定資訊

三:工作佇列+訊息應答+訊息分發

  這裡的工作佇列我就使用生產者傳送訊息到直接交換機,再由直接交換機通過路由分發到佇列中,再由消費者(多個)來消費佇列的訊息,具體的流程圖如下(在下面的每小節完成訊息應答和訊息分發):

1:普通的工作佇列

  因為具體的其它程式碼(組態檔等)在第一章已經給出了,下面我就主要粘出具體的程式碼

/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-14 17:43
 * RabbitMQ設定類
 */
@Configuration
public class RabbitMQConfig {
    //直接交換機名稱
    public static final String DIRECT_EXCHANGE = "directExchange";
    //佇列名稱A
    public static final String QUEUE_A_NAME = "queueAName";
    //路由key
    public static final String ROUTE_KEY = "routeKeyName";

    /***
     * 建立交換機資訊
     * @return Exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build();
    }

    /***
     * 建立佇列A資訊
     * @return Queue
     */
    @Bean("queueAName")
    public Queue queueAName() {
        return QueueBuilder.durable(QUEUE_A_NAME).build();
    }

    /***
     * 佇列繫結到交換機上,通過路由key
     * @param directExchange 交換機資訊
     * @param queueAName A佇列繫結
     * @return Binding
     */
    @Bean("directExchangeBindAQueue")
    public Binding directExchangeBindAQueue(@Qualifier(value = "directExchange") Exchange directExchange,
                                            @Qualifier(value = "queueAName") Queue queueAName) {
        return BindingBuilder.bind(queueAName).to(directExchange).with(ROUTE_KEY).noargs();
    }
}
RabbitMQConfig設定類,設定交換機、佇列及繫結關係
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-14 21:39
 * 測試生產者
 */
@Component
@RequiredArgsConstructor
public class TestProducer {

    //注入rabbitTemplate物件
    private final RabbitTemplate rabbitTemplate;

    /***
     * 生產者方法
     * @param msg 訊息
     */
    public void producerSendMsg(MessageSendDTO msg) {
        //訊息轉換為JSON格式並轉為位元組陣列
        byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8);
        //傳送訊息
        rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.ROUTE_KEY, bytes);
    }
}

//----------------------------------------------------------------


/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-13 23:29
 * 這是監聽佇列A的消費者(A、B)
 */
@Slf4j
@Component
public class QueueConsumer {

    /***
     * 消費者A(監聽)佇列queueAName
     * @param msgData 傳遞的具體訊息,最好是生產者傳送使用什麼型別,這裡接收就用什麼型別
     * @param message 這個就類似我們原生的message
     * @param channel 這個就類似我們原生的channel
     */
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_A_NAME}) //只需要監聽佇列即可,多個則在{}裡面逗號分割
    public void messageSimpleHandleA(@Payload String msgData, //這個是生產者傳送的JSON訊息
                                     Message message,
                                     Channel channel) throws InterruptedException, IOException {
        //獲取到佇列訊息,因為傳送是JSON格式,我們要解析物件格式
        MessageSendDTO msg = JSONObject.parseObject(message.getBody(), MessageSendDTO.class);
        //假設消費者A處理訊息慢,每8秒處理一條
        Thread.sleep(8000);
        log.info("A:訊息由消費者A消費:{},並消費完成", msg);
    }

    /***
     * 消費者B(監聽)佇列queueAName
     * @param msgData 傳遞的具體訊息,最好是生產者傳送使用什麼型別,這裡接收就用什麼型別
     * @param message 這個就類似我們原生的message
     * @param channel 這個就類似我們原生的channel
     */
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_A_NAME}) //只需要監聽佇列即可,多個則在{}裡面逗號分割
    public void messageSimpleHandleB(@Payload String msgData, //這個是生產者傳送的JSON訊息
                                     Message message,
                                     Channel channel) throws InterruptedException, IOException {
        //獲取到佇列訊息,因為傳送是JSON格式,我們要解析物件格式
        MessageSendDTO msg = JSONObject.parseObject(message.getBody(), MessageSendDTO.class);
        //假設消費者B處理訊息快,每2秒處理一條
        Thread.sleep(2000);
        log.info("B:訊息由消費者B消費:{},並消費完成", msg);
    }
}
TestProducer生產者和QueueConsumer消費者
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-14 21:37
 */
@Slf4j  //使用lombok自帶的紀錄檔註解,具體實現是slf4j+log4j2
@RestController
@RequestMapping("/test")
@RequiredArgsConstructor
public class TestController {

    //使用SLF4J來獲取Logger物件;(注意導包:import org.slf4j.Logger; import org.slf4j.LoggerFactory;)
    //Logger logger = LoggerFactory.getLogger(this.getClass());

    //注入生產者物件
    private final TestProducer testProducer;

    /***
     * 基本的get請求,用來接收訊息,並把訊息交給生產者,並由生產者推播到指定交換機,由交換機分發訊息
     * @param msg 請求訊息
     * @return String
     */
    @PostMapping("/produce")
    public String msgSend(@RequestBody MessageSendDTO msg) {
        log.info("Controller接收到請求並把請求的資訊交由生產者:{}", msg);

        //迴圈傳送訊息
        for (int i = 97; i <= 106; i++) {
            MessageSendDTO build = MessageSendDTO.builder().msgID(i)
                    .msgType("testType")
                    .msgBody(msg.getMsgBody() + new String(new char[]{(char) i, (char) i})).build();
            //傳送訊息
            testProducer.producerSendMsg(build);
        }
        return "請求傳送成功,並已接收";
    }
}
TestController用來接收postman傳送的資料

  我們執行後可以發現,生產者傳送10條資訊到佇列,然後就由2個消費者監聽獲取佇列的資訊,其中消費者A每隔8秒消費一條訊息,消費者B每隔2秒消費一條訊息,所以能者多勞才對,但是恰恰相法,預設使用的是輪詢的方式,每個消費者都會消費5條訊息;其實這種是不行的,其中消費者B大部分屬於空閒時間,應該執行更多資訊才對,下面我就來優化程式碼,使用不公平分發(其實這個就是我在上篇說到的預取值那一節

2:訊息分發(預取值)

  具體在上篇的預取值一節介紹是啥了,這裡我就這樣使用了,我們對上面的程式碼進行一些簡單的處理:

server:
  port: 8081
spring:
  ## 具體RabbitMQ設定請參考:org.springframework.boot.autoconfigure.amqp.RabbitProperties
  rabbitmq:
    host: 49.235.99.193
    port: 5672
    username: admin
    password: 123
    virtual-host: test
    listener:
      simple:
        prefetch: 1 # 設定預取值為1(當為1時也被稱為不公平分發)
#消費者每次從佇列獲取的訊息數量 (預設一次250個)
#通過檢視後臺管理器中queue的unacked數量

3:訊息接收應答

  消費者處理完訊息後向佇列傳送訊息處理完成請求,那麼我們就得開啟訊息接收應答方式;非SpringBoot整合RabbitMQ的預設方式是訊息一旦傳送給消費者就代表應答了,然後佇列就刪除已傳送的訊息了;但是我們不希望這樣,因為在處理的過程中出現問題後,那條訊息就沒了,我們希望訊息處理完成後再給佇列傳送應答成功;這就得修改設定和消費者了:

server:
  port: 8081
spring:
  ## 具體RabbitMQ設定請參考:org.springframework.boot.autoconfigure.amqp.RabbitProperties
  rabbitmq:
    host: 49.235.99.193
    port: 5672
    username: admin
    password: 123
    virtual-host: test
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1

# 訊息確認模式:
# acknowledge-mode: none
    # 自動確認,則不需要我們手動確認訊息,而是訊息一旦傳送給消費者就代表完成確認了
# acknowledge-mode: auto
    # 根據情況確認(預設值),若程式出現異常則不確認,若成功執行完成則確認
# acknowledge-mode: manual
    # 手動確認,需要我們寫確認方法

然後我們需要在消費者上面編寫手動確認程式碼:

@Slf4j
@Component
public class QueueConsumer {

    /***
     * 消費者A(監聽)佇列queueAName
     * @param msgData 傳遞的具體訊息,最好是生產者傳送使用什麼型別,這裡接收就用什麼型別
     * @param message 這個就類似我們原生的message
     * @param channel 這個就類似我們原生的channel
     */
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_A_NAME}, ackMode = "MANUAL")//監聽佇列即可,多個則在{}裡面逗號分割
    public void messageSimpleHandleA(@Payload String msgData, //這個是生產者傳送的JSON訊息
                                     @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, //處理訊息的編號
                                     Message message,
                                     Channel channel) throws InterruptedException, IOException {
        //獲取到佇列訊息,因為傳送是JSON格式,我們要解析物件格式
        MessageSendDTO msg = JSONObject.parseObject(message.getBody(), MessageSendDTO.class);
        //假設消費者A處理訊息慢,每8秒處理一條
        Thread.sleep(8000);
        log.info("A:訊息由消費者A消費:{},並消費完成", msg);
        //手動確認,注:這個deliveryTag可以通過message.getMessageProperties().getDeliveryTag()拿到
        channel.basicAck(deliveryTag, false);
    }

    /***
     * 消費者B(監聽)佇列queueAName
     * @param msgData 傳遞的具體訊息,最好是生產者傳送使用什麼型別,這裡接收就用什麼型別
     * @param message 這個就類似我們原生的message
     * @param channel 這個就類似我們原生的channel
     */
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_A_NAME}, ackMode = "MANUAL") //監聽佇列即可,多個則在{}裡面逗號分割
    public void messageSimpleHandleB(@Payload String msgData, //這個是生產者傳送的JSON訊息
                                     @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, //處理訊息的編號
                                     Message message,
                                     Channel channel) throws InterruptedException, IOException {
        //獲取到佇列訊息,因為傳送是JSON格式,我們要解析物件格式
        MessageSendDTO msg = JSONObject.parseObject(message.getBody(), MessageSendDTO.class);
        //假設消費者B處理訊息快,每2秒處理一條
        Thread.sleep(2000);
        //模擬判斷我是否需要手動確認(若隨機不是2則確認消費,否則拒絕,繼續交由佇列)
        if (Math.ceil(Math.random() * 4) != 2) {
            log.info("B:訊息由消費者B消費:{},並消費完成", msg);
            //手動確認
            channel.basicAck(deliveryTag, false);
        } else {
            log.info("B:訊息由消費者B消費:{},並消費失敗,丟回佇列", msg);
       // 訊息編號我們也可以通過message取出來,不用deliveryTag,在message可以獲取更多的資訊 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true
); } } }
我們需要新增ackMode = "MANUAL",並且編寫指定的手動確認程式碼,消費者B為了可以更好的模擬,我可能會隨機執行不確認,並且丟回佇列
具體的手動確認的程式碼和方法請參考我之前上篇的:手動應答

四:扇出交換機(Fanout 釋出訂閱)

  前面我們已經使用過直接交換機了,下面將說明一下如何在SpringBoot裡整合RabbitMQ來實現扇出交換機使用,具體的扇出交換機的介紹我在上篇已經詳細介紹了,下面直接放出程式碼:

server:
  port: 8081
spring:
  ## 具體RabbitMQ設定請參考:org.springframework.boot.autoconfigure.amqp.RabbitProperties
  rabbitmq:
    host: 49.235.99.193
    port: 5672
    username: admin
    password: 123
    virtual-host: test
application.yml組態檔
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-16 14:05
 * 扇出交換機設定
 */
@Configuration
public class RabbitMQConfig {

    //扇出交換機名稱
    public static final String EXCHANGE_NAME = "fanoutDemo";
    //建立兩個訊息佇列
    public static final String QUEUE_A = "queueA";
    public static final String QUEUE_B = "queueB";

    /***
     * 建立交換機資訊
     * @return Exchange
     */
    @Bean("fanoutDemo")
    public Exchange fanoutDemo() {
        return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();
    }

    /***
     * 建立佇列A
     * @return Queue
     */
    @Bean("queueA")
    public Queue queueA() {
        return QueueBuilder.durable(QUEUE_A).build();
    }

    /***
     * 建立佇列B
     * @return Queue
     */
    @Bean("queueB")
    public Queue queueB() {
        return QueueBuilder.durable(QUEUE_B).build();
    }

    /***
     * 佇列A繫結到扇出交換機
     * @param fanoutDemo 交換機名稱
     * @param queueA 佇列A
     * @return Binding
     */
    @Bean("fanoutBindQueueA")
    public Binding fanoutBindQueueA(@Qualifier(value = "fanoutDemo") Exchange fanoutDemo,
                                    @Qualifier(value = "queueA") Queue queueA) {
        return BindingBuilder.bind(queueA).to(fanoutDemo).with("").noargs();
    }

    /***
     * 佇列B繫結到扇出交換機
     * @param fanoutDemo 交換機名稱
     * @param queueB 佇列B
     * @return Binding
     */
    @Bean("fanoutBindQueueB")
    public Binding fanoutBindQueueB(@Qualifier(value = "fanoutDemo") Exchange fanoutDemo,
                                    @Qualifier(value = "queueB") Queue queueB) {
        return BindingBuilder.bind(queueB).to(fanoutDemo).with("").noargs();
    }
}
RabbitMQConfig設定類,設定交換機、佇列及繫結關係
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-16 15:01
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TestProducer {

    private final RabbitTemplate rabbitTemplate;

    /***
     * 生產者方法
     * @param msg 訊息
     */
    public void producerSendMsg(MessageSendDTO msg) {
        //訊息轉換為JSON格式並轉為位元組陣列
        byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8);
        //傳送訊息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "", bytes);
        log.info("生產者傳送資訊完成,已經交由給交換機.....");
    }
}

//======================================
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-16 15:04
 * 消費者資訊
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class QueueConsumer {

    /***
     * 消費者A
     */
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_A})
    public void testConsumerA(@Payload String msgData, //這個是生產者傳送的JSON訊息
                              Message message,
                              Channel channel) {
        log.info("接收到佇列A資訊......;資訊為:{}", JSONObject.parseObject(msgData, MessageSendDTO.class));

    }

    /***
     * 消費者B
     */
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_B})
    public void testConsumerB(@Payload String msgData, //這個是生產者傳送的JSON訊息
                              Message message,
                              Channel channel) {
        //注:若消費失敗(報錯)會自動手動不確認,並且把訊息放到佇列中,然後又被這個佇列消費,最終死迴圈
        int a = 1 / 0;
        log.info("接收到佇列B資訊......;資訊為:{}", JSONObject.parseObject(msgData, MessageSendDTO.class));
    }
}
生產者和消費者程式碼
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-14 21:37
 */
@Slf4j
@RestController
@RequestMapping("/test")
@RequiredArgsConstructor
public class TestController {

    //注入生產者物件
    private final TestProducer testProducer;

    /***
     * 基本的get請求,用來接收訊息,並把訊息交給生產者,並由生產者推播到指定交換機,由交換機分發訊息
     * @param msg 請求訊息
     * @return String
     */
    @PostMapping("/produce")
    public String msgSend(@RequestBody MessageSendDTO msg) {
        log.info("Controller接收到請求並把請求的資訊交由生產者:{}", msg);
        //傳送訊息
        testProducer.producerSendMsg(msg);
        return "請求傳送成功,並已接收";
    }
}
TestController用來接收postman傳送的資料

  上面的案例我採用了一個生產者傳送訊息到扇出交換機,再由扇出交換機發布訊息到每個繫結過這個扇出交換機的佇列,然後再由消費者消費每個佇列的訊息。

五:主題交換機(Topics 匹配模式)

  其實主題交換機就是比之前的交換機靈活,它可以按照匹配的方式路由訊息到佇列,具體的主題交換機的介紹在上篇已經給出介紹了,在這我只對之前的使用原生方式實現的,再使用SpringBoot整合RabbitMQ來實現一下,按照上篇的圖示實現:

/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-16 23:10
 * 主題交換機設定類
 */
@Configuration
public class RabbitMQConfig {

    //交換機名稱
    public static final String TOPIC_EXCHANGE = "TopicExchange";
    //佇列Q1名稱
    public static final String Q1 = "Q1Queue";
    //佇列Q2名稱
    public static final String Q2 = "Q2Queue";
    //路由繫結關係 Routing Key
    public static final String Q1_KEY = "*.orange.*";
    //路由繫結關係 Routing Key 1
    public static final String Q2_KEY_A = "*.*.rabbit";
    //路由繫結關係 Routing Key 2
    public static final String Q2_KEY_B = "lazy.#";

    /***
     * 主題交換機
     * @return Exchange
     */
    @Bean("topicExchange")
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE).durable(true).build();
    }

    /***
     * 佇列1資訊
     * @return Queue
     */
    @Bean("q1Queue")
    public Queue q1Queue() {
        return QueueBuilder.durable(Q1).build();
    }

    /***
     * 佇列2資訊
     * @return Queue
     */
    @Bean("q2Queue")
    public Queue q2Queue() {
        return QueueBuilder.durable(Q2).build();
    }

    /***
     * 繫結關係,Q1Queue佇列繫結的匹配路由為*.orange.*
     * @param topicExchange 交換機
     * @param q1Queue 佇列1
     * @return Binding
     */
    @Bean("bindingA")
    public Binding bindingA(@Qualifier("topicExchange") Exchange topicExchange,
                            @Qualifier("q1Queue") Queue q1Queue) {
        return BindingBuilder.bind(q1Queue).to(topicExchange).with(Q1_KEY).noargs();
    }

    /***
     * 繫結關係,Q2Queue佇列繫結的匹配路由為*.*.rabbit
     * @param topicExchange 交換機
     * @param q2Queue 佇列2
     * @return Binding
     */
    @Bean("bindingB1")
    public Binding bindingB1(@Qualifier("topicExchange") Exchange topicExchange,
                             @Qualifier("q2Queue") Queue q2Queue) {
        return BindingBuilder.bind(q2Queue).to(topicExchange).with(Q2_KEY_A).noargs();
    }

    /***
     * 繫結關係,Q2Queue佇列繫結的匹配路由為lazy.#
     * @param topicExchange 交換機
     * @param q2Queue 佇列2
     * @return Binding
     */
    @Bean("bindingB2")
    public Binding bindingB2(@Qualifier("topicExchange") Exchange topicExchange,
                             @Qualifier("q2Queue") Queue q2Queue) {
        return BindingBuilder.bind(q2Queue).to(topicExchange).with(Q2_KEY_B).noargs();
    }
}
RabbitMQConfig設定類,設定交換機、佇列及繫結關係
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-16 15:01
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TestProducer {

    private final RabbitTemplate rabbitTemplate;

    /***
     * 生產者方法
     */
    public void producerSendMsg() {
        //訊息任務準備
        HashMap<String, String> sendMsg = new HashMap<>();
        sendMsg.put("quick.orange.rabbit", "被佇列 Q1 Q2 接收到");
        sendMsg.put("lazy.orange.elephant", "被佇列 Q1 Q2 接收到");
        sendMsg.put("quick.orange.fox", "被佇列 Q1 接收到");
        sendMsg.put("lazy.brown.fox", "被佇列 Q2 接收到");
        sendMsg.put("lazy.pink.rabbit", "雖然滿足兩個繫結規則但兩個規則都是在Q2佇列,所有隻要Q2接收一次");
        sendMsg.put("quick.brown.fox", "不匹配任何繫結不會被任何佇列接收到會被丟棄");
        sendMsg.put("quick.orange.male.rabbit", "是四個單詞不匹配任何繫結會被丟棄");
        sendMsg.put("lazy.orange.male.rabbit", "是四個單詞但匹配 Q2");
        //迴圈傳送訊息任務
        for (Map.Entry<String, String> msg : sendMsg.entrySet()) {
            String routKey = msg.getKey();  //主題路由key
            String message = msg.getValue();//訊息任務
            //建立物件
            MessageSendDTO build = MessageSendDTO.builder().msgBody("基本資訊:" + message + " 路由資訊:" + routKey).build();
            //訊息轉換為JSON格式並轉為位元組陣列
            byte[] bytes = JSONObject.toJSONString(build).getBytes(StandardCharsets.UTF_8);
            //傳送訊息
            rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, routKey, bytes);
        }
        log.info("生產者傳送資訊完成,已經交由給交換機.....");
    }
}

//+++++++++++++++++++++++++++++++++++++++
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-16 15:04
 * 消費者資訊
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class QueueConsumer {

    /***
     * 消費者1
     */
    @RabbitListener(queues = {RabbitMQConfig.Q1})
    public void testConsumerA(@Payload String msgData, //這個是生產者傳送的JSON訊息
                              Message message,
                              Channel channel) {
        log.info("接收到佇列1資訊;資訊為:{}", JSONObject.parseObject(msgData, MessageSendDTO.class));

    }

    /***
     * 消費者2
     */
    @RabbitListener(queues = {RabbitMQConfig.Q2})
    public void testConsumerB(@Payload String msgData, //這個是生產者傳送的JSON訊息
                              Message message,
                              Channel channel) {
        log.info("接收到佇列2資訊......;資訊為:{}", JSONObject.parseObject(msgData, MessageSendDTO.class));
    }
}
生產者和消費者程式碼
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-14 21:37
 */
@Slf4j
@RestController
@RequestMapping("/test")
@RequiredArgsConstructor
public class TestController {

    //注入生產者物件
    private final TestProducer testProducer;

    /***
     * 基本的get請求,用來接收訊息,並把訊息交給生產者,並由生產者推播到指定交換機,由交換機分發訊息
     * @return String
     */
    @PostMapping("/produce")
    public String msgSend() {
        log.info("Controller接收到請求並把請求的資訊交由生產者");
        //傳送訊息
        testProducer.producerSendMsg();
        return "請求傳送成功,並已接收";
    }
}
TestController用來接收postman傳送的資料

六:死信佇列(重要)

  在上篇我們詳細介紹了死信佇列,並使用最原生的方式實現死信佇列,這裡我將在SpringBoot裡面整合RabbitMQ來實現死信佇列,具體的開之前的接介紹,下面我只編寫具體的程式碼,部分程式碼在上面已經給出:

server:
  port: 8081
spring:
  ## 具體RabbitMQ設定請參考:org.springframework.boot.autoconfigure.amqp.RabbitProperties
  rabbitmq:
    host: 49.235.99.193
    port: 5672
    username: admin
    password: 123
    virtual-host: test
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1
application.yml組態檔
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-17 10:56
 */
@Configuration
public class RabbitMQConfig {

    //直接交換機名稱
    public static final String EXCHANGE_NAME = "MsgHandleExchange";
    //佇列名稱
    public static final String QUEUE_NAME = "MsgHandleQueue";
    //路由key
    public static final String ROUTING_KEY = "MsgHandleKey";
    //宣告死信交換機名稱
    public static final String DLX_EXCHANGE = "DLXExchange";
    //宣告死信佇列名稱
    public static final String DLX_QUEUE = "DLXQueue";
    //宣告路由繫結關係 Routing Key 死信交換機到死信佇列
    public static final String DLX_KEY = "DLXKey";


    //+++++++++++++++++設定了直連交換機和佇列的關係

    /***
     * 一個普通的直連交換機
     * @return Exchange
     */
    @Bean("msgHandleExchange")
    public Exchange msgHandleExchange() {
        //第一種方式:使用new的方式,是什麼型別交換機我們就建立什麼xxxExchange
        //引數1:exchange: 交換機名稱
        //引數2:durable: 是否需要持久化
        //引數3:autoDelete: 當最後一個繫結到Exchange上的佇列刪除後,自動刪除該Exchange
        //引數4:arguments: 擴充套件引數,用於擴充套件AMQP協定客製化化使用
        //Exchange directExchange = new DirectExchange(EXCHANGE_NAME,true,false);
        //當前Exchange是否用於RabbitMQ內部使用,預設為False
        //使用預設即可directExchange.isInternal();

        //第二種方式:使用Builder方式
        return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
    }

    /***
     * 一個普通的佇列
     * @return Queue
     */
    @Bean("msgHandleQueue")
    public Queue msgHandleQueue() {
        //第一種方式:使用new的方式;這種和原生建立一樣
        //引數一:佇列名稱
        //引數二:佇列裡的訊息是否持久化,預設訊息儲存在記憶體中,預設false
        //引數三:該佇列是否只供一個消費者進行消費的獨佔佇列,則為 true(僅限於此連線),false(預設,可以多個消費者消費)
        //引數四:是否自動刪除 最後一個消費者斷開連線以後 該佇列是否自動刪除 true 自動刪除,預設false
        //引數五:構建佇列的其它屬性,看下面擴充套件引數
        //Queue queue = new Queue(QUEUE_NAME,true,false,false,null);
        //原生: channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //第二種方式:使用Builder方式
        //~~~~~~~~~~~~~~~~~~~~~~~~~~設定死信引數Start
        //繫結死信佇列(引數設定)
        Map<String, Object> arguments = new HashMap<>();
        //正常佇列設定死信交換機 引數key是固定值;(就是說死去的訊息傳送到哪個交換機)
        arguments.put("x-dead-letter-exchange", DLX_EXCHANGE);
        //正常佇列設定死信交換機到死信佇列繫結Routing Key 引數key是固定值(就是說死去的訊息在交換機裡通過什麼路由傳送到死信佇列)
        arguments.put("x-dead-letter-routing-key", DLX_KEY);
        //設定正常佇列的長度限制 為3
        //arguments.put("x-max-length",3);
        //佇列設定訊息過期時間 60 秒
        //arguments.put("x-message-ttl", 60 * 1000);
        //~~~~~~~~~~~~~~~~~~~~~~~~~~設定死信引數End
        return QueueBuilder.durable(QUEUE_NAME).withArguments(arguments).build();
    }

    /***
     * 佇列繫結到交換機
     * @param msgHandleExchange 交換機名稱
     * @param msgHandleQueue 佇列名稱
     * @return Binding
     */
    @Bean("queueBindDirectExchange")
    public Binding queueBindDirectExchange(@Qualifier("msgHandleExchange") Exchange msgHandleExchange,
                                           @Qualifier("msgHandleQueue") Queue msgHandleQueue) {
        return BindingBuilder.bind(msgHandleQueue).to(msgHandleExchange).with(ROUTING_KEY).noargs();
    }

    //+++++++++++++++++設定死信交換機的一系列資訊

    /***
     * 死信交換機
     * @return Exchange
     */
    @Bean("DLXExchange")
    public Exchange dLXExchange() {
        return ExchangeBuilder.directExchange(DLX_EXCHANGE).durable(true).build();
    }

    /***
     * 死信佇列
     * @return Queue
     */
    @Bean("DLXQueue")
    public Queue dLXQueue() {
        return QueueBuilder.durable(DLX_QUEUE).build();
    }

    /***
     * 死信交換機上面繫結死信佇列
     * @return Binding
     */
    @Bean("dlxQueueBindDlxExchange")
    public Binding dlxQueueBindDlxExchange(@Qualifier("DLXExchange") Exchange dLXExchange,
                                           @Qualifier("DLXQueue") Queue dLXQueue) {
        //死信交換機上面繫結死信佇列
        return BindingBuilder.bind(dLXQueue).to(dLXExchange).with(DLX_KEY).noargs();
    }
}
RabbitMQConfig設定類,設定交換機、佇列及繫結關係(還有死信的設定)
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-14 21:37
 */
@Slf4j
@RestController
@RequestMapping("/test")
@RequiredArgsConstructor
public class TestController {

    //注入生產者物件
    private final TestProducer testProducer;

    /***
     * 基本的get請求,用來接收訊息,並把訊息交給生產者,並由生產者推播到指定交換機,由交換機分發訊息
     * @param msg 請求訊息
     * @return String
     */
    @PostMapping("/produce")
    public String msgSend(@RequestBody MessageSendDTO msg) {
        log.info("Controller接收到請求並把請求的資訊交由生產者:{}", msg);
        //傳送訊息
        testProducer.producerSendMsg(msg);
        return "請求傳送成功,並已接收";
    }
}
TestController用來接收postman傳送的資料
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-17 14:01
 * 死信消費者
 */
@Slf4j
@Component
public class DLXConsumer {
    /***
     * 死信消費者
     */
    @RabbitListener(queues = {RabbitMQConfig.DLX_QUEUE}, ackMode = "MANUAL")
    public void dlxConsumerTest(@Payload String msgData, //這個是生產者傳送的JSON訊息
                                @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, //處理訊息的編號
                                @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey,
                                Message message,
                                Channel channel) throws IOException {

        //把接收過來的JSON資訊轉換為物件
        MessageSendDTO messageSendDTO = JSONObject.parseObject(msgData, MessageSendDTO.class);
        //死信佇列名稱
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        //死信交換機名稱
        String receivedExchange = message.getMessageProperties().getReceivedExchange();
        //路由key
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info("死信消費者從死信佇列:{} 獲取死信訊息:{},並處理完成手動確認", consumerQueue, messageSendDTO);
        channel.basicAck(deliveryTag, false);
    }
}

//---------------------------------------------

/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-16 15:04
 * 消費者資訊
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class QueueConsumer {

    /***
     * 消費者
     * @param msgData 具體的訊息
     * @param deliveryTag 處理訊息的編號
     * @param routingKey 當前的路由key
     * @param message message物件
     * @param channel 通道物件
     */
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_NAME}, ackMode = "MANUAL")  // MANUAL必須大寫
    public void testConsumerA(@Payload String msgData, //這個是生產者傳送的JSON訊息
                              @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, //處理訊息的編號
                              @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey,
                              Message message,
                              Channel channel) throws IOException {
        //把接收JSON的資料轉換為物件
        MessageSendDTO messageSendDTO = JSONObject.parseObject(msgData, MessageSendDTO.class);
        //模擬判斷我是否需要手動確認(若隨機不是2則確認消費,否則拒絕,交由死信佇列)
        if (Math.ceil(Math.random() * 4) != 2) {
            log.info("處理完成接收到的佇列資訊為:{},從{}路由過來的資料", messageSendDTO, routingKey);
            //手動確認
            channel.basicAck(deliveryTag, false);
        } else {
            log.info("未處理完成接收到的佇列資訊為:{},從{}路由過來的資料", messageSendDTO, routingKey);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}
DLXConsumer死信消費者和QueueConsumer普通消費者
@Slf4j
@Component
@RequiredArgsConstructor
public class TestProducer {

    private final RabbitTemplate rabbitTemplate;

    /***
     * 生產者方法
     * @param msg 訊息
     */
    public void producerSendMsg(MessageSendDTO msg) {
        //訊息轉換為JSON格式並轉為位元組陣列
        byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8);
        //傳送訊息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, bytes);
        log.info("生產者傳送資訊完成,已經交由給交換機.....");
    }
}
TestProducer生產者

七:延遲佇列(基於死信)

1:在佇列上設定TTL

  針對延遲佇列我上篇已經有了基本介紹,其實延遲佇列就是基於TTL過期時間來完成的,把訊息推播到普通的佇列裡並不會被消費,而是等待這條訊息的TTL時間到期才會被丟棄到死信佇列中,其實那個具體的死信佇列才是我們將來要真實要處理的訊息,訊息TTL過期到死信佇列,最終有死信消費者完成最終的消費;說白了就是藉助普通佇列的延遲時間達到延遲消費。

server:
  port: 8081
spring:
  ## 具體RabbitMQ設定請參考:org.springframework.boot.autoconfigure.amqp.RabbitProperties
  rabbitmq:
    host: 49.235.99.193
    port: 5672
    username: admin
    password: 123
    virtual-host: test
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1
application.yml組態檔
@Configuration
public class RabbitMQConfig {

    //延遲直接交換機
    public static final String DELAY_DIRECT_EXCHANGE = "delayDirectExchange";
    //延遲佇列
    public static final String DELAY_TTL_QUEUE = "delayTTLQueue";
    //延遲佇列連線延遲交換機路由key
    public static final String DELAY_ROUTING_KEY = "delayRoutingKey";
    //死信交換機
    public static final String DEAD_EXCHANGE = "deadExchange";
    //死信佇列
    public static final String DEAD_QUEUE = "deadQueue";
    //死信佇列繫結死信交換機路由key
    public static final String DEAD_ROUTING_KEY = "deadRoutingKey";

    //編寫延遲佇列和延遲交換機一些列設定

    /***
     * 延遲交換機
     * @return Exchange
     */
    @Bean("delayDirectExchange")
    public Exchange delayDirectExchange() {
        return ExchangeBuilder.directExchange(DELAY_DIRECT_EXCHANGE).durable(true).build();
    }

    /***
     * 延遲普通佇列
     * @return Queue
     */
    @Bean("delayTTLQueue")
    public Queue delayTTLQueue() {
        //繫結死信佇列(引數設定)
        Map<String, Object> arguments = new HashMap<>();
        //正常佇列設定死信交換機 引數key是固定值;(就是說死去的訊息傳送到哪個交換機)
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常佇列設定死信交換機到死信佇列繫結Routing Key 引數key是固定值(就是說死去的訊息在交換機裡通過什麼路由傳送到死信佇列)
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        //設定正常佇列的長度限制 為3
        //arguments.put("x-max-length",3);
        //佇列設定訊息過期時間 20 秒
        arguments.put("x-message-ttl", 20 * 1000);
        return QueueBuilder.durable(DELAY_TTL_QUEUE).withArguments(arguments).build();
    }

    /***
     * 延遲佇列繫結到延遲交換機上
     * @param delayDirectExchange 延遲交換機
     * @param delayTTLQueue 延遲佇列
     * @return Binding
     */
    @Bean("delayQueueBindDelayExchange")
    public Binding delayQueueBindDelayExchange(@Qualifier("delayDirectExchange") Exchange delayDirectExchange,
                                               @Qualifier("delayTTLQueue") Queue delayTTLQueue) {
        return BindingBuilder.bind(delayTTLQueue).to(delayDirectExchange).with(DELAY_ROUTING_KEY).noargs();

    }

    //編寫死信佇列和死信交換機,和它們繫結關係的設定

    /***
     * 死信交換機
     * @return Exchange
     */
    @Bean("deadExchange")
    public Exchange deadExchange() {
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
    }

    /***
     * 死信佇列
     * @return Queue
     */
    @Bean("deadQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    /***
     * 死信佇列繫結死信交換機
     * @param deadExchange 死信交換機
     * @param deadQueue 死信佇列
     * @return Binding
     */
    @Bean("deadQueueBindDeadExchange")
    public Binding deadQueueBindDeadExchange(@Qualifier("deadExchange") Exchange deadExchange,
                                             @Qualifier("deadQueue") Queue deadQueue) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }
}
RabbitMQConfig設定類,設定交換機、佇列及繫結關係(還有死信的設定)
@Slf4j
@Component
@RequiredArgsConstructor
public class TestProducer {

    private final RabbitTemplate rabbitTemplate;

    /***
     * 生產者方法
     * @param msg 訊息
     */
    public void producerSendMsg(MessageSendDTO msg) {
        //訊息轉換為JSON格式並轉為位元組陣列
        byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8);
        //傳送訊息
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_DIRECT_EXCHANGE,RabbitMQConfig.DELAY_ROUTING_KEY,bytes);
        log.info("生產者傳送資訊完成,已經交由給延遲直接交換機.....");
    }
}
//==========================
@Slf4j
@Component
public class DeadConsumer {
    /***
     * 死信消費者
     */
    @RabbitListener(queues = {RabbitMQConfig.DEAD_QUEUE}, ackMode = "MANUAL")
    public void dlxConsumerTest(@Payload String msgData, //這個是生產者傳送的JSON訊息
                                @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, //處理訊息的編號
                                @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey,
                                Message message,
                                Channel channel) throws IOException {

        //把接收過來的JSON資訊轉換為物件
        MessageSendDTO messageSendDTO = JSONObject.parseObject(msgData, MessageSendDTO.class);
        //死信佇列名稱
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        //死信交換機名稱
        String receivedExchange = message.getMessageProperties().getReceivedExchange();
        //路由key
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info("死信消費者從死信佇列:{} 獲取死信訊息:{},並處理完成手動確認", consumerQueue, messageSendDTO);
        channel.basicAck(deliveryTag, false);
    }
}
生產者TestProducer和消費者DeadConsumer(死信佇列消費者)
@Slf4j
@RestController
@RequestMapping("/test")
@RequiredArgsConstructor
public class TestController {

    //注入生產者物件
    private final TestProducer testProducer;

    /***
     * 基本的get請求,用來接收訊息,並把訊息交給生產者,並由生產者推播到指定交換機,由交換機分發訊息
     * @param msg 請求訊息
     * @return String
     */
    @PostMapping("/produce")
    public String msgSend(@RequestBody MessageSendDTO msg) {
        log.info("Controller接收到請求並把請求的資訊交由生產者:{}", msg);
        //傳送訊息
        testProducer.producerSendMsg(msg);
        return "請求傳送成功,並已接收";
    }
}
TestController用來接收postman傳送的資料

  通過上面的案例我們可以實現延遲的效果,把訊息傳送到延遲佇列中,並不會對這部分訊息進行消費,而是等待過期後自帶丟到死信交換機中,並由死信交換機轉發到死信佇列中,並進行消費者消費;之前案例我們設定了一個延遲20秒的延遲佇列,但是現在若有一個需求,對一些資料進行10秒的延遲,這時我們就需要手動去建立一個佇列,並設定延遲時間為10秒;這時我們就發現並不靈活,每次延遲不同時間都需要新增一個延遲佇列,所以在佇列級別上設定延遲是不靈活的,所以不推薦;下面將進行一個優化。

2:對每條訊息設定延遲TTL

  上面說了對佇列設定TTL延遲來實現延遲訊息不是太靈活,所以下面將對之前的程式碼進行一個簡單的優化,建立一個全新的沒有延遲的佇列,只是對傳送的每條訊息進行一個延遲(在生產者設定訊息延遲並行送),這樣就可以實現一個佇列裡面的每條訊息的延遲時間不一樣,說幹就幹,參考下圖:

原本程式碼不變,只是對其部分類進行了優化,新增部分程式碼:

第一步:在RabbitMQConfig新增這部分程式碼:
    //簡單佇列,無延遲
    public static final String SIMPLE_QUEUE = "simpleQueue";
    //簡單佇列連線延遲交換機路由key
    public static final String SIMPLE_ROUTING_KEY = "simpleRoutingKey";

    /***
     * 一個簡單無延遲的佇列
     * @return Queue
     */
    @Bean("simpleQueue")
    public Queue simpleQueue() {
        //繫結死信佇列(引數設定)
        Map<String, Object> arguments = new HashMap<>();
        //正常佇列設定死信交換機 引數key是固定值;(就是說死去的訊息傳送到哪個交換機)
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常佇列設定死信交換機到死信佇列繫結Routing Key 引數key是固定值(就是說死去的訊息在交換機裡通過什麼路由傳送到死信佇列)
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        //設定正常佇列的長度限制 為3
        //arguments.put("x-max-length",3);
        //佇列設定訊息過期時間 20 秒
        //arguments.put("x-message-ttl", 20 * 1000);
        return QueueBuilder.durable(SIMPLE_QUEUE).withArguments(arguments).build();
    }

    /***
     * 簡單佇列繫結到延遲交換機上
     * @param delayDirectExchange 延遲交換機
     * @param simpleQueue 簡單佇列
     * @return Binding
     */
    @Bean("simpleQueueBindDelayExchange")
    public Binding simpleQueueBindDelayExchange(@Qualifier("delayDirectExchange") Exchange delayDirectExchange,
                                                @Qualifier("simpleQueue") Queue simpleQueue) {
        return BindingBuilder.bind(simpleQueue).to(delayDirectExchange).with(SIMPLE_ROUTING_KEY).noargs();
    }
第二步:在TestController新增這一個資源請求:
    /***
     * 基本的POST請求,用來接收訊息,並把訊息交給生產者,並由生產者推播到指定交換機,由交換機分發訊息
     * @param msg 請求訊息
     * @param ttl 過期時間
     * @return String
     */
    @PostMapping("/produceA/{ttl}")
    public String msgSendSimple(@RequestBody MessageSendDTO msg, @PathVariable(value = "ttl") Integer ttl) {
        log.info("Controller接收到請求並把請求的資訊交由生產者:{},其中訊息的過期時間為:{} s", msg, ttl);
        //傳送訊息
        testProducer.producerSendMsgSimple(msg, ttl);
        return "請求傳送成功,並已接收";
    }
第三步:在TestProducer生產者類新增生產者方法:
    /***
     * 生產者方法
     * @param msg 訊息
     * @param ttl 過期時間
     */
    public void producerSendMsgSimple(MessageSendDTO msg, Integer ttl) {
        //訊息轉換為JSON格式並轉為位元組陣列
        byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8);
        //傳送訊息
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_DIRECT_EXCHANGE, RabbitMQConfig.SIMPLE_ROUTING_KEY, bytes, message -> {
            //這條訊息的過期時間也被設定成了ttl秒 , 超過ttl秒未處理則執行到此訊息後被丟棄(記住是執行到此訊息後被丟棄,後面說明)
            message.getMessageProperties().setExpiration(String.valueOf(ttl * 1000));
            //設定好了一定要返回
            return message;
        });
    }
部分程式碼優化(在原基礎上新增程式碼,無刪減)

  得出結論:上面的程式不是特別完美(有問題),雖然設定訊息過期時間,但是在佇列中,不管後進的延遲訊息多短,都得等前面的延遲訊息過期或消費,否則後面的延遲訊息會一直等待,即使延遲訊息過了也會等待(最終等到這條訊息執行時,會先判斷是否過期,沒過期繼續等待,阻塞後面的訊息,過期或者被消費則下一個執行);可以看看佇列先進先出的原則。

八:延遲佇列(基於外掛)

  上面基於死信佇列實現延遲存在問題,在這我將使用外掛的方式來解決上面遺留的問題,這種方式得需要我們下載外掛,安裝外掛後,交換機就會多出來一個」x-delayed-message「的型別,我們建立時選擇這種型別;

  我們需要下載外掛:rabbitmq_delayed_message_exchange-3.11.1.ez  具體的版本可以測試,我用的當前時間最新的。

  安裝方式:把下載的外掛複製到RabbitMQ的plugins目錄裡:

    如我這具體路徑是:/usr/local/rabbitmq_server-3.11.13/plugins;複製完成後就可以進行安裝操作,

    執行:rabbitmq-plugins enable rabbitmq_delayed_message_exchange;不用指定版本號。

    直接執行成功會列印:....started 1 plugins.(程式碼安裝成功一個外掛)

  現在我們安裝完外掛後想實現延遲訊息則不用那麼麻煩了,也不用寫死信交換機了,只需要正常的佇列建立和交換機建立即可,下面是基本的流程圖:

/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-17 17:10
 * RabbitMQ設定類
 */
@Configuration
public class RabbitMQConfig {

    //直接延遲交換機名稱
    public static final String DELAYED_EXCHANGE = "delayedExchange";
    //延遲佇列名稱(但不是在佇列中延遲)
    public static final String DELAYED_QUEUE = "delayedQueue";
    //繫結路由key
    public static final String DELAYED_ROUTING_KEY = "delayedRoutingKey";

    /***
     * 建立交換機訊息
     * @return Exchange
     */
    @Bean("delayedExchange")
    public CustomExchange delayedExchange() {
        //因為通過ExchangeBuilder沒有那個延遲交換機的型別,所以我們使用其它交換機
        //其它引數
        Map<String, Object> args = new HashMap<>();
        //自定義交換機的型別;(雖然設定的是延遲交換機,但是具體四大型別還是得有)
        args.put("x-delayed-type", "direct");
        //引數:交換機名稱、交換機型別、是否持久化交換機、是否斷開自動刪除、其它引數
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
    }

    /***
     * 佇列名稱
     * @return Queue
     */
    @Bean("delayedQueue")
    public Queue delayedQueue() {
        return QueueBuilder.durable(DELAYED_QUEUE).build();
    }

    /***
     * 繫結關係
     * @param delayedExchange 交換機訊息
     * @param delayedQueue 佇列訊息
     * @return Binding
     */
    @Bean("delayedQueueBindDelayedExchange")
    public Binding delayedQueueBindDelayedExchange(@Qualifier(value = "delayedExchange") CustomExchange delayedExchange,
                                                   @Qualifier(value = "delayedQueue") Queue delayedQueue) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}
RabbitMQConfig設定類,設定交換機、佇列及繫結關係
@Slf4j
@Component
@RequiredArgsConstructor
public class TestProducer {

    private final RabbitTemplate rabbitTemplate;

    /***
     * 生產者方法
     * @param msg 訊息
     * @param delayTime 延遲時間
     */
    public void producerSendMsgDelay(MessageSendDTO msg, Integer delayTime) {
        //訊息轉換為JSON格式並轉為位元組陣列
        byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8);
        //傳送訊息
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE, RabbitMQConfig.DELAYED_ROUTING_KEY, bytes, message -> {
            //這條訊息的過期時間被設定過期delayTime秒(在交換機中被延遲,時間一到則被路由到佇列)
            message.getMessageProperties().setDelay(delayTime * 1000);
            //設定好了一定要返回
            return message;
        });
    }
}

// --------------------------------------------------------------
@Slf4j
@Component
public class ConsumerA {
    /***
     * 消費者
     */
    @RabbitListener(queues = {RabbitMQConfig.DELAYED_QUEUE}, ackMode = "MANUAL")
    public void dlxConsumerTest(@Payload String msgData, //這個是生產者傳送的JSON訊息
                                @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, //處理訊息的編號
                                @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey,
                                Message message,
                                Channel channel) throws IOException {
        //把接收過來的JSON資訊轉換為物件
        MessageSendDTO messageSendDTO = JSONObject.parseObject(msgData, MessageSendDTO.class);
        //佇列名稱
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        //交換機名稱
        String receivedExchange = message.getMessageProperties().getReceivedExchange();
        //路由key
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info("消費者從佇列:{} 獲取訊息:{},並處理完成手動確認", consumerQueue, messageSendDTO);
        channel.basicAck(deliveryTag, false);
    }
}
TestProducer生產者和ConsumerA消費者
@Slf4j
@RestController
@RequestMapping("/test")
@RequiredArgsConstructor
public class TestController {

    //注入生產者物件
    private final TestProducer testProducer;

    /***
     * 基本的POST請求,用來接收訊息,並把訊息交給生產者,並由生產者推播到指定交換機,由交換機分發訊息
     * @param msg 請求訊息
     * @param delayTime 延遲時間
     * @return String
     */
    @PostMapping("/produceMsg/{delayTime}")
    public String msgSendSimple(@RequestBody MessageSendDTO msg, @PathVariable(value = "delayTime") Integer delayTime) {
        log.info("Controller接收到請求並把請求的資訊交由生產者:{},其中訊息的過期時間為:{} s", msg, delayTime);
        //傳送訊息
        testProducer.producerSendMsgDelay(msg, delayTime);
        return "請求傳送成功,並已接收";
    }
}
TestController用來接收postman傳送的資料

九:訊息釋出確認

  之前介紹了訊息應答,它保證了消費者和佇列之間的關係達到訊息不丟失;但是現在要如何保證生產者投遞訊息到交換機以及交換機到佇列的資料不丟失呢?這我們就使用到了訊息的釋出確認,看過上篇的人會知道,之前我使用原生方式實現了單個釋出確認、批次釋出確認和非同步批次確認,但現在我要以SpringBoot整合RabbitMQ的方式來完成訊息的釋出確認。

1:一個簡單的程式碼準備

  這裡我先編寫一個沒有實現訊息的釋出確認程式碼,針對這個程式碼後面做一些列的優化:

server:
  port: 8081
spring:
  ## 具體RabbitMQ設定請參考:org.springframework.boot.autoconfigure.amqp.RabbitProperties
  rabbitmq:
    host: 49.235.99.193
    port: 5672
    username: admin
    password: 123
    virtual-host: test
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1
application.yml組態檔
@Configuration
public class RabbitMQConfig {
    
    //直接交換機
    public static final String CONFIRM_EXCHANGE = "confirmExchange";
    //佇列
    public static final String CONFIRM_QUEUE = "confirmQueue";
    //繫結路由key
    public static final String CONFIRM_ROUTING_KEY = "confirmRoutingKey";

    /***
     * 建立交換機訊息
     * @return Exchange
     */
    @Bean("confirmExchange")
    public Exchange confirmExchange() {
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).build();
    }

    /***
     * 佇列名稱
     * @return Queue
     */
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    /***
     * 繫結關係
     * @param confirmExchange 交換機訊息
     * @param confirmQueue 佇列訊息
     * @return Binding
     */
    @Bean("delayedQueueBindDelayedExchange")
    public Binding delayedQueueBindDelayedExchange(@Qualifier(value = "confirmExchange") Exchange confirmExchange,
                                                   @Qualifier(value = "confirmQueue") Queue confirmQueue) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY).noargs();
    }
}
RabbitMQConfig設定類,設定交換機、佇列及繫結關係
@Slf4j
@Component
@RequiredArgsConstructor
public class TestProducer {

    private final RabbitTemplate rabbitTemplate;

    /***
     * 生產者方法
     * @param msg 訊息
     */
    public void producerSendMsgDelay(MessageSendDTO msg) {
        //訊息轉換為JSON格式並轉為位元組陣列
        byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8);
        //傳送訊息
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE, RabbitMQConfig.CONFIRM_ROUTING_KEY, bytes);
    }
}
//=======================================
@Slf4j
@Component
public class ConsumerA {
    /***
     * 消費者
     */
    @RabbitListener(queues = {RabbitMQConfig.CONFIRM_QUEUE}, ackMode = "MANUAL")
    public void dlxConsumerTest(@Payload String msgData, //這個是生產者傳送的JSON訊息
                                @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, //處理訊息的編號
                                @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey,
                                Message message,
                                Channel channel) throws IOException {
        //把接收過來的JSON資訊轉換為物件
        MessageSendDTO messageSendDTO = JSONObject.parseObject(msgData, MessageSendDTO.class);
        //佇列名稱
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        log.info("消費者從佇列:{} 獲取訊息:{},並處理完成手動確認", consumerQueue, messageSendDTO);
        channel.basicAck(deliveryTag, false);
    }
}
TestProducer生產者和ConsumerA消費者
@Slf4j
@RestController
@RequestMapping("/test")
@RequiredArgsConstructor
public class TestController {

    //注入生產者物件
    private final TestProducer testProducer;

    /***
     * 基本的POST請求,用來接收訊息,並把訊息交給生產者,並由生產者推播到指定交換機,由交換機分發訊息
     * @param msg 請求訊息
     * @return String
     */
    @PostMapping("/produce")
    public String msgSendSimple(@RequestBody MessageSendDTO msg) {
        log.info("Controller接收到請求並把請求的資訊交由生產者:{}", msg);
        //傳送訊息
        testProducer.producerSendMsgDelay(msg);
        return "請求傳送成功,並已接收";
    }
}
TestController用來接收postman傳送的資料

這裡我們就完成了一個普通的訊息傳送到消費者消費程式碼了,其實也就是文章開頭的簡單的Demo,不過我們需要對其優化。

2:釋出確認

  這裡說的是生產者傳送訊息到交換機失敗的情況下回撥:

Ⅰ:我們得編寫一個RabbitMQMyCallBack的回撥類,具體如下:
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class RabbitMQMyCallBack implements RabbitTemplate.ConfirmCallback {

        //注入rabbitTemplate物件
        private final RabbitTemplate rabbitTemplate;

        /***
         * 物件範例化完成(物件建立和屬性注入)後呼叫此方法
         */
        @PostConstruct
        public void init() {
       //必須設定 rabbitTemplate.setMandatory(
true); //設定釋出確認資訊RabbitTemplate.ConfirmCallback confirmCallback; rabbitTemplate.setConfirmCallback(this); } /*** * 是ConfirmCallback的抽象方法,用來確認訊息是否到達exchange(交換器),不保證訊息是否可以路由到正確的queue; * 它的抽象方法機制只確認,但需要設定部分引數: * publisher-confirm-type: correlated * 對於部分Springboot老版本需要設定:publisher-confirms: true * 交換機確認回撥方法(成功和失敗): * 傳送訊息 --> 交換機接收到訊息 --> 回撥 * 1:correlationData 儲存回撥訊息的ID及相關資訊 * 2:交換機收到訊息 ack = true * 3:呼叫回撥confirm方法,對應ack=true , cause=null * 傳送訊息 --> 交換機接收訊息失敗 --> 回撥 * 1:correlationData 儲存回撥訊息的ID及相關資訊 * 2:交換機收到訊息 ack = false * 3:呼叫回撥confirm方法,對應ack=false , cause="異常資訊" * @param correlationData 回撥的相關資料 * @param ack 訊息是否成功傳送給交換機,true成功,false失敗 * @param cause 對於ack為false時會有對應的失敗原因,否則為空 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //獲取對應的ID訊息,因為不確認是否有ID被傳入,所以取值需要判空 String id = correlationData == null ? "" : correlationData.getId(); //校驗是否成功傳送 if (ack) { log.info("訊息已經成功交給了交換機,對應訊息ID為:{}", id); } else { log.info("訊息未能成功傳送給交換機,對應訊息ID為:{},異常原因:{}", id, cause); } } } Ⅱ:編寫完成過後,我們需要在傳送者TestProducer來設定一些訊息,用來回撥用處: @Slf4j @Component @RequiredArgsConstructor public class TestProducer { private final RabbitTemplate rabbitTemplate; /*** * 生產者方法 * @param msg 訊息 */ public void producerSendMsgDelay(MessageSendDTO msg) { //訊息轉換為JSON格式並轉為位元組陣列 byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8); //其它的一些資訊,用來回撥用處 CorrelationData correlationData = new CorrelationData(); //設定id資訊,其實預設就是UUID,我們其實可以根據自己設定指定ID資訊 //correlationData.setId(String.valueOf(UUID.randomUUID())); //傳送訊息 rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE, RabbitMQConfig.CONFIRM_ROUTING_KEY, bytes, correlationData); } } Ⅲ:還差最後一步了,設定組態檔(一定要加): spring.rabbitmq.publisher-confirm-type: correlated 取值有: none:禁用釋出確認模式,是預設值 correlated:釋出訊息成功到交換器後會觸發回撥方法(推薦 simple:存在兩種效果,第一種和correlated效果一樣,但是也可以使用rabbitTemplate呼叫waitForConfirms或 waitForConfirmsOrDie 方法等待 broker 節點返回傳送結果,根據返回結果來判定下一步的邏輯,要注意的點是 waitForConfirmsOrDie 方法如果返回 false 則會關閉 channel,則接下來無法傳送訊息到 broker

測試呼叫失敗和成功現狀(故意寫錯傳送到不存在的交換機):

生產者一次性傳送2個訊息:
     //傳送訊息A(可以成功傳送) rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE, RabbitMQConfig.CONFIRM_ROUTING_KEY, bytes, correlationData); //傳送訊息B(不可以成功傳送,交換機不存在) rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE+"test", RabbitMQConfig.CONFIRM_ROUTING_KEY, bytes, correlationData);

3:回退訊息

  這裡說的是交換機路由傳送到佇列時,佇列不存在則訊息預設就沒了,但是我們設定回退訊息時,路由到的佇列不存在則會執行個回撥方法:

  Mandatory引數:在僅開啟了生產者確認機制的情況下,交換機接收到訊息後,會直接給訊息生產者傳送確認訊息,如果發現該訊息不可路由,那麼訊息會被直接丟棄,此時生產者是不知道訊息被丟棄這個事件的。

Ⅰ:我們得編寫一個RabbitMQMyCallBack的回撥類,具體如下:
   @Slf4j
   @Component
   @RequiredArgsConstructor
   public class RabbitMQMyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

       //注入rabbitTemplate物件
       private final RabbitTemplate rabbitTemplate;

       /***
        * 物件範例化完成(物件建立和屬性注入)後呼叫此方法
        */
       @PostConstruct
       public void init() {
           //設定釋出確認資訊回撥類RabbitTemplate.ConfirmCallback confirmCallback;
           rabbitTemplate.setConfirmCallback(this);
           //設定回退訊息回撥類ReturnsCallback.ReturnsCallback returnsCallback;
           rabbitTemplate.setReturnsCallback(this);
           //true:交換機無法將訊息進行路由時,會將該訊息返回給生產者;false:如果發現訊息無法進行路由,則直接丟棄
           rabbitTemplate.setMandatory(true);  // 或使用設定   spring.rabbitmq.template.mandatory: true
       }

       @Override
       public void confirm(CorrelationData correlationData, boolean ack, String cause) {...}

       /***
        * 當訊息無法被路由時執行當前回撥
        * @param returned 被退回的訊息資訊
        */
       @Override
       public void returnedMessage(ReturnedMessage returned) {
           // 傳送的訊息
           Message message = returned.getMessage();
           // 傳送到哪個交換機
           String exchange = returned.getExchange();
           // 交換機到佇列的路由key
           String routingKey = returned.getRoutingKey();
           // 退回原因
           String replyText = returned.getReplyText();
           // 退回原因狀態碼
           int replyCode = returned.getReplyCode();
           //訊息列印
           log.info("資訊被回退,從交換機:{} 路由:{} 傳送到佇列失敗,傳送資訊為:{},退回狀態碼:{} 和原因:{}",
                   exchange, routingKey, message, replyCode, replyText);
           //我們可以在這後面對傳送失敗的訊息進行處理
       }
   }
Ⅱ:編寫完成過後,我們需要在傳送者TestProducer來設定一些訊息,用來回撥用處:
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class TestProducer {

        private final RabbitTemplate rabbitTemplate;

        /***
         * 生產者方法
         * @param msg 訊息
         */
           public void producerSendMsgDelay(MessageSendDTO msg) {
               //省略....
               //傳送訊息一次(不可以成功傳送,路由key不存在)
               rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE,
                       RabbitMQConfig.CONFIRM_ROUTING_KEY + "test", bytes, correlationData);
           }
    }
還差最後一步了,設定組態檔加紅的(這也是一個比較全的設定了):
    server:
      port: 8081
    spring:
      ## 具體RabbitMQ設定請參考:org.springframework.boot.autoconfigure.amqp.RabbitProperties
      rabbitmq:
        host: 49.235.99.193
        port: 5672
        username: admin
        password: 123
        virtual-host: test
        # 開啟訊息確認模式
        publisher-confirm-type: correlated  # 訊息釋出確認(生產者->交換機是否成功)
        publisher-returns: true             # 訊息回退(交換機路由->佇列是否成功)
        template:
          mandatory: true                  # 訊息路由傳送失敗返回到佇列中, 相當手動設定 rabbitTemplate.setMandatory(true);
        # 主要針對消費者的一些設定
        listener:
          simple:
            acknowledge-mode: manual        # 訊息應答ACK方式
            prefetch: 1                     # 限制每次傳送1條資料到佇列(只可堆積1條到消費者)(預取值)
            concurrency: 1                  # 最少需要一個消費者來監聽同一佇列
            max-concurrency: 2              # 最大隻能擁有2個消費者來監聽同一佇列
            default-requeue-rejected: true  # 決定被拒絕的訊息是否重新入隊;預設是true(與引數acknowledge-mode有關係)

  到這裡的釋出確認已經完成了

十:備份交換機

  有了mandatory引數和回退訊息,我們就可以對那些無法被投遞的訊息有著回撥功能,這樣就可以對無法投遞的訊息進行處理;但有時候,我們並不知道該如何處理這些無法路由的訊息,最多打個紀錄檔,然後觸發報警,再來手動處理。而通過紀錄檔來處理這些無法路由的訊息是很不優雅的做法,特別是在多臺伺服器上都存在生產者的時候,我們就需要手動去每臺伺服器上檢視生產的紀錄檔檔案,分析問題,其實這樣很容易看花眼,而且設定 mandatory 引數會增加生產者的複雜性,需要新增處理這些被退回的訊息的邏輯。如果既不想丟失訊息,又不想增加生產者的複雜性,該怎麼做呢?前面在設定死信佇列的文章中,我們提到,可以為佇列設定死信交換機來儲存那些處理失敗的訊息,可是這些不可路由訊息根本沒有機會進入到佇列,因此無法使用死信佇列來儲存訊息。

  在RabbitMQ中,有一種備份交換機的機制存在,可以很好的應對這個問題。備份交換機可以理解為 RabbitMQ 中交換機的「備胎」,當我們為某一個交換機宣告一個對應的備份交換機時,就是為它建立一個備胎,當交換機接收到一條不可路由訊息時,將會把這條訊息轉發到備份交換機中,由備份交換機來進行轉發和處理,通常備份交換機的型別為Fanout,這樣就能把所有訊息都投遞到與其繫結的佇列中,然後我們在備份交換機下繫結一個佇列,這樣原交換機無法被路由的訊息,就會都進入這個佇列了。當然,我們還可以建立一個報警佇列,用獨立的消費者來進行監測和報警。

基本的流程圖:

server:
  port: 8081
spring:
  ## 具體RabbitMQ設定請參考:org.springframework.boot.autoconfigure.amqp.RabbitProperties
  rabbitmq:
    host: 49.235.99.193
    port: 5672
    username: admin
    password: 123
    virtual-host: test
    # 開啟訊息確認模式
    publisher-confirm-type: correlated  # 訊息釋出確認(生產者->交換機是否成功)
    publisher-returns: true             # 訊息回退(交換機路由->佇列是否成功)
    template:
      mandatory: true                  # 訊息路由傳送失敗返回到佇列中, 相當手動設定 rabbitTemplate.setMandatory(true);
    # 主要針對消費者的一些設定
    listener:
      simple:
        acknowledge-mode: manual        # 訊息應答ACK方式
        prefetch: 1                     # 限制每次傳送1條資料到佇列(只可堆積1條到消費者)(預取值)
        concurrency: 1                  # 最少需要一個消費者來監聽同一佇列
        max-concurrency: 2              # 最大隻能擁有2個消費者來監聽同一佇列
        default-requeue-rejected: true  # 決定被拒絕的訊息是否重新入隊;預設是true(與引數acknowledge-mode有關係)
application.yml組態檔
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-19 0:25
 * 交換機和佇列的設定
 */
@Configuration
public class RabbitMQConfig {

    /* 一些簡單的交換機-->路由-->佇列的方式設定*/
    //直接交換機
    public static final String CONFIRM_EXCHANGE = "confirmExchange";
    //佇列
    public static final String CONFIRM_QUEUE = "confirmQueue";
    //繫結路由key
    public static final String CONFIRM_ROUTING_KEY = "confirmRoutingKey";
    /*針對一些備份交換機的設定*/
    //備份交換機
    public static final String BACKUP_EXCHANGE = "backupExchange";
    //備份佇列(把一些無法被路由的訊息進行備份)
    public static final String BACKUP_QUEUE = "backupQueue";
    //報警佇列(報警佇列用來傳送報警訊息,告知消費者處理,以達到讓管理員知道有資料處理不了)
    public static final String WARNING_QUEUE = "warningQueue";

    //一些簡單的佇列和交換機的宣告
    /***
     * 建立交換機訊息
     * @return Exchange
     */
    @Bean("confirmExchange")
    public Exchange confirmExchange() {
        //一些其它引數
        Map<String, Object> arguments = new HashMap<>();
        //設定備份交換機資訊,將來傳送的訊息無法被路由,就會傳送到備份交換機
        arguments.put("alternate-exchange", BACKUP_EXCHANGE);
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArguments(arguments).build();
    }

    /***
     * 佇列名稱
     * @return Queue
     */
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    /***
     * 繫結關係
     * @param confirmExchange 交換機訊息
     * @param confirmQueue 佇列訊息
     * @return Binding
     */
    @Bean("delayedQueueBindDelayedExchange")
    public Binding delayedQueueBindDelayedExchange(@Qualifier(value = "confirmExchange") Exchange confirmExchange,
                                                   @Qualifier(value = "confirmQueue") Queue confirmQueue) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY).noargs();
    }

    //備份交換機的建立
    /***
     * 建立備份交換機訊息
     * @return Exchange
     */
    @Bean("backupExchange")
    public Exchange backupExchange() {
        return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE).durable(true).build();
    }

    /***
     * 備份佇列名稱
     * @return Queue
     */
    @Bean("backupQueue")
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE).build();
    }

    /***
     * 報警佇列名稱
     * @return Queue
     */
    @Bean("warningQueue")
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE).build();
    }

    /***
     * 繫結關係(備份佇列繫結到備份交換機上)
     * @param backupExchange 備份交換機訊息
     * @param backupQueue 備份佇列訊息
     * @return Binding
     */
    @Bean("backupQueueBindBackupExchange")
    public Binding backupQueueBindBackupExchange(@Qualifier(value = "backupExchange") Exchange backupExchange,
                                                 @Qualifier(value = "backupQueue") Queue backupQueue) {
        return BindingBuilder.bind(backupQueue).to(backupExchange).with("").noargs();
    }

    /***
     * 繫結關係(報警佇列繫結到備份交換機上)
     * @param backupExchange 備份交換機訊息
     * @param warningQueue 報警佇列訊息
     * @return Binding
     */
    @Bean("warningQueueBindBackupExchange")
    public Binding warningQueueBindBackupExchange(@Qualifier(value = "backupExchange") Exchange backupExchange,
                                                  @Qualifier(value = "warningQueue") Queue warningQueue) {
        return BindingBuilder.bind(warningQueue).to(backupExchange).with("").noargs();
    }
}
RabbitMQConfig設定類,設定交換機、佇列及繫結關係
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMQMyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    //注入rabbitTemplate物件
    private final RabbitTemplate rabbitTemplate;

    /***
     * 物件範例化完成(物件建立和屬性注入)後呼叫此方法
     */
    @PostConstruct
    public void init() {
        //設定釋出確認資訊回撥類RabbitTemplate.ConfirmCallback confirmCallback;
        rabbitTemplate.setConfirmCallback(this);
        //設定回退訊息回撥類ReturnsCallback.ReturnsCallback returnsCallback;
        rabbitTemplate.setReturnsCallback(this);
        //true:交換機無法將訊息進行路由時,會將該訊息返回給生產者;false:如果發現訊息無法進行路由,則直接丟棄
        rabbitTemplate.setMandatory(true);  // 或使用設定   spring.rabbitmq.template.mandatory: true
    }

    /***
     * 釋出確認(生產者-->交換機的確認)
     * @param correlationData 回撥的相關資料
     * @param ack 訊息是否成功傳送給交換機,true成功,false失敗
     * @param cause 對於ack為false時會有對應的失敗原因,否則為空
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //獲取對應的ID訊息,因為不確認是否有ID被傳入,所以取值需要判空
        String id = correlationData == null ? "" : correlationData.getId();
        //校驗是否成功傳送
        if (ack) {
            log.info("訊息已經成功交給了交換機,對應訊息ID為:{}", id);
        } else {
            log.info("訊息未能成功傳送給交換機,對應訊息ID為:{},異常原因:{}", id, cause);
        }
    }

    /***
     * 當訊息無法被路由時執行當前回撥
     * @param returned 被退回的訊息資訊
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        // 傳送的訊息
        Message message = returned.getMessage();
        // 傳送到哪個交換機
        String exchange = returned.getExchange();
        // 交換機到佇列的路由key
        String routingKey = returned.getRoutingKey();
        // 退回原因
        String replyText = returned.getReplyText();
        // 退回原因狀態碼
        int replyCode = returned.getReplyCode();
        //訊息列印
        log.info("資訊被回退,從交換機:{} 路由:{} 傳送到佇列失敗,傳送資訊為:{},退回狀態碼:{} 和原因:{}",
                exchange, routingKey, message, replyCode, replyText);
        //我們可以在這後面對傳送失敗的訊息進行處理
    }
}
RabbitMQMyCallBack用來實現釋出確認的回撥
@Slf4j
@Component
@RequiredArgsConstructor
public class TestProducer {

    private final RabbitTemplate rabbitTemplate;

    /***
     * 生產者方法
     * @param msg 訊息
     */
    public void producerSendMsg(MessageSendDTO msg) {
        //訊息轉換為JSON格式並轉為位元組陣列
        byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8);
        //其它的一些資訊,用來回撥用處
        CorrelationData correlationData = new CorrelationData();
        //設定id資訊,其實預設就是UUID,我們其實可以根據自己設定指定ID資訊
        //correlationData.setId(String.valueOf(UUID.randomUUID()));
        //傳送訊息(不可以成功傳送,路由key不存在)
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE,
                RabbitMQConfig.CONFIRM_ROUTING_KEY + "test", bytes, correlationData);
    }
}
TestProducer生產者
@Slf4j
@Component
public class ConsumerA {
    /***
     * 消費者
     */
    @RabbitListener(queues = {RabbitMQConfig.CONFIRM_QUEUE}, ackMode = "MANUAL")
    public void dlxConsumerTest(@Payload String msgData, //這個是生產者傳送的JSON訊息
                                @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, //處理訊息的編號
                                @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey,
                                Message message,
                                Channel channel) throws IOException {
        //把接收過來的JSON資訊轉換為物件
        MessageSendDTO messageSendDTO = JSONObject.parseObject(msgData, MessageSendDTO.class);
        //佇列名稱
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        log.info("消費者從佇列:{} 獲取訊息:{},並處理完成手動確認", consumerQueue, messageSendDTO);
        channel.basicAck(deliveryTag, false);
    }
}
//==============================
@Slf4j
@Component
public class BackupConsumer {

    /***
     * 備份佇列消費者接收的訊息監聽
     */
    @RabbitListener(queues = {RabbitMQConfig.BACKUP_QUEUE}, ackMode = "MANUAL")
    public void backupConsumerTest(@Payload String msgData, Message message, Channel channel) throws IOException {
        //把接收過來的JSON資訊轉換為物件
        MessageSendDTO messageSendDTO = JSONObject.parseObject(msgData, MessageSendDTO.class);
        log.info("備份佇列:{},監聽傳送過來的資料並處理:{}",
                message.getMessageProperties().getConsumerQueue(), messageSendDTO);
        //手動確認
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}
//==============================
@Slf4j
@Component
public class WarningConsumer {
    /***
     * 報警佇列消費者接收的訊息監聽
     */
    @RabbitListener(queues = {RabbitMQConfig.WARNING_QUEUE}, ackMode = "MANUAL")
    public void warningConsumerTest(@Payload String msgData, Message message, Channel channel) throws IOException {
        //把接收過來的JSON資訊轉換為物件
        MessageSendDTO messageSendDTO = JSONObject.parseObject(msgData, MessageSendDTO.class);
        log.info("報警佇列:{},監聽傳送過來的資料並處理:{}",
                message.getMessageProperties().getConsumerQueue(), messageSendDTO);
        //手動確認
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
ConsumerA(消費者)、BackupConsumer(備份消費者)、WarningConsumer(報警消費者)
@Slf4j
@RestController
@RequestMapping("/test")
@RequiredArgsConstructor
public class TestController {

    //注入生產者物件
    private final TestProducer testProducer;

    /***
     * 基本的POST請求,用來接收訊息,並把訊息交給生產者,並由生產者推播到指定交換機,由交換機分發訊息
     * @param msg 請求訊息
     * @return String
     */
    @PostMapping("/produce")
    public String msgSendSimple(@RequestBody MessageSendDTO msg) {
        log.info("Controller接收到請求並把請求的資訊交由生產者傳送:{}", msg);
        //傳送訊息
        testProducer.producerSendMsg(msg);
        return "請求傳送成功,並已接收";
    }
}
TestController用來接收postman傳送的資料

  可以好奇的是,為什麼上面沒有出現交換機路由傳送給佇列出異常的回退訊息呢?並且也是設定mandatory引數的(和回退訊息搭配使用);如果回退訊息和備份交換機兩者同時開啟,經過上面結果顯示答案是備份交換機優先順序比回退訊息的優先順序高

十一:RabbitMQ重複消費問題(冪等性)

  為什麼RabbitMQ會出現生產者多次投遞或者消費者多次消費呢?這裡其實會有很多因素的,按道理正常處理不報錯一般不會出現問題,但是不能一定保證,下面我就來和大家來探討如何解決重複消費問題:

產生問題的幾種情況:
    生產者:如Controller介面被呼叫了兩次,而Controller執行時會呼叫生產者,在沒有處理冪等性問題時,這無形中呼叫2次生產者。
    MQ:在消費者消費完準備響應ACK確認完成時,這時MQ突然宕機,所以消費者響應ACK就無法傳送給MQ了,恰巧這時MQ修復完成啟動了,
        但MQ以為消費者還未消費該資料,MQ也沒接收到ACK確認,所以MQ會覅後會再次推播該條訊息,導致重複消費。
    消費者:消費者已經消費完成,正準備ACK確認時,突然網路波動導致無法確認,連線中斷,或者消費者宕機了,也無法ACK確認,消費
        者重啟後,MQ會再次推播原來的訊息給消費者
解決方式:
    最好的方式使用Redis快取來完成,簡單快捷方便,而且會定期清理歷史消費完成的訊息記錄(快取裡,不是實際消費的具體資料被清理)
注:解決方式千千萬,具體看自己專案

  我解決的方式是使用Redis;首先需要對傳遞的訊息類設定一個唯一標識(業務拼接的唯一ID或者UUID),這時要確保每次傳遞相同的訊息UUID是相同的,在生產者投遞前,先把訊息的UUID設定到快取裡,並設定過期時間(這裡的過期時間是,投遞成功後,過期時間內可能會造成二次重複投遞,但是過期時間之外不會存在再次投遞,主要就是清理以前的快取記錄);投遞成功就等待過期時間結束自己清理,投遞失敗則需要把這次設定的快取UUID刪除,方便下次投遞;在消費者那邊,消費時首先對UUID快取,代表當前已經消費成功或者消費中,但也要設定過期時間,和上面的過期時間一樣用途,但是消費失敗異常後也需要刪除快取,方便下次繼續消費設定快取;具體圖:

 

/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-14 21:39
 * 測試生產者
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TestProducer {

    //注入rabbitTemplate物件
    private final RabbitTemplate rabbitTemplate;
    //注入StringRedisTemplate物件
    private final StringRedisTemplate redisTemplate;

    /***
     * 生產者方法
     * @param msg 訊息
     */
    public void producerSendMsg(MessageSendDTO msg) {
        ValueOperations<String, String> forValue = redisTemplate.opsForValue();
        try {
            //防止重複提交
            //若設定成功則為True,設定不成功或者設定的值已經存在則返回False
            //這裡設定20秒代表自動過期,一旦設定這個鍵值,訊息被成功投遞則不刪除(防止20秒內重複提交),但是投遞失敗
            //以後我需要刪除這個鍵值,方便下次繼續設定投遞;;具體按照實際設定過期時間
            Boolean result = forValue.setIfAbsent(msg.getUUID() + "-delivery", String.valueOf(msg.getMsgID()),
                    20, TimeUnit.SECONDS);
            //判斷設定成功則傳送訊息(否則這個訊息可能多次傳送給消費者)
            if (Boolean.TRUE.equals(result)) {
                //訊息轉換為JSON格式並轉為位元組陣列
                byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8);
                //傳送訊息
                rabbitTemplate.convertAndSend(RabbitMQConfig.ORDINARY_DIRECT_EXCHANGE, 
            RabbitMQConfig.ROUTE_KEY, bytes); }
else { log.info("訊息已經由生產者傳送投遞了,請忽重複投遞!"); } } catch (Exception e) { //若生產者投遞出現問題則代表投遞不成功,刪除這次快取 redisTemplate.delete(msg.getUUID() + "-delivery"); } } }
/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-13 23:29
 * 消費者
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class QueueConsumer {

    //注入StringRedisTemplate物件
    private final StringRedisTemplate redisTemplate;

    /***
     * 消費者(監聽佇列ordinaryQueue)
     * @param msgData 傳遞的具體訊息,最好是生產者傳送使用什麼型別,這裡接收就用什麼型別
     * @param message 這個就類似我們原生的message
     * @param channel 這個就類似我們原生的channel
     */
    @RabbitListener(queues = {RabbitMQConfig.ORDINARY_QUEUE}, ackMode = "MANUAL") 
    public void ordinaryQueueTest(@Payload String msgData, //這個是生產者傳送的JSON訊息
                                  @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, //處理訊息的編號
                                  Message message,
                                  Channel channel) throws InterruptedException, IOException {

        ValueOperations<String, String> forValue = redisTemplate.opsForValue();
        //獲取到佇列訊息,因為傳送是JSON格式,我們要解析物件格式
        MessageSendDTO msg = JSONObject.parseObject(message.getBody(), MessageSendDTO.class);
        try {
            //判斷訊息有沒有被消費過,沒有消費過則設定(代表有消費者準備消費了)
            Boolean result = forValue.setIfAbsent(msg.getUUID(), String.valueOf(msg.getMsgID()),
                    1, TimeUnit.DAYS);
            //判斷,若設定成功代表可以消費此條訊息
            if (Boolean.TRUE.equals(result)) {
                log.info("A:訊息由消費者A消費:{},並消費完成", msg);
                //手動確認,注:這個deliveryTag可以通過message.getMessageProperties().getDeliveryTag()拿到
                channel.basicAck(deliveryTag, false);
            } else {
                log.info("消費者當前消費的訊息被別的消費者已經消費過了,或者正在消費:{}", msg);
                //重複傳送的也得手動確認掉,但是不處理
                channel.basicAck(deliveryTag, false);
            }
        } catch (Exception e) {
            //若消費失敗則刪除之前的鎖定(快取),下次佇列投遞給消費者的時候可以繼續消費
            redisTemplate.delete(msg.getUUID());
        }
    }
}

十二:優先順序佇列

  其實我們傳送到佇列的訊息是按照先進先出的順序執行的,不能說後面的訊息直接先出佇列了;但是在日常開發中會遇到一些問題,比如生產者的投遞速度是特別快的,而消費者消費特別慢,這時生產者投遞訊息有幾千個,假設這時生產者投遞了一個需要緊急處理的訊息,這沒辦法,只能等前面幾千個訊息消費完成後才能等到那個緊急訊息執行,這是顯然不行的,所以就引出了優先順序佇列的解決方案。

RabbitMQConfig設定類修改:
  /*** * 建立普通佇列(並且新增優先順序的設定) * @return Queue */ @Bean("ordinaryQueue") public Queue ordinaryQueue() { //其它引數 Map<String, Object> arguments = new HashMap<>(); //設定優先佇列的值範圍,官方允許0~255,設定10代表優先順序範圍為10,設定過大,後面排序耗費資源和CPU //代表後期生產者在投遞訊息時需要設定訊息的0~10的優先順序,越大越先執行 arguments.put("x-max-priority", 10); //構建佇列 return QueueBuilder.durable(ORDINARY_QUEUE).withArguments(arguments).build(); }
生產者程式碼修改:
/**
* * 生產者方法 * @param msg 訊息 */ public void producerSendMsg(MessageSendDTO msg) { //迴圈傳送投遞訊息 for (int i = 1001; i <= 1010; i++) { msg.setMsgID(i); //訊息轉換為JSON格式並轉為位元組陣列 byte[] bytes = JSONObject.toJSONString(msg).getBytes(StandardCharsets.UTF_8); //若i為1005則代表讓它的訊息優先順序高,其它都是預設0 if (i == 1005) { //設定其它引數(如這裡需要設定優先順序)(2種引數方式) //AMQP.BasicProperties properties = new AMQP.BasicProperties(); //properties.builder().priority(5).build(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setPriority(5); //傳送訊息 Message message = new Message(bytes, messageProperties); rabbitTemplate.convertAndSend(RabbitMQConfig.ORDINARY_DIRECT_EXCHANGE, RabbitMQConfig.ROUTE_KEY, message); } else { //傳送訊息 rabbitTemplate.convertAndSend(RabbitMQConfig.ORDINARY_DIRECT_EXCHANGE, RabbitMQConfig.ROUTE_KEY, bytes); } } }
注意:我們設定佇列優先順序以後,那麼在傳送訊息則需要設定優先順序引數

十三:惰性佇列

  RabbitMQ 從 3.6.0 版本開始引入了惰性佇列的概念。惰性佇列會盡可能的將訊息存入磁碟中,而在消費者消費到相應的訊息時才會被載入到記憶體中,它的一個重要的設計目標是能夠支援更長的佇列,即支援更多的訊息儲存。當消費者由於各種各樣的原因(比如消費者下線、宕機亦或者是由於維護而關閉等)而致使長時間內不能消費訊息造成堆積時,惰性佇列就很有必要了。預設情況下,當生產者將訊息傳送到 RabbitMQ 的時候,佇列中的訊息會盡可能的儲存在記憶體之中,這樣可以更加快速的將訊息傳送給消費者。即使是持久化的訊息,在被寫入磁碟的同時也會在記憶體中駐留一份備份。當 RabbitMQ 需要釋放記憶體的時候,會將記憶體中的訊息換頁至磁碟中,這個操作會耗費較長的時間,也會阻塞佇列的操作,進而無法接收新的訊息。雖然 RabbitMQ 的開發者們一直在升級相關的演演算法,但是效果始終不太理想,尤其是在訊息量特別大的時候。

    佇列具備兩種模式:default 和 lazy。預設的為default 模式,在3.6.0 之前的版本無需做任何變更。lazy模式即為惰性佇列的模式,
可以通過呼叫 channel.queueDeclare 方法的時候在引數中設定,也可以通過Policy 的方式設定,如果一個佇列同時使用這兩種方式設定的
話,那麼Policy(這不具體展開說了)的方式具備更高的優先順序。如果要通過宣告的方式改變已有佇列的模式的話,那麼只能先刪除佇列,然後再
重新宣告一個新的。在佇列宣告的時候可以通過「x-queue-mode」引數來設定佇列的模式,取值為「default
」和「lazy」。 下面範例中演示了一個惰性佇列的宣告細節: @Bean("ordinaryQueue") public Queue ordinaryQueue() { //其它引數 Map<String, Object> arguments = new HashMap<>(); //設定優先佇列的值範圍,官方允許0~255,設定10代表優先順序範圍為10,設定過大,後面排序耗費資源和CPU //代表後期生產者在投遞訊息時需要設定訊息的0~10的優先順序,越大越先執行 //arguments.put("x-max-priority", 10); //設定惰性佇列 arguments.put("x-queue-mode", "lazy"); //構建佇列 return QueueBuilder.durable(ORDINARY_QUEUE).withArguments(arguments).build();}

 .