基於Redis的簡易延時佇列

2023-12-09 18:00:18

基於Redis的簡易延時佇列

一、背景

在實際的業務場景中,經常會遇到需要延時處理的業務,比如訂單超時未支付,需要取消訂單,或者是使用者註冊後,需要在一段時間內啟用賬號,否則賬號失效等等。這些業務場景都可以通過延時佇列來實現。
最近在實際業務當中就遇到了這樣的一個場景,需要實現一個延時佇列,用來處理訂單超時未支付的業務。在網上找了一些資料,發現大部分都是使用了mq來實現,比如rabbitmq,rocketmq等等,但是這些mq都是需要安裝的,而且還需要設定,對於此專案來說不想增加額外的依賴,所以就想到了使用redis來實現一個簡易的延時佇列。

二、實現思路

1. 業務場景

訂單超時未支付,需要取消訂單,這個業務場景可以分為兩個步驟來實現:

  1. 使用者下單後,將訂單資訊存入資料庫,並將訂單資訊存入延時佇列中,設定延時時間為30分鐘。
  2. 30分鐘後,從延時佇列中取出訂單資訊,判斷訂單是否已支付,如果未支付,則取消訂單。
  3. 如果使用者在30分鐘內支付了訂單,則將訂單從延時佇列中刪除。

2. 實現思路

  1. 使用redis的zset來實現延時佇列,zset的score用來儲存訂單的超時時間,value用來儲存訂單資訊。
  2. 使用redis的set來儲存已支付的訂單,set中的value為訂單id。

三、實現程式碼

1. 使用了兩個註解類分別標記生產者類、生產者方法,消費者方法

/**
 * @program: 
 * @description: redis延時佇列生產者類註解,標記生產者類,用來掃描生產者類中的生產者方法,將生產者方法註冊到redis延時佇列中
 * @author: jiangchengxuan
 * @created: 2023/12/09 10:32
 */
@Component
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisMessageQueue {}
/**
 * @program: 
 * @description: 
 * 帶有此註解的方法,方法的入參首先會被轉換為json字串,然後存入redis的zset中,score為當前時間+延時時間,value為json字串
 * 當延時時間到達後,會從redis的zset中取出value,然後將value轉換為入參型別,呼叫此方法,執行業務邏輯
 * 此註解只能標記在方法上,且方法必須為public,且只能有一個引數
 * 此註解標記的方法,必須在redis延時佇列生產者類中,否則不會生效
 * @author: jiangchengxuan
 * @created:  2023/12/09 10:37
 */
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisMessageQueueMethod {
    String threadName() default "redis訊息佇列預設執行緒";
    String queueKey();   // 佇列key值
    int threadNum() default 1;      //預設執行緒數量
    int threadSleepTime() default 500;  //預設執行緒休眠時間預設500ms
}

2. 生產者類具體實現

/**
 * @program: 
 * @description:  生產者類具體實現
 * @author: jiangchengxuan
 * @created: 2023/12/09 10:44
 */
@Slf4j
@Component
public class DelayQueueWorkerConfig implements InitializingBean {
    private volatile boolean monitorStarted = false;

    private volatile boolean monitorShutDowned = false;

    private ExecutorService executorService;

    // 需要監控的延時佇列
    @Autowired
    protected IDelayQueue<String> monitorQueue;

    @Autowired
    private ApplicationContext applicationContext;


    @Override
    public void afterPropertiesSet(){
        //spring工具類,可以獲取指定註解的類
        Map<String, Object> allNeedClass = applicationContext.getBeansWithAnnotation(RedisMessageQueue.class);
        for (Map.Entry<String, Object> entry : allNeedClass.entrySet()) {
            Object bean = entry.getValue();
            Method[] methods = bean.getClass().getMethods();
            for (Method method : methods) {
                Annotation[] annotations = method.getDeclaredAnnotations();
                for (Annotation annotation : annotations) {
                    if (annotation instanceof RedisMessageQueueMethod) {
                        RedisMessageQueueMethod queueMethod = (RedisMessageQueueMethod) annotation;
                        //找的需要使用訊息佇列的方法後,
                        initExecuteQueue(queueMethod, method, bean);
                    }
                    }
                }
            }
        }


    /**
     * 初始化執行造作
     * @param queueAnnotations 註解
     * @param method 方法
     * @param bean 物件
     */
    void initExecuteQueue(RedisMessageQueueMethod queueAnnotations ,Method method,Object bean) {
        String threadName = queueAnnotations.threadName();
        int threadNum = queueAnnotations.threadNum();
        int threadSheepTime = queueAnnotations.threadSleepTime();
        String queueKey = queueAnnotations.queueKey();
        //獲取所有訊息佇列名稱
        executorService = Executors.newFixedThreadPool(threadNum);
        for (int i = 0; i < threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                Thread.currentThread().setName(threadName + "[" + num + "]");
                //如果沒有設定佇列queuekey或者已經暫停則不執行
                while (!monitorShutDowned) {
                    String value = null;
                    try {
                        value = monitorQueue.get(queueKey);
                        // 獲取資料時進行刪除操作,刪除成功,則進行處理,業務邏輯處理失敗則繼續新增回佇列但是時間設定最大以達到儲存現場的目的,防止並行獲取重複資料
                        if (StringUtils.isNotEmpty(value)) {
                            if (log.isDebugEnabled()) {
                                log.debug("Monitor Thread[" + Thread.currentThread().getName() + "], get from queue,value = {}", value);
                            }
                            boolean success = (Boolean) method.invoke(bean, value);
                            // 失敗重試
                            if (!success) {
                                success =  (Boolean) method.invoke(bean, value);;
                                if (!success) {
                                    log.warn("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = {}", value);
                                    monitorQueue.add(TimeUnit.DAYS,365, value, queueKey);
                                }
                            } else {
                                if (log.isDebugEnabled()) {
                                    log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:execute successfully!values = {}", value);
                                }
                            }
                        } else {
                            if (log.isDebugEnabled()) {
                                log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:monitorThreadRunning = {}", monitorStarted);
                            }
                            Thread.sleep(threadSheepTime);
                        }
                    } catch (Exception e) {
                        log.error("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = " + value, e);
                    }
                }
                log.info("Monitor Thread[" + Thread.currentThread().getName() + "] Completed...");
            });
        }
        log.info("thread pool is started...");
    }

}
/**
 * @program: 
 * @description: 
 * 延時佇列介面實現類,
 * 使用redis的zset實現延時佇列,
 * @author: jiangchengxuan
 * @created:  2023/12/09 23:34
 */
public interface IDelayQueue <E> {
    /**
     * 向延時佇列中新增資料
     *
     * @param score 分數
     * @param data  資料
     * @return true 成功 false 失敗
     */
    boolean add(long score, E data,String queueKey);


    /**
     * 向延時佇列中新增資料
     *
     * @param timeUnit 時間單位
     * @param time     延後時間
     * @param data     資料
     * @param queueKey
     * @return true 成功 false 失敗
     */
    boolean add(TimeUnit timeUnit, long time, E data, String queueKey);

    /**
     * 從延時佇列中獲取資料
     * @param queueKey 佇列key
     * @return 資料
     */
    String get(String queueKey);

    /**
     * 刪除資料
     *
     * @param key
     * @param data 資料
     * @return
     */
    public<T> boolean rem(String key, T data) ;
}
/**
 * @program: 
 * @description:  redis操作類,封裝了redis的操作方法,使用時直接注入即可使用,不需要關心redis的操作細節,使用時只需要關心業務邏輯即可
 * @author: jiangchengxuan
 * @created: 2023/12/09 23:35
 */
@Service
public class RedisDelayQueue implements IDelayQueue<String> {

    @Autowired
    private RedisService redisService;


    @Override
    public boolean add(long score, String data,String queueKey) {
        return redisService.opsForZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, data, score);
    }

    @Override
    public boolean add(TimeUnit timeUnit, long time, String data, String queueKey) {
        switch (timeUnit) {
            case SECONDS:
                return add(LocalDateTime.now().plusSeconds(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data, queueKey);
            case MINUTES:
                return add(LocalDateTime.now().plusMinutes(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            case HOURS:
                return add(LocalDateTime.now().plusHours(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            case DAYS:
                return add(LocalDateTime.now().plusDays(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            default:
                return false;
        }
    }


    @Override
    public String get(String queueKey) {
        long now = System.currentTimeMillis();
        long min = Long.MIN_VALUE;
        Set<String> res = redisService.rangeByScoreZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, min, now, 0, 10);
        if (!CollectionUtils.isEmpty(res)) {
            for (String data : res){
                // 刪除成功,則進行處理,防止並行獲取重複資料
                if (rem(queueKey, data)){
                    return data;
                }
            }
        }
        return null;
    }


    @Override
    public<T> boolean rem(String key, T data) {
        return redisService.remZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+key, data);
    }
}
  1. 使用
@RedisMessageQueue
public class SomethingClass
{
    @Autowired
    private IDelayQueue<String> messageQueue;

    /**
     * 生產者,向佇列中新增資料,30秒後消費者進行消費
     */
    public void test(){
        messageQueue.add(TimeUnit.SECONDS,30L,"這是引數資料","new_queue");
    }
    
    /**
     * 消費者,如果按此設定的話,會啟動一個執行緒,執行緒名稱為:測試執行緒名稱,執行緒數量為1,執行緒休眠時間為10毫秒
     * 注意:queueKey需要與生產者中的queueKey保持一致才能進行消費
     * @param data 
     */
    @Override
    @RedisMessageQueueMethod(threadName = "測試執行緒名稱",queueKey = "new_queue",threadNum = 1,threadSleepTime = 10)
    public void testMethod(String data) {
        //do something
    }

}