Dromara【新晉開源專案】DynamicTp,美團動態執行緒池思路的實踐!

2022-04-15 19:00:30

快訊,開源輕量級動態執行緒池監控 - DynamicTp 加入Dromara社群!歡迎歡迎!!

下面是DynamicTp作者對DynamicTp的介紹。

大家好,今天我們來聊一個比較實用的話題,動態可監控的執行緒池實踐,全新開源專案(DynamicTp)地址在文章末尾,歡迎交流學習。


背景

使用 ThreadPoolExecutor 過程中你是否有以下痛點呢?

1.程式碼中建立了一個 ThreadPoolExecutor,但是不知道那幾個核心引數設定多少比較合適

2.憑經驗設定引數值,上線後發現需要調整,改程式碼重新啟動服務,非常麻煩

3.執行緒池相對開發人員來說是個黑盒,執行情況不能感知到,直到出現問題

如果你有以上痛點,動態可監控執行緒池(DynamicTp)或許能幫助到你。

如果看過 ThreadPoolExecutor 的原始碼,大概可以知道其實它有提供一些 set 方法,可以在執行時動態去修改相應的值,這些方法有:

public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);

現在大多數的網際網路專案其實都會微服務化部署,有一套自己的服務治理體系,微服務元件中的分散式設定中心扮演的就是動態修改設定, 實時生效的角色。那麼我們是否可以結合設定中心來做執行時執行緒池引數的動態調整呢?答案是肯定的,而且設定中心相對都是高可用的, 使用它也不用過於擔心設定推播出現問題這類事兒,而且也能減少研發動態執行緒池元件的難度和工作量。

綜上,我們總結出以下的背景

  • 廣泛性:在 Java 開發中,想要提高系統效能,執行緒池已經是一個 90%以上的人都會選擇使用的基礎工具

  • 不確定性:專案中可能會建立很多執行緒池,既有 IO 密集型的,也有 CPU 密集型的,但執行緒池的引數並不好確定;需要有套機制在執行過程中動態去調整引數

  • 無感知性,執行緒池執行過程中的各項指標一般感知不到;需要有套監控報警機制在事前、事中就能讓開發人員感知到執行緒池的執行狀況,及時處理

  • 高可用性,設定變更需要及時推播到使用者端;需要有高可用的設定管理推播服務,設定中心是現在大多數網際網路系統都會使用的元件,與之結合可以大幅度減少開發量及接入難度


簡介

我們基於設定中心對執行緒池 ThreadPoolExecutor 做一些擴充套件,實現對執行中執行緒池引數的動態修改,實時生效; 以及實時監控執行緒池的執行狀態,觸發設定的報警策略時報警,報警資訊會推播辦公平臺(釘釘、企微等)。 報警維度包括(佇列容量、執行緒池活性、拒絕觸發、任務超時等);同時也會定時採集執行緒池指標資料供監控平臺視覺化使用。 使我們能時刻感知到執行緒池的負載,根據情況及時調整,避免出現問題影響線上業務。

    |  __ \                            (_) |__   __|
    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __
    | |  | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \
    | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
    |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/
             __/ |                              | |
            |___/                               |_|
     :: Dynamic Thread Pool ::

特性

  • 參考,對執行緒池引數動態化管理,增加監控、報警功能

  • 基於 Spring 框架,現只支援 SpringBoot 專案使用,輕量級,引入 starter 即可使用

  • 基於設定中心實現執行緒池引數動態調整,實時生效;整合主流設定中心,已支援 Nacos、Apollo、Zookeeper、Consul, 同時也提供 SPI 介面可自定義擴充套件實現

  • 內建通知報警功能,提供多種報警維度(設定變更通知、活性報警、容量閾值報警、拒絕觸發報警、任務執行或等待超時報警), 預設支援企業微信、釘釘報警,同時提供 SPI 介面可自定義擴充套件實現

  • 內建執行緒池指標採集功能,支援通過 MicroMeter、JsonLog 紀錄檔輸出、Endpoint 三種方式,可通過 SPI 介面自定義擴充套件實現

  • 整合管理常用第三方元件的執行緒池,已整合 SpringBoot 內建 WebServer(Tomcat、Undertow、Jetty)的執行緒池管理

  • 提供任務包裝功能,實現TaskWrapper介面即可,如TtlTaskWrapper可以支援執行緒池上下文資訊傳遞

  • JUC普通執行緒池也可以被框架監控(@DynamicTp);參考Tomcat執行緒池提供了io密集型場景使用的EagerDtpExecutor


架構設計

主要分四大模組

  • 設定變更監聽模組:

    1.監聽特定設定中心的指定組態檔(已實現 Nacos、Apollo、Zookeeper、Consul),可通過內部提供的SPI介面擴充套件其他實現

    2.解析組態檔內容,內建實現 yml、properties 組態檔的解析,可通過內部提供的 SPI 介面擴充套件其他實現

    3.通知執行緒池管理模組實現重新整理

  • 執行緒池管理模組:

    1.服務啟動時從設定中心拉取設定,生成執行緒池範例註冊到內部執行緒池註冊中心以及Spring容器中

    2.監聽模組監聽到設定變更時,將變更資訊傳遞給管理模組,實現執行緒池引數的重新整理

    3.程式碼中通過依賴注入(推薦)或者getExecutor()方法根據執行緒池名稱來獲取執行緒池範例

  • 監控模組:

    實現監控指標採集以及輸出,預設提供以下三種方式,也可通過內部提供的 SPI 介面擴充套件其他實現

    1.預設實現JsonLog輸出到磁碟,可以自己採集解析紀錄檔,儲存展示

    2.MicroMeter採集,引入 MicroMeter 相關依賴,暴露相關端點

    3.暴雷自定義Endpoint端點,可通過 http 方式實時存取

  • 通知告警模組:

    對接辦公平臺,實現通知告警功能,預設實現釘釘、企微,可通過內部提供的 SPI 介面擴充套件其他實現,通知告警型別如下

    1.執行緒池主要引數變更通知

    2.阻塞佇列容量達到設定的告警閾值

    3.執行緒池活性達到設定的告警閾值

    4.觸發拒絕策略告警,格式:A/B,A:該報警項前後兩次報警區間累加數量,B:該報警項累計總數

    5.任務執行超時告警,格式:A/B,A:該報警項前後兩次報警區間累加數量,B:該報警項累計總數

    6.任務等待超時告警,格式:A/B,A:該報警項前後兩次報警區間累加數量,B:該報警項累計總數


使用

1.引入對應設定中心的依賴

2.設定中心設定執行緒池範例,設定參考下文(給出的是全設定項,設定項都有預設值)

3.啟動類加@EnableDynamicTp註解

4.使用@Resource或@Autowired註解注入,或通過DtpRegistry.getExecutor("name")獲取

5.普通JUC執行緒池想要被監控,可以@Bean定義時加@DynamicTp註解

6.tips:動態執行緒池範例服務啟動時會根據設定中心的設定動態註冊到Spring容器中,建議不要用@Bean程式設計式重複宣告同一執行緒池範例,直接設定在設定中心就行

7.詳細參考Example範例

  • 執行緒池設定(yml 型別)

    spring:
      dynamic:
        tp:
          enabled: true
          enabledBanner: true        # 是否開啟banner列印,預設true
          enabledCollect: false      # 是否開啟監控指標採集,預設false
          collectorType: logging     # 監控資料採集器型別(JsonLog | MicroMeter),預設logging
          logPath: /home/logs        # 監控紀錄檔資料路徑,預設 ${user.home}/logs
          monitorInterval: 5         # 監控時間間隔(報警判斷、指標採集),預設5s
          platforms:                 # 通知報警平臺設定
            - platform: wechat
              urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c  # 替換
              receivers: test1,test2                   # 接受人企微名稱
            - platform: ding
              urlKey: f80dad441fcd655438f4a08dcd6a     # 替換
              secret: SECb5441fa6f375d5b9d21           # 替換,非sign模式可以沒有此值
              receivers: 15810119805                   # 釘釘賬號手機號
          tomcatTp:                                    # tomcat web server執行緒池設定
              minSpare: 100
              max: 400
          jettyTp:                                     # jetty web server執行緒池設定
              min: 100
              max: 400
          undertowTp:                                  # undertow web server執行緒池設定
              coreWorkerThreads: 100                   # 核心執行緒數
              maxWorkerThreads: 400                    # 最大執行緒數
              workerKeepAlive: 40                     
          executors:                                   # 動態執行緒池設定,都有預設值,採用預設值的可以不設定該項,減少設定量
            - threadPoolName: dtpExecutor1
              executorType: common                     # 執行緒池型別common、eager:適用於io密集型
              corePoolSize: 6
              maximumPoolSize: 8
              queueCapacity: 200
              queueType: VariableLinkedBlockingQueue   # 任務佇列,檢視原始碼QueueTypeEnum列舉類
              rejectedHandlerType: CallerRunsPolicy    # 拒絕策略,檢視RejectedTypeEnum列舉類
              keepAliveTime: 50
              allowCoreThreadTimeOut: false
              threadNamePrefix: test                         # 執行緒名字首
              waitForTasksToCompleteOnShutdown: false        # 參考spring執行緒池設計
              awaitTerminationSeconds: 5                     # 單位(s)
              preStartAllCoreThreads: false                  # 是否預熱核心執行緒,預設false
              runTimeout: 200                                # 任務執行超時閾值,目前只做告警用,單位(ms)
              queueTimeout: 100                              # 任務在佇列等待超時閾值,目前只做告警用,單位(ms)
              taskWrapperNames: ["ttl"]                          # 任務包裝器名稱,整合TaskWrapper介面
              notifyItems:                     # 報警項,不設定自動會按預設值設定(變更通知、容量報警、活性報警、拒絕報警、任務超時報警)
                - type: capacity               # 報警項型別,檢視原始碼 NotifyTypeEnum列舉類
                  enabled: true
                  threshold: 80                # 報警閾值
                  platforms: [ding,wechat]     # 可選設定,不設定預設拿上層platforms設定的所以平臺
                  interval: 120                # 報警間隔(單位:s)
    
  • 程式碼呼叫,從DtpRegistry中根據執行緒池名稱獲取,或者通過依賴注入方式(推薦,更優雅)

    @Resource
    private ThreadPoolExecutor dtpExecutor1;
    
    public void exec() {
       dtpExecutor1.execute(() -> System.out.println("test"));
    }
    
    public static void main(String[] args) {
       DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dtpExecutor1");
       dtpExecutor.execute(() -> System.out.println("test"));
    }
    
  • 詳細使用範例參考example工程


通知報警

  • 觸發報警閾值會推播相應報警訊息(活性、容量、拒絕、超時),且會高亮顯示相應欄位

  • 設定變更會推播通知訊息,且會高亮變更的欄位


監控

監控資料

監控資料

通過 collectType 屬性設定監控指標採集型別,預設 logging

  • MicroMeter:通過引入相關 MicroMeter 依賴採集到相應的平臺 (如 Prometheus,InfluxDb...)

  • Logging:定時採集指標資料以 Json 紀錄檔格式輸出磁碟,地址 {appName}.monitor.log

  • 暴露 EndPoint 端點(dynamic-tp),可以通過 http 方式請求

    [
        {
            "dtp_name""remoting-call",
            "core_pool_size"6,
            "maximum_pool_size"12,
            "queue_type""SynchronousQueue",
            "queue_capacity"0,
            "queue_size"0,
            "fair"false,
            "queue_remaining_capacity"0,
            "active_count"0,
            "task_count"21760,
            "completed_task_count"21760,
            "largest_pool_size"12,
            "pool_size"6,
            "wait_task_count"0,
            "reject_count"124662,
            "reject_handler_name""CallerRunsPolicy"
        }
    ]
    

關注專案

對專案有什麼想法或者建議,可以建立,一起完善專案

倉庫地址,歡迎給專案點贊!

https://github.com/dromara/dynamic-tp

https://gitee.com/dromara/dynamic-tp

展開閱讀全文