在前面的部落格《Java定時器演進過程和生產級分散式任務排程ElasticJob程式碼實戰》中,我們已經熟悉ElasticJob分散式任務的應用,其核心實現為elasticjob-lite-spring-boot-starter,少量設定開箱即用;還有前面也有部落格檔案談談走進Spring Boot原始碼學習之路和淺談入門,瞭解Spring Boot的原理,沒看過夥伴可以先翻看下前面的文章。SpringBoot官網已經提供非常多的starter使用,然而今天我們就來模擬封裝一個簡易的分散式任務排程實現定時任務選主執行和故障自動轉移的starter,本篇主要重心在於基於SpringBoot官網標準start封裝的模板和步驟。
應用程式上下文
在Spring應用程式中,應用程式上下文是組成應用程式的物件(或「bean」)的網路。它包含我們的Web控制器,服務,儲存庫以及我們的應用程式可能需要的任何(通常是無狀態的)物件。
設定
使用註釋@Configuration標註的類,扮演新增到應用程式上下文的bean工廠。它可能包含帶註釋的工廠方法,@Bean其返回值由Spring自動新增到應用程式上下文中。
簡而言之,Spring設定為應用程式上下文提供bean。
自動設定
自動設定是Spring自動發現的@Configuration類。只要該類位於在類路徑classpath上,即可自動設定,並將設定的結果新增到應用程式上下文中。自動設定可以是有條件的,使得其啟用取決於外部因素,例如具有特定值的特定設定引數。
自動設定模組
自動設定模組是包含自動設定類的Maven或Gradle模組。這樣,我們就可以構建自動為應用程式上下文做貢獻的模組,新增某個功能或提供對某個外部庫的存取。我們在Spring Boot應用程式中使用它所要做的就是在我們的pom.xml或者包含它的依賴項build.gradle。
Spring Boot團隊大量使用此方法將Spring Boot與外部庫整合。
Spring Boot Starter
Spring Boot Starter是一個Maven或Gradle模組,其唯一目的是提供「使用某個功能」「開始」所需的所有依賴項。這通常意味著它是一個單獨的pom.xml或build.gradle檔案,包含一個或多個自動設定模組的依賴項以及可能需要的任何其他依賴項。在Spring Boot應用程式中,我們只需要包含此啟動器Starter即可使用該功能。
Spring Boot 在啟動的時候會做這幾件事情
其實也就是 Spring Boot 在啟動的時候,按照約定去讀取 Spring Boot Starter 的設定資訊,再根據設定資訊對資源進行初始化,並注入到 Spring 容器中。這樣 Spring Boot 啟動完畢後,就已經準備好了一切資源,使用過程中直接注入對應 Bean 資源即可。
參考GitHub基於分散式任務實現的一些程式碼,這裡核心主要是構建一個light-job自動裝配組態檔讀取類和一個light-job自動裝配設定類。
light-job-spring-boot-starter-autoconfigure模組新增Pom依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.itxs</groupId>
<artifactId>light-job-spring-boot-starter-autoconfigure</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>light-job-spring-boot-starter-autoconfigure</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<jdk.version>1.8</jdk.version>
<spring-boot.version>2.6.4</spring-boot.version>
<zookeeper.version>3.4.6</zookeeper.version>
<commons-lang3.version>3.4</commons-lang3.version>
<quartz.version>2.3.2</quartz.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>${quartz.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
建立LightJobProperties讀取組態檔
package com.itxs.lightjob.config;
import com.itxs.lightjob.zk.ZKManager.KEYS;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ConfigurationProperties(prefix = "light.job",ignoreInvalidFields = true)
public class LightJobProperties {
private String enabled;
private String zkConnect;
private String rootPath = "/light/job";
private int zkSessionTimeout = 60000;
private String zkUsername;
private String zkPassword;
private List<String> ipBlackList;
private List<String> targetBean;
private List<String> targetMethod;
private List<String> cronExpression;
private List<String> startTime;
private List<String> period;
private List<String> delay;
private List<String> params;
private List<String> type;
private List<String> extKeySuffix;
private List<String> beforeMethod;
private List<String> afterMethod;
private List<String> threadNum;
public Map<String, String> getConfig(){
Map<String, String> properties = new HashMap<String, String>();
properties.put(KEYS.zkConnectString.key, zkConnect);
if(StringUtils.isNotBlank(rootPath)){
properties.put(KEYS.rootPath.key, rootPath);
}
if(zkSessionTimeout > 0){
properties.put(KEYS.zkSessionTimeout.key, zkSessionTimeout+"");
}
if(StringUtils.isNotBlank(zkUsername)){
properties.put(KEYS.userName.key, zkUsername);
}
if(StringUtils.isNotBlank(zkPassword)){
properties.put(KEYS.password.key, zkPassword);
}
StringBuilder sb = new StringBuilder();
if(ipBlackList != null && ipBlackList.size() > 0){
for(String ip:ipBlackList){
sb.append(ip).append(",");
}
sb.substring(0,sb.lastIndexOf(","));
}
properties.put(KEYS.ipBlacklist.key, sb.toString());
return properties;
}
public String getEnabled() {
return enabled;
}
public void setEnabled(String enabled) {
this.enabled = enabled;
}
public String getZkConnect() {
return zkConnect;
}
public void setZkConnect(String zkConnect) {
this.zkConnect = zkConnect;
}
public String getRootPath() {
return rootPath;
}
public void setRootPath(String rootPath) {
this.rootPath = rootPath;
}
public int getZkSessionTimeout() {
return zkSessionTimeout;
}
public void setZkSessionTimeout(int zkSessionTimeout) {
this.zkSessionTimeout = zkSessionTimeout;
}
public String getZkUsername() {
return zkUsername;
}
public void setZkUsername(String zkUsername) {
this.zkUsername = zkUsername;
}
public String getZkPassword() {
return zkPassword;
}
public void setZkPassword(String zkPassword) {
this.zkPassword = zkPassword;
}
public List<String> getIpBlackList() {
return ipBlackList;
}
public void setIpBlackList(List<String> ipBlackList) {
this.ipBlackList = ipBlackList;
}
public List<String> getTargetBean() {
return targetBean;
}
public void setTargetBean(List<String> targetBean) {
this.targetBean = targetBean;
}
public List<String> getTargetMethod() {
return targetMethod;
}
public void setTargetMethod(List<String> targetMethod) {
this.targetMethod = targetMethod;
}
public List<String> getCronExpression() {
return cronExpression;
}
public void setCronExpression(List<String> cronExpression) {
this.cronExpression = cronExpression;
}
public List<String> getStartTime() {
return startTime;
}
public void setStartTime(List<String> startTime) {
this.startTime = startTime;
}
public List<String> getPeriod() {
return period;
}
public void setPeriod(List<String> period) {
this.period = period;
}
public List<String> getDelay() {
return delay;
}
public void setDelay(List<String> delay) {
this.delay = delay;
}
public List<String> getParams() {
return params;
}
public void setParams(List<String> params) {
this.params = params;
}
public List<String> getType() {
return type;
}
public void setType(List<String> type) {
this.type = type;
}
public List<String> getExtKeySuffix() {
return extKeySuffix;
}
public void setExtKeySuffix(List<String> extKeySuffix) {
this.extKeySuffix = extKeySuffix;
}
public List<String> getBeforeMethod() {
return beforeMethod;
}
public void setBeforeMethod(List<String> beforeMethod) {
this.beforeMethod = beforeMethod;
}
public List<String> getAfterMethod() {
return afterMethod;
}
public void setAfterMethod(List<String> afterMethod) {
this.afterMethod = afterMethod;
}
public List<String> getThreadNum() {
return threadNum;
}
public void setThreadNum(List<String> threadNum) {
this.threadNum = threadNum;
}
}
建立自動裝配類LightJobAutoConfiguration.java
package com.itxs.lightjob.config;
import com.itxs.lightjob.ZKScheduleManager;
import com.itxs.lightjob.core.TaskDefine;
import com.itxs.lightjob.util.ScheduleUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Configuration
@EnableConfigurationProperties({LightJobProperties.class})
@ConditionalOnProperty(value = "light.job.enabled", havingValue = "true")
@ComponentScan()
public class LightJobAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(LightJobAutoConfiguration.class);
@Autowired
private LightJobProperties uncodeScheduleConfig;
@Bean(name = "zkScheduleManager", initMethod="init")
public ZKScheduleManager commonMapper(){
ZKScheduleManager zkScheduleManager = new ZKScheduleManager();
zkScheduleManager.setZkConfig(uncodeScheduleConfig.getConfig());
List<TaskDefine> list = initAllTask();
zkScheduleManager.setInitTaskDefines(list);
LOGGER.info("=====>ZKScheduleManager inited..");
return zkScheduleManager;
}
private List<TaskDefine> initAllTask(){
List<TaskDefine> list = new ArrayList<TaskDefine>();
int total = 0;
if(uncodeScheduleConfig.getTargetBean() != null){
total = uncodeScheduleConfig.getTargetBean().size();
}
for(int i = 0; i < total; i++){
TaskDefine taskDefine = new TaskDefine();
if(uncodeScheduleConfig.getTargetBean() != null){
String value = uncodeScheduleConfig.getTargetBean().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setTargetBean(value);
}
}
if(uncodeScheduleConfig.getTargetMethod() != null){
String value = uncodeScheduleConfig.getTargetMethod().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setTargetMethod(value);
}
}
if(uncodeScheduleConfig.getCronExpression() != null){
String value = uncodeScheduleConfig.getCronExpression().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setCronExpression(value);
}
}
if(uncodeScheduleConfig.getStartTime() != null){
String value = uncodeScheduleConfig.getStartTime().get(i);
if(StringUtils.isNotBlank(value)){
Date time = null;
try {
time = ScheduleUtil.transferStringToDate(value);
} catch (ParseException e) {
e.printStackTrace();
}
if(time != null){
taskDefine.setStartTime(time);
}
}
}
if(uncodeScheduleConfig.getPeriod() != null){
String value = uncodeScheduleConfig.getPeriod().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setPeriod(Long.valueOf(value));
}
}
if(uncodeScheduleConfig.getDelay() != null){
String value = uncodeScheduleConfig.getDelay().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setDelay(Long.valueOf(value));
}
}
if(uncodeScheduleConfig.getParams() != null){
String value = uncodeScheduleConfig.getParams().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setParams(value);
}
}
if(uncodeScheduleConfig.getType() != null){
String value = uncodeScheduleConfig.getType().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setType(value);
}
}
if(uncodeScheduleConfig.getExtKeySuffix() != null){
String value = uncodeScheduleConfig.getExtKeySuffix().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setExtKeySuffix(value);
}
}
if(uncodeScheduleConfig.getBeforeMethod() != null){
String value = uncodeScheduleConfig.getBeforeMethod().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setBeforeMethod(value);
}
}
if(uncodeScheduleConfig.getAfterMethod() != null){
String value = uncodeScheduleConfig.getAfterMethod().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setAfterMethod(value);
}
}
if(uncodeScheduleConfig.getThreadNum() != null){
String value = uncodeScheduleConfig.getThreadNum().get(i);
if(StringUtils.isNotBlank(value)){
taskDefine.setThreadNum(Integer.valueOf(value));
}
}
list.add(taskDefine);
}
return list;
}
}
然後在resources目錄下的META-INF目錄下建立spring.factories檔案,跟SpringBoot其他starter一樣,輸出自動裝配類的全類名;springboot專案預設只會掃描本專案下的帶@Configuration註解的類,如果自定義starter,不在本工程中,是無法載入的,所以要設定META-INF/spring.factories組態檔。設定了META-INF/spring.factories組態檔是springboot實現starter的關鍵點,springboot的這種設定載入方式是一種類SPI(Service Provider Interface)的方式,SPI可以在META-INF/services設定介面擴充套件的實現類,springboot中原理類似,只是名稱換成了spring.factories而已。
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.itxs.lightjob.config.LightJobAutoConfiguration
其他還有自動裝配類的具體實現程式碼檔案,如下面目錄,主要利用zookeeper做分散式協調如分散式選主,執行maven install打包和安裝到本地maven倉庫。
light-job-spring-boot-starter是不做實現,主要管理依賴,Pom檔案內容如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itxs</groupId>
<artifactId>light-job-spring-boot-starter</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.itxs</groupId>
<artifactId>light-job-spring-boot-starter-autoconfigure</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
</project>
最後我們執行maven install打包和安裝到本地maven倉庫。
範例工程中加入light-job-spring-boot-starter依賴,這裡選擇前面文章範例的庫存微服務模組中新增
<dependency>
<groupId>com.itxs</groupId>
<artifactId>light-job-spring-boot-starter</artifactId>
<version>1.0</version>
</dependency>
建立演示任務並放到Spring容器裡管理
package cn.itxs.ecom.storage.job;
import org.springframework.stereotype.Component;
@Component
public class DemoTask {
public void execute() {
System.out.println("===========execute start!=========");
System.out.println("===========do job!=========");
System.out.println("===========execute end !=========");
}
}
組態檔增加
light:
job:
enabled: true
zk-connect: 192.168.4.27:2181,192.168.4.28:2181,192.168.4.29:2181
root-path: /ecom/storage
zk-session-timeout: 60000
target-bean:
- demoTask
target-method:
- execute
period:
- 1000
cron-expression:
- 0/10 * * * * ?
啟動三個庫存微服務模組,在第1個庫存微服務模組看到demoTask任務已經根據設定每十秒在執行
關閉第1個庫存微服務模組程式後,通過zookeeper重新選舉一個節點定時執行,從下面看選擇第3個庫存微服務模組每十秒實行
zookeeper地址設定可以放到設定中心如Nacos,如果目前我們設定資料是放在Redis中,可以通過System.setProperty設定系統變數的方式來實現,先註釋zk-connect的設定,這是啟動程式就會報錯
RedisConfig設定類中增加實現BeanPostProcessor介面實現其postProcessAfterInitialization方法,在bean初始化後讀取redis值設定環境變數值。
package cn.itxs.ecom.storage.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@Slf4j
public class RedisConfig implements BeanPostProcessor{
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance , ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key採用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也採用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式採用jackson
//template.setValueSerializer(jackson2JsonRedisSerializer);
template.setValueSerializer(stringRedisSerializer);
// hash的value序列化方式採用jackson
//template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(stringRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException
{
//在redisTemplate Bean初始化之後設定light.job.zk-connect為公共叢集的zk地址
if (beanName.equals("redisTemplate")){
log.info("postProcessAfterInitialization match beanName {}",beanName);
try {
RedisTemplate redisObj = (RedisTemplate) bean;
String zkConnect = (String)redisObj.opsForHash().get("clusterinfo", "zookeeper-server");
if (StringUtils.isNotBlank(zkConnect)) {
log.info("postProcessAfterInitialization get zkConnect ={}", zkConnect);
System.setProperty("light.job.zk-connect", zkConnect);
log.info("System.setProperty light.job.zk-connect={}", zkConnect);
}
} catch (Exception e) {
log.error("postProcessAfterInitialization operate redisTemplate {} failed", e);
}
}
return null;
}
}
啟動後可以看到正常每十秒執行定時任務
**本人部落格網站 **IT小神 www.itxiaoshen.com