SpringBoot自定義cron表示式註冊定時任務

2023-04-21 18:01:08

springBoot自定義cron表示式註冊定時任務

一、原理

  • 1、使用Spring自帶的TaskScheduler註冊任務
  • 2、註冊後返回:ScheduledFuture,用於取消定時任務
  • 3、註冊任務後不會馬上取消任務,所以將任務快取。在需要取消任務的時候呼叫取消介面取消
  • 4、cron表示式可以由前端或者後端生成。實現中會校驗cron表示式
public class TestScheduled {

    /**
     * 1、使用Spring自帶的TaskScheduler註冊任務
     * 2、註冊後返回:ScheduledFuture,用於取消定時任務
     */
    @Resource
    private TaskScheduler taskScheduler;

    public void registrarTask() {
        //具體的任務Runnable(一般使用類實現Runnable介面)
        Runnable taskRunnable = new Runnable() {
            @Override
            public void run() {

            }
        };
        //cron表示式觸發器
        CronTrigger trigger = new CronTrigger("0/5 * * * * ?");
        //開啟定時任務的真正方法
        ScheduledFuture<?> future = this.taskScheduler.schedule(taskRunnable, trigger);
        //取消定時任務
        future.cancel(true);
    }
}

二、具體實現

1、設定任務排程器

  • 作用:設定:核心執行緒數:可同時執行任務數;設定執行緒名稱字首
  • 可以不設定。不設定就預設使用spring自帶的
package com.cc.ssd.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/** TaskScheduler任務排程器設定類
 * @since 2023/4/21 0021
 * @author CC
 **/
@Configuration
public class CronTaskConfig {

    /**
     * 任務排程器自定義設定
     */
    @Bean(name = "taskScheduler")
    public TaskScheduler taskScheduler() {
        // 任務排程執行緒池
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 定時任務執行執行緒池核心執行緒數:可同時執行4個任務
        taskScheduler.setPoolSize(4);
        taskScheduler.setRemoveOnCancelPolicy(true);
        // 執行緒名稱字首
        taskScheduler.setThreadNamePrefix("Cs-ThreadPool-");
        return taskScheduler;
    }

}

2、定時任務註冊類

  • 作用:快取、註冊定時任務;還可以查詢、刪除定時任務
package com.cc.ssd.registrar;

import com.cc.ssd.task.CronTaskFuture;
import com.cc.ssd.task.CronTaskRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/** 註冊定時任務:快取定時任務、註冊定時任務到排程中心
 * @author CC
 **/
@Component
public class CronTaskRegistrar implements DisposableBean {

    private static final Logger log = LoggerFactory.getLogger(CronTaskRegistrar.class);

    /**
     * 快取任務
     * key:具體的任務
     * value:註冊定時任務後返回的ScheduledFuture
     */
    private final Map<Runnable, CronTaskFuture> scheduledTasks = new ConcurrentHashMap<>(16);

    /**
     * 使用自定義的任務排程設定
     */
    @Resource(name = "taskScheduler")
    private TaskScheduler taskScheduler;

    /** 獲取任務排程設定
     * @return 任務排程設定
     */
    public TaskScheduler getTaskScheduler() {
        return this.taskScheduler;
    }

    /** 新增定時任務1
     *  存在任務:刪除此任務,重新新增這個任務
     * @param taskRunnable 執行的具體任務定義:taskRunnable 實現Runnable
     * @param cronExpression cron表示式
     */
    public void addCronTask(Runnable taskRunnable, String cronExpression) {
        //驗證cron表示式是否正確
        boolean validExpression = CronExpression.isValidExpression(cronExpression);
        if (!validExpression) {
            throw new RuntimeException("cron表示式驗證失敗!");
        }
        //獲取下次執行時間
        CronExpression parse = CronExpression.parse(cronExpression);
        LocalDateTime next = parse.next(LocalDateTime.now());
        if (Objects.nonNull(next)) {
            //定時任務下次執行的時間
            String format = next.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            log.info("定時任務下次執行的時間:{}", format);
        }

        //封裝成 CronTask(cron任務)
        CronTask cronTask = new CronTask(taskRunnable, cronExpression);
        this.addCronTask(cronTask);
    }

    /** 新增定時任務2
     * @param cronTask :<p>CronTask用於在指定時間間隔內執行定時任務。</p>
     *                   <p>它是通過CronTrigger來實現的,CronTrigger是一個基於cron表示式的觸發器,</p>
     *                   <p>可以在指定的時間間隔內觸發任務執行。</p>
     * @since 2023/4/21 0021
     * @author CC
     **/
    private void addCronTask(CronTask cronTask) {
        if (Objects.nonNull(cronTask)) {
            //1有這個任務,先刪除這個任務。再新增
            Runnable task = cronTask.getRunnable();
            String taskId = null;
            if (task instanceof CronTaskRunnable) {
                taskId = ((CronTaskRunnable) task).getTaskId();
            }
            //通過任務id獲取快取的任務,如果包含則刪除,然後新增任務
            Runnable taskCache = this.getTaskByTaskId(taskId);
            if (Objects.nonNull(taskCache) && this.scheduledTasks.containsKey(taskCache)) {
                this.removeCronTaskByTaskId(taskId);
            }
            //2註冊定時任務到排程中心
            CronTaskFuture scheduledFutureTask = this.scheduleCronTask(cronTask);

            //3快取定時任務
            this.scheduledTasks.put(task, scheduledFutureTask);

            //todo cc 4可以將任務儲存到資料庫中……重新啟動程式然後載入資料庫中的任務到快取中……

        }
    }

    /** 註冊 ScheduledTask 定時任務
     * @param cronTask cronTask
     * @return 註冊定時任務後返回的 ScheduledFutureTask
     */
    private CronTaskFuture scheduleCronTask(CronTask cronTask) {
        //註冊定時任務後記錄的Future
        CronTaskFuture scheduledTask = new CronTaskFuture();
        //開啟定時任務的真正方法
        scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
//        scheduledTask.setThreadLocal(this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger()));
        return scheduledTask;
    }

    /** 獲取任務列表
     * @return
     */
    public List<CronTaskRunnable> getScheduledTasks() {
        List<CronTaskRunnable> tasks = new ArrayList<>();

        Set<Runnable> keySet = scheduledTasks.keySet();
        keySet.forEach(key -> {
            CronTaskRunnable task = new CronTaskRunnable();
            if (key instanceof CronTaskRunnable) {
                CronTaskRunnable taskParent = (CronTaskRunnable) key;
                BeanUtils.copyProperties(taskParent, task);
            }
            tasks.add(task);
        });

        return tasks.stream()
                .sorted(Comparator.comparing(CronTaskRunnable::getTaskId))
                .collect(Collectors.toList());
    }

    /** 根據任務id刪除單個定時任務
     * @param taskId 任務id
     */
    public void removeCronTaskByTaskId(String taskId) {
        //通過任務id獲取任務
        Runnable task = this.getTaskByTaskId(taskId);
        //需要通過任務id獲取任務,然後再移除
        CronTaskFuture cronTaskFuture = this.scheduledTasks.remove(task);
        if (Objects.nonNull(cronTaskFuture)) {
            cronTaskFuture.cancel();
        }
    }

    /** 通過任務id獲取任務。未查詢到返回null
     * @param taskId 任務id
     * @return java.lang.Runnable
     * @since 2023/4/21 0021
     * @author CC
     **/
    private Runnable getTaskByTaskId(String taskId) {
        Assert.notNull(taskId, "任務id不能為空!");
        Set<Map.Entry<Runnable, CronTaskFuture>> entries = scheduledTasks.entrySet();
        //根據任務id獲取該任務快取
        Map.Entry<Runnable, CronTaskFuture> rcf = entries.stream().filter(rf -> {
            Runnable key = rf.getKey();
            String taskId1 = null;
            if (key instanceof CronTaskRunnable) {
                taskId1 = ((CronTaskRunnable) key).getTaskId();
            }
            return taskId.equals(taskId1);
        }).findAny().orElse(null);

        if (Objects.nonNull(rcf)) {
            return rcf.getKey();
        }
        return null;
    }

    /** 刪除所有的定時任務
     *     DisposableBean是Spring框架中的一個介面,它定義了一個destroy()方法,
     *     用於在Bean銷燬時執行清理工作。
     *     當一個Bean實現了DisposableBean介面時,
     *     Spring容器會在該Bean銷燬時自動呼叫destroy()方法,
     *     以便進行一些清理工作,例如釋放資源等。
     *     如果您的Bean需要在銷燬時執行一些清理工作,
     *     那麼實現DisposableBean介面是一個很好的選擇。
     */
    @Override
    public void destroy() {
        //關閉所有定時任務
        for (CronTaskFuture task : this.scheduledTasks.values()) {
            task.cancel();
        }
        //清空快取
        this.scheduledTasks.clear();

        log.info("取消所有定時任務!");
        //todo cc 修改或刪除資料庫的任務
    }

}

3、定時任務的執行結果ScheduledFuture

  • 作用:CronTaskFuture類中使用的是ScheduledFuture物件來表示定時任務的執行結果。
package com.cc.ssd.task;

import java.util.Objects;
import java.util.concurrent.ScheduledFuture;

/** CronTaskFuture類中使用的是ScheduledFuture物件來表示定時任務的執行結果。
 *  ——最後ps:也可以不要這個記錄類,直接快取ScheduledFuture物件。
 *  用來記錄單獨的Future、定時任務註冊任務後產生的
 * @author CC
 **/
public final class CronTaskFuture {

    /** 每個執行緒一個副本
     * 經過測試這裡不能使用ThreadLocal
     */
//    private static final ThreadLocal<ScheduledFuture<?>> THREAD_LOCAL = new ThreadLocal<>();

    /** 最後ps:由於ScheduledFuture是執行緒安全的。這裡不用 volatile 或者 ThreadLocal
     *      註冊任務後返回的:ScheduledFuture 用於記錄並取消任務
     *      這兩個都可以不使用。直接給future賦值
     *          volatile:執行緒之間可見:volatile用於實現多執行緒之間的可見性和一致性,保證資料的正確性。
     *          ThreadLocal:用於實現執行緒封閉,保證執行緒安全
     * 使用建議:
     *      CronTaskFuture類中使用的是ScheduledFuture物件來表示定時任務的執行結果。
     *      ScheduledFuture物件是執行緒安全的,因此不需要使用volatile關鍵字來保證多執行緒同步。
     *      如果需要在多執行緒中使用執行緒本地變數,可以使用ThreadLocal。
     *      因此,建議在CronTaskFuture類中使用ScheduledFuture物件,而不是使用volatile或ThreadLocal。
     *      另外,如果需要在Spring容器銷燬時執行一些清理操作,可以實現DisposableBean介面,並在destroy()方法中進行清理操作。
     */
    public ScheduledFuture<?> future;
//    public volatile ScheduledFuture<?> future;
//    public void setThreadLocal(ScheduledFuture<?> future){
//        THREAD_LOCAL.set(future);
//    }

    /**
     * 取消當前定時任務
     */
    public void cancel() {
        try {
//            ScheduledFuture<?> future = THREAD_LOCAL.get();
            ScheduledFuture<?> future = this.future;
            if (Objects.nonNull(future)) {
                future.cancel(true);
            }
        } catch (Exception e) {
            throw new RuntimeException("銷燬定時任務失敗!");
        } finally {
//            THREAD_LOCAL.remove();
        }

    }

}

4、具體的任務。

  • 實現Runable介面
  • 任務處理的方式按照自己的需求去實現即可
package com.cc.ssd.task;

import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/** 具體任務實現
 * @author CC
 * @since 2023/4/21 0021
 */
@Data
public class CronTaskRunnable implements Runnable {

    private static final Logger log = LoggerFactory.getLogger(CronTaskRunnable.class);

    /**
     * 任務id(必須唯一)
     */
    private String taskId;
    /**
     * 任務型別:自定義
     */
    private Integer taskType;
    /**
     * 任務名字
     */
    private String taskName;
    /**
     * 任務引數
     */
    private Object[] params;

    public CronTaskRunnable() {
    }

    public CronTaskRunnable(String taskId, Integer taskType, String taskName, Object... params) {
        this.taskId = taskId;
        this.taskType = taskType;
        this.taskName = taskName;
        this.params = params;
    }

    /** 執行任務
     * @since 2023/4/21 0021
     * @author CC
     **/
    @Override
    public void run() {
        long start = System.currentTimeMillis();

        //具體的任務。
        log.info("\n\t {}號.定時任務開始執行 - taskId:{},taskName:{},taskType:{},params:{}",
                taskType, taskId, taskName, taskType, params);

        //任務處理的方式:
        //todo cc 1就在這裡執行:模擬任務
        //todo cc 2開啟策略模式,根據任務型別 排程不同的任務
        //todo cc 3使用反射:傳來bean名字,方法名字,呼叫不同的任務
        //todo cc 4開啟佇列,把要執行的任務放到佇列中,然後執行 —— 使用場景:每個任務執行很耗時的情況下使用
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        log.info("\n\t {}號.任務執行完成 - 耗時:{},taskId:{},taskType:{}",
                taskType, System.currentTimeMillis() - start, taskId, taskType);

    }


}

5、測試Controller

package com.cc.ssd.web.controller;

import com.cc.ssd.registrar.CronTaskRegistrar;
import com.cc.ssd.task.CronTaskRunnable;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;

/**
 * @author CC
 * @since 2023/4/21 0021
 */
@RestController
@RequestMapping("/scheduled")
public class TestScheduledController {

    @Resource
    private CronTaskRegistrar cronTaskRegistrar;

    /** 獲取任務列表
     * @return java.util.List<com.cc.ssd.task.SchedulingRunnableTask>
     * @since 2023/4/21 0021
     * @author CC
     **/
    @GetMapping
    public List<CronTaskRunnable> getScheduledTasks() {
        return cronTaskRegistrar.getScheduledTasks();
    }

    /** 新增任務
     * @param param param
     * @return java.lang.String
     * @since 2023/4/21 0021
     * @author CC
     **/
    @PostMapping
    public String addCronTask(@RequestBody Map<String, Object> param) {
        //自己拿任務引數的邏輯:可以把每個任務儲存到資料庫,重新啟動任務的同時,載入這些任務到任務排程中心
        String taskId = (String) param.get("taskId");
        Integer taskType = (Integer) param.get("taskType");
        String taskName = (String) param.get("taskName");
        Object params = param.get("params");
        //新增任務引數
        CronTaskRunnable task = new CronTaskRunnable(taskId, taskType, taskName, params);
        //註冊任務:cron表示式,可以從傳入不一樣的
        cronTaskRegistrar.addCronTask(task, "0/5 * * * * ?");
        return "ok";
    }

    /** 根據任務id刪除定時任務
     * @param taskId 任務id
     * @return java.lang.String
     * @since 2023/4/21 0021
     * @author CC
     **/
    @DeleteMapping
    public String removeCronTaskByTaskId(@RequestParam String taskId) {
        cronTaskRegistrar.removeCronTaskByTaskId(taskId);
        return "ok";
    }

    /** 刪除全部任務
     * @return java.lang.String
     * @since 2023/4/21 0021
     * @author CC
     **/
    @DeleteMapping("/removeAll")
    public String removeCronTask() {
        cronTaskRegistrar.destroy();
        return "ok";
    }

}

6、最後效果

  • 自己用controller去測試一波吧