如何實現非同步通知的重試機制

2023-11-02 18:00:48

工作中經常要和第三方做對接,比如支付、電子合同等系統。操作成功之後,第三方會傳送非同步的通知,返回最終的處理結果,使用非同步而不是使用同步通知,是為了加快系統響應速度,防止執行緒阻塞。任務處理完成後通過非同步的通知,傳送給對應的伺服器端。之前對接微信支付,完成支付後,微信傳送一個非同步通知給伺服器端,伺服器端根據支付通知修改狀態,通知規則看到以下的一段話。

其中有段話:

重新傳送通知,直到成功為止(在通知一直不成功的情況下,微信總共會發起多次通知,通知頻率為15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h - 總計 24h4m)

微信為何要這麼設計

微信結果通知本質就是傳送一個網路請求到不同的伺服器上,既然是一個網路請求,就可能因為各種原因導致請求超時或者失敗,比如:

  • 請求的伺服器掛了
  • 網路發生了波動
  • 伺服器響應異常

以上原因都會導致支付結果通知接收失敗,也就無法通知給使用者。為了解決上述的問題,就需要引入重試機制,當請求無法應答時,就需要重試幾次,保證請求能確認傳送。

非同步通知的重試機制

從微信支付通知可以引申到所有的非同步通知,或者和第三方對接時。如果要確保通知能被成功的接收,就需要考慮請求失敗的情況,大部分都是需要使用重試機制。而重試機制是隔段時間不是固定的,是越來越大的,這是考慮到重試時,由於網路故障或者伺服器故障重啟裝置需要花一段時間,而間隔時間越來越長就可以更大的保證請求可以被成功接收

重複請求,介面需要考慮重複請求的情況,要設計成一個冪等性介面,多次請求和請求一次的效果是一致的。

重試機制的實現

重試機制就是一個定時器,隔一段時間執行一次,沒有預期的效果就再重複執行一次。

實現的難點就在於,間隔的時間是不一致的,如果時間的間隔是固定的話,就可以使用定時任務。

方案一:定時任務(不可行)

使用定時器,每隔一段時間執行一次任務。在 SpringBoot 啟動類新增 @EnableScheduling 註解,然後在執行的方法新增 @Scheduled 註解。

@Scheduled(fixedDelay = 1000*2)
public void test2() {
    Date date = new Date();
    System.out.println("tesk2 " + date);
}

以上表示每隔 2 秒執行一次。間隔時間都是固定的,這個不符合預期,因為要求的時間間隔是依次增加的。

如果是間隔時間是固定的,那定時任務就符合條件嗎?

如果是隻有一條任務在執行,執行不成功,存放在 Redis 中,然後定時執行任務,如果任務執行成功,就去掉任務。但是定時器還是會定時執行。

如果執行的任務很多的話,前面的任務要等待後續的任務執行,那延遲就很嚴重了,就需要使用到多執行緒,開啟多個執行緒,在《阿里Java開發手冊》有一條:

執行緒資源必須通過執行緒池提供,不允許在應用中自行顯式建立執行緒。

定時任務有以下幾個缺點不滿足:

  • 時間間隔固定。
  • 只能單執行緒處理任務,任務越多,延遲性越久。

方案二:執行緒池 + 定時任務 (不可行)

既然使用單執行緒會產生延遲,就使用執行緒池來降低延遲,因為發起請求屬於 IO 密集型,所以執行緒數設定成 CPU 個數的兩倍,在 SpringBoot 自定義一個執行緒池:

@Configuration
public class ThreadPoolConfig {

    // 執行緒存活時間
    private static int keepAliveTime = 10;

    // 呼叫執行緒執行多餘任務
    RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();

    @Bean("customerTaskExecutor")
    public TaskExecutor taskExecutor() {
        // 核心執行緒數
        int cores = Runtime.getRuntime().availableProcessors()*2;
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(cores);
        executor.setMaxPoolSize(cores);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setRejectedExecutionHandler(handler);
        executor.setThreadNamePrefix("Custom-");  // 執行緒名字首
        executor.initialize();
        return executor;
    }
}

其中核心執行緒數和最大執行緒數設定成一致,拒絕策略使用呼叫執行緒執行多餘的任務,確保每個任務都能執行。然後新增一個非同步方法.

public interface AsyncService {

    void executeAsync();
}

@Service
@Slf4j
public class AsyncServiceImpl implements AsyncService {

    @Override
    @Async("customerTaskExecutor")
    public void executeAsync() {
        log.info("【開始執行任務】");
        // 延遲幾秒
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("【結束執行任務】");

    }
}

使用 sleep 方法延遲,模擬請求,使用壓測工具,發起 100 次請求,控制檯輸出如下:

2023-10-31 18:00:32.792  INFO 53009 --- [       Custom-1] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.811  INFO 53009 --- [       Custom-2] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.813  INFO 53009 --- [       Custom-3] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.814  INFO 53009 --- [       Custom-4] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.816  INFO 53009 --- [       Custom-5] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.817  INFO 53009 --- [       Custom-6] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.819  INFO 53009 --- [       Custom-7] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.820  INFO 53009 --- [       Custom-8] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.821  INFO 53009 --- [       Custom-9] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.823  INFO 53009 --- [      Custom-10] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.824  INFO 53009 --- [      Custom-11] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:32.825  INFO 53009 --- [      Custom-12] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】
2023-10-31 18:00:33.296  INFO 53009 --- [       Custom-1] com.jeremy.threadpool.AsyncServiceImpl   : 【結束執行任務】
2023-10-31 18:00:33.296  INFO 53009 --- [       Custom-1] com.jeremy.threadpool.AsyncServiceImpl   : 【開始執行任務】

採用執行緒池執行的任務,多個執行緒同時執行任務,能有效的降低了任務的延遲性。定時任務間隔固定時間從資料庫 Mysql 或者 Redis 獲取需要請求的資料,同時執行請求。

這樣就有幾個問題:

  • 間隔是固定的。
  • 空閒的時候沒有請求執行,到了執行時間,大量的請求在執行,導致閒的時候閒死,忙的時候忙死。資源得不到很好的利用。

除了定時器,還有什麼元件可以解決上面問題,那就是使用訊息中介軟體了。

方案三:訊息中介軟體 + 執行緒池(可行)

使用執行緒池的方式開啟多個執行緒執行。那針對固定時間間隔和只能同時執行的問題使用訊息中介軟體就能很好的解決問題,訊息中介軟體採用生產+消費模型實現訊息的生產和消費,

延遲佇列

本文使用訊息中介軟體 RabbitMQ實現延遲佇列,具體實現可以看我的另外一篇文章延遲佇列實現訂單超時自動取消,具體實現流程圖試下如下。

請求傳送失敗之後,呼叫生產者傳送訊息,經過設定的時間間隔之後,傳送給消費者,消費端再次發起請求,如果請求失敗,再呼叫生產者傳送訊息,並設定好下一次的時間間隔,其中消費端發起任務使用執行緒池發起請求。

下載 RabbitMQ 延遲訊息的外掛 delayed_message_exchange,

Github官網找到對應的版本,我選擇的是 3.8.17:

設定延遲佇列:

@Configuration
public class XDelayedMessageConfig {

	/**
	 * 延遲交換機
	 */
	public static final String DELAYED_EXCHANGE = "exchange.delayed";

	/**
	 * 重試佇列
	 */
	public static final String RETRY_QUEUE = "queue.retry";


	/**
	 * 重試routing key
	 */
	public static final String RETRY_ROUTING_KEY = "routingKey.bind.retry";

    @Bean
	public Queue retryQueue() {
		return new Queue(RETRY_QUEUE,true);
	}

	
	/**
	 * 定義延遲交換機
	 * 交換機的型別為 x-delayed-message
	 * @return
	 */
	@Bean
	public CustomExchange delayedExchange() {
		Map<String,Object> map = new HashMap<>();
		map.put("x-delayed-type","direct");
		return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
	}



	@Bean
	public Binding retryQueueBinding() {
		return BindingBuilder.bind(retryQueue()).to(delayedExchange()).with(RETRY_ROUTING_KEY).noargs();
	}



}

在傳送端模擬重試機制,設定時間間隔 5、10、30 秒。

@Autowired
private RabbitTemplate rabbitTemplate;

private final int[] INTERVAL_ARRAY= {5,10,30};

@GetMapping("/retry")
public String retry(int index) {
    if (index >= 0 && index <= 2) {
        send(index +",延遲" + INTERVAL_ARRAY[index] + "s",INTERVAL_ARRAY[index]);
    }
    return "ok";
}


private void send(String message,Integer delayTime) {
    message = message + " " + DateUtil.dateFormat(new Date());
    System.out.println("【傳送訊息】" + message);
    rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.RETRY_ROUTING_KEY,
            message, message1 -> {
                message1.getMessageProperties().setDelay(delayTime*1000);
                return message1;
            });
}

接收端:

@RabbitListener(queues = XDelayedMessageConfig.RETRY_QUEUE)
public void delayProcess(String msg, Channel channel, Message message) {
    System.out.println("【接收訊息】" + msg + " 當前時間" + DateUtil.dateFormat(new Date()));
    try {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (IOException e) {
        e.printStackTrace();
    }
    int index = Integer.parseInt(msg.split(",")[0]);
    retry(++index);
}

控制檯輸出:

【傳送訊息】0,延遲5s 10:59:29
【接收訊息】0,延遲5s 10:59:29 當前時間10:59:33
【傳送訊息】1,延遲10s 10:59:33
【接收訊息】1,延遲10s 10:59:33 當前時間10:59:43
【傳送訊息】2,延遲30s 10:59:43
【接收訊息】2,延遲30s 10:59:43 當前時間11:00:10

其中 0、1、2表示重試的次數。通過延遲訊息的方式,重試傳送資訊。每個任務作為一個訊息進行消費。和定時服務相比,有以下幾個優點:

  • 支援動態間隔
  • 任務不是同時執行,降低伺服器的壓力。

總結

在傳送一些非同步通知時候,需要考慮到通知可能接收失敗的情況,比如:

  • 請求的伺服器掛了。
  • 網路發生了波動。
  • 伺服器響應異常,服務重啟。

此時無法正確的及時推播通知,無法保證通知的可靠性。這個時候就需要重試多次,而且間隔要依次增加,因為服務啟動或者網路的卡頓在經過一段時間就恢復了。後續重試成功的概率就更高了。

  • 定時重試
    • 定時重試首先不符合變化的間隔時間,間隔的時間是固定的,重試的任務都堆積在一起請求,這樣也會給伺服器造成很大的壓力。而空閒的時候,伺服器的利用率有比較低。
    • 同時請求,只能一個一個同步執行任務,同時執行的任務越多,延遲就越嚴重。
  • 定時任務 + 執行緒池
    • 為了解決同時處理任務,新增了自定義的執行緒池,因為請求屬於 IO 密集型,所以設定執行緒數為 CPU 核數的兩倍。
    • 多個任務執行,降低了延遲性。
    • 無法滿足動態間隔時間的問題,而且同時請求伺服器壓力大。
  • 延遲佇列 + 執行緒池
    • 延遲時間請求可以使用到延遲佇列,每個任務都作為一個訊息。每次處理不成功,就傳送訊息到延遲佇列中,到達時間間隔之後,再消費訊息。如果請求失敗再重複以上操作。
    • 消費者處理訊息,使用執行緒池處理,加快處理速度。也可以開啟多臺伺服器分發處理任務,加快處理速度,降低任務的延遲性。