spring schedule 設定多工動態 cron 【增刪啟停】

2021-03-15 12:05:44

一、背景

之前公司經常會遇到設定定時任務,簡單的任務可以直接依賴spring即可。
簡單任務直接使用 @scheduled 註解配合@EnableScheduling。
但是如何實現簡單的動態cron呢?

開發原則是:
在滿足專案需求的情況下,儘量少的依賴其它框架,避免專案過於臃腫和複雜。主要研究spring 自帶的schedule。

常見的任務排程方式

  • 單機部署模式
    Timer: jdk中自帶的一個定時排程類,可以簡單的實現按某一頻度進行任務執行。提供的功能比較單一,無法實現複雜的排程任務。
    ScheduledExecutorService: 也是jdk自帶的一個基於執行緒池設計的定時任務類。其每個排程任務都會分配到執行緒池中的一個執行緒執行,所以其任務是並行執行的,互不影響。
    Spring Task:Spring提供的一個任務排程工具,支援註解和組態檔形式,支援Cron表示式,使用簡單但功能強大。
    Quartz:一款功能強大的任務排程器,可以實現較為複雜的排程功能,如每月一號執行、每天凌晨執行、每週五執行等等,還支援分散式排程,就是設定稍顯複雜。
  • 分散式叢集模式(不多介紹,簡單提一下)
    Quartz:可以去看看這篇文章Quartz分散式
    elastic-job:當開發的彈性分散式任務排程系統,採用zookeeper實現分散式協調,實現任務高可用以及分片。
    xxl-job:是大眾點評員工徐雪裡於2015年釋出的分散式任務排程平臺,是一個輕量級分散式任務排程框架。
    saturn: 是唯品會提供一個分散式、容錯和高可用的作業排程服務架構。

二、本篇說明

springBoot 基礎模組 spring-boot-starter-web 已經內建 schedule ,無需引入額外依賴。
先思考幾個問題:
1、動態 cron 實現的原理
任務的 【 停止】是基於 future介面 的cancel() 方法。
任務的 【增加、刪除、啟動】是基於 註冊到 類ScheduledTaskRegistrar 的 ScheduledFuture的數量。
涉及核心類:

  • ScheduledFuture
  • SchedulingConfigurer
  • ScheduledTaskRegistrar

2、多工並行執行設定
spring預設機制對schedule是單執行緒。

3、如何設定多個任務
好多博文,都是設定一個cron,這讓初學者很難受。

4、如何設定任務分組
根據自己業務背景,可根據步驟三,進行改造。

5、如何設定服務啟動自啟任務。
想要程式啟動時首次去加我們設定的task,只需實現 CommandLineRunner 即可。
6、如何從資料庫讀取設定
這個其實很簡單,在實現 ScheduledTaskRegistrar 時,先直接查詢我們需要的資料即可。
7、如何優雅的實現我們的程式碼
這裡為了我們多個task實現時,去除臃腫的if else ,使用策略模式去實現我們的task,這裡程式碼裡面會具體介紹。

8、如何去觸發我們的schedule 【增刪啟停】
設定好 task任務類,注入到 controller ,通過介面直接呼叫即可。

三、程式碼實現

先貼出我的github 程式碼,下面程式碼描述不全。

1. 普通多工動態cron 實現

1.1 對應資料庫的實體類 TaskEntity
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TaskEntity {
    /**
     * 任務id
     */
    private int taskId;
    /**
     * 任務說明
     */
    private String desc;
    /**
     * cron 表示式
     */
    private String expression;
}
1.2 設定每個任務實現

設定任務介面 TaskService

public interface TaskService {

    void HandlerJob();

    Integer jobId();

}

設定任務介面實現 TaskServiceJob1Impl、TaskServiceJob2Impl …

@Service
public class TaskServiceJob1Impl implements TaskService {
    @Override
    public void HandlerJob() {
        System.out.println("------job1 開始執行---------:"+new Date());

        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "    " + Thread.currentThread().getName() + "    任務一啟動");
        try {
            Thread.sleep(10000);//任務耗時10秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "    " + Thread.currentThread().getName() + "    結束");

    }

    @Override
    public Integer jobId() {
        return 1;
    }
}
1.3 設定任務解析器 TaskSolverChooser

注:
這裡引入策略模式
為啥要設定 任務解析器選擇器:
因為我們實現多個任務時,一個任務對應一個 CronTask,需要在 MyScheduledTask 裡面去實現我們每一個方法。
譬如,我們有100個任務就要自定義100個任務實現方法,程式碼會很臃腫,明顯不符合,【開閉原則】,於是這裡採用策略模式,解耦我們多個任務業務實現邏輯。

@Slf4j
@Component
public class TaskSolverChooser implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    private Map<Integer, TaskService> chooseMap = new HashMap<>(16);

    /**
     * 拿到spring context 上下文
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @PostConstruct
    private void registerToTaskSolver(){
        Map<String, TaskService> taskServiceMap = applicationContext.getBeansOfType(TaskService.class);
        for (TaskService value : taskServiceMap.values()) {
            chooseMap.put(value.jobId(), value);
            log.info("task {} 處理器: {} 註冊成功",new Object[]{value.jobId(),value});
        }
    }

    /**
     * 獲取需要的job
     */
    public TaskService getTask(Integer jobId){
        return chooseMap.get(jobId);
    }
}
1.4 設定MyScheduledTask (動態cron核心設定)

說明:
1、設定多執行緒執行任務
2、設定 重新整理 task
3、設定 停止 task
4、設定 執行task 業務邏輯

@Component
public class MyScheduledTask implements SchedulingConfigurer {

    private volatile ScheduledTaskRegistrar registrar;

    private final ConcurrentHashMap<Integer, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Integer, CronTask> cronTasks = new ConcurrentHashMap<>();

    @Autowired
    private TaskSolverChooser taskSolverChooser;

    @Override
    public void configureTasks(ScheduledTaskRegistrar registrar) {

        //設定20個執行緒,預設單執行緒,如果不設定的話,不能同時並行執行任務
        registrar.setScheduler(Executors.newScheduledThreadPool(10));
        this.registrar = registrar;
    }

    /**
     * 修改 cron 需要 呼叫該方法
     */
    public void refresh(List<TaskEntity> tasks){
        //取消已經刪除的策略任務
        Set<Integer> sids = scheduledFutures.keySet();
        for (Integer sid : sids) {
            if(!exists(tasks, sid)){
                scheduledFutures.get(sid).cancel(false);
            }
        }
        for (TaskEntity TaskEntity : tasks) {
            String expression = TaskEntity.getExpression();
            //計劃任務表示式為空則跳過
            if(!StringUtils.hasLength(expression)){
                continue;
            }
            //計劃任務已存在並且表示式未發生變化則跳過
            if (scheduledFutures.containsKey(TaskEntity.getTaskId())
                    && cronTasks.get(TaskEntity.getTaskId()).getExpression().equals(expression)) {
                continue;
            }
            //如果策略執行時間發生了變化,則取消當前策略的任務
            if(scheduledFutures.containsKey(TaskEntity.getTaskId())){
                scheduledFutures.get(TaskEntity.getTaskId()).cancel(false);
                scheduledFutures.remove(TaskEntity.getTaskId());
                cronTasks.remove(TaskEntity.getTaskId());
            }
            //業務邏輯處理
            CronTask task = cronTask(TaskEntity, expression);


            //執行業務
            ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
            cronTasks.put(TaskEntity.getTaskId(), task);
            scheduledFutures.put(TaskEntity.getTaskId(), future);
        }
    }

    /**
     * 停止 cron 執行
     */
    public void stop(List<TaskEntity> tasks){
        tasks.forEach(item->{
            if (scheduledFutures.containsKey(item.getTaskId())) {
                // mayInterruptIfRunning設成false話,不允許線上程執行時中斷,設成true的話就允許。
                scheduledFutures.get(item.getTaskId()).cancel(false);
                scheduledFutures.remove(item.getTaskId());
            }
        });
    }

    /**
     * 業務邏輯處理
     */
    public CronTask cronTask(TaskEntity TaskEntity, String expression)  {
        return new CronTask(() -> {
                    //每個計劃任務實際需要執行的具體業務邏輯
                    //採用策略,模式 ,執行我們的job
                   taskSolverChooser.getTask(TaskEntity.getTaskId()).HandlerJob();
                }, expression);
    }

    private boolean exists(List<TaskEntity> tasks, Integer tid){
        for(TaskEntity TaskEntity:tasks){
            if(TaskEntity.getTaskId() == tid){
                return true;
            }
        }
        return false;
    }

    @PreDestroy
    public void destroy() {
        this.registrar.destroy();
    }

}
1.5 設定程式啟動時首次去加我們設定的task
@Component
public class StartInitTask implements CommandLineRunner {

    @Autowired
    private MyScheduledTask myScheduledTask;

    @Override
    public void run(String... args) throws Exception {
        List<TaskEntity> list = Arrays.asList(
                new TaskEntity(1, "測試1", "0/1 * *  * * ?"),
                new TaskEntity(2, "測試2", "0/1 * *  * * ?")
        );
        myScheduledTask.refresh(list);
    }
}
1.6 設定web介面去觸發,增刪啟停
@RestController
public class StartController {

    @Autowired
    private MyScheduledTask scheduledTask;

    @PostMapping(value = "/startOrChangeCron")
    public String changeCron(@RequestBody List<TaskEntity> list){
        if (CollectionUtils.isEmpty(list)) {
            // 這裡模擬存在資料庫的資料
            list = Arrays.asList(
                    new TaskEntity(1, "測試1","0/1 * *  * * ?") ,
                    new TaskEntity(2, "測試2","0/1 * *  * * ?")
            );
        }
        scheduledTask.refresh(list);
        return "task任務:" + list.toString() + "已經開始執行";
    }

    @PostMapping(value = "/stopCron")
    public String stopCron(@RequestBody List<TaskEntity> list){
        if (CollectionUtils.isEmpty(list)) {
            // 這裡模擬將要停止的cron可通過前端傳來
            list = Arrays.asList(
                    new TaskEntity(1, "測試1","0/1 * *  * * ?") ,
                    new TaskEntity(2, "測試2","0/1 * *  * * ?")
            );
        }
        scheduledTask.stop(list);
        List<Integer> collect = list.stream().map(TaskEntity::getTaskId).collect(Collectors.toList());
        return "task任務:" + collect.toString() + "已經停止啟動";
    }

}

2. 普通多工動態cron 實現

注:
基於反射實現,根據方法全類名,去動態執行方法。
多工分組設定,根據任務型別進行分組。
譬如:定時任務人員的相關操作,有檢測人員離職狀態,人員業績達標,人員考勤…等,對人員定時任務做一個分類,在同一個類裡面去實現不同的task。
多工分組設定,其實跟上面步驟類似,這裡不再敘述。
可參考: 分組多工動態cron

3 測試記錄

測試1 專案啟動自啟
TaskServiceJob1Impl和TaskServiceJob1Impl … 設定 阻塞10s
觀察紀錄檔時間可發現,已經同時並行執行倆個任務。
在這裡插入圖片描述
測試2 觸發 重新整理【增、刪、啟】我們的task,。
其實這裡沒這麼智慧,如果需要觸發重新整理介面,實際上是重新載入我們的task,就是對應觸發我們,增加任務任務,刪除任務,啟動任務。
使用idea外掛測試介面
在這裡插入圖片描述
觀察紀錄檔
在這裡插入圖片描述
測試3 觸發 停止介面,停止一個介面。
這裡測試略過…

四、總結

其實實現簡單的動態設定,以上程式碼可用,比較簡單。