SpringBoot 整合 Quartz + MySQL

2023-04-18 09:00:19

Quartz 簡單使用
Java SpringBoot 中,動態執行 bean 物件中的方法

原始碼地址 => https://gitee.com/VipSoft/VipBoot/tree/develop/vipsoft-quartz

工作原理解讀

只要設定好 DataSource Quartz 會自動進行表的資料操作,

新增 Quartz Job 任務

儲存 QRTZ_JOB_DETAILS、QRTZ_TRIGGERS => QRTZ_CRON_TRIGGERS

public void addJob(QuartzJob job) throws SchedulerException {
  ....
  JobDetail jobDetail = JobBuilder.newJob(jobClass)
                    .withIdentity(jobKey)
                    .build();
  // 放入引數,執行時的方法可以獲取
  jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job);
  //該行程式碼執行後,會將定時任務插入 QRTZ_JOB_DETAILS 等相關表
  scheduler.scheduleJob(jobDetail, trigger);
  ....
}
//org.quartz.impl.jdbcjobstore.JobStoreSupport
public void storeJobAndTrigger(final JobDetail newJob, final OperableTrigger newTrigger) throws JobPersistenceException {
    this.executeInLock(this.isLockOnInsert() ? "TRIGGER_ACCESS" : null, new JobStoreSupport.VoidTransactionCallback() {
        public void executeVoid(Connection conn) throws JobPersistenceException {
            JobStoreSupport.this.storeJob(conn, newJob, false);  //資料儲存 QRTZ_JOB_DETAILS 表
            JobStoreSupport.this.storeTrigger(conn, newTrigger, newJob, false, "WAITING", false, false); //資料儲存 QRTZ_TRIGGERS 表
        }
    });
}

public int insertTrigger(...){
 INSERT_TRIGGER
 insertExtendedTriggerProperties => INSERT_CRON_TRIGGER OR INSERT_BLOB_TRIGGER

}

詳見:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
將 job.getJobDataMap(),對像序列化後,存入 JOB_DETAILS.JOB_DATA欄位,可以是一個對像,以執行定時任務時,會把該欄位反序列化,根據前期設定的內容進行業務處理

獲取 Quartz Job 任務

執行計劃任務時,獲取 Job Detail

QuartzSchedulerThread.run()
=> qsRsrcs.getJobStore().acquireNextTriggers()
=> txCallback.execute(conn)
=> JobStoreSupport.acquireNextTriggers()
=> JobStoreSupport.retrieveJob()
=> StdJDBCDelegate.selectJobDetail()

刪除 Quartz Job 任務

/**
 * <p>
 * Delete the base trigger data for a trigger.
 * </p>
 * 
 * @param conn
 *          the DB Connection
 * @return the number of rows deleted
 */
public int deleteTrigger(Connection conn, TriggerKey triggerKey) throws SQLException {
    PreparedStatement ps = null;

    deleteTriggerExtension(conn, triggerKey);
    
    try {
        ps = conn.prepareStatement(rtp(DELETE_TRIGGER));
        ps.setString(1, triggerKey.getName());
        ps.setString(2, triggerKey.getGroup());

        return ps.executeUpdate();
    } finally {
        closeStatement(ps);
    }
}

清除資料

/**
 * 清任務順序
 */
public void clearData(Connection conn)
    throws SQLException {

    PreparedStatement ps = null;

    try {
        ps = conn.prepareStatement(rtp(DELETE_ALL_SIMPLE_TRIGGERS));
        ps.executeUpdate();
        ps.close();
        ps = conn.prepareStatement(rtp(DELETE_ALL_SIMPROP_TRIGGERS));
        ps.executeUpdate();
        ps.close();
        ps = conn.prepareStatement(rtp(DELETE_ALL_CRON_TRIGGERS));
        ps.executeUpdate();
        ps.close();
        ps = conn.prepareStatement(rtp(DELETE_ALL_BLOB_TRIGGERS));
        ps.executeUpdate();
        ps.close();
        ps = conn.prepareStatement(rtp(DELETE_ALL_TRIGGERS));
        ps.executeUpdate();
        ps.close();
        ps = conn.prepareStatement(rtp(DELETE_ALL_JOB_DETAILS));
        ps.executeUpdate();
        ps.close();
        ps = conn.prepareStatement(rtp(DELETE_ALL_CALENDARS));
        ps.executeUpdate();
        ps.close();
        ps = conn.prepareStatement(rtp(DELETE_ALL_PAUSED_TRIGGER_GRPS));
        ps.executeUpdate();
    } finally {
        closeStatement(ps);
    }
}

Demo 程式碼

MySQL 指令碼

https://github.com/quartz-scheduler/quartz/blob/v2.3.2/quartz-core/src/main/resources/org/quartz/impl/jdbcjobstore/tables_mysql.sql

清除資料

DELETE FROM qrtz_simple_triggers ;
DELETE FROM qrtz_simprop_triggers ;
DELETE FROM qrtz_cron_triggers ;
DELETE FROM qrtz_blob_triggers ;
DELETE FROM qrtz_triggers ;
DELETE FROM qrtz_job_details ;
DELETE FROM qrtz_calendars ;
DELETE FROM qrtz_paused_trigger_grps ; 
DELETE FROM qrtz_scheduler_state ; 
DELETE FROM qrtz_locks ; 
DELETE FROM qrtz_fired_triggers

Pom.xml
如果SpringBoot版本是2.0.0以後的,則在spring-boot-starter中已經包含了quart的依賴,則可以直接使用spring-boot-starter-quartz依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<!--Quartz 整合需要和資料庫互動-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.0.8</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.20</version>
</dependency>
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<!--hutool 工具類-->
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.3.7</version>
</dependency>


QuartzJob 參考上圖,建立實體

點選檢視程式碼
package com.vipsoft.web.entity;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.Size;
import java.io.Serializable;

/**
 * 定時任務排程
 */
public class QuartzJob implements Serializable {

    private static final long serialVersionUID = -6798153039624729495L;

    /**
     * 任務序號
     */
    private int jobId;

    /**
     * 任務名稱
     */
    @NotBlank(message = "任務名稱不能為空")
    @Size(max = 10, message = "任務名稱不能超過10個字元")
    private String jobName;

    /**
     * 任務組名
     */
    @NotBlank(message = "任務組名不能為空")
    @Size(max = 10, message = "任務組名不能超過10個字元")
    private String jobGroup;

    /**
     * 呼叫目標字串
     */
    private String invokeTarget;

    /**
     * 執行表示式
     */
    private String cronExpression;

    /**
     * cron計劃策略 0=預設,1=立即觸發執行,2=觸發一次執行,3=不觸發立即執行
     */
    private String misfirePolicy = "0";

    /**
     * 並行執行 0=允許,1=禁止
     */
    private String concurrent;

    /**
     * 描述 -- 任務說明
     */
    private String description;


    /**
     * 任務狀態(0正常 1暫停)
     */
    private String status;


    public int getJobId() {
        return jobId;
    }

    public void setJobId(int jobId) {
        this.jobId = jobId;
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getJobGroup() {
        return jobGroup;
    }

    public void setJobGroup(String jobGroup) {
        this.jobGroup = jobGroup;
    }

    public String getInvokeTarget() {
        return invokeTarget;
    }

    public void setInvokeTarget(String invokeTarget) {
        this.invokeTarget = invokeTarget;
    }

    public String getCronExpression() {
        return cronExpression;
    }

    public void setCronExpression(String cronExpression) {
        this.cronExpression = cronExpression;
    }

    public String getMisfirePolicy() {
        return misfirePolicy;
    }

    public void setMisfirePolicy(String misfirePolicy) {
        this.misfirePolicy = misfirePolicy;
    }

    public String getConcurrent() {
        return concurrent;
    }

    public void setConcurrent(String concurrent) {
        this.concurrent = concurrent;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }
}

核心程式碼:QuartzJobServiceImpl

點選檢視程式碼
package com.vipsoft.web.service.impl;

import cn.hutool.core.util.StrUtil;
import com.vipsoft.web.config.ScheduleConstants;
import com.vipsoft.web.entity.QuartzJob;
import com.vipsoft.web.exception.CustomException;
import com.vipsoft.web.job.CommonJob;

import com.vipsoft.web.service.IQuartzJobService;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

@Service
public class QuartzJobServiceImpl implements IQuartzJobService {

    @Autowired
    Scheduler scheduler;


    /**
     * 新增任務
     *
     * @param job 排程資訊
     * @return 結果
     */
    @Override
    public void clearAll(QuartzJob job) throws SchedulerException {
        scheduler.clear();
    }
    /**
     * 新增任務
     *
     * @param job 排程資訊
     * @return 結果
     */
    @Override
    public void addJob(QuartzJob job) throws SchedulerException {
        if (StrUtil.isEmpty(job.getStatus())) {
            // 如果沒值,設定暫停
            job.setStatus(ScheduleConstants.Status.PAUSE.getValue());
        }
        Class<? extends Job> jobClass = CommonJob.class;
        // 構建job資訊
        int jobId = job.getJobId();
        String jobName = job.getJobName();
        String jobGroup = job.getJobGroup();
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);

        JobDetail jobDetail = JobBuilder.newJob(jobClass)
                .withIdentity(jobKey)
                .build();
        // 放入引數,執行時的方法可以獲取
        jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job);



        // 表示式排程構建器
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
        cronScheduleBuilder = handleCronScheduleMisfirePolicy(job.getMisfirePolicy(), cronScheduleBuilder);

        // 按新的cronExpression表示式構建一個新的trigger
        CronTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(triggerKey)
                .withSchedule(cronScheduleBuilder)
                .build();

        // 判斷是否存在
        if (scheduler.checkExists(jobKey)) {
            // 防止建立時存在資料問題 先移除,然後在執行建立操作
            scheduler.deleteJob(jobKey);
        }

        scheduler.scheduleJob(jobDetail, trigger);

        // 暫停任務
        if (job.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue())) {
            scheduler.pauseJob(jobKey);
        }
    }

    /**
     * 設定定時任務策略
     */
    public static CronScheduleBuilder handleCronScheduleMisfirePolicy(String misfirePolicy, CronScheduleBuilder cb)  {
        switch (misfirePolicy) {
            case ScheduleConstants.MISFIRE_DEFAULT:
                return cb;
            case ScheduleConstants.MISFIRE_IGNORE_MISFIRES:
                return cb.withMisfireHandlingInstructionIgnoreMisfires();
            case ScheduleConstants.MISFIRE_FIRE_AND_PROCEED:
                return cb.withMisfireHandlingInstructionFireAndProceed();
            case ScheduleConstants.MISFIRE_DO_NOTHING:
                return cb.withMisfireHandlingInstructionDoNothing();
            default:
                throw new CustomException(60001, "策略設定異常 " + misfirePolicy);
        }
    }

    /**
     * 更新任務
     *
     * @param job 排程資訊
     * @return 結果
     */
    @Override
    public void updateJob(QuartzJob job) throws SchedulerException {
        // 判斷是否存在
        JobKey jobKey = JobKey.jobKey(job.getJobName(), job.getJobGroup());
        if (scheduler.checkExists(jobKey)) {
            // 防止建立時存在資料問題 先移除,然後在執行建立操作
            scheduler.deleteJob(jobKey);
        }
        addJob(job);
    }

    /**
     * 刪除任務
     *
     * @param job
     * @Date 2016年1月16日
     * @since 2.0.0
     */
    @Override
    public void deleteJob(QuartzJob job) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(job.getJobName(), job.getJobGroup());
        if (this.scheduler.checkExists(jobKey)) {
            this.scheduler.deleteJob(jobKey);
        }
    }

    /**
     * 立即執行任務
     *
     * @param job 排程資訊
     * @return 結果
     */
    @Override
    public void run(QuartzJob job) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(job.getJobName(), job.getJobGroup());
        scheduler.triggerJob(jobKey);
    }

    /**
     * 暫停任務
     *
     * @param job 排程資訊
     * @return 結果
     */
    @Override
    public void pauseJob(QuartzJob job) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(job.getJobName(), job.getJobGroup());
        this.scheduler.pauseJob(jobKey);
    }

    /**
     * 恢復任務
     *
     * @param job 排程資訊
     * @return 結果
     */
    @Override
    public void restartJob(QuartzJob job) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(job.getJobName(), job.getJobGroup());
        this.scheduler.resumeJob(jobKey);
    }

    /**
     * 獲取quartz排程器的計劃任務
     *
     * @param job 排程資訊
     * @return 排程任務集合
     */
    @Override
    public List<QuartzJob> listJob(QuartzJob job) throws SchedulerException {
        List<QuartzJob> scheduleJobVOList = new ArrayList<>();
        GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
        Set<JobKey> jobKeys = this.scheduler.getJobKeys(matcher);
        for (JobKey jobKey : jobKeys) {
            List<? extends Trigger> triggers = this.scheduler.getTriggersOfJob(jobKey);
            for (Trigger trigger : triggers) {
                JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
                QuartzJob scheduleJobVO = (QuartzJob) jobDetail.getJobDataMap().get(ScheduleConstants.TASK_PROPERTIES);
                Trigger.TriggerState triggerState = this.scheduler.getTriggerState(trigger.getKey());
                scheduleJobVO.setStatus(triggerState.name());
                // 判斷trigger
                if (trigger instanceof SimpleTrigger) {
                    SimpleTrigger simple = (SimpleTrigger) trigger;
                    scheduleJobVO.setCronExpression("重複次數:" + (simple.getRepeatCount() == -1 ? "無限" : simple.getRepeatCount()) + ",重複間隔:"
                            + (simple.getRepeatInterval() / 1000L));
                    scheduleJobVO.setDescription(simple.getDescription());
                }
                if (trigger instanceof CronTrigger) {
                    CronTrigger cron = (CronTrigger) trigger;
                    scheduleJobVO.setCronExpression(cron.getCronExpression());
                    scheduleJobVO.setDescription(cron.getDescription() == null ? ("觸發器:" + trigger.getKey()) : cron.getDescription());
                }
                scheduleJobVOList.add(scheduleJobVO);
            }
        }
        return scheduleJobVOList;
    }
}

ScheduleConfig

點選檢視程式碼
package com.vipsoft.web.config;
 
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; 
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import javax.sql.DataSource;
import java.util.Properties;

@Configuration
public class ScheduleConfig {

    /**
     * 設定屬性
     */
    private Properties quartzProperties() {
        // quartz引數
        Properties prop = new Properties();
        prop.put("org.quartz.scheduler.instanceName", "VipSoftScheduler");
        prop.put("org.quartz.scheduler.instanceId", "AUTO");
        // 執行緒池設定
        prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
        prop.put("org.quartz.threadPool.threadCount", "20");
        prop.put("org.quartz.threadPool.threadPriority", "5");
        // JobStore設定
        prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
        // 叢集設定
        prop.put("org.quartz.jobStore.isClustered", "true");
        prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
        prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
        prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");

        // sqlserver 啟用
        // prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?");
        prop.put("org.quartz.jobStore.misfireThreshold", "12000");
        prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
        return prop;
    }


    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setDataSource(dataSource);

//        //獲取設定屬性--通過載入組態檔的方式獲取設定
//        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
//        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
//        //在quartz.properties中的屬性被讀取並注入後再初始化物件
//        propertiesFactoryBean.afterPropertiesSet();
//        factory.setQuartzProperties(propertiesFactoryBean.getObject());

        //用於quartz叢集,載入quartz資料來源設定
        factory.setQuartzProperties(this.quartzProperties());

        factory.setSchedulerName("VipSoftScheduler");
        //QuartzScheduler 延時啟動,應用啟動完10秒後 QuartzScheduler 再啟動
        factory.setStartupDelay(10);
        // 可選,QuartzScheduler
        // 啟動時更新己存在的Job,這樣就不用每次修改targetObject後刪除qrtz_job_details表對應記錄了
        factory.setOverwriteExistingJobs(true);
        // 設定自動啟動,預設為true
        factory.setAutoStartup(true);
        factory.setApplicationContextSchedulerContextKey("applicationContextKey");

        return factory;
    }
}

暫停後,qrtz_triggers 表的 TRIGGER_STATE = PAUSED

執行效果
http://localhost:8088/schedule/add
http://localhost:8088/schedule/pause
http://localhost:8088/schedule/restart

{"jobName":"測試","jobGroup":"DEFAULT","invokeTarget":"scheduletask.execute('VipSoft Quartz')","cronExpression":"0/10 * * * * ?","misfirePolicy":2,"concurrent":1,"status":"0"}

參考

實現方式參考:若依(RuoYi),他是新建了一張中間表,通過 init() 方法,利用中心中間表進行定時任務的初始化

/**
 * 專案啟動時,初始化定時器 
 主要是防止手動修改資料庫導致未同步到定時任務處理(注:不能手動修改資料庫ID和任務組名,否則會導致髒資料)
 */
@PostConstruct
public void init() throws SchedulerException, TaskException
{
    scheduler.clear();
    List<SysJob> jobList = jobMapper.selectJobAll();
    for (SysJob job : jobList)
    {
        ScheduleUtils.createScheduleJob(scheduler, job);
    }
}