用xxl-job做後臺任務管理, 主要是快速解決定時任務的HA問題, 專案程式碼量不大, 功能精簡, 沒有特殊依賴. 因為產品中用到了這個專案, 上午花了點時間研究了一下執行機制. 把看到的記一下.
<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${最新穩定版本}</version>
</dependency>
執行需要 JDK1.8, MySQL5.7
專案檔案結構如下
├───doc
│ ├───db # 初始化的sql
│ └───images
├───xxl-job-admin # 執行的伺服器端模組, 提供介面和排程
│ └───src
│ ├───main
│ │ ├───java
│ │ │ └───com
│ │ │ └───xxl
│ │ │ └───job
│ │ │ └───admin
│ │ │ ├───controller
│ │ │ │ ├───annotation
│ │ │ │ ├───interceptor
│ │ │ │ └───resolver
│ │ │ ├───core
│ │ │ ├───dao
│ │ │ └───service
│ │ │ └───impl
│ │ └───resources
│ │ ├───i18n # 多國化, 簡繁英
│ │ ├───mybatis-mapper # xml形式的mapper
│ │ ├───static # 前端靜態檔案
│ │ └───templates # Freemarker模板
│ └───test
│ └───java
│
├───xxl-job-core # 公用jar包, 模組內部依賴
│ └───src
│ └───main
│ └───java
│
└───xxl-job-executor-samples
├───xxl-job-executor-sample-frameless # 任務執行層範例
│ └───src
│ ├───main
│ │ ├───java
│ │ └───resources
│ └───test
│ └───java
└───xxl-job-executor-sample-springboot # 使用SpringBoot的執行層範例
└───src
├───main
│ ├───java
│ └───resources
└───test
執行端需要準備以下資訊
adminAddresses 伺服器端地址, 例如 http://127.0.0.1:8080/xxl-job-admin
accessToken 貌似是伺服器端的token, 在呼叫伺服器端 api/registry, api/registryRemove 等操作時需要驗證
appname 執行端名稱
address 執行端地址, 和 ip:port 二選一, 存在則覆蓋 ip:port
ip 執行端IP
port 執行端伺服器埠
執行端啟動後將自己註冊到伺服器端, 等待回撥
任務執行通過 XxlJobTrigger.processTrigger() 發起, 準備引數, 並在分組中選擇一個地址
根據這個地址取得 ExecutorBiz, 呼叫 executorBiz.run() 執行任務
伺服器端: 通過 ExecutorBizClient,
XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
accessToken
是伺服器端的accessToken執行端: 通過 ExecutorBizImpl.run()
XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
得到XxlJob方法XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason)
執行通過呼叫 FrameLessXxlJobConfig.getInstance().initXxlJobExecutor() 這個方法將 XxlJobSimpleExecutor 範例化, 並註冊到xxl_job伺服器端
@Configuration
中, 將 XxlJobSpringExecutor 作為一個 @Bean
新增到 Spring contextSmartInitializingSingleton
介面的 afterSingletonsInstantiated()
方法afterSingletonsInstantiated()
方法中
@XxlJob
註解的方法registJobHandler()
, 將@XxlJob
方法新增到private static ConcurrentMap<String, IJobHandler> jobHandlerRepository
XxlJobExecutor.start()
, 將自己註冊到 xxl_job 伺服器端xxl_job 並未使用Spring的服務機制, 而是內部實現了一個偵聽指定IP+埠的服務. 這個實現對應的類是 EmbedServer, 服務基於 Netty, 核心程式碼是
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
這行程式碼註冊了內部的XxlJob方法
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)
處理遠端請求時, 在下面的程式碼中, 通過executorBiz.run(triggerParam)
呼叫XxlJob方法
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
//...
// services mapping
try {
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
//...
}
通過select ... for update
實現的, 這個表並沒有放到 MyBatis, 在 JobScheduleHelper 中, 通過
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
得到鎖, 在方法末尾釋放
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}