SpringBoot整合XXLJob

2023-10-10 12:02:52

XXLJob簡介

XXLJob是一個分散式任務排程平臺,優點:開發迅速、學習簡單、輕量級、易擴充套件。是大眾點評員工xxl建立並維護,基於 GPL-3.0 開源,可放心商用,目前已經擁有龐大的使用群體。

簡單來說,就是一個定時任務中介軟體,類似的產品有當當網開源的Elastic-Job。

特性

  • 簡單:產品本身基於Java開發的,鑑於JVM的優秀,安裝部署整合簡單,和其他中介軟體、框架相比,避免了各種環境問題;同時程式設計師在專案中整合進來也很簡單,有手就行。
  • 觸發策略豐富:按照設定的Cron表示式觸發、固定間隔觸發、固定延時觸發、API(事件)觸發、人工觸發、父子任務觸發
  • 支援失敗重試
  • 包含簡單的告警和豐富的log,玩過定時任務的都知道,可觀測性在實際專案中是多麼的關鍵
  • 支援任務分片:例如將一個大任務,拆分為多個小任務,然後分給不同的執行器節點同時執行
  • 資料加密:排程中心和執行器之間的通訊進行資料加密,提升排程資訊保安性
  • 豐富的勾點:各種回撥可細粒度觀察任務的生命週期
  • 跨語言:排程中心與執行器提供語言無關的 RESTful API 服務,第三方任意語言可據此對接排程中心或者實現執行器。除此之外,還提供了 「多工模式」和「httpJobHandler」等其他跨語言方案;
  • 容器化:對雲原生支援友好
  • 有webUI管理系統
  • 高可用:支援叢集部署,可彈性擴容
  • 自動發現:執行器會週期性自動註冊任務, 排程中心將會自動發現註冊上來的執行器並將任務分配給執行器同時觸發執行

模組

  • 排程中心:管理執行器、任務;檢視任務紀錄檔、告警、報表;提供一個webUI管理系統,有簡單的使用者登入賬號管理功能;依賴資料庫
  • 執行器:執行器是獨立的RESTFul服務,一般整合在業務服務中,它會開闢一個9999(預設)埠和排程中心互動

由上可知,XXLJob為C/S架構,排程中心本身可以高可用部署,執行器整合在業務微服務中,當業務微服務多範例部署的時候,執行器也就可以達到分散式和高可用了。

安裝排程中心

初始化資料庫

排程中心依賴資料庫,安裝前需先初始化資料庫,初始化指令碼可從github中獲取 https://github.com/xuxueli/xxl-job/tree/master/doc/db

表說明:

  • xxl_job_lock:任務排程鎖表
  • xxl_job_group:執行器資訊表,維護任務執行器資訊
  • xxl_job_info:排程擴充套件資訊表: 用於儲存XXL-JOB排程任務的擴充套件資訊,如任務分組、任務名、機器地址、執行器、執行入參和報警郵件等等
  • xxl_job_log:排程紀錄檔表: 用於儲存XXL-JOB任務排程的歷史資訊,如排程結果、執行結果、排程入參、排程機器和執行器等等
  • xxl_job_log_report:排程紀錄檔報表:使用者儲存XXL-JOB任務排程紀錄檔的報表,排程中心報表功能頁面會用到
  • xxl_job_logglue:任務GLUE紀錄檔:用於儲存GLUE更新歷史,用於支援GLUE的版本回溯功能
  • xxl_job_registry:執行器登入檔,維護線上的執行器和排程中心機器地址資訊
  • xxl_job_user:系統使用者表;

設定

  • 執行埠
  • 資料庫連線
  • 報警郵箱
  • token:一個串,非必填,設定之後,執行器也需要設定此串,才能和排程中心互動,相當於一個簡單的認證
  • 執行緒池設定

參見官方檔案,本文重點放在SpringBoot整合XXLJob。排程中心的部署,尤其是高可用部署,後續單獨開篇。

啟動

排程中心就是一個SpringBoot程式,以xxljob2.4.0版本為例,其依賴的SpringBoot版本為 2.7.9,所以任何啟動SpringBoot 的方式都可以,webui的預設存取地址:http://ip:8080/xxl-job-admin

整合執行器

pom


<dependency>    
    <groupId>com.xuxueli</groupId>    
    <artifactId>xxl-job-core</artifactId>  
    <version>2.4.0</version>
</dependency>


yml



server:
  port: 9009
logging:
  level:
    com.ramble: debug
xxl:
  job:
    admin:
      #排程中心部署根地址 [選填]:如排程中心叢集部署存在多個地址則用逗號分隔。執行器將會使用該地址進行"執行器心跳註冊"和"任務結果回撥";為空則關閉自動註冊;
      addresses: http://127.0.0.1:8080/xxl-job-admin
      
    #執行器通訊TOKEN [選填]:非空時啟用;
    accessToken: 
    executor:
      #執行器AppName [選填]:執行器心跳註冊分組依據;為空則關閉自動註冊
      appname: xxljob-demo-service
      #${spring.application.name}
      #執行器註冊 [選填]:優先使用該設定作為註冊地址,為空時使用內嵌服務 」IP:PORT「 作為註冊地址。從而更靈活的支援容器型別執行器動態IP和動態對映埠問題。
      address: ""
      #執行器IP [選填]:預設為空表示自動獲取IP,多網路卡時可手動設定指定IP,該IP不會繫結Host僅作為通訊實用;地址資訊用於 "執行器註冊" 和 "排程中心請求並觸發任務";
      ip: ""
      #執行器埠號 [選填]:小於等於0則自動獲取;預設埠為9999,單機部署多個執行器時,注意要設定不同執行器埠;
      port: 0
      ###${server-port}
      #執行器執行紀錄檔檔案儲存磁碟路徑 [選填] :需要對該路徑擁有讀寫許可權;為空則使用預設路徑;
      logpath: ./logs/xxl-job/jobhandler
      #執行器紀錄檔檔案儲存天數 [選填] : 過期紀錄檔自動清理, 限制值大於等於3時生效; 否則, 如-1, 關閉自動清理功能;
      logretentiondays: 30


XxlJobConfig



@Slf4j
@Configuration
public class XxlJobConfig {
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.accessToken}")
    private String accessToken;
    @Value("${xxl.job.executor.appname}")
    private String appname;
    @Value("${xxl.job.executor.address}")
    private String address;
    @Value("${xxl.job.executor.ip}")
    private String ip;
    @Value("${xxl.job.executor.port}")
    private int port;
    @Value("${xxl.job.executor.logpath}")
    private String logPath;
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> start xxl-job config init");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
}



啟動執行器

如果一切順利,將在控制檯看到如下輸出:


2023-10-09 11:36:23.162  INFO 15736 --- [       Thread-4] com.xxl.job.core.server.EmbedServer      : >>>>>>>>>>> xxl-job remoting server start success, nettype = class com.xxl.job.core.server.EmbedServer, port = 9999

看到這個,說明執行器設定已生效,執行器已經順利和排程中心聯絡上了。

實踐

可以簡單的將任務分兩個步驟,第一在執行器中定義一個任務具體需要幹什麼,第二在排程中心觸發定義的任務

簡單的定時任務

在執行器建立任務

在業務微服務中建立


@Slf4j
@Component
public class DemoJob {
    /**
     * 簡單的job,排程器
     */
    @XxlJob("job1")
    public void job1() {
        log.debug("do job1");
    }
}


在排程中心建立執行器

  • 對於排程中心來說,執行器可能是多範例的,通過AppName確定為同一個叢集
  • 執行器可以手動指定,也可以自動發現

建立成功之後可以在執行器列表看到。圖片為編輯頁面,所以可以看到已經有機器地址了。

在排程中心建立任務

  • JobHandler:需要和業務微服務中建立的任務名稱一致

新增完畢之後啟動,如果一切順利,將可以在業務微服務中看到如下log:

2023-10-09 11:50:33.050 DEBUG 34868 --- [6-1696823413294] com.ramble.xxljob.task.DemoJob           : do job1
2023-10-09 11:50:38.044 DEBUG 34868 --- [6-1696823413294] com.ramble.xxljob.task.DemoJob           : do job1
2023-10-09 11:50:44.100 DEBUG 34868 --- [6-1696823413294] com.ramble.xxljob.task.DemoJob           : do job1
2023-10-09 11:50:48.052 DEBUG 34868 --- [6-1696823413294] com.ramble.xxljob.task.DemoJob           : do job1
2023-10-09 11:50:53.043 DEBUG 34868 --- [6-1696823413294] com.ramble.xxljob.task.DemoJob           : do job1

每5s執行了一次任務

帶前置和後置處理的定時任務

XxlJob註解詳解

XxlJob註解有三個引數:
value:JobHandler的名稱,需要在執行器和排程中心保持一致
init:定時任務前置處理,僅在定時任務首次執行前執行一次
destory:定時任務後置處理,僅在定時任務銷燬的時候執行一次

這裡需要注意:

  • 一個定時任務可能反覆執行多次,例如我們設定了固定10s執行一次,那麼可能每10s就執行一次,但是前置處理和後置處理僅執行一次,前置處理和後置處理僅針對整個任務,而非任務的一次執行
  • 在前置處理中拋異常並不會阻止任務的建立和執行
  • 可以在前置處理和後置處理中存取資料庫或者做一些業務邏輯

建立帶前(後)置處理的任務



@Slf4j
@Component
public class DemoJob {
    /**
     * 建立帶前(後)置處理的任務
     * Job方法新增註解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷燬方法")",註解value值對應的是排程中心新建任務的JobHandler屬性的值。
     * <p>
     * 執行紀錄檔:需要通過 "XxlJobHelper.log" 列印執行紀錄檔;
     * <p>
     * 任務結果:預設任務結果為 "成功" 狀態,不需要主動設定;如有訴求,比如設定任務結果為失敗,可以通過 "XxlJobHelper.handleFail/handleSuccess" 自主設定任務結果;
     */
    @XxlJob(value = "job2", init = "job2Init", destroy = "job2Destroy")
    public void job2() throws InterruptedException {
        LocalDateTime now = LocalDateTime.now();
        XxlJobHelper.log("進入job2,time={}", now.toString());
        log.debug("job2 - doSomething ...");
        Thread.sleep(2000);
        XxlJobHelper.log("離開job2,time={}", now.toString());
    }
    public void job2Init() {
        log.debug("job2Init - doSomething ...");
    }
    public void job2Destroy() {
        log.debug("job2Destroy - doSomething ...");
    }
}


排程中心需要建立對應job2的任務並啟動。如果一切順利將在執行器控制檯看到如下log:



2023-10-09 13:31:44.104 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2Init - doSomething ...
2023-10-09 13:31:44.110 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2 - doSomething ...
2023-10-09 13:31:54.052 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2 - doSomething ...
2023-10-09 13:32:04.053 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2 - doSomething ...
2023-10-09 13:32:14.054 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2 - doSomething ...
Disconnected from the target VM, address: '127.0.0.1:50819', transport: 'socket'
2023-10-09 13:32:21.606  INFO 35848 --- [       Thread-4] com.xxl.job.core.server.EmbedServer      : >>>>>>>>>>> xxl-job remoting server stop.
2023-10-09 13:32:21.618  INFO 35848 --- [rRegistryThread] c.x.j.c.thread.ExecutorRegistryThread    : >>>>>>>>>>> xxl-job registry-remove success, registryParam:RegistryParam{registryGroup='EXECUTOR', registryKey='xxljob-demo-service', registryValue='http://192.168.3.191:9999/'}, registryResult:ReturnT [code=200, msg=null, content=null]
2023-10-09 13:32:21.618  INFO 35848 --- [rRegistryThread] c.x.j.c.thread.ExecutorRegistryThread    : >>>>>>>>>>> xxl-job, executor registry thread destroy.
2023-10-09 13:32:21.621  INFO 35848 --- [ionShutdownHook] com.xxl.job.core.server.EmbedServer      : >>>>>>>>>>> xxl-job remoting server destroy success.
2023-10-09 13:32:21.622 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2Destroy - doSomething ...
2023-10-09 13:32:21.622  INFO 35848 --- [7-1696829504104] com.xxl.job.core.thread.JobThread        : >>>>>>>>>>> xxl-job JobThread stoped, hashCode:Thread[xxl-job, JobThread-27-1696829504104,10,main]
2023-10-09 13:32:21.623  INFO 35848 --- [FileCleanThread] c.x.j.core.thread.JobLogFileCleanThread  : >>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.

通過log可以觀測到:

  • 執行器啟動的時候,在任務首次執行前列印了init方法中的log
  • 而後定時任務按照既定執行策略執行
  • 當執行器服務停止的時候,列印了destory方法中的log
  • 通過XxlJobHelper.log 列印的紀錄檔,可以在排程中心 排程紀錄檔--->操作--->執行紀錄檔 中看到

父子任務

當兩個任務需要關聯觸發的時候可以使用父子任務的功能,當然了子任務還可以有子任務。

這種情況只需要啟動父任務,不需要啟動子任務,當父任務執行成功了,會觸發子任務的啟動。當父任務執行失敗了,不會觸發子任務的啟動。

父子執行器



 /**
     * 父任務
     */
     @XxlJob("jobFather")
     public void jobFather() {
         // 建立一個新的亂數生成器
         Random random = new Random();
         // 生成一個0到100之間的隨機整數
         int randomNumber = random.nextInt(101);
         if (randomNumber % 2 == 0) {
             log.debug("do - jobFather - success");
             XxlJobHelper.handleSuccess();
         } else {
             log.debug("do - jobFather - fail");
             XxlJobHelper.handleFail("呼叫XxlJobHelper.handleFail,排程中心就任務此任務執行失敗");
         }
     }
 
     /**
      * 子任務
      */
     @XxlJob("jobChild")
     public void jobChild() {
         log.debug("do - jobChild");
     }


  • 父、子執行器並沒有什麼特殊的地方
  • 在排程中心手動關聯父、子任務後,父執行器執行成功後就會觸發子執行器執行
  • 上述父執行器中通過XxlJobHelper.handleSuccess() 告訴排程中心,此任務執行成功了
  • 上述父執行器中通過XxlJobHelper.handleFail() 告訴排程中心,此任務執行失敗了,排程中心將不會觸發子任務

關聯父子任務

  • 子任務、父任務分別建立
  • 然後編輯父任務,將子任務的id填寫到「子任務ID」中,此時就關聯上了
  • 啟動父任務,不需要啟動子任務
  • 子任務的啟動交由排程中心觸發,當父任務執行成功了,排程中心自然會啟動子任務
  • 如果將子任務啟動了,那麼子任務將擁有兩個觸發維度,第一是根據子任務自身的排程型別和排程速度觸發,第二是排程中心觸發

執行器側log



2023-10-10 09:13:00.276  INFO 19228 --- [       Thread-4] com.xxl.job.core.server.EmbedServer      : >>>>>>>>>>> xxl-job remoting server start success, nettype = class com.xxl.job.core.server.EmbedServer, port = 9999
2023-10-10 09:13:23.549  INFO 19228 --- [Pool-1699379094] c.xxl.job.core.executor.XxlJobExecutor   : >>>>>>>>>>> xxl-job regist JobThread success, jobId:29, handler:com.xxl.job.core.handler.impl.MethodJobHandler@2b43f314[class com.ramble.xxljob.task.DemoJob#jobFather]
2023-10-10 09:13:23.552 DEBUG 19228 --- [9-1696900403549] com.ramble.xxljob.task.DemoJob           : do - jobFather - fail
2023-10-10 09:13:28.495 DEBUG 19228 --- [9-1696900403549] com.ramble.xxljob.task.DemoJob           : do - jobFather - fail
2023-10-10 09:13:34.500 DEBUG 19228 --- [9-1696900403549] com.ramble.xxljob.task.DemoJob           : do - jobFather - success
2023-10-10 09:13:34.511  INFO 19228 --- [Pool-1699379094] c.xxl.job.core.executor.XxlJobExecutor   : >>>>>>>>>>> xxl-job regist JobThread success, jobId:30, handler:com.xxl.job.core.handler.impl.MethodJobHandler@7e3d2ebd[class com.ramble.xxljob.task.DemoJob#jobChild]
2023-10-10 09:13:34.512 DEBUG 19228 --- [0-1696900414511] com.ramble.xxljob.task.DemoJob           : do - jobChild
2023-10-10 09:13:38.494 DEBUG 19228 --- [9-1696900403549] com.ramble.xxljob.task.DemoJob           : do - jobFather - fail
2023-10-10 09:13:43.526 DEBUG 19228 --- [9-1696900403549] com.ramble.xxljob.task.DemoJob           : do - jobFather - success
2023-10-10 09:13:43.539 DEBUG 19228 --- [0-1696900414511] com.ramble.xxljob.task.DemoJob           : do - jobChild

通過紀錄檔可以觀察到:

  • 一開始僅將父任務的 JobThread註冊到了排程中心
  • 而後父任務按照排程速度執行
  • 當父任務執行失敗沒有觸發子任務執行
  • 當父任務執行成功首先將子任務的 JobThread 註冊到了排程中心,隨即執行了子任務

排程中心-任務管理詳解

執行器

任務需要繫結到執行器,任務觸發排程時將會自動發現註冊成功的執行器, 實現任務自動發現功能; 另一方面也可以方便的進行任務分組。每個任務必須繫結一個執行器, 可在 "執行器管理" 進行設定

路由策略

當執行器叢集部署時,提供豐富的路由策略,包括:

  • FIRST(第一個):固定選擇第一個機器;
  • LAST(最後一個):固定選擇最後一個機器;
  • ROUND(輪詢);
  • RANDOM(隨機):隨機選擇線上的機器;
  • CONSISTENT_HASH(一致性HASH):每個任務按照Hash演演算法固定選擇某一臺機器,且所有任務均勻雜湊在不同機器上。
  • LEAST_FREQUENTLY_USED(最不經常使用):使用頻率最低的機器優先被選舉;
  • LEAST_RECENTLY_USED(最近最久未使用):最久未使用的機器優先被選舉;
  • FAILOVER(故障轉移):按照順序依次進行心跳檢測,第一個心跳檢測成功的機器選定為目標執行器並行起排程;
  • BUSYOVER(忙碌轉移):按照順序依次進行空閒檢測,第一個空閒檢測成功的機器選定為目標執行器並行起排程;
  • SHARDING_BROADCAST(分片廣播):廣播觸發對應叢集中所有機器執行一次任務,同時系統自動傳遞分片引數;可根據分片引數開發分片任務;

排程過期策略

  • 忽略:排程過期後,忽略過期的任務,從當前時間開始重新計算下次觸發時間
  • 立即執行一次:排程過期後,立即執行一次,並從當前時間開始重新計算下次觸發時間

阻塞處理策略

排程過於密集執行器來不及處理時的處理策略

  • 單機序列(預設):排程請求進入單機執行器後,排程請求進入FIFO佇列並以序列方式執行
  • 丟棄後續排程:排程請求進入單機執行器後,發現執行器存在執行的排程任務,本次請求將會被丟棄並標記為失敗
  • 覆蓋之前排程:排程請求進入單機執行器後,發現執行器存在執行的排程任務,將會終止執行中的排程任務並清空佇列,然後執行本次排程任務

超時和重試

  • 任務超時時間:支援自定義任務超時時間,任務執行超時將會主動中斷任務
  • 失敗重試次數;支援自定義任務失敗重試次數,當任務失敗時將會按照預設的失敗重試次數主動進行重試

參照