在上一篇 Java 實現訂單未支付超時自動取消,使用Java
自帶的定時任務TimeTask
實現訂單超時取消,但是有小夥伴提出這種實現,會有以下幾個問題:
針對上述服務掛了、或者服務重啟導致訊息失效的問題,需要使用獨立於專案的服務,比如訊息中介軟體,比如Redis
或者RabbitMQ
。本文主要講解訊息佇列RabbitMQ
。
建立一個訂單,超時30分鐘未支付就取消訂單。
RabbitMQ
本身是不支援延遲佇列的,但可以利用RabbitMQ
的存活時間 + 死信佇列
來實現訊息延遲。
TTL
全稱為:time to live
,意思為存活時間,當訊息沒有設定消費者,訊息就一直停留在佇列中,停留時間超過存活時間後,訊息會被自動刪除
RabbitMQ
支援兩種TTL
設定:
當訊息過期還沒有被消費,此時訊息會變成死信訊息dead letter
,這是實現延遲佇列的關鍵。
訊息變為死信的條件:
basic.reject/basic.nack
,並且requeue=false
。當上面的訊息變成死信訊息之後,它不會立即被刪除,首先它要看有沒有對應的死信交換機,如果有繫結的死信交換機,訊息就會從傳送到對應的死信交換機上。
DLX
全程為Dead Letter Exchanges
,意思為死信交換機。
死信交換機和普通交換機沒什麼區別,不同的是死信交換機會繫結在其他佇列上,當佇列的訊息變成死信訊息後,死信訊息會傳送到死信交換上。
佇列繫結死信交換機需要兩個引數:
x-dead-letter-exchange
: 繫結的死信交換機名稱。x-dead-letter-routing-key
: 繫結的死信交換機routingKey
。死信交換機和普通交換機的區別就是死信交換機的
Exchange
和routingKey
作為繫結引數,繫結在其他佇列上。
訊息傳送的流程圖:
TTL
的訊息傳送給交換機,由交換機路由到佇列中。@Configuration
public class DelayQueueRabbitConfig {
// 下面是死信佇列
/**
* 死信佇列
*/
public static final String DLX_QUEUE = "queue.dlx";
/**
* 死信交換機
*/
public static final String DLX_EXCHANGE = "exchange.dlx";
/**
* 死信routing-key
*/
public static final String DLX_ROUTING_KEY = "routingKey.dlx";
/**
* 死信佇列
* @return
*/
@Bean
public Queue dlxQueue() {
return new Queue(DLX_QUEUE,true);
}
/**
* 死信交換機
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE,true,false);
}
/**
* 死信佇列和交換機繫結
* @return
*/
@Bean
public Binding bindingDLX() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
}
}
// 下面的是延遲佇列
/**
* 訂單延遲佇列
*/
public static final String ORDER_QUEUE = "queue.order";
/**
* 訂單交換機
*/
public static final String ORDER_EXCHANGE = "exchange.order";
/**
* 訂單routing-key
*/
public static final String ORDER_ROUTING_KEY = "routingkey.order";
/**
* 訂單延遲佇列
* @return
*/
@Bean
public Queue orderQueue() {
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DLX_EXCHANGE);
params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
return new Queue(ORDER_QUEUE, true, false, false, params);
}
/**
* 訂單交換機
* @return
*/
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(ORDER_EXCHANGE,true,false);
}
/**
* 訂單佇列和交換機繫結
* @return
*/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
}
繫結死信交換通過新增
x-dead-letter-exchange
、x-dead-letter-routing-key
引數指定對應的交換機和路由。
設定五秒超時時間
@RestController
public class SendController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/dlx")
public String dlx() {
String date = DateUtil.dateFormat(new Date());
String delayTime = "5000";
System.out.println("【傳送訊息】延遲 5 秒 傳送時間 " + date);
rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
message, message1 -> {
message1.getMessageProperties().setExpiration(delayTime);
return message1;
});
return "ok";
}
class DateUtil{
public static String dateFormat(Date date) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
return sdf.format(date);
}
}
}
@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)
public void delayPrecss(String msg,Channel channel,Message message){
System.out.println("【接收訊息】" + msg + " 接收時間" + DateUtil.dateFormat(new Date()));
}
控制檯輸出:
【傳送訊息】延遲5 秒 傳送時間 21:32:15
【接收訊息】延遲5 秒 傳送時間 21:32:15 接收時間21:32:20
傳送訊息,5秒之後消費者後會收到訊息。說明延遲成功。
佇列都有先進先出
的特點,如果佇列前面的訊息延遲比後的訊息延遲更長,會出現什麼情況。
傳送三條訊息,延遲分別是10s
、2s
、5s
:
@RestController
public class SendController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/dlx")
public String dlx() {
dlxSend("延遲10秒","10000");
dlxSend("延遲2 秒","2000");
dlxSend("延遲5 秒","5000");
return "ok";
}
private void dlxSend(String message,String delayTime) {
System.out.println("【傳送訊息】" + message + "當前時間" + DateUtil.dateFormat(new Date()));
rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
message, message1 -> {
message1.getMessageProperties().setExpiration(delayTime);
return message1;
});
}
控制檯輸出:
【傳送訊息】延遲10秒當前時間21:54:36
【傳送訊息】延遲2 秒當前時間21:54:36
【傳送訊息】延遲5 秒當前時間21:54:36
【接收訊息】延遲10秒 當前時間21:54:46
【接收訊息】延遲2 秒 當前時間21:54:46
【接收訊息】延遲5 秒 當前時間21:54:46
所有的訊息都要等10s
的訊息消費完才能消費,當10s
訊息未被消費,其他訊息也會阻塞,即使訊息設定了更短的延遲。因為佇列有先進先出
的特徵,當佇列有多條訊息,延遲時間就沒用作用了,前面的訊息消費後,後的訊息才能被消費,不然會被阻塞到佇列中。
針對上面訊息的時序問題,RabbitMQ
開發一個延遲訊息的外掛delayed_message_exchange
,延遲訊息交換機。使用該外掛可以解決上面時序的問題。
在Github官網找到對應的版本,我選擇的是3.8.17
:
將檔案下載下來放到伺服器的/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins
目錄下,執行以下命令,啟動外掛:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
啟動外掛,交換機會有新的型別x-delayed-message
:
x-delayed-message
型別的交換機,支援延遲投遞訊息。傳送訊息給x-delayed-message
型別的交換流程圖:
x-delayed-message
型別的交換機接收訊息投遞後,並未將直接路由到佇列中,而是儲存到mnesia
(一個分散式資料系統),該系統會檢測訊息延遲時間。@Configuration
public class XDelayedMessageConfig {
/**
* 佇列
*/
public static final String DIRECT_QUEUE = "queue.delayed";
/**
* 延遲交換機
*/
public static final String DELAYED_EXCHANGE = "exchange.delayed";
/**
* 繫結的routing key
*/
public static final String ROUTING_KEY = "routingKey.bind";
@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE,true);
}
/**
* 定義延遲交換機
* 交換機的型別為 x-delayed-message
* @return
*/
@Bean
public CustomExchange delayedExchange() {
Map<String,Object> map = new HashMap<>();
map.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
}
@Bean
public Binding delayOrderBinding() {
return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
}
}
傳送訊息:
@GetMapping("/delay")
public String delay() {
delaySend("延遲佇列10 秒",10000);
delaySend("延遲佇列5 秒",5000);
delaySend("延遲佇列2 秒",2000);
return "ok";
}
private void delaySend(String message,Integer delayTime) {
message = message + " " + DateUtil.dateFormat(new Date());
System.out.println("【傳送訊息】" + message);
rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY,
message, message1 -> {
message1.getMessageProperties().setDelay(delayTime);
//message1.getMessageProperties().setHeader("x-delay",delayTime);
return message1;
});
}
消費訊息:
@RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)
public void delayProcess(String msg,Channel channel, Message message) {
System.out.println("【接收訊息】" + msg + " 當前時間" + DateUtil.dateFormat(new Date()));
}
控制檯輸出:
【傳送訊息】延遲佇列10 秒 22:00:01
【傳送訊息】延遲佇列5 秒 22:00:01
【傳送訊息】延遲佇列2 秒 22:00:01
【接收訊息】延遲佇列2 秒 22:00:01 當前時間22:00:03
【接收訊息】延遲佇列5 秒 22:00:01 當前時間22:00:05
【接收訊息】延遲佇列10 秒 22:00:01 當前時間22:00:10
解決了訊息的時序問題。
Java
自帶的延遲訊息,系統重啟或者掛了之後,訊息就無法傳送,不適於用在生產環境上。RabbitMQ
本身不支援延遲佇列,可以使用存活時間ttl
+ 死信佇列dlx
實現訊息延遲。
ttl
,所在的佇列不設定消費者。先進先出
的特性。RabbitMQ
的x-delay-message
外掛可以解決訊息時序問題。
ttl
的訊息傳送x-delayed-message
型別的交換機,訊息不會直接路由到佇列中。而且儲存到分散式資料系統中,該系統會檢測訊息延遲時間。