如何實現延遲任務,這11種方式才算優雅!

2023-02-28 15:00:46

大家好,我是三友~~

延遲任務在我們日常生活中比較常見,比如訂單支付超時取消訂單功能,又比如自動確定收貨的功能等等。

所以本篇文章就來從實現到原理來盤點延遲任務的11種實現方式,這些方式並沒有絕對的好壞之分,只是適用場景的不大相同。

微信公眾號:三友的java日記

DelayQueue

DelayQueue是JDK提供的api,是一個延遲佇列

DelayQueue泛型引數得實現Delayed介面,Delayed繼承了Comparable介面。

getDelay方法返回這個任務還剩多久時間可以執行,小於0的時候說明可以這個延遲任務到了執行的時間了。

compareTo這個是對任務排序的,保證最先到延遲時間的任務排到佇列的頭。

來個demo

@Getter
public class SanYouTask implements Delayed {

    private final String taskContent;

    private final Long triggerTime;

    public SanYouTask(String taskContent, Long delayTime) {
        this.taskContent = taskContent;
        this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return this.triggerTime.compareTo(((SanYouTask) o).triggerTime);
    }

}

SanYouTask實現了Delayed介面,構造引數

  • taskContent:延遲任務的具體的內容
  • delayTime:延遲時間,秒為單位

測試

@Slf4j
public class DelayQueueDemo {

    public static void main(String[] args) {
        DelayQueue<SanYouTask> sanYouTaskDelayQueue = new DelayQueue<>();

        new Thread(() -> {
            while (true) {
                try {
                    SanYouTask sanYouTask = sanYouTaskDelayQueue.take();
                    log.info("獲取到延遲任務:{}", sanYouTask.getTaskContent());
                } catch (Exception e) {
                }
            }
        }).start();

        log.info("提交延遲任務");
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記5s"5L));
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記3s"3L));
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記8s"8L));
    }
}

開啟一個執行緒從DelayQueue中獲取任務,然後提交了三個任務,延遲時間分為別5s,3s,8s。

測試結果:

成功實現了延遲任務。

實現原理

offer方法在提交任務的時候,會通過根據compareTo的實現對任務進行排序,將最先需要被執行的任務放到佇列頭。

take方法獲取任務的時候,會拿到佇列頭部的元素,也就是佇列中最早需要被執行的任務,通過getDelay返回值判斷任務是否需要被立刻執行,如果需要的話,就返回任務,如果不需要就會等待這個任務到延遲時間的剩餘時間,當時間到了就會將任務返回。

Timer

Timer也是JDK提供的api

先來demo

@Slf4j
public class TimerDemo {

    public static void main(String[] args) {
        Timer timer = new Timer();
        
        log.info("提交延遲任務");
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                log.info("執行延遲任務");
            }
        }, 5000);
    }

}

通過schedule提交一個延遲時間為5s的延遲任務

實現原理

提交的任務是一個TimerTask

public abstract class TimerTask implements Runnable {
    //忽略其它屬性
    
    long nextExecutionTime;
}

TimerTask內部有一個nextExecutionTime屬性,代表下一次任務執行的時間,在提交任務的時候會計算出nextExecutionTime值。

Timer內部有一個TaskQueue物件,用來儲存TimerTask任務的,會根據nextExecutionTime來排序,保證能夠快速獲取到最早需要被執行的延遲任務。

在Timer內部還有一個執行任務的執行緒TimerThread,這個執行緒就跟DelayQueue demo中開啟的執行緒作用是一樣的,用來執行到了延遲時間的任務。

所以總的來看,Timer有點像整體封裝了DelayQueue demo中的所有東西,讓用起來簡單點。

雖然Timer用起來比較簡單,但是在阿里規範中是不推薦使用的,主要是有以下幾點原因:

  • Timer使用單執行緒來處理任務,長時間執行的任務會導致其他任務的延時處理
  • Timer沒有對執行時異常進行處理,一旦某個任務觸發執行時異常,會導致整個Timer崩潰,不安全

ScheduledThreadPoolExecutor

由於Timer在使用上有一定的問題,所以在JDK1.5版本的時候提供了ScheduledThreadPoolExecutor,這個跟Timer的作用差不多,並且他們的方法的命名都是差不多的,但是ScheduledThreadPoolExecutor解決了單執行緒和異常崩潰等問題。

來個demo

@Slf4j
public class ScheduledThreadPoolExecutorDemo {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2new ThreadPoolExecutor.CallerRunsPolicy());

        log.info("提交延遲任務");
        executor.schedule(() -> log.info("執行延遲任務"), 5, TimeUnit.SECONDS);
    }

}

結果

實現原理

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,也就是繼承了執行緒池,所以可以有很多個執行緒來執行任務。

ScheduledThreadPoolExecutor在構造的時候會傳入一個DelayedWorkQueue阻塞佇列,所以執行緒池內部的阻塞佇列是DelayedWorkQueue。

在提交延遲任務的時候,任務會被封裝一個任務會被封裝成ScheduledFutureTask物件,然後放到DelayedWorkQueue阻塞佇列中。

ScheduledFutureTask
ScheduledFutureTask

ScheduledFutureTask實現了前面提到的Delayed介面,所以其實可以猜到DelayedWorkQueue會根據ScheduledFutureTask對於Delayed介面的實現來排序,所以執行緒能夠獲取到最早到延遲時間的任務。

當執行緒從DelayedWorkQueue中獲取到需要執行的任務之後就會執行任務。

RocketMQ

RocketMQ是阿里開源的一款訊息中介軟體,實現了延遲訊息的功能,如果有對RocketMQ不熟悉的小夥伴可以看一下我之前寫的RocketMQ保姆級教學RocketMQ訊息短暫而又精彩的一生 這兩篇文章。

RocketMQ延遲訊息的延遲時間預設有18個等級。

當傳送訊息的時候只需要指定延遲等級即可。如果這18個等級的延遲時間不符和你的要求,可以修改RocketMQ伺服器端的組態檔。

來個demo

依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
  
<!--web依賴-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

組態檔

rocketmq:
  name-server: 192.168.200.144:9876 #伺服器ip:nameServer埠
  producer:
    group: sanyouProducer

controller類,通過DefaultMQProducer傳送延遲訊息到sanyouDelayTaskTopic這個topic,延遲等級為2,也就是延遲時間為5s的意思。

@RestController
@Slf4j
public class RocketMQDelayTaskController {

    @Resource
    private DefaultMQProducer producer;

    @GetMapping("/rocketmq/add")
    public void addTask(@RequestParam("task") String task) throws Exception {
        Message msg = new Message("sanyouDelayTaskTopic""TagA", task.getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg.setDelayTimeLevel(2);
        // 傳送訊息並得到訊息的傳送結果,然後列印
        log.info("提交延遲任務");
        producer.send(msg);
    }

}

建立一個消費者,監聽sanyouDelayTaskTopic的訊息。

@Component
@RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouDelayTaskTopic")
@Slf4j
public class SanYouDelayTaskTopicListener implements RocketMQListener<String{

    @Override
    public void onMessage(String msg) {
        log.info("獲取到延遲任務:{}", msg);
    }

}

啟動應用,瀏覽器輸入以下連結新增任務

http://localhost:8080/rocketmq/add?task=sanyou

測試結果:

實現原理

生產者傳送延遲訊息之後,RocketMQ伺服器端在接收到訊息之後,會去根據延遲級別是否大於0來判斷是否是延遲訊息

  • 如果不大於0,說明不是延遲訊息,那就會將訊息儲存到指定的topic中
  • 如果大於0,說明是延遲訊息,此時RocketMQ會進行一波偷樑換柱的操作,將訊息的topic改成SCHEDULE_TOPIC_XXXX中,XXXX不是預留位置,然後儲存。

在BocketMQ內部有一個延遲任務,相當於是一個定時任務,這個任務就會獲取SCHEDULE_TOPIC_XXXX中的訊息,判斷訊息是否到了延遲時間,如果到了,那麼就會將訊息的topic儲存到原來真正的topic(拿我們的例子來說就是sanyouDelayTaskTopic)中,之後消費者就可以從真正的topic中獲取到訊息了。

定時任務
定時任務

RocketMQ這種實現方式相比於前面提到的三種更加可靠,因為前面提到的三種任務內容都是存在記憶體的,伺服器重啟任務就丟了,如果要實現任務不丟還得自己實現邏輯,但是RocketMQ訊息有持久化機制,能夠保證任務不丟失。

RabbitMQ

RabbitMQ也是一款訊息中介軟體,通過RabbitMQ的死信佇列也可以是先延遲任務的功能。

demo

引入RabbitMQ的依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

組態檔

spring:
  rabbitmq:
    host: 192.168.200.144 #伺服器ip
    port: 5672
    virtual-host: /

RabbitMQ死信佇列的設定類,後面說原理的時候會介紹幹啥的

@Configuration
public class RabbitMQConfiguration {
    
    @Bean
    public DirectExchange sanyouDirectExchangee() {
        return new DirectExchange("sanyouDirectExchangee");
    }

    @Bean
    public Queue sanyouQueue() {
        return QueueBuilder
                //指定佇列名稱,並持久化
                .durable("sanyouQueue")
                //設定佇列的超時時間為5秒,也就是延遲任務的時間
                .ttl(5000)
                //指定死信交換機
                .deadLetterExchange("sanyouDelayTaskExchangee")
                .build();
    }

    @Bean
    public Binding sanyouQueueBinding() {
        return BindingBuilder.bind(sanyouQueue()).to(sanyouDirectExchangee()).with("");
    }

    @Bean
    public DirectExchange sanyouDelayTaskExchange() {
        return new DirectExchange("sanyouDelayTaskExchangee");
    }

    @Bean
    public Queue sanyouDelayTaskQueue() {
        return QueueBuilder
                //指定佇列名稱,並持久化
                .durable("sanyouDelayTaskQueue")
                .build();
    }

    @Bean
    public Binding sanyouDelayTaskQueueBinding() {
        return BindingBuilder.bind(sanyouDelayTaskQueue()).to(sanyouDelayTaskExchange()).with("");
    }

}

RabbitMQDelayTaskController用來傳送訊息,這裡沒指定延遲時間,是因為在宣告佇列的時候指定了延遲時間為5s

@RestController
@Slf4j
public class RabbitMQDelayTaskController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/rabbitmq/add")
    public void addTask(@RequestParam("task") String task) throws Exception {
        // 訊息ID,需要封裝到CorrelationData中
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("提交延遲任務");
        // 傳送訊息
        rabbitTemplate.convertAndSend("sanyouDirectExchangee""", task, correlationData);
    }

}

啟動應用,瀏覽器輸入以下連結新增任務

http://localhost:8080/rabbitmq/add?task=sanyou

測試結果,成功實現5s的延遲任務

實現原理

整個工作流程如下:

  • 訊息傳送的時候會將訊息傳送到sanyouDirectExchange這個交換機上
  • 由於sanyouDirectExchange繫結了sanyouQueue,所以訊息會被路由到sanyouQueue這個佇列上
  • 由於sanyouQueue沒有消費者消費訊息,並且又設定了5s的過期時間,所以當訊息過期之後,訊息就被放到繫結的sanyouDelayTaskExchange死信交換機中
  • 訊息到達sanyouDelayTaskExchange交換機後,由於跟sanyouDelayTaskQueue進行了繫結,所以訊息就被路由到sanyouDelayTaskQueue中,消費者就能從sanyouDelayTaskQueue中拿到訊息了

上面說的佇列與交換機的繫結關係,就是上面的設定類所幹的事。

其實從這個單從訊息流轉的角度可以看出,RabbitMQ跟RocketMQ實現有相似之處。

訊息最開始都並沒有放到最終消費者消費的佇列中,而都是放到一箇中間佇列中,等訊息到了過期時間或者說是延遲時間,訊息就會被放到最終的佇列供消費者訊息。

只不過RabbitMQ需要你顯示的手動指定訊息所在的中間佇列,而RocketMQ是在內部已經做好了這塊邏輯。

除了基於RabbitMQ的死信佇列來做,RabbitMQ官方還提供了延時外掛,也可以實現延遲訊息的功能,這個外掛的大致原理也跟上面說的一樣,延時訊息會被先儲存在一箇中間的地方,叫做Mnesia,然後有一個定時任務去查詢最近需要被投遞的訊息,將其投遞到目標佇列中。

監聽Redis過期key

在Redis中,有個釋出訂閱的機制

生產者在訊息傳送時需要到指定傳送到哪個channel上,消費者訂閱這個channel就能獲取到訊息。圖中channel理解成MQ中的topic。

並且在Redis中,有很多預設的channel,只不過向這些channel傳送訊息的生產者不是我們寫的程式碼,而是Redis本身。這裡面就有這麼一個channel叫做__keyevent@<db>__:expired,db是指Redis資料庫的序號。

當某個Redis的key過期之後,Redis內部會發佈一個事件到__keyevent@<db>__:expired這個channel上,只要監聽這個事件,那麼就可以獲取到過期的key。

所以基於監聽Redis過期key實現延遲任務的原理如下:

  • 將延遲任務作為key,過期時間設定為延遲時間
  • 監聽__keyevent@<db>__:expired這個channel,那麼一旦延遲任務到了過期時間(延遲時間),那麼就可以獲取到這個任務

來個demo

Spring已經實現了監聽__keyevent@*__:expired這個channel這個功能,__keyevent@*__:expired中的*代表萬用字元的意思,監聽所有的資料庫。

所以demo寫起來就很簡單了,只需4步即可

依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

組態檔

spring:
  redis:
    host: 192.168.200.144
    port: 6379

設定類

@Configuration
public class RedisConfiguration {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(connectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
        return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
    }

}

KeyExpirationEventMessageListener實現了對__keyevent@*__:expiredchannel的監聽

當KeyExpirationEventMessageListener收到Redis釋出的過期Key的訊息的時候,會發布RedisKeyExpiredEvent事件

所以我們只需要監聽RedisKeyExpiredEvent事件就可以拿到過期訊息的Key,也就是延遲訊息。

對RedisKeyExpiredEvent事件的監聽實現MyRedisKeyExpiredEventListener

@Component
public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent{

    @Override
    public void onApplicationEvent(RedisKeyExpiredEvent event) {
        byte[] body = event.getSource();
        System.out.println("獲取到延遲訊息:" + new String(body));
    }

}

程式碼寫好,啟動應用

之後我直接通過Redis命令設定訊息,就沒通過程式碼傳送訊息了,訊息的key為sanyou,值為task,值不重要,過期時間為5s

set sanyou task 

expire sanyou 5

成功獲取到延遲任務

雖然這種方式可以實現延遲任務,但是這種方式比較多

任務存在延遲

Redis過期事件的釋出不是指key到了過期時間就釋出,而是key到了過期時間被清除之後才會釋出事件。

而Redis過期key的兩種清除策略,就是面試八股文常背的兩種:

  • 惰性清除。當這個key過期之後,存取時,這個Key才會被清除
  • 定時清除。後臺會定期檢查一部分key,如果有key過期了,就會被清除

所以即使key到了過期時間,Redis也不一定會傳送key過期事件,這就到導致雖然延遲任務到了延遲時間也可能獲取不到延遲任務。

丟訊息太頻繁

Redis實現的釋出訂閱模式,訊息是沒有持久化機制,當訊息釋出到某個channel之後,如果沒有使用者端訂閱這個channel,那麼這個訊息就丟了,並不會像MQ一樣進行持久化,等有消費者訂閱的時候再給消費者消費。

所以說,假設服務重啟期間,某個生產者或者是Redis本身釋出了一條訊息到某個channel,由於服務重啟,沒有監聽這個channel,那麼這個訊息自然就丟了。

訊息消費只有廣播模式

Redis的釋出訂閱模式訊息消費只有廣播模式一種。

所謂的廣播模式就是多個消費者訂閱同一個channel,那麼每個消費者都能消費到釋出到這個channel的所有訊息。

如圖,生產者釋出了一條訊息,內容為sanyou,那麼兩個消費者都可以同時收到sanyou這條訊息。

所以,如果通過監聽channel來獲取延遲任務,那麼一旦服務範例有多個的話,還得保證訊息不能重複處理,額外地增加了程式碼開發量。

接收到所有key的某個事件

這個不屬於Redis釋出訂閱模式的問題,而是Redis本身事件通知的問題。

當監聽了__keyevent@<db>__:expired的channel,那麼所有的Redis的key只要發生了過期事件都會被通知給消費者,不管這個key是不是消費者想接收到的。

所以如果你只想消費某一類訊息的key,那麼還得自行加一些標記,比如訊息的key加個字首,消費的時候判斷一下帶字首的key就是需要消費的任務。

Redisson的RDelayedQueue

Redisson他是Redis的兒子(Redis son),基於Redis實現了非常多的功能,其中最常使用的就是Redis分散式鎖的實現,但是除了實現Redis分散式鎖之外,它還實現了延遲佇列的功能。

先來個demo

引入pom

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.1</version>
</dependency>

封裝了一個RedissonDelayQueue類

@Component
@Slf4j
public class RedissonDelayQueue {

    private RedissonClient redissonClient;

    private RDelayedQueue<String> delayQueue;
    private RBlockingQueue<String> blockingQueue;

    @PostConstruct
    public void init() {
        initDelayQueue();
        startDelayQueueConsumer();
    }

    private void initDelayQueue() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress("redis://localhost:6379");
        redissonClient = Redisson.create(config);

        blockingQueue = redissonClient.getBlockingQueue("SANYOU");
        delayQueue = redissonClient.getDelayedQueue(blockingQueue);
    }

    private void startDelayQueueConsumer() {
        new Thread(() -> {
            while (true) {
                try {
                    String task = blockingQueue.take();
                    log.info("接收到延遲任務:{}", task);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "SANYOU-Consumer").start();
    }

    public void offerTask(String task, long seconds) {
        log.info("新增延遲任務:{} 延遲時間:{}s", task, seconds);
        delayQueue.offer(task, seconds, TimeUnit.SECONDS);
    }

}

這個類在建立的時候會去初始化延遲佇列,建立一個RedissonClient物件,之後通過RedissonClient物件獲取到RDelayedQueue和RBlockingQueue物件,傳入的佇列名字叫SANYOU,這個名字無所謂。

當延遲佇列建立之後,會開啟一個延遲任務的消費執行緒,這個執行緒會一直從RBlockingQueue中通過take方法阻塞獲取延遲任務。

新增任務的時候是通過RDelayedQueue的offer方法新增的。

controller類,通過介面新增任務,延遲時間為5s

@RestController
public class RedissonDelayQueueController {

    @Resource
    private RedissonDelayQueue redissonDelayQueue;

    @GetMapping("/add")
    public void addTask(@RequestParam("task") String task) {
        redissonDelayQueue.offerTask(task, 5);
    }

}

啟動專案,在瀏覽器輸入如下連線,新增任務

http://localhost:8080/add?task=sanyou

靜靜等待5s,成功獲取到任務。

實現原理

如下是Redisson延遲佇列的實現原理

SANYOU前面的字首都是固定的,Redisson建立的時候會拼上字首。

  • redisson_delay_queue_timeout:SANYOU,sorted set資料型別,存放所有延遲任務,按照延遲任務的到期時間戳(提交任務時的時間戳 + 延遲時間)來排序的,所以列表的最前面的第一個元素就是整個延遲佇列中最早要被執行的任務,這個概念很重要
  • redisson_delay_queue:SANYOU,list資料型別,也是存放所有的任務,但是研究下來發現好像沒什麼用。。
  • SANYOU,list資料型別,被稱為目標佇列,這個裡面存放的任務都是已經到了延遲時間的,可以被消費者獲取的任務,所以上面demo中的RBlockingQueue的take方法是從這個目標佇列中獲取到任務的
  • redisson_delay_queue_channel:SANYOU,是一個channel,用來通知使用者端開啟一個延遲任務

任務提交的時候,Redisson會將任務放到redisson_delay_queue_timeout:SANYOU中,分數就是提交任務的時間戳+延遲時間,就是延遲任務的到期時間戳

Redisson使用者端內部通過監聽redisson_delay_queue_channel:SANYOU這個channel來提交一個延遲任務,這個延遲任務能夠保證將redisson_delay_queue_timeout:SANYOU中到了延遲時間的任務從redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU這個目標佇列中。

於是消費者就可以從SANYOU這個目標佇列獲取到延遲任務了。

所以從這可以看出,Redisson的延遲任務的實現跟前面說的MQ的實現都是殊途同歸,最開始任務放到中間的一個地方,叫做redisson_delay_queue_timeout:SANYOU,然後會開啟一個類似於定時任務的一個東西,去判斷這個中間地方的訊息是否到了延遲時間,到了再放到最終的目標的佇列供消費者消費。

Redisson的這種實現方式比監聽Redis過期key的實現方式更加可靠,因為訊息都存在list和sorted set資料型別中,所以訊息很少丟。

上述說的兩種Redis的方案更詳細的介紹,可以檢視我之前寫的用Redis實現延遲佇列,我研究了兩種方案,發現並不簡單這篇文章。

Netty的HashedWheelTimer

先來個demo

@Slf4j
public class NettyHashedWheelTimerDemo {

    public static void main(String[] args) {
        HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 8);
        timer.start();

        log.info("提交延遲任務");
        timer.newTimeout(timeout -> log.info("執行延遲任務"), 5, TimeUnit.SECONDS);
    }

}

測試結果

實現原理

如圖,時間輪會被分成很多格子(上述demo中的8就代表了8個格子),一個格子代表一段時間(上述demo中的100就代表一個格子是100ms),所以上述demo中,每800ms會走一圈。

當任務提交的之後,會根據任務的到期時間進行hash取模,計算出這個任務的執行時間所在具體的格子,然後新增到這個格子中,通過如果這個格子有多個任務,會用連結串列來儲存。所以這個任務的新增有點像HashMap儲存元素的原理。

HashedWheelTimer內部會開啟一個執行緒,輪詢每個格子,找到到了延遲時間的任務,然後執行。

由於HashedWheelTimer也是單執行緒來處理任務,所以跟Timer一樣,長時間執行的任務會導致其他任務的延時處理。

前面Redisson中提到的使用者端延遲任務就是基於Netty的HashedWheelTimer實現的。

Hutool的SystemTimer

Hutool工具類也提供了延遲任務的實現SystemTimer

demo

@Slf4j
public class SystemTimerDemo {

    public static void main(String[] args) {
        SystemTimer systemTimer = new SystemTimer();
        systemTimer.start();

        log.info("提交延遲任務");
        systemTimer.addTask(new TimerTask(() -> log.info("執行延遲任務"), 5000));
    }

}

執行結果

Hutool底層其實也用到了時間輪。

Qurtaz

Qurtaz是一款開源作業排程框架,基於Qurtaz提供的api也可以實現延遲任務的功能。

demo

依賴

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>

SanYouJob實現Job介面,當任務到達執行時間的時候會呼叫execute的實現,從context可以獲取到任務的內容

@Slf4j
public class SanYouJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDetail jobDetail = context.getJobDetail();
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        log.info("獲取到延遲任務:{}", jobDataMap.get("delayTask"));
    }
}

測試類

public class QuartzDemo {

    public static void main(String[] args) throws SchedulerException, InterruptedException {
        // 1.建立Scheduler的工廠
        SchedulerFactory sf = new StdSchedulerFactory();
        // 2.從工廠中獲取排程器範例
        Scheduler scheduler = sf.getScheduler();

        // 6.啟動 排程器
        scheduler.start();

        // 3.建立JobDetail,Job型別就是上面說的SanYouJob
        JobDetail jb = JobBuilder.newJob(SanYouJob.class)
                .usingJobData("delayTask", "這是一個延遲任務")
                .build()
;

        // 4.建立Trigger
        Trigger t = TriggerBuilder.newTrigger()
                //任務的觸發時間就是延遲任務到的延遲時間
                .startAt(DateUtil.offsetSecond(new Date(), 5))
                .build();

        // 5.註冊任務和定時器
        log.info("提交延遲任務");
        scheduler.scheduleJob(jb, t);
    }
}

執行結果:

實現原理

核心元件

  • Job:表示一個任務,execute方法的實現是對任務的執行邏輯
  • JobDetail:任務的詳情,可以設定任務需要的引數等資訊
  • Trigger:觸發器,是用來觸發業務的執行,比如說指定5s後觸發任務,那麼任務就會在5s後觸發
  • Scheduler:排程器,內部可以註冊多個任務和對應任務的觸發器,之後會排程任務的執行

啟動的時候會開啟一個QuartzSchedulerThread排程執行緒,這個執行緒會去判斷任務是否到了執行時間,到的話就將任務交給任務執行緒池去執行。

無限輪詢延遲任務

無限輪詢的意思就是開啟一個執行緒不停的去輪詢任務,當這些任務到達了延遲時間,那麼就執行任務。

demo

@Slf4j
public class PollingTaskDemo {

    private static final List<DelayTask> DELAY_TASK_LIST = new CopyOnWriteArrayList<>();

    public static void main(String[] args) {
        new Thread(() -> {
            while (true) {
                try {
                    for (DelayTask delayTask : DELAY_TASK_LIST) {
                        if (delayTask.triggerTime <= System.currentTimeMillis()) {
                            log.info("處理延遲任務:{}", delayTask.taskContent);
                            DELAY_TASK_LIST.remove(delayTask);
                        }
                    }
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (Exception e) {
                }
            }
        }).start();

        log.info("提交延遲任務");
        DELAY_TASK_LIST.add(new DelayTask("三友的java日記"5L));
    }

    @Getter
    @Setter
    public static class DelayTask {

        private final String taskContent;

        private final Long triggerTime;

        public DelayTask(String taskContent, Long delayTime) {
            this.taskContent = taskContent;
            this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
        }
    }

}

任務可以存在資料庫又或者是記憶體,看具體的需求,這裡我為了簡單就放在記憶體裡了。

執行結果:

這種操作簡單,但是就是效率低下,每次都得遍歷所有的任務。

最後

最後,本文所有範例程式碼地址:

https://github.com/sanyou3/delay-task-demo.git

往期熱門文章推薦

如何去閱讀原始碼,我總結了18條心法

如何寫出漂亮程式碼,我總結了45個小技巧

三萬字盤點Spring/Boot的那些常用擴充套件點

兩萬字盤點那些被玩爛了的設計模式

扒一扒Bean注入到Spring的那些姿勢

RocketMQ訊息短暫而又精彩的一生

掃碼或者搜尋關注公眾號 三友的java日記 ,及時乾貨不錯過,公眾號致力於通過畫圖加上通俗易懂的語言講解技術,讓技術更加容易學習,回覆 面試 即可獲得一套面試真題。