之前公司經常會遇到設定定時任務,簡單的任務可以直接依賴spring即可。
簡單任務直接使用 @scheduled 註解配合@EnableScheduling。
但是如何實現簡單的動態cron呢?
開發原則是:
在滿足專案需求的情況下,儘量少的依賴其它框架,避免專案過於臃腫和複雜。主要研究spring 自帶的schedule。
常見的任務排程方式
springBoot 基礎模組 spring-boot-starter-web 已經內建 schedule ,無需引入額外依賴。
先思考幾個問題:
1、動態 cron 實現的原理
任務的 【 停止】是基於 future介面 的cancel() 方法。
任務的 【增加、刪除、啟動】是基於 註冊到 類ScheduledTaskRegistrar 的 ScheduledFuture的數量。
涉及核心類:
2、多工並行執行設定
spring預設機制對schedule是單執行緒。
3、如何設定多個任務
好多博文,都是設定一個cron,這讓初學者很難受。
4、如何設定任務分組
根據自己業務背景,可根據步驟三,進行改造。
5、如何設定服務啟動自啟任務。
想要程式啟動時首次去加我們設定的task,只需實現 CommandLineRunner 即可。
6、如何從資料庫讀取設定
這個其實很簡單,在實現 ScheduledTaskRegistrar 時,先直接查詢我們需要的資料即可。
7、如何優雅的實現我們的程式碼
這裡為了我們多個task實現時,去除臃腫的if else ,使用策略模式去實現我們的task,這裡程式碼裡面會具體介紹。
8、如何去觸發我們的schedule 【增刪啟停】
設定好 task任務類,注入到 controller ,通過介面直接呼叫即可。
先貼出我的github 程式碼,下面程式碼描述不全。
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TaskEntity {
/**
* 任務id
*/
private int taskId;
/**
* 任務說明
*/
private String desc;
/**
* cron 表示式
*/
private String expression;
}
設定任務介面 TaskService
public interface TaskService {
void HandlerJob();
Integer jobId();
}
設定任務介面實現 TaskServiceJob1Impl、TaskServiceJob2Impl …
@Service
public class TaskServiceJob1Impl implements TaskService {
@Override
public void HandlerJob() {
System.out.println("------job1 開始執行---------:"+new Date());
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " " + Thread.currentThread().getName() + " 任務一啟動");
try {
Thread.sleep(10000);//任務耗時10秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " " + Thread.currentThread().getName() + " 結束");
}
@Override
public Integer jobId() {
return 1;
}
}
注:
這裡引入策略模式
為啥要設定 任務解析器選擇器:
因為我們實現多個任務時,一個任務對應一個 CronTask,需要在 MyScheduledTask 裡面去實現我們每一個方法。
譬如,我們有100個任務就要自定義100個任務實現方法,程式碼會很臃腫,明顯不符合,【開閉原則】,於是這裡採用策略模式,解耦我們多個任務業務實現邏輯。
@Slf4j
@Component
public class TaskSolverChooser implements ApplicationContextAware {
private ApplicationContext applicationContext;
private Map<Integer, TaskService> chooseMap = new HashMap<>(16);
/**
* 拿到spring context 上下文
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@PostConstruct
private void registerToTaskSolver(){
Map<String, TaskService> taskServiceMap = applicationContext.getBeansOfType(TaskService.class);
for (TaskService value : taskServiceMap.values()) {
chooseMap.put(value.jobId(), value);
log.info("task {} 處理器: {} 註冊成功",new Object[]{value.jobId(),value});
}
}
/**
* 獲取需要的job
*/
public TaskService getTask(Integer jobId){
return chooseMap.get(jobId);
}
}
說明:
1、設定多執行緒執行任務
2、設定 重新整理 task
3、設定 停止 task
4、設定 執行task 業務邏輯
@Component
public class MyScheduledTask implements SchedulingConfigurer {
private volatile ScheduledTaskRegistrar registrar;
private final ConcurrentHashMap<Integer, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, CronTask> cronTasks = new ConcurrentHashMap<>();
@Autowired
private TaskSolverChooser taskSolverChooser;
@Override
public void configureTasks(ScheduledTaskRegistrar registrar) {
//設定20個執行緒,預設單執行緒,如果不設定的話,不能同時並行執行任務
registrar.setScheduler(Executors.newScheduledThreadPool(10));
this.registrar = registrar;
}
/**
* 修改 cron 需要 呼叫該方法
*/
public void refresh(List<TaskEntity> tasks){
//取消已經刪除的策略任務
Set<Integer> sids = scheduledFutures.keySet();
for (Integer sid : sids) {
if(!exists(tasks, sid)){
scheduledFutures.get(sid).cancel(false);
}
}
for (TaskEntity TaskEntity : tasks) {
String expression = TaskEntity.getExpression();
//計劃任務表示式為空則跳過
if(!StringUtils.hasLength(expression)){
continue;
}
//計劃任務已存在並且表示式未發生變化則跳過
if (scheduledFutures.containsKey(TaskEntity.getTaskId())
&& cronTasks.get(TaskEntity.getTaskId()).getExpression().equals(expression)) {
continue;
}
//如果策略執行時間發生了變化,則取消當前策略的任務
if(scheduledFutures.containsKey(TaskEntity.getTaskId())){
scheduledFutures.get(TaskEntity.getTaskId()).cancel(false);
scheduledFutures.remove(TaskEntity.getTaskId());
cronTasks.remove(TaskEntity.getTaskId());
}
//業務邏輯處理
CronTask task = cronTask(TaskEntity, expression);
//執行業務
ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
cronTasks.put(TaskEntity.getTaskId(), task);
scheduledFutures.put(TaskEntity.getTaskId(), future);
}
}
/**
* 停止 cron 執行
*/
public void stop(List<TaskEntity> tasks){
tasks.forEach(item->{
if (scheduledFutures.containsKey(item.getTaskId())) {
// mayInterruptIfRunning設成false話,不允許線上程執行時中斷,設成true的話就允許。
scheduledFutures.get(item.getTaskId()).cancel(false);
scheduledFutures.remove(item.getTaskId());
}
});
}
/**
* 業務邏輯處理
*/
public CronTask cronTask(TaskEntity TaskEntity, String expression) {
return new CronTask(() -> {
//每個計劃任務實際需要執行的具體業務邏輯
//採用策略,模式 ,執行我們的job
taskSolverChooser.getTask(TaskEntity.getTaskId()).HandlerJob();
}, expression);
}
private boolean exists(List<TaskEntity> tasks, Integer tid){
for(TaskEntity TaskEntity:tasks){
if(TaskEntity.getTaskId() == tid){
return true;
}
}
return false;
}
@PreDestroy
public void destroy() {
this.registrar.destroy();
}
}
@Component
public class StartInitTask implements CommandLineRunner {
@Autowired
private MyScheduledTask myScheduledTask;
@Override
public void run(String... args) throws Exception {
List<TaskEntity> list = Arrays.asList(
new TaskEntity(1, "測試1", "0/1 * * * * ?"),
new TaskEntity(2, "測試2", "0/1 * * * * ?")
);
myScheduledTask.refresh(list);
}
}
@RestController
public class StartController {
@Autowired
private MyScheduledTask scheduledTask;
@PostMapping(value = "/startOrChangeCron")
public String changeCron(@RequestBody List<TaskEntity> list){
if (CollectionUtils.isEmpty(list)) {
// 這裡模擬存在資料庫的資料
list = Arrays.asList(
new TaskEntity(1, "測試1","0/1 * * * * ?") ,
new TaskEntity(2, "測試2","0/1 * * * * ?")
);
}
scheduledTask.refresh(list);
return "task任務:" + list.toString() + "已經開始執行";
}
@PostMapping(value = "/stopCron")
public String stopCron(@RequestBody List<TaskEntity> list){
if (CollectionUtils.isEmpty(list)) {
// 這裡模擬將要停止的cron可通過前端傳來
list = Arrays.asList(
new TaskEntity(1, "測試1","0/1 * * * * ?") ,
new TaskEntity(2, "測試2","0/1 * * * * ?")
);
}
scheduledTask.stop(list);
List<Integer> collect = list.stream().map(TaskEntity::getTaskId).collect(Collectors.toList());
return "task任務:" + collect.toString() + "已經停止啟動";
}
}
注:
基於反射實現,根據方法全類名,去動態執行方法。
多工分組設定,根據任務型別進行分組。
譬如:定時任務人員的相關操作,有檢測人員離職狀態,人員業績達標,人員考勤…等,對人員定時任務做一個分類,在同一個類裡面去實現不同的task。
多工分組設定,其實跟上面步驟類似,這裡不再敘述。
可參考: 分組多工動態cron
測試1 專案啟動自啟
TaskServiceJob1Impl和TaskServiceJob1Impl … 設定 阻塞10s
觀察紀錄檔時間可發現,已經同時並行執行倆個任務。
測試2 觸發 重新整理【增、刪、啟】我們的task,。
其實這裡沒這麼智慧,如果需要觸發重新整理介面,實際上是重新載入我們的task,就是對應觸發我們,增加任務任務,刪除任務,啟動任務。
使用idea外掛測試介面
觀察紀錄檔
測試3 觸發 停止介面,停止一個介面。
這裡測試略過…
其實實現簡單的動態設定,以上程式碼可用,比較簡單。