工作中經常要和第三方做對接,比如支付、電子合同等系統。操作成功之後,第三方會傳送非同步的通知,返回最終的處理結果,使用非同步而不是使用同步通知,是為了加快系統響應速度,防止執行緒阻塞。任務處理完成後通過非同步的通知,傳送給對應的伺服器端。之前對接微信支付,完成支付後,微信傳送一個非同步通知給伺服器端,伺服器端根據支付通知修改狀態,通知規則看到以下的一段話。
其中有段話:
重新傳送通知,直到成功為止(在通知一直不成功的情況下,微信總共會發起多次通知,通知頻率為15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h - 總計 24h4m)
微信結果通知本質就是傳送一個網路請求到不同的伺服器上,既然是一個網路請求,就可能因為各種原因導致請求超時或者失敗,比如:
以上原因都會導致支付結果通知接收失敗,也就無法通知給使用者。為了解決上述的問題,就需要引入重試機制,當請求無法應答時,就需要重試幾次,保證請求能確認傳送。
從微信支付通知可以引申到所有的非同步通知,或者和第三方對接時。如果要確保通知能被成功的接收,就需要考慮請求失敗的情況,大部分都是需要使用重試機制。而重試機制是隔段時間不是固定的,是越來越大的,這是考慮到重試時,由於網路故障或者伺服器故障重啟裝置需要花一段時間,而間隔時間越來越長就可以更大的保證請求可以被成功接收。
重複請求,介面需要考慮重複請求的情況,要設計成一個冪等性介面,多次請求和請求一次的效果是一致的。
重試機制就是一個定時器,隔一段時間執行一次,沒有預期的效果就再重複執行一次。
實現的難點就在於,間隔的時間是不一致的,如果時間的間隔是固定的話,就可以使用定時任務。
使用定時器,每隔一段時間執行一次任務。在 SpringBoot 啟動類新增 @EnableScheduling 註解,然後在執行的方法新增 @Scheduled 註解。
@Scheduled(fixedDelay = 1000*2)
public void test2() {
Date date = new Date();
System.out.println("tesk2 " + date);
}
以上表示每隔 2 秒執行一次。間隔時間都是固定的,這個不符合預期,因為要求的時間間隔是依次增加的。
如果是間隔時間是固定的,那定時任務就符合條件嗎?
如果是隻有一條任務在執行,執行不成功,存放在 Redis 中,然後定時執行任務,如果任務執行成功,就去掉任務。但是定時器還是會定時執行。
如果執行的任務很多的話,前面的任務要等待後續的任務執行,那延遲就很嚴重了,就需要使用到多執行緒,開啟多個執行緒,在《阿里Java開發手冊》有一條:
執行緒資源必須通過執行緒池提供,不允許在應用中自行顯式建立執行緒。
定時任務有以下幾個缺點不滿足:
既然使用單執行緒會產生延遲,就使用執行緒池來降低延遲,因為發起請求屬於 IO 密集型,所以執行緒數設定成 CPU 個數的兩倍,在 SpringBoot 自定義一個執行緒池:
@Configuration
public class ThreadPoolConfig {
// 執行緒存活時間
private static int keepAliveTime = 10;
// 呼叫執行緒執行多餘任務
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
@Bean("customerTaskExecutor")
public TaskExecutor taskExecutor() {
// 核心執行緒數
int cores = Runtime.getRuntime().availableProcessors()*2;
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(cores);
executor.setMaxPoolSize(cores);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setRejectedExecutionHandler(handler);
executor.setThreadNamePrefix("Custom-"); // 執行緒名字首
executor.initialize();
return executor;
}
}
其中核心執行緒數和最大執行緒數設定成一致,拒絕策略使用呼叫執行緒執行多餘的任務,確保每個任務都能執行。然後新增一個非同步方法.
public interface AsyncService {
void executeAsync();
}
@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {
@Override
@Async("customerTaskExecutor")
public void executeAsync() {
log.info("【開始執行任務】");
// 延遲幾秒
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("【結束執行任務】");
}
}
使用 sleep 方法延遲,模擬請求,使用壓測工具,發起 100 次請求,控制檯輸出如下:
2023-10-31 18:00:32.792 INFO 53009 --- [ Custom-1] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.811 INFO 53009 --- [ Custom-2] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.813 INFO 53009 --- [ Custom-3] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.814 INFO 53009 --- [ Custom-4] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.816 INFO 53009 --- [ Custom-5] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.817 INFO 53009 --- [ Custom-6] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.819 INFO 53009 --- [ Custom-7] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.820 INFO 53009 --- [ Custom-8] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.821 INFO 53009 --- [ Custom-9] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.823 INFO 53009 --- [ Custom-10] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.824 INFO 53009 --- [ Custom-11] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:32.825 INFO 53009 --- [ Custom-12] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
2023-10-31 18:00:33.296 INFO 53009 --- [ Custom-1] com.jeremy.threadpool.AsyncServiceImpl : 【結束執行任務】
2023-10-31 18:00:33.296 INFO 53009 --- [ Custom-1] com.jeremy.threadpool.AsyncServiceImpl : 【開始執行任務】
採用執行緒池執行的任務,多個執行緒同時執行任務,能有效的降低了任務的延遲性。定時任務間隔固定時間從資料庫 Mysql 或者 Redis 獲取需要請求的資料,同時執行請求。
這樣就有幾個問題:
除了定時器,還有什麼元件可以解決上面問題,那就是使用訊息中介軟體了。
使用執行緒池的方式開啟多個執行緒執行。那針對固定時間間隔和只能同時執行的問題使用訊息中介軟體就能很好的解決問題,訊息中介軟體採用生產+消費模型實現訊息的生產和消費,
本文使用訊息中介軟體 RabbitMQ實現延遲佇列,具體實現可以看我的另外一篇文章延遲佇列實現訂單超時自動取消,具體實現流程圖試下如下。
請求傳送失敗之後,呼叫生產者傳送訊息,經過設定的時間間隔之後,傳送給消費者,消費端再次發起請求,如果請求失敗,再呼叫生產者傳送訊息,並設定好下一次的時間間隔,其中消費端發起任務使用執行緒池發起請求。
下載 RabbitMQ 延遲訊息的外掛 delayed_message_exchange,
在Github官網找到對應的版本,我選擇的是 3.8.17:
設定延遲佇列:
@Configuration
public class XDelayedMessageConfig {
/**
* 延遲交換機
*/
public static final String DELAYED_EXCHANGE = "exchange.delayed";
/**
* 重試佇列
*/
public static final String RETRY_QUEUE = "queue.retry";
/**
* 重試routing key
*/
public static final String RETRY_ROUTING_KEY = "routingKey.bind.retry";
@Bean
public Queue retryQueue() {
return new Queue(RETRY_QUEUE,true);
}
/**
* 定義延遲交換機
* 交換機的型別為 x-delayed-message
* @return
*/
@Bean
public CustomExchange delayedExchange() {
Map<String,Object> map = new HashMap<>();
map.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
}
@Bean
public Binding retryQueueBinding() {
return BindingBuilder.bind(retryQueue()).to(delayedExchange()).with(RETRY_ROUTING_KEY).noargs();
}
}
在傳送端模擬重試機制,設定時間間隔 5、10、30 秒。
@Autowired
private RabbitTemplate rabbitTemplate;
private final int[] INTERVAL_ARRAY= {5,10,30};
@GetMapping("/retry")
public String retry(int index) {
if (index >= 0 && index <= 2) {
send(index +",延遲" + INTERVAL_ARRAY[index] + "s",INTERVAL_ARRAY[index]);
}
return "ok";
}
private void send(String message,Integer delayTime) {
message = message + " " + DateUtil.dateFormat(new Date());
System.out.println("【傳送訊息】" + message);
rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.RETRY_ROUTING_KEY,
message, message1 -> {
message1.getMessageProperties().setDelay(delayTime*1000);
return message1;
});
}
接收端:
@RabbitListener(queues = XDelayedMessageConfig.RETRY_QUEUE)
public void delayProcess(String msg, Channel channel, Message message) {
System.out.println("【接收訊息】" + msg + " 當前時間" + DateUtil.dateFormat(new Date()));
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
int index = Integer.parseInt(msg.split(",")[0]);
retry(++index);
}
控制檯輸出:
【傳送訊息】0,延遲5s 10:59:29
【接收訊息】0,延遲5s 10:59:29 當前時間10:59:33
【傳送訊息】1,延遲10s 10:59:33
【接收訊息】1,延遲10s 10:59:33 當前時間10:59:43
【傳送訊息】2,延遲30s 10:59:43
【接收訊息】2,延遲30s 10:59:43 當前時間11:00:10
其中 0、1、2表示重試的次數。通過延遲訊息的方式,重試傳送資訊。每個任務作為一個訊息進行消費。和定時服務相比,有以下幾個優點:
在傳送一些非同步通知時候,需要考慮到通知可能接收失敗的情況,比如:
此時無法正確的及時推播通知,無法保證通知的可靠性。這個時候就需要重試多次,而且間隔要依次增加,因為服務啟動或者網路的卡頓在經過一段時間就恢復了。後續重試成功的概率就更高了。