RabbitMQ 如何實現延遲佇列?

2023-09-05 18:00:48

延遲佇列是指當訊息被傳送以後,並不是立即執行,而是等待特定的時間後,消費者才會執行該訊息。
延遲佇列的使用場景有以下幾種:

  1. 未按時支付的訂單,30 分鐘過期之後取消訂單。
  2. 給活躍度比較低的使用者間隔 N 天之後推播訊息,提高活躍度。
  3. 新註冊會員的使用者,等待幾分鐘之後傳送歡迎郵件等。

1.如何實現延遲佇列?

延遲佇列有以下兩種實現方式:

  1. 通過訊息過期後進入死信交換器,再由交換器轉發到延遲消費佇列,實現延遲功能;
  2. 使用官方提供的延遲外掛實現延遲功能。

早期,大部分公司都會採用第一種方式,而隨著 RabbitMQ 3.5.7(2015 年底釋出)的延遲外掛的釋出,因為其使用更簡單、更方便,所以它現在才是大家普通會採用的,實現延遲佇列的方式,所以本文也只講第二種方式。

2.實現延遲佇列

2.1 安裝並啟動延遲佇列

2.1.1 下載延遲外掛

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

注意:需要根據你自己的 RabbitMQ 伺服器端版本選擇相同版本的延遲外掛,可以在 RabbitMQ 控制檯檢視:

2.1.2 將外掛放到外掛目錄

接下來,將上一步下載的外掛放到 RabbitMQ 伺服器安裝目錄,如果是 docker,使用一下命令複製:

docker cp 宿主機檔案 容器名稱或ID:容器目錄

如下圖所示:

之後,進入 docker 容器,檢視外掛中是否包含延遲佇列:

docker exec -it 容器名稱或ID /bin/bash
rabbitmq-plugins list

如下圖所示:

2.1.3 啟動外掛

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如下圖所示:

2.1.4 重啟RabbitMQ服務

安裝完 RabbitMQ 外掛之後,需要重啟 RabbitMQ 服務才能生效。
如果使用的是 Docker,只需要重啟 Docker 容器即可:

docker restart 容器名稱或ID

如下圖所示:

2.1.5 驗收結果

在 RabbitMQ 控制檯檢視,新建交換機時是否有延遲訊息選項,如果有就說明延遲訊息外掛已經正常執行了,如下圖所示:

2.1.6 手動建立延遲交換器(可選)

此步驟可選(非必須),因為某些版本下通過程式建立延遲交換器可能會出錯,如果出錯了,手動建立延遲佇列即可,如下圖所示:

2.2 編寫延遲訊息實現程式碼

2.2.1 設定交換器和佇列

import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

/**
 * 延遲交換器和佇列
 */
@Configuration
public class DelayedExchangeConfig {
    public static final String EXCHANGE_NAME = "myDelayedExchange";
    public static final String QUEUE_NAME = "delayed.queue";
    public static final String ROUTING_KEY = "delayed.routing.key";

    @Bean
    public CustomExchange delayedExchange() {
        return new CustomExchange(EXCHANGE_NAME,
                "x-delayed-message", // 訊息型別
                true, // 是否持久化
                false); // 是否自動刪除
    }

    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(QUEUE_NAME)
                .withArgument("x-delayed-type", "direct")
                .build();
    }

    @Bean
    public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
    }
}

2.1.2 定義訊息傳送方法

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class DelayedMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 5000)
    public void sendDelayedMessage(String message) {
        rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,
                DelayedExchangeConfig.ROUTING_KEY,
                message,
                messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setDelay(10000); // 設定延遲時間,單位毫秒
                    return messagePostProcessor;
                });
    }
}

2.1.3 傳送延遲訊息

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/delayed")
public class DelayedMessageController {
    @Autowired
    private DelayedMessageProducer delayedMessageProducer;

    @GetMapping("/send")
    public String sendDirectMessage(@RequestParam String message) {
        delayedMessageProducer.sendDelayedMessage(message);
        return "Delayed message sent to Exchange: " + message;
    }
}

2.1.4 接收延遲訊息

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class DelayedMessageConsumer {

    @RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME)
    public void receiveDelayedMessage(String message) {
        System.out.println("Received delayed message: " + message);
    }
}

PS:獲取本文延遲佇列的實現 Demo,請加我:GG_Stone【備註:延遲佇列】

小結

實現 RabbitMQ 延遲佇列目前主流的實現方式,是採用官方提供的延遲外掛來實現。而延遲外掛需要先下載外掛、然後設定並重啟 RabbitMQ 服務,之後就可以通過編寫程式碼的方式實現延遲佇列了。

本文已收錄到我的面試小站 www.javacn.site,其中包含的內容有:Redis、JVM、並行、並行、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、設計模式、訊息佇列等模組。