一種非同步延遲佇列的實現方式

2023-03-22 12:03:58

作者:京東零售 張路瑤

1.應用場景

目前系統中有很多需要用到延時處理的功能:支付超時取消、排隊超時、簡訊、微信等提醒延遲傳送、token重新整理、會員卡過期等等。通過延時處理,極大的節省系統的資源,不必輪詢資料庫處理任務。

目前大部分功能通過定時任務完成,定時任務還分使用quartz及xxljob兩種型別輪詢時間短,每秒執行一次,對資料庫造成一定的壓力,並且會有1秒的誤差。輪詢時間久,如30分鐘一次,03:01插入一條資料,正常3:31執行過期,但是3:30執行輪詢時,掃描3:00-3:30的資料,是掃描不到3:31的資料的,需要4:00的時候才能掃描到,相當於多延遲了29分鐘!

2.延時處理方式調研

1.DelayQueue

1.實現方式:

jvm提供的延遲阻塞佇列,通過優先順序佇列對不同延遲時間任務進行排序,通過condition進行阻塞、睡眠dealy時間 獲取延遲任務。

當有新任務加入時,會判斷新任務是否是第一個待執行的任務,若是,會解除佇列睡眠,防止新加入的元素時需要執行的元素而不能正常被執行執行緒獲取到。

2.存在的問題:

1.單機執行,系統宕機後,無法進行有效的重試

2.沒有執行記錄和備份

3.沒有重試機制

4.系統重啟時,會將任務清空!

5.不能分片消費

3.優勢:實現簡單,無任務時阻塞,節省資源,執行時間準確

2.延遲佇列mq

實現方式:依賴mq,通過設定延遲消費時間,達到延遲消費功能。像rabbitMq、jmq都可以設定延遲消費時間。RabbitMq通過將訊息設定過期時間,放入死信佇列進行消費實現。

存在的問題:

1.時間設定不靈活,每個queue是固定的到期時間,每次新建立延時佇列,需要建立新的訊息佇列

優點:依靠jmq,可以有效的監控、消費記錄、重試,具備多機同時消費能力,不懼怕宕機

3.定時任務

通過定時任務輪詢符合條件的資料

缺點:

1.必須要讀業務資料庫,對資料庫造成一定的壓力,

2.存在延時

3.一次掃描資料量過大時,佔用過多的系統資源。

4. 無法分片消費

優點:

1.消費失敗後,下次還能繼續消費,具備重試能力,

2.消費能力穩定

4.redis

任務儲存在redis中,使用redis的 zset佇列根據score進行排序,程式通過執行緒不斷獲取佇列資料消費,實現延時佇列

優點:

1、查詢redis相比較資料庫快,set佇列長度過大,會根據跳錶結構進行查詢,效率高

2、redis可根據時間戳進行排序,只需要查詢當前時間戳內的分數的任務即可

3、無懼機器重啟

4、分散式消費

缺點:

1.受限於redis效能,並行10W

2.多個命令無法保證原子性,使用lua指令碼會要求所有資料都在一個redis分片上。

5. 時間輪

通過時間輪實現的延遲任務執行,也是基於jvm單機執行,如kafka、netty都有實現時間輪,redisson的看門狗也是通過netty的時間輪實現的。

缺點:不適合分散式服務的使用,宕機後,會丟失任務。

3.實現目標

相容目前在使用的非同步事件元件,並提供更可靠,可重試、有記錄、可監控報警、高效能的延遲元件。

•訊息傳輸可靠性:訊息進入到延遲佇列後,保證至少被消費一次。

•Client支援豐富:支援多重語言。

•高可用性:支援多範例部署。掛掉一個範例後,還有後備範例繼續提供服務。

•實時性:允許存在一定的時間誤差。

•支援訊息刪除:業務使用方,可以隨時刪除指定訊息。

•支援消費查詢

•支援手動重試

•對當前非同步事件的執行增加監控

4.架構設計

5.延遲元件實現方式

1.實現原理

目前選擇使用jimdb通過zset實現延時功能,將任務id和對應的執行時間作為score存在在zset佇列中,預設會按照score排序,每次取0-當前時間內的score的任務id,

傳送延遲任務時,會根據時間戳+機器ip+queueName+sequence 生成唯一的id,構造訊息體,加密後放入zset佇列中。

通過搬運執行緒,將達到執行時間的任務移動到釋出佇列中,等待消費者獲取。

監控方通過整合ump

消費記錄通過redis備份+資料庫持久化完成。

通過快取實現的方式,只是實現的一種,可以通過引數控制使用哪一種實現方式,並可通過spi自由擴充套件。

2.訊息結構

每個Job必須包含一下幾個屬性:

•Topic:Job型別,即QueueName

•Id:Job的唯一標識。用來檢索和刪除指定的Job資訊。

•Delay:Job需要延遲的時間。單位:秒。(伺服器端會將其轉換為絕對時間)

•Body:Job的內容,供消費者做具體的業務處理,以json格式儲存。

•traceId:傳送執行緒的traceId,待後續pfinder支援設定traceId後,可與傳送執行緒公用同一個traceiD,便於紀錄檔追蹤

具體結構如下圖表示:

TTR的設計目的是為了保證訊息傳輸的可靠性。

3.資料流轉及流程圖

基於redis-disruptor方式進行釋出、消費,可以作為訊息來進行使用,消費者採用原有非同步事件的disruptor無鎖佇列消費,不同應用、不同queue之間無鎖

1.支援應用只發布,不消費,達到訊息佇列的功能。

2:支援分桶,針對大key問題,若事件多,可以設定延遲佇列和任務佇列桶的數量,減小因大key造成的redis阻塞問題。

3: 通過ducc設定,進行效能的擴充套件,目前只支援開啟消費和關閉消費。

4: 支援設定超時時間設定,防止消費執行緒執行過久

瓶頸: 消費速度慢,生產速度過快,會導致ringbuffer佇列佔滿,當前應用既是生產者也是消費者時,生產者會休眠,效能取決於消費速度,可通過水平擴充套件機器,直接提升效能。監控redis佇列的長度,若不斷增長,可考慮增加消費者,直接提高效能。

可能出現的情況: 因一個應用公用一個disruptor,擁有64個消費者執行緒,如果某一個事件消費過慢,導致64個執行緒都在消費這個事件,會導致其他事件無消費執行緒消費,生產者執行緒也被阻塞,導致所有事件的消費都被阻塞。

後期觀察是否有這個效能瓶頸,可給每一個queue一個消費者執行緒池。

6.demo範例

增加組態檔

判斷是否開啟jd.event.enable:true

<dependency> <groupId>com.jd.car</groupId>
 <artifactId>senna-event</artifactId>
 <version>1.0-SNAPSHOT</version> </dependency>

設定

jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle

消費程式碼:

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {
log.info("Handler開始消費:{}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler開始消費:{}", key);
}
}

註解形式:

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {

@Override
protected void onHandle(String key, String eventType) {
log.info("Handler開始消費:{}", key);
}

@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler開始消費:{}", key);
}
}

傳送程式碼


package com.jd.car.senna.admin.controller;

import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;


/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {

@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;

@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {
log.info("傳送無延遲訊息");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {
log.info("傳送延遲5秒訊息");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";
}

@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {
log.info("傳送延遲到2022-04-02 00:00:00執行的訊息");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
} 

}

參考有贊設計:https://tech.youzan.com/queuing_delay/

7.目前應用:

1.雲修到店排隊24小時後自動取消

2..美團請求token定時重新整理。

3.質保卡延期24小時生成

5. 結算單延期生成

6.簡訊延遲傳送