elastic-job原始碼(1)- job自動裝配

2023-04-26 21:00:45
版本:3.1.0-SNAPSHOT
git地址https://github.com/apache/shardingsphere-elasticjob
 
Maven 座標
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>${latest.version}</version>
</dependency>
 
Spring.factories設定
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration

在新增elasticjob-lite-spring-boot-starter啟動類的時候,會自動載入ElasticJobLiteAutoConfiguration,接下來看下ElasticJobLiteAutoConfiguration中所做的處理。
 
ElasticJobLiteAutoConfiguration.java
/**
 * ElasticJob-Lite auto configuration.
 */
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)


/**
 * elastic job 開關
 * elasticjob.enabled.ture預設為true
 */
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)


/**
 * 匯入
 * ElasticJobRegistryCenterConfiguration.class 註冊中心設定
 * ElasticJobTracingConfiguration.class job事件追蹤設定
 * ElasticJobSnapshotServiceConfiguration.class 快照設定
 */
@Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})


/**
 * job相關設定資訊
 */
@EnableConfigurationProperties(ElasticJobProperties.class)
public class ElasticJobLiteAutoConfiguration {
    
    @Configuration(proxyBeanMethods = false)
    /**
     * ElasticJobBootstrapConfiguration.class  建立job beans 注入spring容器
     * ScheduleJobBootstrapStartupRunner.class  執行型別為ScheduleJobBootstrap.class 的job開始執行
     */
    @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})
    protected static class ElasticJobConfiguration {
    }
}

Elastic-job 是利用zookeeper 實現分散式job的功能,所以在自動裝配的時候,需要有zookeeper註冊中心的設定。
自動裝配主要做了4件事事
1.設定zookeeper 使用者端資訊,啟動連線zookeeper.
2.設定事件追蹤資料庫,用於儲存job執行記錄
3.解析所有job組態檔,將所有job的bean放置在spring 單例bean中
4.識別job型別,在zookeeper節點上處理job節點資料,執行定時任務job.
 
第一件事:設定zookeeper 使用者端資訊,啟動連線zookeeper.
ZookeeperRegistryCenter.class
public void init() {
    log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
            //設定zookeeper 伺服器地址
            .connectString(zkConfig.getServerLists())
            //設定重試機制
            .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
            //設定名稱空間,zookeeper節點名稱
            .namespace(zkConfig.getNamespace());
    //設定session超時時間
    if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
        builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
    }
    //設定連線超時時間
    if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
        builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
    }
    if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
        builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))
                .aclProvider(new ACLProvider() {
                
                    @Override
                    public List<ACL> getDefaultAcl() {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                
                    @Override
                    public List<ACL> getAclForPath(final String path) {
                        return ZooDefs.Ids.CREATOR_ALL_ACL;
                    }
                });
    }
    client = builder.build();
    //zookeeper 使用者端開始啟動
    client.start();
    try {
        //zookeeper 使用者端一直連線
        if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
            client.close();
            throw new KeeperException.OperationTimeoutException();
        }
        //CHECKSTYLE:OFF
    } catch (final Exception ex) {
        //CHECKSTYLE:ON
        RegExceptionHandler.handleException(ex);
    }
}

 

第二件事: 設定事件追蹤資料庫,用於儲存job執行記錄

ElasticJobTracingConfiguration.java

 

/**
 * Create a bean of tracing DataSource.
 *
 * @param tracingProperties tracing Properties
 * @return tracing DataSource
 */
@Bean("tracingDataSource")
//spring中注入bean name 為tracingDataSource的job資料庫連線資訊
public DataSource tracingDataSource(final TracingProperties tracingProperties) {
    //獲取elastic-job 資料庫設定
    DataSourceProperties dataSource = tracingProperties.getDataSource();
    if (dataSource == null) {
        return null;
    }
    HikariDataSource tracingDataSource = new HikariDataSource();
    tracingDataSource.setJdbcUrl(dataSource.getUrl());
    BeanUtils.copyProperties(dataSource, tracingDataSource);
    return tracingDataSource;
}


/**
 * Create a bean of tracing configuration.
 *
 * @param dataSource required by constructor
 * @param tracingDataSource tracing ataSource
 * @return a bean of tracing configuration
 */
@Bean
@ConditionalOnBean(DataSource.class)
@ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB")
public TracingConfiguration<DataSource> tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {
    /**
     * dataSource 是業務資料庫
     * tracingDataSource 是job資料庫
     * 當設定elasticjob.tracing.type = RDB時,如果單獨設定job資料庫是,預設使用job資料庫作為job執行軌跡的記錄
     * 但這邊同時業務資料庫和job追蹤資料庫同時注入是,mybatis-plus 結合@Table 使用的時候,很有可能找不到正確對應的資料來源
     */
    DataSource ds = tracingDataSource;
    if (ds == null) {
        ds = dataSource;
    }
    return new TracingConfiguration<>("RDB", ds);
}

 

通過elasticjob.tracing.type=RDB的設定開啟事件追蹤功能,這邊job的事件追蹤資料來源可以和業務資料來源設定不一樣。

 

第三件事:解析所有job組態檔

ElasticJobBootstrapConfiguration.class

 

public void createJobBootstrapBeans() {
    //獲取job設定
    ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
    //獲取單利註冊物件
    SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
    //獲取注入zookeeper 使用者端
    CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
    //獲取job事件追蹤
    TracingConfiguration<?> tracingConfig = getTracingConfiguration();
    //構造JobBootstraps
    constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
}

重要的是constructJobBootstraps 這個方法,來看下

private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
                                    final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) {
    //遍歷設定的每一個job
    for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) {
        ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
        //校驗 job class 和 type 都為空拋異常
        Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
                        || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
        //校驗 job class 和 type 都有 報相互排斥
        Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
                        || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
                "[elasticJobClass] and [elasticJobType] are mutually exclusive.");


        if (null != jobConfigurationProperties.getElasticJobClass()) {
            //通過class 注入job
            registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {
            //通過type 注入job
            registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);
        }
    }
}

Job 有兩種型別的注入,第一種是是class,設定成job的全路徑資訊注入
 
再來看看registerClassedJob 方法裡的內容
private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
                                final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
    //獲取job設定
    JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
    //設定job事件追蹤
    jobExtraConfigurations(jobConfig, tracingConfig);
    //獲取job型別
    ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
    //沒有設定cron表示式 就初始化為OneOffJobBootstrap物件,一次性任務
    if (Strings.isNullOrEmpty(jobConfig.getCron())) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
        singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
    } else {
        //有設定cron表示式 就初始化為ScheduleJobBootstrap物件,定時任務
        //設定bean name
        String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
        //注入ScheduleJobBootstrap物件為單利物件
        singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
    }
}

Class 型別注入的job有兩種型別
1.ScheduleJobBootstrap:定時任務型別的job。
2.OneOffJobBootstrap:一定次job型別。
 
看下定義的new ScheduleJobBootstrap 方法
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
    Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
    this.regCenter = regCenter;
    //獲取job監聽器
    Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig);
    // 整合所有操作zookeeper 節點的services,job 監聽器
    setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
    //獲取當前job名稱
    String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
    //zookeeper節點 {namespace}/{jobclassname}/config 放置job設定資訊
    this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
    // 整合所有操作zookeeper 節點的services
    schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
    jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
    //檢驗job設定
    validateJobProperties();
    //定義job執行器
    jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
    //監聽器裡注入GuaranteeService
    setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
    //建立定時任務,開始執行
    jobScheduleController = createJobScheduleController();
}

 

看下createJobScheduleController

private JobScheduleController createJobScheduleController() {
    JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
    //註冊job
    JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
    //註冊器開始執行
    registerStartUpInfo();
    return result;
}

看下registerStartUpInfo方法

public void registerStartUpInfo(final boolean enabled) {
    //開始所有的監聽器
    listenerManager.startAllListeners();
    //選舉leader /{namespace}/leader/election/instance 放置選舉出來的伺服器
    leaderService.electLeader();
    //{namespace}/{ipservers} 設定enable處理
    serverService.persistOnline(enabled);
    //臨時節點   /{namespave}/instances 放置執行服務範例資訊
    instanceService.persistOnline();
    //開啟一個非同步服務
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}

這裡實行的操作:
1.開啟所有監聽器處理
2.leader選舉
3.持久化節點資料
4.開啟非同步服務
 
第四步:4.識別job型別,在zookeeper節點上處理job節點資料,執行定時任務job.
 
@Override
public void run(final String... args) {
    log.info("Starting ElasticJob Bootstrap.");
    applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
    log.info("ElasticJob Bootstrap started.");
}

獲取到所有的定時任務job(ScheduleJobBootstrap型別),執行schedule方法,底層實際使用quartz框架執行定時任務。