Auto-Job任務排程框架

2023-01-04 12:01:09

Auto-Job 任務排程框架

## 一、背景

生活中,業務上我們會碰到很多有關作業排程的場景,如每週五十二點發放優惠券、或者每天凌晨進行快取預熱、亦或每月定期從第三方系統抽數等等,Spring和java目前也有原生的定時任務支援,但是其都存在一些弊病,如下:

  • 不支援叢集,未避免任務重複執行的問題
  • 不支援生命週期的統一管理
  • 不支援分片任務:處理有序資料時,多機器分片執行任務處理不同資料
  • 不支援失敗重試:出現異常任務終結,不能根據執行狀態控制任務重新執行
  • 不能很好的和企業系統整合,如不能很好的和企業系統前端整合以及不能很好的嵌入到後端服務
  • 不支援動態調整:不重啟服務情況下不能修改任務引數
  • 無報警機制:任務失敗之後沒有報警通知(郵箱、簡訊)
  • 無良好的執行紀錄檔和排程紀錄檔跟蹤

基於原生定時任務的這些弊病,AutoJob就由此誕生,AutoJob為解決分散式作業排程提供了新的思路和解決方案。

二、特性

簡單: 簡單包括整合簡單、開發簡單和使用簡單。

整合簡單:框架能非常簡單的整合到Spring專案和非Spring專案,得益於AutoJob不依賴於Spring容器環境和MyBatis環境,你無需為了使用該框架還得搭建一套Spring應用。

開發簡單:AutoJob開發初衷就希望具有低程式碼侵入性和快速開發的特點,如下在任意一個類中,你只需要在某個需要排程的任務上加上註解,該任務就會被框架進行動態排程:

	@AutoJob(attributes = "{'我愛你,心連心',12.5,12,true}", cronExpression = "5/7 * * * * ?")
    public void formatAttributes(String string, Double decimal, Integer num, Boolean flag) {
        //引數注入
        AutoJobLogHelper logger = new AutoJobLogHelper();//使用框架內建的紀錄檔類
        logger.setSlf4jProxy(log);//對Slf4j的log進行代理,紀錄檔輸出將會使用Slf4j輸出
        logger.info("string={}", string);
        logger.warn("decimal={}", decimal);
        logger.debug("num={}", num);
        logger.error("flag={}", flag);
        //使用mapper
        mapper.selectById(21312L);
        //...
    }

使用簡單:使用該框架你無需關注太多的設定,整個框架的啟動只需要一行程式碼,如下:

//設定任務掃描包路徑
@AutoJobScan({"com.yourpackage"})
//處理器自動掃描
@AutoJobProcessorScan({"com.yourpackage"})
public class AutoJobMainApplication {
    public static void main(String[] args) {
    //框架啟動
    	new AutoJobBootstrap(AutoJobMainApplication.class)
                .build()
                .run();
        System.out.println("==================================>系統建立完成");
 	}

}

得益於良好的系統架構和編碼設計,你的應用啟動無需過多設定,只需要一行程式碼

拓展: 框架原始碼採用多種合理設計模式設計,具有良好的可拓展性和可維護性。

動態: 框架提供API,支援任務的動態CURD操作,即時生效。

多資料庫支援: 提供多型別資料庫支援,目前支援MySQL和PostgreSQL。

任務依賴: 支援設定子任務,當父任務執行結束且執行成功後將會主動觸發一次子任務的執行。

一致性: 框架使用DB樂觀鎖實現任務的一致性,在叢集模式下,排程器在排程任務前都會嘗試獲取鎖,獲取鎖成功後才會進行該任務的排程。

HA(開發中): 該框架支援去中心化的叢集部署,叢集節點通過RPC加密通訊。叢集節點之間會自動進行故障轉移和負載均衡,

彈性增縮容(開發中): 支援節點的動態上下線,同時節點支援開啟保護模式,防止惡劣的網路環境下節點脫離叢集。

任務失敗重試: 支援任務失敗重試,並且可設定重試間隔。

完整的生命週期: 框架提供任務完整的生命週期事件,業務可捕捉並做對應的處理。

動態排程執行緒池: 框架使用自研的動態執行緒池,可靈活根據任務流量動態調整執行緒池核心執行緒和最大執行緒引數,節省系統執行緒資源,並且提供了預設的拒絕處理器,防止任務被missFire。

非同步非阻塞的紀錄檔處理: 紀錄檔採用生產者消費者模型,基於自研的記憶體訊息佇列,任務方法作為紀錄檔的生產者,生產紀錄檔放入訊息佇列,框架啟動對應的紀錄檔消費執行緒進行紀錄檔處理。

實時紀錄檔: 紀錄檔將會實時的進行儲存,便於跟蹤。

任務白名單: 提供任務白名單功能,只有在白名單中的任務才允許被註冊和排程,保證系統安全。

可拓展的紀錄檔儲存策略: 紀錄檔支援多種策略儲存,如記憶體Cache、資料庫等,可根據專案需要靈活增加儲存策略,如Redis、檔案等。

豐富的排程機制: 支援Cron like表示式,repeat-cycle排程、子任務觸發、延遲觸發等,得益於良好的編碼設計,使用者可非常簡單的新增自定義排程器,如下:

/**
 * 你的自定義排程器
 * @Author Huang Yongxiang
 * @Date 2022/08/18 14:56
 */
public class YourScheduler extends AbstractScheduler{
    public YourScheduler(AutoJobTaskExecutorPool executorPool, IAutoJobRegister register, AutoJobConfigHolder configHolder) {
        super(executorPool, register, configHolder);
    }
    
    //...排程邏輯
}

@AutoJobScan("com.example.autojob.job")
@AutoJobProcessorScan("com.example.autojob")
public class AutoJobMainApplication {
    public static void main(String[] args) {
        new AutoJobLauncherBuilder(AutoJobMainApplication.class)
                .withAutoScanProcessor()
            	//設定你的排程器
                .addScheduler(YourScheduler.class)
                .build()
                .run();
        System.out.println("==================================>系統建立完成");
    }
}

任務報警: 框架支援郵件報警,目前原生支援QQ郵箱、163郵箱、GMail等,同時也支援自定義的郵箱smtp伺服器。

目前系統提供:任務失敗報警、任務被拒報警、節點開啟保護模式報警、節點關閉保護模式報警,當然使用者也可非常簡單的進行郵件報警的拓展。

豐富的任務入參: 框架支援基礎的資料型別和物件型別的任務入參,如Boolean,String,Long,Integer,Double等型別,對於物件入參,框架預設使用JSON進行序列化入參。

良好的前端整合性: 框架提供相關API,使用者可以靈活開發Restful介面接入到企業專案,無需額外佔用一個程序或機器來單獨執行排程中心。

記憶體任務: 框架提供DB任務和記憶體任務兩種型別,DB任務持久化到資料庫,宣告週期在資料庫內記錄,記憶體任務除了紀錄檔,整個生命週期都在記憶體中完成,相比DB任務具有無鎖、排程快速的特點。

指令碼任務: 提供指令碼任務的執行,如Python、Shell,SQL等。

動態分片(開發中): 叢集模式下框架支援任務分片,多機執行。

全非同步: 任務排程流程採用全非同步實現,如非同步排程、非同步執行、非同步紀錄檔等,有效對密集排程進行流量削峰,理論上支援任意時長任務的執行。

三、快速使用

1、專案匯入

該框架不依賴於Spring容器環境和MyBatis等持久層框架,你可以將其作為一個Maven模組匯入到你的專案中,你可以去碼雲上下載:https://gitee.com/hyxl-520/auto-job.git

專案分為兩個模組:auto-job-framework和auto-job-spring,前者是框架的核心部分,後者是與Spring整合的使用,後續可能會基於Spring web開發相關控制檯。

2、專案設定

專案設定主要為框架設定和資料來源設定。框架設定預設讀取類路徑下的auto-job.ymlauto-job.properties檔案,具體設定項內容見「所有設定」;資料來源設定,框架預設使用Druid作為連線池,你只需要在druid.properties檔案中設定資料來源就行了,當然你可以自定義資料來源,具體方法在AutoJobBootstrap裡。相關建表指令碼可以在db目錄下找到。框架預設使用MySQL資料庫,理論上支援SQL標準的其他資料庫

3、任務開發

3.1、基於註解

開發一個基於註解的任務非常簡單,除了紀錄檔輸出使用框架內建的紀錄檔輔助類AutoJobLogHelper輸出外,其他你就只需要關心你的業務。當然,AutoJobLogHelper使用起來和slf4j幾乎沒有區別,它提供四種級別的紀錄檔輸出:debug、info、warn、error,而且你可以使用AutoJobLogHelper對你的slf4j進行代理,這樣這些任務執行中輸出的紀錄檔將會直接使用slf4j進行輸出。如下,是一個簡單演示:

 @AutoJob(attributes = "{'我愛你,心連心',12.5,12,true}", cronExpression = "5/7 * * * * ?", id = 2, alias = "引數測試任務")
    public void formatAttributes(String string, Double decimal, Integer num, Boolean flag) {
        AutoJobLogHelper logger=new AutoJobLogHelper();
        //log是org.slf4j.Logger物件,這裡對其進行代理
        logger.setSlf4jProxy(log);
        logger.info("string={}", string);
        logger.warn("decimal={}", decimal);
        logger.debug("num={}", num);
        logger.error("flag={}", flag);
    }

在你開發的任務上加上@AutoJob註解,設定一些東西,這個任務就開發完成了。@AutoJob是用來標識一個方法是一個AutoJob任務,當然還有其他註解,這裡暫不做闡述。細心的同學會發現這個任務是有引數的,沒錯,AutoJob框架支援引數,更多引數的設定後文會詳細講解。

3.2、基於構建

手動建立任務相比註解來說更為靈活,框架提供了建立任務的構建者物件,如AutoJobMethodTaskBuilderAutoJobScriptTaskBuilder物件,前者用於構建方法型任務,後者用於構建指令碼型任務。

MethodTask task = new AutoJobMethodTaskBuilder(Jobs.class, "hello") //方法型任務需要指定方法所在的類以及方法名
          .setTaskId(IdGenerator.getNextIdAsLong())
          .setTaskAlias("測試任務") //任務別名
    	  .setParams("{'我愛你,心連心',12.5,12,true}") //任務引數,支援simple引數
          .setTaskType(AutoJobTask.TaskType.MEMORY_TASk)
          .setMethodObjectFactory(new DefaultMethodObjectFactory()) //方法執行物件工廠,用於建立方法執行的物件上下文
          .addACronExpressionTrigger("* 5 7 * * * ?", -1) //新增一個cron-like觸發器
          .build();

AutoJobApplication
         .getInstance()
         .getMemoryTaskAPI() //獲取全域性的記憶體任務的API
         .registerTask(new AutoJobMethodTaskAttributes(task)); //註冊任務

4、框架啟動

得益於良好的設計,該框架你可以在任何一個main方法啟動,如下是示列的一個啟動

import com.example.autojob.skeleton.annotation.AutoJobProcessorScan;
import com.example.autojob.skeleton.annotation.AutoJobScan;
import com.example.autojob.skeleton.framework.boot.AutoJobLauncherBuilder;

@AutoJobScan("com.example.autojob.job")
@AutoJobProcessorScan("com.example.autojob")
public class AutoJobMainApplication {
    public static void main(String[] args) {
        new AutoJobBootstrap(AutoJobSpringApplication.class)
                .withAutoScanProcessor()
                .build()
                .run();
        System.out.println("==================================>AutoJob應用已啟動完成");
    }
}

第5行是用於設定任務掃描的類路徑,支援子包掃描,不設定時會掃描整個專案,用時較長。

第6行是處理器掃描,處理器主要是在框架啟動前和框架啟動後進行一些處理,預設是掃描整個專案,注意該註解只有設定了withAutoScanProcessor才能生效,如程式碼第10行,框架自己的處理器為自動載入,無需設定。

第9-12行是框架的啟動程式碼,AutoJobBootstrap是應用引導構建程式,通過它你能增加很多自定義的設定。在第11行後,AutoJob應用即建立完成,第12行呼叫run方法啟動整個應用。

5、動態修改

框架本身不是一個Web應用,沒有提供對應修改的Rest介面,但是框架提供了很多操作任務的API,你可以在AutoJobAPIAutoJobLogAPI裡找到。你可以你可以參考auto-job-spring模組裡提供的範例開發對應Rest介面,隨著版本更替,autojob將會在未來支援控制檯。

四、任務型別

按照功能分類

任務按照功能可以分為方法型任務和指令碼型任務。

方法型任務對應Java中的一個方法,該方法可以有返回值,允許有引數,引數的注入可以見「任務引數」。方法內部的紀錄檔輸出必須使用AutoJobLogHelper來輸出,否則紀錄檔可能無法儲存。

指令碼型任務對應一個磁碟上的指令碼檔案或一段cmd命令。具體使用可見章節:「高階用法-指令碼任務」。

按照排程方式分類

任務按照排程方式可以分為記憶體型任務和DB型任務。

記憶體型任務的生命週期都在記憶體中完成,具有排程迅速、無鎖、隨調隨動的特點,適合短週期、有限次、臨時性的任務。

DB型任務將會儲存到資料庫,每一次排程都會更新資料庫相關狀態。DB型任務採用樂觀鎖,每次執行前都需要獲得鎖才能執行,具有長期性、易維護、易修改等特點,適合於定期資料同步、定時快取預熱等在長期內都會用到的任務。

五、任務引數

方法型任務

方法型任務支援兩種引數格式,一種是FULL型引數,一種是SIMPLE引數,具體區別可見如下示列:

void exampleMethod1(String str, Integer num, Double decimal, Boolean flag);

void exampleMethod2(String str, Integer num, Double decimal, Boolean flag, Long count, Param param);

class param{
    private int id;
    private String num;
    //...
}

如上方法:exampleMethod1,使用SIMPLE型引數:

MethodTask task = new AutoJobMethodTaskBuilder(Jobs.class, "hello") 
          .setTaskId(IdGenerator.getNextIdAsLong())
          .setTaskAlias("測試任務")
    	  .setParams("{'我是字串引數',12,12.5,true}")
          .setTaskType(AutoJobTask.TaskType.MEMORY_TASk)
          .setMethodObjectFactory(new DefaultMethodObjectFactory()) 
    	  .build();
//{'我是字串引數',12,12.5,true}

使用FULL型引數

MethodTask task = new AutoJobMethodTaskBuilder(Jobs.class, "hello")
                .setTaskId(IdGenerator.getNextIdAsLong())
                .setTaskAlias("測試任務")
                .setParams("[{\"values\":{\"value\":\"字串引數\"},\"type\":\"string\"},{\"values\":{\"value\":12},\"type\":\"integer\"},{\"values\":{\"value\":12.5},\"type\":\"decimal\"},{\"values\":{\"value\":false},\"type\":\"boolean\"}]")
                .setTaskType(AutoJobTask.TaskType.MEMORY_TASk)
                .setMethodObjectFactory(new DefaultMethodObjectFactory())
                .build();

/*
[
  {
    "values": {
      "value": "字串引數"
    },
    "type": "string"
  },
  {
    "values": {
      "value": 12
    },
    "type": "integer"
  },
  {
    "values": {
      "value": 12.5
    },
    "type": "decimal"
  },
  {
    "values": {
      "value": false
    },
    "type": "boolean"
  }
]
*/

我們可以發現SIMPLE引數十分簡單,"{a1,a2,a3,...}",參數列達式本身是一個字串,大引號包裹,引數順序按照從左到右依次匹配。SIMPLE引數支援四類引數

'字串引數',單引號包裹,對應型別String

12:整數型引數,對應型別:Integer包裝型別,如果數值超過整形範圍,則會自動匹配Long型別。

12.5:小數型引數,對應型別:Double包裝型別。

true|false:布林型引數,對應型別:Boolean包裝型別。

FULL型引數相比就要複雜的多了,本身是一個JSON陣列字串,每一個JSON物件代表一個引數,每個物件有type和values兩個屬性,字面意思,型別和值,FULL型別除了支援SIMPLE型的四種型別引數外還支援物件型,物件型的引數使用JSON來進行序列化和反序列化。由於FULL型引數過於複雜,因此框架提供了AttributesBuilder物件,可以非常簡單的生成FULL型引數,以exampleMethod2為例:

Param param = new Param();
        param.setId(1);
        param.setNum("12");
System.out.println(new AttributesBuilder()
        .addParams(AttributesBuilder.AttributesType.STRING, "字串引數")
        .addParams(AttributesBuilder.AttributesType.INTEGER, 12)
        .addParams(AttributesBuilder.AttributesType.DECIMAL, 12.5)
        .addParams(AttributesBuilder.AttributesType.BOOLEAN, false)
        .addParams(Param.class, param)
        .getAttributesString());
/*
[
  {
    "values": {
      "value": "字串引數"
    },
    "type": "string"
  },
  {
    "values": {
      "value": 12
    },
    "type": "integer"
  },
  {
    "values": {
      "value": 12.5
    },
    "type": "decimal"
  },
  {
    "values": {
      "value": false
    },
    "type": "boolean"
  },
  {
    "values": {
      "id": 1,
      "num": "12"
    },
    "type": "com.example.autojob.job.Param"
  }
]
*/

一般來說,基於註解的任務開發我們更傾向於推薦使用SIMPLE型引數,簡單、明瞭;基於構建的任務開發我們更鐘意於FULL型引數,型別豐富。

指令碼型任務

指令碼型任務的引數是通過啟動命令給出的,如python /script.test.py -a 12 -b,其中-a 12-b就是兩個引數,因此指令碼型任務只支援字串型引數。

七、任務執行物件工廠

任務執行物件工廠是方法型任務才有的屬性,因為方法型任務對應的是Java某個類中的方法,因此方法的執行可能依賴於物件範例的上下文,特別是當該框架與Spring整合時很可能會使用Spring容器中的Bean,因此可以指定建立方法依賴的物件的工廠:IMethodObjectFactory,框架預設使用類的無參構造方法建立物件範例,當然你可以建立自定義的工廠:

public class SpringMethodObjectFactory implements IMethodObjectFactory {
    public Object createMethodObject(Class<?> methodClass) {
        // SpringUtil持有Spring的容器,獲取Spring容器中的Bean
        return SpringUtil.getBean(JobBean.class);
    }
}

那麼怎麼讓我們的任務執行物件工廠生效呢,見如下示列:

// 基於註解的任務開發只需要指定methodObjectFactory屬性即可,框架將會呼叫指定工廠的無參構造方法建立一個工廠範例
@AutoJob
            (
                    id = 1
                    , attributes = "{'hello autoJob'}"
                    , defaultStartTime = StartTime.NOW
                    , repeatTimes = -1, cycle = 5
                    , methodObjectFactory = SpringMethodObjectFactory.class
            )
public void hello(String str) {
    logHelper.info(str);
}

//基於構建的任務開發時將工廠範例設定進去即可
public static void main(String[] args) {
    MethodTask methodTask = new AutoJobMethodTaskBuilder(Jobs.class, "hello")
            .setMethodObjectFactory(new SpringMethodObjectFactory())
            .build();
    AutoJobApplication
         .getInstance()
         .getMemoryTaskAPI() //獲取全域性的記憶體任務的API
         .registerTask(new AutoJobMethodTaskAttributes(task)); //註冊任務
}

八、任務紀錄檔

作為一款任務排程框架,詳細的紀錄檔一定是必不可少的。框架提供三種型別紀錄檔記錄:排程紀錄檔、執行紀錄檔、執行紀錄檔

排程紀錄檔

任務的每一次啟動到完成被任務是一次排程,排程紀錄檔詳細記錄了排程任務的基礎資訊、排程時間、執行狀態、執行時長、以及任務結果(任務結果對應方法型任務是返回值,由JSON序列化,指令碼型任務是指令碼返回值)。排程紀錄檔對應資料庫表aj_scheduling_record,其ID關聯到本次排程中產生的執行紀錄檔和執行紀錄檔。

執行紀錄檔

執行紀錄檔為任務在執行期間內部輸出的紀錄檔,方法型任務為使用AutoJobLogHelper輸出的紀錄檔,指令碼型任務為指令碼或cmd命令在控制檯的輸出。執行紀錄檔對應資料庫表aj_job_logs

執行紀錄檔

執行紀錄檔記錄了某次排程任務的執行情況,如何時啟動、何時完成、是否執行成功、任務結果、任務異常等。執行紀錄檔對應庫表aj_run_logs

任務紀錄檔都是實時更新的,如果你使用的是框架的預設紀錄檔儲存策略(資料庫儲存),你可以通過AutoJobLogDBAPI獲取到紀錄檔。執行紀錄檔和執行紀錄檔都繫結了排程ID,通過排程ID即可找到本次排程所產生的執行紀錄檔和執行紀錄檔。

九、框架架構


框架架構圖的左部分的元件是框架的核心元件。

任務容器模組

任務容器模組包含DB任務容器和記憶體任務容器,分別用於存放DB型的任務和記憶體型的任務。

排程模組

排程模組由排程器、任務排程佇列、註冊器、時間輪排程器以及時間輪構成。記憶體任務排程器AutoJobMemoryTaskScheduler和DB任務排程器AutoJobDBScheduler負責從任務容器排程出即將執行的任務(<=5秒)放到任務排程佇列快取AutoJobTaskQueue。時間輪排程器AutoJobTimeWheelScheduler通過註冊器AutoJobRegister排程任務排程佇列中的任務進入時間輪,準備執行。時間輪按秒捲動,將執行的任務提交進任務執行器池進行執行。執行成功排程器AutoJobRunSuccessScheduler執行執行成功後的相關操作,比如更新狀態、更新下次觸發時間等等,執行失敗排程器AutoJobRunErrorScheduler執行執行失敗後的相關操作,比如更新狀態、根據設定的重試策略更新觸發時間、故障轉移等等。

任務執行器池模組

任務執行器池包含兩個動態執行緒池,分別為快池(fast-pool)和慢池(slow-pool),任務預設第一次執行提交進快池,第二次執行會根據上次執行時長決定是否降級處理。動態執行緒池是具有根據流量動態調節的執行緒池,具體的設定可以見「十、所有設定:執行器池設定」。

紀錄檔模組

紀錄檔模組和核心排程模組是完全解耦的,執行紀錄檔由任務執行時產生並且釋出到記憶體訊息佇列,紀錄檔模組監聽訊息釋出事件並且取出訊息放入訊息buffer,單獨由紀錄檔處理執行緒定期、定量儲存紀錄檔。執行紀錄檔通過監聽任務事件來進行儲存。紀錄檔模組的設計都是非同步化的,盡最大可能減小紀錄檔IO對排程的影響。

除了以上的核心元件外,框架還有部分功能拓展元件。

生命週期處理器

生命週期處理器也可以理解成生命週期勾點,具體來說是一個任務的生命週期勾點,具體看下面的生命週期事件圖

要使用一個生命週期勾點也十分簡單,下面來看一個示列:

//方式一(子事件處理器)
public class TaskBeforeRunHandle implements ITaskEventHandler<TaskBeforeRunEvent> {
    @Override
    public void doHandle(TaskBeforeRunEvent event) {
        System.out.println("任務:" + event
                .getTask()
                .getAlias() + "即將開始執行");
    }

    @Override
    public int getHandlerLevel() {
        return 0;
    }
}

以上示列表示一個在任務執行前在控制檯輸出:「任務:{任務別名}即將開始執行」,要實現一個事件處理器只需要實現ITaskEventHandler介面即可,泛型代表你需要處理的事件。當然還可以通過如下方式來實現同上面示列一樣的功能

//方式二(父事件處理器)
public class TaskBeforeRunHandle implements ITaskEventHandler<TaskEvent> {
    @Override
    public void doHandle(TaskEvent event) {
        if (event instanceof TaskBeforeRunEvent) {
            System.out.println("任務:" + event
                    .getTask()
                    .getAlias() + "即將開始執行");
        }
    }

    @Override
    public int getHandlerLevel() {
        //數位越大,級別越高
        return 0;
    }
}

TaskEvent是所有任務事件的父類別,實現其父類別事件的處理器時所有的任務相關事件都會執行該處理器,可以判斷事件型別來完成相關操作,當一個處理器需要處理多種事件型別時可以如上使用。每個事件處理器可以通過重寫getHandlerLevel方法指定級別,數位越大,級別越高,執行越會被優先執行。父事件處理器高階別>父事件處理器低階別>子事件處理器高階別>子事件處理器低階別。當然,只宣告處理器不將其新增到應用也不會生效的,下面介紹如何使得事件處理器生效。

public class TaskEventHandlerLoader implements IAutoJobLoader {
    @Override
    public void load() {
        //方式一(子事件處理器)
        TaskEventHandlerDelegate
                .getInstance()
                .addHandler(TaskBeforeRunEvent.class, new TaskBeforeRunHandle());
        
		//方式二(父事件處理器)
        TaskEventHandlerDelegate
                .getInstance()
                .addHandler(TaskEvent.class, new TaskBeforeRunHandle());
    }
}
//將啟動處理器新增進應用上下文
public static void main(String[] args) {
        new AutoJobBootstrap(AutoJobMainApplication.class)
                .addProcessor(new TaskEventHandlerLoader()) //新增到上下文
                .build()
                .run();
}

上面的程式碼演示瞭如何新增處理器到上下文。在AutoJob中,在框架啟動前和框架關閉前執行某些操作的處理器成為Processor,框架啟動前執行的處理器為IAutoJobLoader,框架關閉前執行的處理器為IAutoJobEnd,上面程式碼中,通過啟動處理器將事件處理器新增到「事件委派者」:TaskEventHandlerDelegate,再在應用構建時手動將啟動處理器新增到應用上下文中。當然如果你的Processor非常多,可以通過註解@AutoJobProcessorScan來自動掃描Processor,可以指定掃描的包,支援子包掃描,不指定時預設全專案掃描。掃描後通過呼叫Processor的無參構造方法建立範例後自動注入上下文。如下示列:

@AutoJobProcessorScan("com.example.autojob")
public class AutoJobMainApplication {
    public static void main(String[] args) {
        new AutoJobBootstrap(AutoJobMainApplication.class) //指定入口類
                .withAutoScanProcessor() //手動開啟處理器自動掃描,預設是關閉的,以防全專案掃描耗時較長
                .build()
                .run();
        System.out.println("==================================>系統建立完成");
}

十、所有設定

框架提供了豐富的設定,這些設定預設是從auto-job.yml或者auto-job.properties檔案中載入,當然你可以從資料庫動態載入實現動態設定,全部設定如下:

# 動態任務排程框架設定
autoJob:
  context:
    schedulingQueue: 
      length: 100 # 排程佇列長度,排程佇列用於存放即將執行的任務
    memoryContainer: # 記憶體型任務容器,存放記憶體型任務
      length: 200 # 容器容量
      cleanStrategy: CLEAN_FINISHED # 清理策略,CLEAN_FINISHED-定期清理已經執行完成的任務 KEEP_FINISHED-保留執行完成的任務,會將其移入一個記憶體Cache,不會佔用容器容量
  annotation:
    enable: true # 是否啟用註解掃描,掃描被@AutoJob @FactoryJob的方法並將其包裝成可執行任務物件
    defaultDelayTime: 30 # 在未給註解的任務設定排程資訊的情況下,預設的任務延遲執行時間:min
  database:
    type: mysql # 資料庫型別,目前支援,MySQL和PostgreSQL
  executor: # 執行器池,分為快池和慢池
    fastPool: # 快池相關設定,慢池相同
      update: # 執行器池支援根據流量動態調整執行緒數目
        enable: true # 是否開啟
        trafficUpdateCycle: 5 # 流量監控週期:秒
        adjustedThreshold: 0.05 # 如果流量變化相比最大執行緒數超過此比例(0-1),則進行調整
      coreThread: # 核心執行緒數
        initial: 5 # 初始值
        min: 5 # 允許變化到的最小值
        max: 50 # 允許變化到的最大值
        keepAliveTime: 60 # 當執行緒數大於當前核心執行緒數時,執行緒保持生存的時間:秒
      maxThread: # 最大執行緒數
        initial: 10
        min: 10
        max: 50
    slowPool: # 慢池
      update:
        enable: false
        trafficUpdateCycel: 5
        adjustedThreshold: 0.05
      coreThread:
        initial: 10
        min: 5
        max: 50
        keepAliveTime: 60
      maxThread:
        initial: 20
        min: 10
        max: 50
      relegation:
          threshold: 3 # 降級閾值,當任務的上次執行時長超過該閾值(分鐘)時,下次將會降級到slow pool執行
  register:
    filter: # 註冊過濾器用於防止某些不安全的任務被執行
      enable: true
      classPath: "**.job.**" # 只有在這些類路徑下的任務才允許被註冊和執行
  scheduler:
    finished:
      error:
        retry: # 失敗重試相關設定,該設定是全域性的
          enable: true
          retryCount: 3
          interval: 1 # 兩次重試的間隔:min
  emailAlert: # 全域性郵件報警相關設定
    enable: true
    auth:
      sender: "[email protected]" # 傳送方,唯一
      receiver: "[email protected]" # 接收方,多個逗號分割
      token: "LXZYE214123CEWASU" # smtp密碼
      type: 163Mail # 郵件型別,目前支援:QQMail、163Mail、gMail(google)、outLookMail、customize(自定義)
      customize: # 自定義下的smtp伺服器的相關設定
        smtpAddress:
        smtpPort:
    config: # 提供部分事件報警(開關)
      taskRunError: true # 任務執行出錯(優先使用任務私有郵件使用者端,不存在使用全域性使用者端)
      taskRefuseHandle: true # 任務被拒絕執行(優先使用任務私有郵件使用者端,不存在使用全域性使用者端)
      clusterOpenProtectedMode: true # 叢集節點開啟保護模式(叢集模式下有效)
      clusterCloseProtectedMode: true # 叢集節點關閉保護模式(叢集模式下有效)
  logging: # 紀錄檔的相關設定
    taskLog: # 任務內部通過呼叫logger輸出的紀錄檔
      memory: # 紀錄檔預設是資料庫儲存,框架額外提供了記憶體Cache儲存,記憶體Cache一般僅做測試,該設定一般情況下無需更改
        enable: false
        length: 100
        defaultExpireTime: 3 # 分鐘
    runLog: # 任務的排程紀錄檔
      memory:
        enable: false
        length: 100
        defaultExpireTime: 3
  cluster: # 叢集相關設定,目前版本暫無需考慮
    enable: false # 叢集開關,目前版本開啟後會啟動PRC伺服器
    port: 8080 # TCP埠
    auth: # RPC通訊身份驗證
      enable: true # 是否啟動身份驗證
      publicKey: "autoJob!@#=123.?" # 通訊加密公鑰,16位元字串
      token: "hello" # token,相同token的兩個AutoJob應用才能通訊
    client:
      nodeUrl: "localhost:8086"
      pool: # RPC對談池相關設定
        size: 10
        getTimeout: 3
        getDataTimeout: 10
        connectTimeout: 10
        keepAliveTimeout: 10
      allowMaxJetLag: 3
      nodeSync:
        cycle: 5
        offLineThreshold: 3
    config:
      annotations:
        enable: false
      protectedMode:
        enable: true
        threshold: 0.2

當然上面設定並不是都需要你設定,框架基本所有設定都設定了預設值,能保證常規場景下的排程。

十一、高階用法

1、指令碼任務

框架支援指令碼任務,原生支援:Python、Shell、PHP、NodeJs以及PowerShell,提供其他指令碼型別拓展。指令碼任務對應的物件為ScriptTask。指令碼作為一個伺服器上的指令碼檔案儲存在磁碟上,要構建一個指令碼任務非常簡單,框架提供AutoJobScriptTaskBuilder來輔助構建一個完整的指令碼任務,下面看幾個示列:

		ScriptTask task = new AutoJobScriptTaskBuilder()
                .setTaskId(IdGenerator.getNextIdAsLong()) //設定任務ID,任務ID作為區分任務的鍵,不指定時將會隨機分配
                .setTaskAlias("測試指令碼任務1") //任務別名
                .setTaskType(AutoJobTask.TaskType.MEMORY_TASk) //任務型別,有記憶體型任務和DB型任務,記憶體型任務的所有生命週期都在記憶體完成,除了紀錄檔外不會保留到資料庫
                .setBelongTo(1L) //保留拓展欄位,用於說明該任務所屬
                .addACronExpressionTrigger("* 15 7 * * * ?", -1) //新增一個cron-like觸發器,兩個引數分別是:cron-like表示式、重複次數。不指定觸發器時將會在預設延遲後執行一次,-1表示該任務為永久執行,如果只需執行n次,重複次數為n-1
                .createNewWithContent(ScriptType.PYTHON, "print('hello auto-job')"); // 使用指令碼型別和指令碼內容構建一個指令碼任務物件

        ScriptTask task1 = new AutoJobScriptTaskBuilder()
                .setTaskId(IdGenerator.getNextIdAsLong())
                .setTaskAlias("測試指令碼任務2")
                .setTaskType(AutoJobTask.TaskType.MEMORY_TASk)
                .setBelongTo(1L)
                .addASimpleTrigger(SystemClock.now(), 3, 10, TimeUnit.SECONDS) //新增一個簡單觸發器,四個引數分別是:啟動時間、重複次數、週期、週期時間單位,該觸發器表示立即執行,並且重複執行三次,總共執行四次,週期為10秒
                .createNew("python", "/script", "test", "py"); // 使用給定路徑的指令碼檔案建立一個指令碼任務,四個引數分別是:啟動命令、指令碼路徑、指令碼檔名、指令碼字尾,該方法能夠建立除框架原生指令碼型別以外的指令碼任務

        ScriptTask task2 = new AutoJobScriptTaskBuilder()
                .setTaskId(IdGenerator.getNextIdAsLong())
                .setTaskAlias("測試指令碼任務3")
                .setTaskType(AutoJobTask.TaskType.MEMORY_TASk)
                .setBelongTo(1L)
                .addAChildTaskTrigger()  // 新增一個子任務觸發器,該任務不會自動觸發,只有當有任務主動關聯該任務作為其子任務且父任務完成一次排程時才會觸發該任務
                .createNewWithCmd("ping www.baidu.com"); // 建立一個cmd指令碼任務

        ScriptTask task3 = new AutoJobScriptTaskBuilder()
                .setTaskId(IdGenerator.getNextIdAsLong())
                .setTaskAlias("測試指令碼任務4")
                .setTaskType(AutoJobTask.TaskType.MEMORY_TASk)
                .setBelongTo(1L)
                .addADelayTrigger(3, TimeUnit.MINUTES) // 新增一個延遲觸發器,任務將在給定延遲後自動觸發一次,預設使用該型別觸發器,延遲時間可以在框架設定中設定
                .createNewWithExistScriptFile(ScriptType.PYTHON, "/script", "test"); // 使用已存在的指令碼建立一個指令碼任務,三個引數分別是:指令碼型別、指令碼路徑、指令碼檔名

以上示列除了演示瞭如何建立一個指令碼任務,也介紹了觸發器。框架提供了四種觸發器,分別是cron-like觸發器、simple觸發器、父-子任務觸發器、延遲觸發器,具體觸發器的介紹上面程式碼註釋基本講解了這裡就不作冗述。

2、自定義排程器

排程器的概念在第九節:框架架構裡已經說明,那麼怎麼來自定義一個自己的排程器呢,下面做一個簡單示列:

/**
 * 你的自定義排程器
 *
 * @Author Huang Yongxiang
 * @Date 2022/08/18 14:56
 */
public class YourScheduler extends AbstractScheduler{
    //排程器預設構造方法
    public YourScheduler(AutoJobTaskExecutorPool executorPool, IAutoJobRegister register, AutoJobConfigHolder configHolder) {
        super(executorPool, register, configHolder);
    }
    
    //...排程邏輯
}

@AutoJobScan("com.example.autojob.job")
@AutoJobProcessorScan("com.example.autojob")
public class AutoJobMainApplication {
    public static void main(String[] args) {
        new AutoJobLauncherBuilder(AutoJobMainApplication.class)
                .withAutoScanProcessor()
            	//設定你的排程器,如果你的排程器支援預設構造方法可以只指定型別
                .addScheduler(YourScheduler.class)
            	//.addScheduler(new YourScheduler()) 如果不支援預設構造方法就需要新增一個範例
                .build()
                .run();
        System.out.println("==================================>系統建立完成");
    }
}

可能你希望框架只通過你的排程器來進行排程,而不再需要記憶體任務排程器或DB任務排程器,你可以在應用啟動時選擇性關閉:

@AutoJobScan("com.example.autojob.job")
@AutoJobProcessorScan("com.example.autojob")
public class AutoJobMainApplication {
    public static void main(String[] args) {
        new AutoJobBootstrap(AutoJobMainApplication.class)
                .withAutoScanProcessor()
                .closeDBTaskScheduler() // 關閉DB任務排程器
                .closeMemoryTaskScheduler() // 關閉記憶體任務排程器
                .build()
                .run();
        System.out.println("==================================>系統建立完成");
    }
}

注意!!!如果你沒有指定自己的排程器而關閉了框架原生的記憶體任務排程器或DB任務排程器,則框架會喪失該型別任務的排程功能,如果都關閉了則框架不再具有任何任務的排程功能。

3、自定義郵件報警

AutoJob中的郵件報警也是事件驅動的,框架釋出相關報警事件->對應處理器建立郵件物件->傳送,因此要實現自定義的郵件報警,只需要實現:自定義的報警事件、何時釋出事件、報警事件處理器(模板的建立)。

所有的報警事件都繼承於AlertEvent,下面我們看一下框架的任務執行錯誤報警的實現方式:

//定義報警事件
@Getter
@Setter
public class TaskRunErrorAlertEvent extends AlertEvent {
    public TaskRunErrorAlertEvent(String title, String content, AutoJobTask errorTask) {
        super(title, AlertEventLevel.WARN, content);
        this.errorTask = errorTask;
    }
    private AutoJobTask errorTask;
    private String stackTrace;
}

//報警事件的郵件模板建立
public static AlertMail newRunErrorAlertMail(TaskRunErrorAlertEvent event) {
        AlertMailBuilder builder = AlertMailBuilder.newInstance();
        AutoJobTask errorTask = event.getErrorTask();
        return builder
            	.setMailClient(errorTask.getMailClient())
                .setTitle(event.getTitle())
                .setLevel(AlertEventLevel.WARN)
                .addContentTitle(String.format("任務:\"%d:%s\"執行失敗", errorTask.getId(), errorTask.getAlias()), 1)
                .addBr()
                .addBold("報警時間:" + DateUtils.formatDateTime(event.getPublishTime()))
                .addBr()
                .addBold(String.format("報警機器:%s:%s", event
                        .getNode()
                        .getHost(), event
                        .getNode()
                        .getPort()))
                .addBr()
                .addBold("任務路徑:" + errorTask.getReference())
                .addBr()
                .addParagraph("堆疊資訊如下:")
                .addParagraph(event
                        .getStackTrace()
                        .replace("\n", "</br>"))
                .addError("請及時處理")
                .getAlertMail();
}

//事件處理器
@Slf4j
public class TaskRunErrorAlertEventHandler implements IAlertEventHandler<TaskRunErrorAlertEvent> {
    @Override
    public void doHandle(TaskRunErrorAlertEvent event) {
        AutoJobConfig config = AutoJobApplication.getInstance().getConfigHolder().getAutoJobConfig();
        if (!config.getTaskRunErrorAlert()) {
            return;
        }
        AlertMail alertMail = AlertMailFactory.newRunErrorAlertMail(event);
        if (alertMail != null) {
            if (alertMail.send()) {
                log.info("傳送報警郵件成功");
            } else {
                log.error("傳送報警郵件失敗");
            }
        }
    }
}

//事件處理器新增進上下文
public class AlertEventHandlerLoader implements IAutoJobLoader {
    @Override
    public void load() {
        TaskEventHandlerDelegate
                .getInstance()
                .addHandler(TaskRunErrorEvent.class, new TaskRunErrorEventHandler());
    }
}

//事件釋出
public class TaskRunErrorEventHandler implements ITaskEventHandler<TaskRunErrorEvent> {
    @Override
    public void doHandle(TaskRunErrorEvent event) {
        AlertEventHandlerDelegate
                .getInstance()
                .doHandle(AlertEventFactory.newTaskRunErrorAlertEvent(event.getTask(), event.getErrorStack()));
    }
}

上面的程式碼大家需要關注幾個地方:AlertMailBuilder是一個郵件模板構建類,可以構建一個郵件物件;報警事件處理器和任務事件處理器一樣需要通過Processor新增進上下文。

4、自定義紀錄檔儲存

框架預設紀錄檔的儲存位置是資料庫,你可以自己定義相關的儲存策略和儲存策略委派者,來實現紀錄檔在其他地方的儲存。下面來簡單演示:

//定義紀錄檔儲存策略

/**
 * 執行紀錄檔檔案儲存策略
 *
 * @Date 2022/11/21 9:15
 */
public class AutoJobLogFileStrategy implements IAutoJobLogSaveStrategy<AutoJobLog> {
    @Override
    public void doHandle(String taskPath, List<AutoJobLog> logList) {
        try {
            FileWriter fileWriter = new FileWriter(new File(taskPath));
            //...
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
/**
 * 執行紀錄檔檔案儲存策略
 *
 * @Date 2022/11/21 9:15
 */
public class AutoJobRunLogFileStrategy implements IAutoJobLogSaveStrategy<AutoJobRunLog> {
    @Override
    public void doHandle(String taskPath, List<AutoJobRunLog> logList) {
        try {
            FileWriter fileWriter = new FileWriter(new File(taskPath));
            //...
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


//設定策略委派
public class AutoJobLogFileDelegate implements ILogSaveStrategyDelegate<AutoJobLog> {
    @Override
    public IAutoJobLogSaveStrategy<AutoJobLog> doDelegate(AutoJobConfigHolder configHolder, Class<AutoJobLog> type) {
        //預設使用File儲存策略
        return new AutoJobLogFileStrategy();
    }
}

public class AutoJobRunLogFileDelegate implements ILogSaveStrategyDelegate<AutoJobRunLog> {
    @Override
    public IAutoJobLogSaveStrategy<AutoJobRunLog> doDelegate(AutoJobConfigHolder configHolder, Class<AutoJobRunLog> type) {
        //預設使用File儲存策略
        return new AutoJobRunLogFileStrategy();
    }
}

以上程式碼定義好了檔案的儲存策略,那麼如何使得我們的策略生效呢,這就需要我們再建立任務時把我們的策略委派給新增進上下文

public static void main(String[] args) {
        new AutoJobBootstrap(AutoJobMainApplication.class)
                .setLogSaveStrategyDelegate(new AutoJobLogFileDelegate()) //設定執行紀錄檔儲存策略委派者
                .setRunLogSaveStrategyDelegate(new AutoJobRunLogFileDelegate()) //設定執行紀錄檔儲存策略委派者
                .build()
                .run();
}

將我們的紀錄檔儲存策略委派者設定進去後,原有的儲存策略就會被覆蓋,當然如果你的委派者邏輯裡面返回了AutoJobLogDBStrategy等原生的儲存策略除外。

5、自定義任務過濾器

任務過濾器是一個過濾器鏈,在任務註冊進任務排程佇列前執行,主要功能是用於過濾某些不安全的任務的執行,框架提供了基於類路徑過濾的任務過濾器ClassPathFilter,但是白名單隻能在組態檔設定,因此很可能你希望實現一個動態的白名單設定,比如從資料庫比對等等,這時你就需要繼承AbstractRegisterFilter,如下示列:

//package com.example.spring.job
public class TestFilter extends AbstractRegisterFilter {
    @Override
    public void doHandle(AutoJobTask task) {
       if(/*...*/){
            //當該任務不允許註冊時直接設定成不允許註冊
            task.setIsAllowRegister(false);
        }
    }
}

@SpringBootApplication
@AutoJobScan("com.example.spring.job")
@AutoJobRegisterPreProcessorScan("com.example.spring") //指定掃描包
public class AutoJobSpringApplication {
    public static void main(String[] args) {
        SpringApplication.run(AutoJobSpringApplication.class, args);
        System.out.println("==================================>Spring應用已啟動完成");
        new AutoJobBootstrap(AutoJobSpringApplication.class)
                .withAutoScanProcessor()
                .build()
                .run();
        System.out.println("==================================>AutoJob應用已啟動完成");
    }

}

在建立應用時還需要在入口類上設定@AutoJobRegisterPreProcessorScan,指定註冊前置處理器的掃描包路徑,否則該過濾器不會被掃描到。

注意:子任務不會被該類過濾器處理。

6、註解任務開發的高階應用

在第三章節-第三小節-基於註解中,簡單演示了註解@AutoJob的用法,AutoJob框架還提供了其他註解,如@FactoryAutoJob@Conditional等,下面一一講解。

@AutoJob註解

@Autojob註解是框架中使用最多的一個註解,將其標註在一個方法上,設定好排程資訊,該方法就會在應用啟動時將其包裝成一個方法型任務放到對應的任務容器,可以參考下下面的示列。

@Slf4j
public class Jobs {
    private static final AutoJobLogHelper logHelper = new AutoJobLogHelper();

    static {
        logHelper.setSlf4jProxy(log);
    }

    //立即啟動,重複無限次,週期為5秒,使用自定義方法執行物件工廠,引數為"hello autoJob"
    @AutoJob(id = 1, attributes = "{'hello autoJob'}", defaultStartTime = StartTime.NOW, repeatTimes = -1, cycle = 5, methodObjectFactory = SpringMethodObjectFactory.class)
    public void hello(String str) {
        logHelper.info(str);
    }

    //2022-11-21 12:00:00啟動,重複3次,總共執行4次,週期為10秒,作為DB任務排程,最長允許執行時長5秒
   @AutoJob(id = 2, startTime = "2022-11-21 12:00:00", repeatTimes = 3, cycle = 10, asType = AutoJobTask.TaskType.DB_TASK, maximumExecutionTime = 5000)
    public void longTask() {
        logHelper.info("long task start");
        SyncHelper.sleepQuietly(10, TimeUnit.SECONDS);
        logHelper.info("long task end");
    }

    //作為子任務排程
    @AutoJob(id = 3, schedulingStrategy = SchedulingStrategy.AS_CHILD_TASK)
    public void childTask() {
        logHelper.info("child task start");
        SyncHelper.sleepQuietly(3, TimeUnit.SECONDS);
        logHelper.info("child task end");
    }

    //按照cron like表示式排程,重複無限次,子任務為3
    @AutoJob(id = 4, alias = "獲取隨機字串", cronExpression = "* * 0/5 17 * * ?", repeatTimes = -1, childTasksId = "3")
    public String getRandomString() {
        return StringUtils.getRandomStr(16);
    }
    
    //僅儲存到資料庫
    @AutoJob(id = 4, schedulingStrategy = SchedulingStrategy.ONLY_SAVE)
    public void error() {
        String str = null;
        str.length();
    }	
}

@FactoryAutoJob註解

由於@AutoJob的設定都是固定的,可能你希望能夠動態設定任務的某些屬性,因此@FactoryAutoJob就為了解決此類場景而出現的,當然你也可以使用基於構建的方式開發任務來實現動態,下面來看一個示列:

@FactoryAutoJob(RandomStringMethodFactory.class)
public String getRandomString() {
    return StringUtils.getRandomStr(16);
}

public class RandomStringMethodFactory implements IMethodTaskFactory {
    @Override
    public MethodTask newTask(AutoJobConfigHolder configHolder, Method method) {
        return new AutoJobMethodTaskBuilder(method.getDeclaringClass(), method.getName())
                .setTaskId(IdGenerator.getNextIdAsLong())
            	//...
                .build();
    }
}

如上示列,getRandomString的包裝將由RandomStringMethodFactory來進行。

@Conditional註解

相信經常使用Spring的小可耐們對此註解應該熟悉,在Spring中,該註解用於實現條件注入,即符合條件時該Bean才會注入到容器。在AutoJob中,功能類似,只有符合該註解指定條件的方法才能被包裝成一個任務。

7、使用內建RPC框架

AutoJob的目標是一款分散式的任務排程框架,因此內部開發了通訊框架:RPC, 這裡只做簡單介紹,後期會基於該RPC開發分散式的AutoJob。每一個AutoJob都有伺服器端和使用者端,伺服器端的開啟可以通過在組態檔裡cluster.enable=true開啟,要使用RPC框架首先需要開發服務提供類,如框架自帶的API:

@AutoJobRPCService("MemoryTaskAPI") //通過該註解宣告該類是一個RPC服務提供方
@Slf4j
public class MemoryTaskAPI implements AutoJobAPI {
    //...細節省略
    @Override
    @RPCMethod("count") //宣告該方法對外提供的方法名
    //服務方法返回值和引數都得是包裝型別
    public Integer count() {
        //...
    }
}

其他AutoJob節點如何呼叫該服務呢,也非常簡單,如下示列:

@AutoJobRPCClient("MemoryTaskAPI") //宣告該介面是一個RPC使用者端
public class MemoryTaskAPIClient{
    //方法名同服務對外提供方法名相同
    Integer count();
}

RPCClientProxy<MemoryTaskAPIClient> proxy = new RPCClientProxy<>("localhost", 7777, MemoryTaskAPIClient.class); //建立介面代理
MemoryTaskAPIClient client = proxy.clientProxy(); //獲取代理範例
System.out.println(client.count()); //像本地方法一樣使用

內嵌RPC框架基於netty開發,使用JSON進行序列化和反序列化。基礎資料型別僅支援包裝型別,即如int需要使用Integer。集合支援Map和List,支援泛型。目前RPC僅供學習使用。

8、使用基於時間動態調整的執行緒池封裝

框架的執行池AutoJobTaskExecutorPool是任務執行的地方,其包含一個快池和一個慢池,分別用於執行執行時間短和執行時間長的任務。框架任務執行原生使用的是兩個基於流量動態更新的執行緒池FlowThreadPoolExecutorHelper,為了更加適應業務需求,提供基於時間動態調整的執行緒池TimerThreadPoolExecutorPool

TimerThreadPoolExecutorHelper.TimerEntry entry = new TimerThreadPoolExecutorHelper.TimerEntry("0 0 7 * * ?", 10, 20, 60, TimeUnit.SECONDS);//設定調整項,<0的項不作調整
		//新增一個觸發監聽器
        entry.setTriggerListener((cronExpression, threadPoolExecutor) -> {
            System.out.println("日間執行緒池調整");
        });
        TimerThreadPoolExecutorHelper fastPool = TimerThreadPoolExecutorHelper
                .builder()
                .setInitialCoreTreadCount(3)
                .setInitialMaximizeTreadCount(5)
                .setTaskQueueCapacity(100)
                .addTimerEntry("0 0 22 * * ?", 0, 1, -1, null)
                .addTimerEntry(entry)
                .build();
        new AutoJobBootstrap(AutoJobSpringApplication.class)
                .withAutoScanProcessor()
            	//自定義執行池
                .setExecutorPool(new AutoJobTaskExecutorPool(null, fastPool, FlowThreadPoolExecutorHelper
                        .builder()
                        .build()))
                .build()
                .run();
        System.out.println("==================================>AutoJob應用已啟動完成");

lExecutorPool`。

TimerThreadPoolExecutorHelper.TimerEntry entry = new TimerThreadPoolExecutorHelper.TimerEntry("0 0 7 * * ?", 10, 20, 60, TimeUnit.SECONDS);//設定調整項,<0的項不作調整
		//新增一個觸發監聽器
        entry.setTriggerListener((cronExpression, threadPoolExecutor) -> {
            System.out.println("日間執行緒池調整");
        });
        TimerThreadPoolExecutorHelper fastPool = TimerThreadPoolExecutorHelper
                .builder()
                .setInitialCoreTreadCount(3)
                .setInitialMaximizeTreadCount(5)
                .setTaskQueueCapacity(100)
                .addTimerEntry("0 0 22 * * ?", 0, 1, -1, null)
                .addTimerEntry(entry)
                .build();
        new AutoJobBootstrap(AutoJobSpringApplication.class)
                .withAutoScanProcessor()
            	//自定義執行池
                .setExecutorPool(new AutoJobTaskExecutorPool(null, fastPool, FlowThreadPoolExecutorHelper
                        .builder()
                        .build()))
                .build()
                .run();
        System.out.println("==================================>AutoJob應用已啟動完成");

如上示列,快池使用基於時間動態調整的執行緒池封裝,其會在每天早上七點將執行緒池擴容到核心10執行緒,最大20執行緒,核心空閒時長更新為60秒,在每晚十點將執行緒池縮容到核心0執行緒,最大1執行緒並且新增了一個觸發監聽器;慢池使用基於流量調整執行緒池封裝。

如果對你有幫助,謝謝點贊、收藏、Star ₍˄·͈༝·͈˄*₎◞ ̑̑