SpringBoot定時任務

2022-08-05 09:00:28

通過前文我們基本梳理了定時任務體系:Timer和ScheduledExecutorService是JDK內建的定時任務方案,以及Netty內部基於時間輪實現的HashedWheelTimer,再到Quartz以及分散式任務(ElasticJob,xxl-job等等)。對於Springboot簡單應用,還可以採用Spring自帶task方式,本文主要介紹Spring自帶的Task的案例和其實現方式。@pdai

實現案例

Spring Task封裝的比較好,使用非常簡單。

@EnableScheduling+@Scheduled

  • 通過@EnableScheduling啟用定時任務,@Scheduled定義任務
@EnableScheduling
@Configuration
public class ScheduleDemo {

    /**
     * 每隔1分鐘執行一次。
     */
    @Scheduled(fixedRate = 1000 * 60 * 1)
    public void runScheduleFixedRate() {
        log.info("runScheduleFixedRate: current DateTime, {}", LocalDateTime.now());
    }

    /**
     * 每個整點小時執行一次。
     */
    @Scheduled(cron = "0 0 */1 * * ?")
    public void runScheduleCron() {
        log.info("runScheduleCron: current DateTime, {}", LocalDateTime.now());
    }

}
  • @Scheduled所支援的引數
  1. cron:cron表示式,指定任務在特定時間執行;
  2. fixedDelay:表示上一次任務執行完成後多久再次執行,引數型別為long,單位ms;
  3. fixedDelayString:與fixedDelay含義一樣,只是引數型別變為String;
  4. fixedRate:表示按一定的頻率執行任務,引數型別為long,單位ms;
  5. fixedRateString: 與fixedRate的含義一樣,只是將引數型別變為String;
  6. initialDelay:表示延遲多久再第一次執行任務,引數型別為long,單位ms;
  7. initialDelayString:與initialDelay的含義一樣,只是將引數型別變為String;
  8. zone:時區,預設為當前時區,一般沒有用到。

進一步理解

我們再通過一些問題來幫助你更深入理解Spring Task實現方式。@pdai

使用Spring Task要注意什麼?

  • 關於例外處理

建議自行處理異常

  • 關於超時處理

在實際的開發中,這個問題經常會出現,比如執行一段時間後定時任務不再執行了; 這種情況會發生在,比如你呼叫一個第三方介面,沒有設定呼叫超時,繼而引發異常,這時候當前任務便阻塞了。

Spring Task的原理?

Spring Task的原始碼在這裡:

@EnableScheduling註解

新增@EnableScheduling註解會自動注入SchedulingConfiguration

 * @author Chris Beams
 * @author Juergen Hoeller
 * @since 3.1
 * @see Scheduled
 * @see SchedulingConfiguration
 * @see SchedulingConfigurer
 * @see ScheduledTaskRegistrar
 * @see Trigger
 * @see ScheduledAnnotationBeanPostProcessor
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

SchedulingConfiguration中初始化ScheduledAnnotationBeanPostProcessor

SchedulingConfiguration設定中自動初始化ScheduledAnnotationBeanPostProcessor

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {

	@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
		return new ScheduledAnnotationBeanPostProcessor();
	}

}

什麼是BeanPostProcessor? 我們在前文中有詳解的講解,具體看Spring核心之控制反轉(IOC)原始碼解析

Spring 容器中 Bean 的生命週期流程

ScheduledTaskRegistrar註冊task

在ScheduledAnnotationBeanPostProcessor建構函式中初始化了ScheduledTaskRegistrar

/**
    * Create a default {@code ScheduledAnnotationBeanPostProcessor}.
    */
public ScheduledAnnotationBeanPostProcessor() {
    this.registrar = new ScheduledTaskRegistrar();
}

ScheduledTaskRegistrar最主要的是註冊各種型別的task (這種方式在新的版本中已經廢棄了)

protected void scheduleTasks() {
    if (this.taskScheduler == null) {
        this.localExecutor = Executors.newSingleThreadScheduledExecutor();
        this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }
    if (this.triggerTasks != null) {
        for (TriggerTask task : this.triggerTasks) {
            addScheduledTask(scheduleTriggerTask(task));
        }
    }
    if (this.cronTasks != null) {
        for (CronTask task : this.cronTasks) {
            addScheduledTask(scheduleCronTask(task));
        }
    }
    if (this.fixedRateTasks != null) {
        for (IntervalTask task : this.fixedRateTasks) {
            addScheduledTask(scheduleFixedRateTask(task));
        }
    }
    if (this.fixedDelayTasks != null) {
        for (IntervalTask task : this.fixedDelayTasks) {
            addScheduledTask(scheduleFixedDelayTask(task));
        }
    }
}

註冊哪些Task,怎麼設計類的呢?

ScheduledAnnotationBeanPostProcessor載入Scheduled註解

在BeanPostProcessor的postProcessAfterInitialization階段載入Scheduled註解

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
            bean instanceof ScheduledExecutorService) {
        // Ignore AOP infrastructure such as scoped proxies.
        return bean;
    }

    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    if (!this.nonAnnotatedClasses.contains(targetClass) &&
            AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
        Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                    Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                            method, Scheduled.class, Schedules.class);
                    return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
                });
        if (annotatedMethods.isEmpty()) {
            this.nonAnnotatedClasses.add(targetClass);
            if (logger.isTraceEnabled()) {
                logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
            }
        }
        else {
            // Non-empty set of methods
            annotatedMethods.forEach((method, scheduledAnnotations) ->
                    scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
            if (logger.isTraceEnabled()) {
                logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                        "': " + annotatedMethods);
            }
        }
    }
    return bean;
}

Scheduled註解是新增到方法級別,具體如下

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {

	/**
	 * A special cron expression value that indicates a disabled trigger: {@value}.
	 * <p>This is primarily meant for use with <code>${...}</code> placeholders,
	 * allowing for external disabling of corresponding scheduled methods.
	 * @since 5.1
	 * @see ScheduledTaskRegistrar#CRON_DISABLED
	 */
	String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;


	/**
	 * A cron-like expression, extending the usual UN*X definition to include triggers
	 * on the second, minute, hour, day of month, month, and day of week.
	 * <p>For example, {@code "0 * * * * MON-FRI"} means once per minute on weekdays
	 * (at the top of the minute - the 0th second).
	 * <p>The fields read from left to right are interpreted as follows.
	 * <ul>
	 * <li>second</li>
	 * <li>minute</li>
	 * <li>hour</li>
	 * <li>day of month</li>
	 * <li>month</li>
	 * <li>day of week</li>
	 * </ul>
	 * <p>The special value {@link #CRON_DISABLED "-"} indicates a disabled cron
	 * trigger, primarily meant for externally specified values resolved by a
	 * <code>${...}</code> placeholder.
	 * @return an expression that can be parsed to a cron schedule
	 * @see org.springframework.scheduling.support.CronExpression#parse(String)
	 */
	String cron() default "";

	/**
	 * A time zone for which the cron expression will be resolved. By default, this
	 * attribute is the empty String (i.e. the server's local time zone will be used).
	 * @return a zone id accepted by {@link java.util.TimeZone#getTimeZone(String)},
	 * or an empty String to indicate the server's default time zone
	 * @since 4.0
	 * @see org.springframework.scheduling.support.CronTrigger#CronTrigger(String, java.util.TimeZone)
	 * @see java.util.TimeZone
	 */
	String zone() default "";

	/**
	 * Execute the annotated method with a fixed period between the end of the
	 * last invocation and the start of the next.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 * @return the delay
	 */
	long fixedDelay() default -1;

	/**
	 * Execute the annotated method with a fixed period between the end of the
	 * last invocation and the start of the next.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 * @return the delay as a String value &mdash; for example, a placeholder
	 * or a {@link java.time.Duration#parse java.time.Duration} compliant value
	 * @since 3.2.2
	 */
	String fixedDelayString() default "";

	/**
	 * Execute the annotated method with a fixed period between invocations.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 * @return the period
	 */
	long fixedRate() default -1;

	/**
	 * Execute the annotated method with a fixed period between invocations.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 * @return the period as a String value &mdash; for example, a placeholder
	 * or a {@link java.time.Duration#parse java.time.Duration} compliant value
	 * @since 3.2.2
	 */
	String fixedRateString() default "";

	/**
	 * Number of units of time to delay before the first execution of a
	 * {@link #fixedRate} or {@link #fixedDelay} task.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 * @return the initial
	 * @since 3.2
	 */
	long initialDelay() default -1;

	/**
	 * Number of units of time to delay before the first execution of a
	 * {@link #fixedRate} or {@link #fixedDelay} task.
	 * <p>The time unit is milliseconds by default but can be overridden via
	 * {@link #timeUnit}.
	 * @return the initial delay as a String value &mdash; for example, a placeholder
	 * or a {@link java.time.Duration#parse java.time.Duration} compliant value
	 * @since 3.2.2
	 */
	String initialDelayString() default "";

	/**
	 * The {@link TimeUnit} to use for {@link #fixedDelay}, {@link #fixedDelayString},
	 * {@link #fixedRate}, {@link #fixedRateString}, {@link #initialDelay}, and
	 * {@link #initialDelayString}.
	 * <p>Defaults to {@link TimeUnit#MICROSECONDS}.
	 * <p>This attribute is ignored for {@linkplain #cron() cron expressions}
	 * and for {@link java.time.Duration} values supplied via {@link #fixedDelayString},
	 * {@link #fixedRateString}, or {@link #initialDelayString}.
	 * @return the {@code TimeUnit} to use
	 * @since 5.3.10
	 */
	TimeUnit timeUnit() default TimeUnit.MILLISECONDS;

}

@Scheduled所支援的引數:

  1. cron:cron表示式,指定任務在特定時間執行;
  2. fixedDelay:表示上一次任務執行完成後多久再次執行,引數型別為long,單位ms;
  3. fixedDelayString:與fixedDelay含義一樣,只是引數型別變為String;
  4. fixedRate:表示按一定的頻率執行任務,引數型別為long,單位ms;
  5. fixedRateString: 與fixedRate的含義一樣,只是將引數型別變為String;
  6. initialDelay:表示延遲多久再第一次執行任務,引數型別為long,單位ms;
  7. initialDelayString:與initialDelay的含義一樣,只是將引數型別變為String;
  8. zone:時區,預設為當前時區,一般沒有用到。

獲取到方法上Scheduled註解(對任務的定義),通過processScheduled處理具體型別的task

/**
    * Process the given {@code @Scheduled} method declaration on the given bean.
    * @param scheduled the {@code @Scheduled} annotation
    * @param method the method that the annotation has been declared on
    * @param bean the target bean instance
    * @see #createRunnable(Object, Method)
    */
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    try {
        Runnable runnable = createRunnable(bean, method);
        boolean processedSchedule = false;
        String errorMessage =
                "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

        Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

        // Determine initial delay
        long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
        String initialDelayString = scheduled.initialDelayString();
        if (StringUtils.hasText(initialDelayString)) {
            Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
            if (this.embeddedValueResolver != null) {
                initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
            }
            if (StringUtils.hasLength(initialDelayString)) {
                try {
                    initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                }
            }
        }

        // Check cron expression
        String cron = scheduled.cron();
        if (StringUtils.hasText(cron)) {
            String zone = scheduled.zone();
            if (this.embeddedValueResolver != null) {
                cron = this.embeddedValueResolver.resolveStringValue(cron);
                zone = this.embeddedValueResolver.resolveStringValue(zone);
            }
            if (StringUtils.hasLength(cron)) {
                Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                processedSchedule = true;
                if (!Scheduled.CRON_DISABLED.equals(cron)) {
                    TimeZone timeZone;
                    if (StringUtils.hasText(zone)) {
                        timeZone = StringUtils.parseTimeZoneString(zone);
                    }
                    else {
                        timeZone = TimeZone.getDefault();
                    }
                    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                }
            }
        }

        // At this point we don't need to differentiate between initial delay set or not anymore
        if (initialDelay < 0) {
            initialDelay = 0;
        }

        // Check fixed delay
        long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
        if (fixedDelay >= 0) {
            Assert.isTrue(!processedSchedule, errorMessage);
            processedSchedule = true;
            tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
        }

        String fixedDelayString = scheduled.fixedDelayString();
        if (StringUtils.hasText(fixedDelayString)) {
            if (this.embeddedValueResolver != null) {
                fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
            }
            if (StringUtils.hasLength(fixedDelayString)) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                try {
                    fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                }
                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
        }

        // Check fixed rate
        long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
        if (fixedRate >= 0) {
            Assert.isTrue(!processedSchedule, errorMessage);
            processedSchedule = true;
            tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
        }
        String fixedRateString = scheduled.fixedRateString();
        if (StringUtils.hasText(fixedRateString)) {
            if (this.embeddedValueResolver != null) {
                fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
            }
            if (StringUtils.hasLength(fixedRateString)) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                try {
                    fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                }
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
        }

        // Check whether we had any attribute set
        Assert.isTrue(processedSchedule, errorMessage);

        // Finally register the scheduled tasks
        synchronized (this.scheduledTasks) {
            Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
            regTasks.addAll(tasks);
        }
    }
    catch (IllegalArgumentException ex) {
        throw new IllegalStateException(
                "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
    }
}

ScheduledTaskRegistrar 中解析task

以CronTask為例,如果定義了taskScheduler則由taskScheduler執行,如果沒有放到unresolvedTasks中。

/**
    * Schedule the specified cron task, either right away if possible
    * or on initialization of the scheduler.
    * @return a handle to the scheduled task, allowing to cancel it
    * (or {@code null} if processing a previously registered task)
    * @since 4.3
    */
@Nullable
public ScheduledTask scheduleCronTask(CronTask task) {
    ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
    boolean newTask = false;
    if (scheduledTask == null) {
        scheduledTask = new ScheduledTask(task);
        newTask = true;
    }
    if (this.taskScheduler != null) {
        scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
    }
    else {
        addCronTask(task);
        this.unresolvedTasks.put(task, scheduledTask);
    }
    return (newTask ? scheduledTask : null);
}

TaskScheduler對Task處理

預設是ConcurrentTaskScheduler, 處理方法如下

@Override
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
    try {
        if (this.enterpriseConcurrentScheduler) {
            return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
        }
        else {
            ErrorHandler errorHandler =
                    (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
            return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();
        }
    }
    catch (RejectedExecutionException ex) {
        throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
    }
}

EnterpriseConcurrentTriggerScheduler 是 JSR-236 Trigger標準,它的處理方法如下

/**
    * Delegate that adapts a Spring Trigger to a JSR-236 Trigger.
    * Separated into an inner class in order to avoid a hard dependency on the JSR-236 API.
    */
private class EnterpriseConcurrentTriggerScheduler {

    public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {
        ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;
        return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {
            @Override
            @Nullable
            public Date getNextRunTime(@Nullable LastExecution le, Date taskScheduledTime) {
                return (trigger.nextExecutionTime(le != null ?
                        new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :
                        new SimpleTriggerContext()));
            }
            @Override
            public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {
                return false;
            }
        });
    }
}

如果沒有使用EnterpriseConcurrentTriggerScheduler, 則使用ReschedulingRunnable,本質上由ScheduledExecutorService處理

public ReschedulingRunnable(Runnable delegate, Trigger trigger, Clock clock,
			ScheduledExecutorService executor, ErrorHandler errorHandler) {

    super(delegate, errorHandler);
    this.trigger = trigger;
    this.triggerContext = new SimpleTriggerContext(clock);
    this.executor = executor;
}


@Nullable
public ScheduledFuture<?> schedule() {
    synchronized (this.triggerContextMonitor) {
        this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
        if (this.scheduledExecutionTime == null) {
            return null;
        }
        long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
        this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
        return this;
    }
}

範例原始碼

https://github.com/realpdai/tech-pdai-spring-demos

更多內容

告別碎片化學習,無套路一站式體系化學習後端開發: Java 全棧知識體系 https://pdai.tech