SpringBoot 整合 RabbitMQ 實現訊息可靠傳輸

2022-05-25 09:00:31

訊息的可靠傳輸是面試必問的問題之一,保證訊息的可靠傳輸主要在生產端開啟 comfirm 模式,RabbitMQ 開啟持久化,消費端關閉自動 ack 模式。

環境設定

SpringBoot 整合 RabbitMQ 實現訊息的傳送。

  • 新增 maven 依賴
       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 新增 application.yml 組態檔
spring:
  rabbitmq:
    host: 192.168.3.19
    port: 5672
    username: admin
    password: xxxx
  • 設定交換機、佇列以及繫結
    @Bean
    public DirectExchange myExchange() {
        DirectExchange directExchange = new DirectExchange("myExchange");
        return directExchange;
    }

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue");
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
    }
  • 生產傳送訊息
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send(String message) {
        rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
        System.out.println("【傳送訊息】" + message)
        return "【send message】" + message;
    }
  • 消費者接收訊息
    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    public void process(String msg, Channel channel, Message message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date();
        String time = sdf.format(date);
        System.out.println("【接收資訊】" + msg + " 當前時間" + time);
  • 呼叫生產端傳送訊息 hello,控制檯輸出:
【傳送訊息】hello
【接收資訊】hello 當前時間2022-05-12 10:21:14

說明訊息已經被成功接收。

訊息丟失分析

一條訊息的從生產到消費,訊息丟失可能發生在以下幾個階段:

  • 生產端丟失: 生產者無法傳輸到 RabbitMQ
  • 儲存端丟失: RabbitMQ 儲存自身掛了
  • 消費端丟失:儲存由於網路問題,無法傳送到消費端,或者消費掛了,無法傳送正常消費

RabbitMQ 從生產端、儲存端、消費端都對可靠性傳輸做很好的支援。

生產階段

生產階段通過請求確認機制,來確保訊息的可靠傳輸。當傳送訊息到 RabbitMQ 伺服器 之後,RabbitMQ 收到訊息之後,給傳送返回一個請求確認,表示RabbitMQ 伺服器已成功的接收到了訊息。

  • 設定 application.yml
spring:
  rabbitmq:
    # 訊息確認機制 生產者 -> 交換機
    publisher-confirms: true
    # 訊息返回機制  交換機 -> 佇列
    publisher-returns: true

設定

@Configuration
@Slf4j
public class RabbitConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("【correlationData】:" + correlationData);
                log.info("【ack】" + ack);
                log.info("【cause】" + cause);
                if (ack) {
                    log.info("【傳送成功】");
                } else {
                    log.info("【傳送失敗】correlationData:" + correlationData + " cause:" + cause);
                }
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.warn("【訊息傳送失敗】");
                log.info("【message】" + message);
                log.info("【replyCode】" + replyCode);
            }
        });

        return rabbitTemplate;
    }
}

訊息從 生產者交換機, 有confirmCallback 確認模式。傳送訊息成功後訊息會呼叫方法confirm(CorrelationData correlationData, boolean ack, String cause),根據 ack 判斷訊息是否傳送成功。

訊息從 交換機佇列,有returnCallback 退回模式。

傳送訊息 product message 控制檯輸出如下:

【傳送訊息】product message
【接收資訊】product message 當前時間2022-05-12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【傳送成功】

生產端模擬訊息丟失

這裡有兩個方案:

  1. 傳送訊息後立馬關閉 broke,後者把網路關閉,但是broker關閉之後控制檯一直就會報錯,傳送訊息也報500錯誤。
  2. 傳送不存在的交換機:
// myExchange 修改成 myExchangexxxxx
rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);

結果:

【correlationData】:null
【ack】false
【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
【傳送失敗】

當傳送失敗可以對訊息進行重試

  1. 交換機正確,傳送不存在的佇列:

交換機接收到訊息,返回成功通知,控制檯輸出:

【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
【ack】true
【cause】null
【傳送成功】

交換機沒有找到佇列,返回失敗資訊:

【訊息傳送失敗】
【message】product message
【replyCode】312

RabbitMQ

開啟佇列持久化,建立的佇列和交換機預設設定是持久化的。首先把佇列和交換機設定正確,修改消費監聽的佇列,使得訊息存放在佇列裡

修改佇列的持久化,修改成非持久化:

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue",false);
        return queue;
    }

傳送訊息之後,訊息存放在佇列中,然後重啟 RabbitMQ,訊息不存在了。
設定佇列持久化:

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue",true);
        return queue;
    }

重啟之後,佇列的訊息還存在。

消費端

消費端預設開始 ack 自動確認模式,當佇列訊息被消費者接收,不管有沒有被消費端訊息,都自動刪除佇列中的訊息。所以為了確保消費端能成功消費訊息,將自動模式改成手動確認模式:

修改 application.yml 檔案

spring:
  rabbitmq:
    # 手動訊息確認
    listener:
      simple:
        acknowledge-mode: manual

消費接收訊息之後需要手動確認:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    public void process(String msg, Channel channel, Message message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date();
        String time = sdf.format(date);
        System.out.println("【接收資訊】" + msg + " 當前時間" + time);
        System.out.println(message.getMessageProperties().getDeliveryTag());
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

如果不新增:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

傳送兩條訊息
訊息被接收後,沒有確認,重新放到佇列中:

重啟專案,之後,佇列的訊息會傳送到消費者,但是沒有 ack 確認,還是繼續會放回佇列中。

加上 channel.basicAck 之後,再重啟專案:

佇列訊息就被刪除了

basicAck 方法最後一個引數 multiple 表示是刪除之前的佇列。

multiple 設定成 true,把後面的佇列都清理掉了:

原始碼

https://github.com/jeremylai7/springboot-learning/tree/master/spring-rabbitmq

如果覺得文章對你有幫助的話,請點個推薦吧!