延遲佇列實現訂單超時自動取消

2023-03-03 12:03:20

在上一篇 Java 實現訂單未支付超時自動取消,使用Java自帶的定時任務TimeTask實現訂單超時取消,但是有小夥伴提出這種實現,會有以下幾個問題:

  • 線上服務掛了,導致服務下所有的定時任務失效。
  • 服務重啟,定時任務也會失效。
  • 服務上線需要釋出新的服務,原來服務也會關閉。

針對上述服務掛了、或者服務重啟導致訊息失效的問題,需要使用獨立於專案的服務,比如訊息中介軟體,比如Redis或者RabbitMQ。本文主要講解訊息佇列RabbitMQ

實現效果

建立一個訂單,超時30分鐘未支付就取消訂單。

RabbitMQ本身是不支援延遲佇列的,但可以利用RabbitMQ存活時間 + 死信佇列來實現訊息延遲。

TTL + DLX

存活時間 TTL

TTL全稱為:time to live,意思為存活時間,當訊息沒有設定消費者,訊息就一直停留在佇列中,停留時間超過存活時間後,訊息會被自動刪除

RabbitMQ支援兩種TTL設定:

  • 對訊息本身設定存活時間,每條訊息的存活時間可以靈活設定為不同的存活時間。
  • 對傳遞的佇列設定存活時間,每條傳到到佇列的過期時間都一致。

當訊息過期還沒有被消費,此時訊息會變成死信訊息dead letter這是實現延遲佇列的關鍵

訊息變為死信的條件:

  • 訊息被拒絕basic.reject/basic.nack,並且requeue=false
  • 訊息的過期時間到期了。
  • 佇列達到最大長度。

死信交換機 DLX

當上面的訊息變成死信訊息之後,它不會立即被刪除,首先它要看有沒有對應的死信交換機,如果有繫結的死信交換機,訊息就會從傳送到對應的死信交換機上。

DLX全程為Dead Letter Exchanges,意思為死信交換機。

死信交換機和普通交換機沒什麼區別,不同的是死信交換機會繫結在其他佇列上,當佇列的訊息變成死信訊息後,死信訊息會傳送到死信交換上。

佇列繫結死信交換機需要兩個引數:

  • x-dead-letter-exchange: 繫結的死信交換機名稱。
  • x-dead-letter-routing-key: 繫結的死信交換機routingKey

死信交換機和普通交換機的區別就是死信交換機的ExchangeroutingKey作為繫結引數,繫結在其他佇列上。

專案實戰

訊息傳送的流程圖:

  • 生產者將帶有TTL的訊息傳送給交換機,由交換機路由到佇列中。
  • 佇列由於沒有消費,訊息一直停留在佇列中,一直等到訊息超時,變成死信訊息。
  • 死信訊息轉發到死信交換機在路由到死信佇列上,最後給消費者消費。

建立死信佇列

@Configuration
public class DelayQueueRabbitConfig {
  // 下面是死信佇列
	/**
	 * 死信佇列
	 */
	public static final String DLX_QUEUE = "queue.dlx";

	/**
	 * 死信交換機
	 */
	public static final String DLX_EXCHANGE = "exchange.dlx";

	/**
	 * 死信routing-key
	 */
	public static final String DLX_ROUTING_KEY = "routingKey.dlx";


	/**
	 * 死信佇列
	 * @return
	 */
	@Bean
	public Queue dlxQueue() {
		return new Queue(DLX_QUEUE,true);
	}

	/**
	 * 死信交換機
	 * @return
	 */
	@Bean
	public DirectExchange dlxExchange() {
		return new DirectExchange(DLX_EXCHANGE,true,false);
	}

	/**
	 * 死信佇列和交換機繫結
	 * @return
	 */
	@Bean
	public Binding bindingDLX() {
		return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
	}
}

建立延遲佇列,並繫結死信佇列

  // 下面的是延遲佇列
	/**
	 * 訂單延遲佇列
	 */
	public static final String ORDER_QUEUE = "queue.order";

	/**
	 * 訂單交換機
	 */
	public static final String ORDER_EXCHANGE = "exchange.order";

	/**
	 * 訂單routing-key
	 */
	public static final String ORDER_ROUTING_KEY = "routingkey.order";


	/**
	 * 訂單延遲佇列
	 * @return
	 */
	@Bean
	public Queue orderQueue() {
		Map<String,Object> params = new HashMap<>();
		params.put("x-dead-letter-exchange", DLX_EXCHANGE);
		params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
		return new Queue(ORDER_QUEUE, true, false, false, params);
	}

	/**
	 * 訂單交換機
	 * @return
	 */
	@Bean
	public DirectExchange orderExchange() {
		return new DirectExchange(ORDER_EXCHANGE,true,false);
	}

	/**
	 * 訂單佇列和交換機繫結
	 * @return
	 */
	@Bean
	public Binding orderBinding() {
		return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
	}

繫結死信交換通過新增x-dead-letter-exchangex-dead-letter-routing-key引數指定對應的交換機和路由。

傳送訊息

設定五秒超時時間

@RestController
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/dlx")
    public String dlx() {
        String date = DateUtil.dateFormat(new Date());
        String delayTime = "5000";
        System.out.println("【傳送訊息】延遲 5 秒 傳送時間 " + date);
        rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setExpiration(delayTime);
                    return message1;
                });
       return "ok";         
    }
    
    class DateUtil{
       public static String dateFormat(Date date) {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        return sdf.format(date);
      }
    } 
}    

消費訊息

@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)
public void delayPrecss(String msg,Channel channel,Message message){
    System.out.println("【接收訊息】" + msg + " 接收時間" + DateUtil.dateFormat(new Date()));
}
    

控制檯輸出

【傳送訊息】延遲5 秒 傳送時間 21:32:15
【接收訊息】延遲5 秒 傳送時間 21:32:15 接收時間21:32:20

傳送訊息,5秒之後消費者後會收到訊息。說明延遲成功。

佇列都有先進先出的特點,如果佇列前面的訊息延遲比後的訊息延遲更長,會出現什麼情況。

訊息時序問題

傳送三條訊息,延遲分別是10s2s5s

@RestController
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/dlx")
    public String dlx() {
         dlxSend("延遲10秒","10000");
         dlxSend("延遲2 秒","2000");
         dlxSend("延遲5 秒","5000");
         return "ok";
    }
    
    private void dlxSend(String message,String delayTime) {
         System.out.println("【傳送訊息】" + message +  "當前時間" + DateUtil.dateFormat(new Date()));
         rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setExpiration(delayTime);
                    return message1;
                });
    }

控制檯輸出:

【傳送訊息】延遲10秒當前時間21:54:36
【傳送訊息】延遲2 秒當前時間21:54:36
【傳送訊息】延遲5 秒當前時間21:54:36
【接收訊息】延遲10秒 當前時間21:54:46
【接收訊息】延遲2 秒 當前時間21:54:46
【接收訊息】延遲5 秒 當前時間21:54:46

所有的訊息都要等10s的訊息消費完才能消費,當10s訊息未被消費,其他訊息也會阻塞,即使訊息設定了更短的延遲。因為佇列有先進先出的特徵,當佇列有多條訊息,延遲時間就沒用作用了,前面的訊息消費後,後的訊息才能被消費,不然會被阻塞到佇列中。

外掛實現解決訊息時序問題

針對上面訊息的時序問題,RabbitMQ開發一個延遲訊息的外掛delayed_message_exchange,延遲訊息交換機。使用該外掛可以解決上面時序的問題。

Github官網找到對應的版本,我選擇的是3.8.17

將檔案下載下來放到伺服器的/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins目錄下,執行以下命令,啟動外掛:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

啟動外掛,交換機會有新的型別x-delayed-message:

x-delayed-message型別的交換機,支援延遲投遞訊息。傳送訊息給x-delayed-message型別的交換流程圖:

  • x-delayed-message型別的交換機接收訊息投遞後,並未將直接路由到佇列中,而是儲存到mnesia(一個分散式資料系統),該系統會檢測訊息延遲時間。
  • 訊息達到可投遞時間,訊息會被投遞到目標佇列。

設定延遲佇列

@Configuration
public class XDelayedMessageConfig {
  /**
	 * 佇列
	 */
	public static final String DIRECT_QUEUE = "queue.delayed";

	/**
	 * 延遲交換機
	 */
	public static final String DELAYED_EXCHANGE = "exchange.delayed";

	/**
	 * 繫結的routing key
	 */
	public static final String ROUTING_KEY = "routingKey.bind";

	@Bean
	public Queue directQueue() {
		return new Queue(DIRECT_QUEUE,true);
	}

	/**
	 * 定義延遲交換機
	 * 交換機的型別為 x-delayed-message
	 * @return
	 */
	@Bean
	public CustomExchange delayedExchange() {
		Map<String,Object> map = new HashMap<>();
		map.put("x-delayed-type","direct");
		return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
	}

	@Bean
	public Binding delayOrderBinding() {
		return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
	}

}

傳送訊息:

    @GetMapping("/delay")
    public String delay() {
	    delaySend("延遲佇列10 秒",10000);
	    delaySend("延遲佇列5 秒",5000);
	    delaySend("延遲佇列2 秒",2000);
        return "ok";
    }
    
    private void delaySend(String message,Integer delayTime) {
        message = message + " " + DateUtil.dateFormat(new Date());
        System.out.println("【傳送訊息】" + message);
        rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setDelay(delayTime);
                    //message1.getMessageProperties().setHeader("x-delay",delayTime);
                    return message1;
                });
    }    

消費訊息:

    @RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)
    public void delayProcess(String msg,Channel channel, Message message) {
        System.out.println("【接收訊息】" + msg + " 當前時間" + DateUtil.dateFormat(new Date()));
   }

控制檯輸出:

【傳送訊息】延遲佇列10 秒 22:00:01
【傳送訊息】延遲佇列5 秒 22:00:01
【傳送訊息】延遲佇列2 秒 22:00:01
【接收訊息】延遲佇列2 秒 22:00:01 當前時間22:00:03
【接收訊息】延遲佇列5 秒 22:00:01 當前時間22:00:05
【接收訊息】延遲佇列10 秒 22:00:01 當前時間22:00:10

解決了訊息的時序問題。

總結

  • 使用Java自帶的延遲訊息,系統重啟或者掛了之後,訊息就無法傳送,不適於用在生產環境上。
  • RabbitMQ本身不支援延遲佇列,可以使用存活時間ttl + 死信佇列dlx實現訊息延遲。
    • 傳送的訊息設定ttl,所在的佇列不設定消費者。
    • 佇列繫結死信佇列,訊息超時之後,變成死信訊息,再傳送給死信佇列,最後傳送給消費者。
  • 傳送多條不同延遲時間訊息,前面訊息沒有到延遲時間,會阻塞後面延遲更低的訊息,因為佇列有先進先出的特性。
  • RabbitMQx-delay-message外掛可以解決訊息時序問題。
    • 帶有ttl的訊息傳送x-delayed-message型別的交換機,訊息不會直接路由到佇列中。而且儲存到分散式資料系統中,該系統會檢測訊息延遲時間。
    • 訊息到達延遲時間,訊息才能會投遞到佇列中,最後傳送給消費者。

Github 原始碼

參考