xxl-job定時排程任務Java程式碼分析

2022-12-21 06:01:34

簡介

用xxl-job做後臺任務管理, 主要是快速解決定時任務的HA問題, 專案程式碼量不大, 功能精簡, 沒有特殊依賴. 因為產品中用到了這個專案, 上午花了點時間研究了一下執行機制. 把看到的記一下.

環境

<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>${最新穩定版本}</version>
</dependency>

執行需要 JDK1.8, MySQL5.7

資料庫結構

  • 庫編碼 utf8mb4_unicode_ci
  • Table: xxl_job_group
    任務分組, 組名, 只支援一級分組, address_list 欄位支援多個執行端地址, 逗號分隔
  • Table: xxl_job_info
    任務主表, 記錄了任務明細, 排程明細以及預警設定
  • Table: xxl_job_log
    任務每次執行的紀錄檔
  • Table: xxl_job_log_report
    按日對執行紀錄檔進行統計的結果
  • Table: xxl_job_logglue
  • Table: xxl_job_registry
    用於登記任務的執行者, 記錄group:分組, key:名稱, value:介面地址. 名稱是可以重複的, 介面地址會新增到任務分組表中的註冊欄位
  • Table: xxl_job_user
    簡單的登入控制, 與其它表沒有關聯
  • Table: xxl_job_lock
    單欄位表, 用於並行時加鎖避免衝突

程式碼結構

  • 專案用到的都是常見元件, MyBatis, FreeMarker, Bootstrap, 當前版本基於SpringBoot 2.6.7
  • 線上執行的是 xxl-job-admin 模組, 提供執行端註冊, 任務發起和紀錄檔記錄等服務
  • 專案中需要實現 xxl-job-executor, 專案中提供了例子

專案檔案結構如下

├───doc
│   ├───db                                               # 初始化的sql
│   └───images
├───xxl-job-admin                                        # 執行的伺服器端模組, 提供介面和排程
│   └───src
│       ├───main
│       │   ├───java
│       │   │   └───com
│       │   │       └───xxl
│       │   │           └───job
│       │   │               └───admin
│       │   │                   ├───controller
│       │   │                   │   ├───annotation
│       │   │                   │   ├───interceptor
│       │   │                   │   └───resolver
│       │   │                   ├───core
│       │   │                   ├───dao
│       │   │                   └───service
│       │   │                       └───impl
│       │   └───resources
│       │       ├───i18n                                 # 多國化, 簡繁英
│       │       ├───mybatis-mapper                       # xml形式的mapper
│       │       ├───static                               # 前端靜態檔案
│       │       └───templates                            # Freemarker模板
│       └───test
│           └───java
│
├───xxl-job-core                                         # 公用jar包, 模組內部依賴
│   └───src
│       └───main
│           └───java
│
└───xxl-job-executor-samples
    ├───xxl-job-executor-sample-frameless                # 任務執行層範例
    │   └───src
    │       ├───main
    │       │   ├───java
    │       │   └───resources
    │       └───test
    │           └───java
    └───xxl-job-executor-sample-springboot               # 使用SpringBoot的執行層範例
        └───src
            ├───main
            │   ├───java
            │   └───resources
            └───test

執行機制

執行端需要準備以下資訊

  • adminAddresses 伺服器端地址, 例如 http://127.0.0.1:8080/xxl-job-admin

  • accessToken 貌似是伺服器端的token, 在呼叫伺服器端 api/registry, api/registryRemove 等操作時需要驗證

  • appname 執行端名稱

  • address 執行端地址, 和 ip:port 二選一, 存在則覆蓋 ip:port

  • ip 執行端IP

  • port 執行端伺服器埠

  • 執行端啟動後將自己註冊到伺服器端, 等待回撥

  • 任務執行通過 XxlJobTrigger.processTrigger() 發起, 準備引數, 並在分組中選擇一個地址

  • 根據這個地址取得 ExecutorBiz, 呼叫 executorBiz.run() 執行任務

  • 伺服器端: 通過 ExecutorBizClient,

    • 呼叫XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    • 其中 accessToken 是伺服器端的accessToken
  • 執行端: 通過 ExecutorBizImpl.run()

    • 呼叫 XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());得到XxlJob方法
    • 通過 XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason) 執行

非 Spring 的場景

通過呼叫 FrameLessXxlJobConfig.getInstance().initXxlJobExecutor() 這個方法將 XxlJobSimpleExecutor 範例化, 並註冊到xxl_job伺服器端

Spring 場景

  • @Configuration 中, 將 XxlJobSpringExecutor 作為一個 @Bean 新增到 Spring context
  • XxlJobSpringExecutor 是 XxlJobExecutor 的子類並實現了 SmartInitializingSingleton 介面的 afterSingletonsInstantiated()方法
  • afterSingletonsInstantiated()方法中
    • 呼叫 initJobHandlerMethodRepository(), 在這個方法中, 找到所有@XxlJob註解的方法
    • 通過 registJobHandler(), 將@XxlJob方法新增到private static ConcurrentMap<String, IJobHandler> jobHandlerRepository
    • 呼叫 XxlJobExecutor.start(), 將自己註冊到 xxl_job 伺服器端

遠端呼叫服務

xxl_job 並未使用Spring的服務機制, 而是內部實現了一個偵聽指定IP+埠的服務. 這個實現對應的類是 EmbedServer, 服務基於 Netty, 核心程式碼是

// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline()
                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                        .addLast(new HttpServerCodec())
                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
            }
        })
        .childOption(ChannelOption.SO_KEEPALIVE, true);

這行程式碼註冊了內部的XxlJob方法

.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)

處理遠端請求時, 在下面的程式碼中, 通過executorBiz.run(triggerParam)呼叫XxlJob方法

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    //...
    // services mapping
    try {
        switch (uri) {
            case "/beat":
                return executorBiz.beat();
            case "/idleBeat":
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            case "/run":
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            case "/kill":
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            case "/log":
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            default:
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
        }
    } catch (Exception e) {
    //...
}

鎖機制

通過select ... for update實現的, 這個表並沒有放到 MyBatis, 在 JobScheduleHelper 中, 通過

preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();

得到鎖, 在方法末尾釋放

// close PreparedStatement
if (null != preparedStatement) {
    try {
        preparedStatement.close();
    } catch (SQLException e) {
        if (!scheduleThreadToStop) {
            logger.error(e.getMessage(), e);
        }
    }
}