大家好,這篇文章跟大家探討下日常使用執行緒池的各種姿勢,重點介紹怎麼在 Spring 環境中正確使用執行緒池。
首先問大家一個問題,你日常開發中是怎樣使用執行緒池的?
我想大致可以分為以下四種情況:
1.方法級,隨用隨建,用完關閉
2.類級共用,定義個 static final 修飾的 ThreadPoolExecutor,該類及子類(看修飾符)所有物件、方法共用
3.業務共用,按業務型別定義多個 ThreadPoolExecutor,相同業務型別共用同一執行緒池物件
4.全域性共用,服務所有地方共用同一全域性執行緒池
一般來說,優先使用方式3,其次方式2,不要使用方式1跟4,原因如下
1.執行緒池出現的目的就是為了統一管理執行緒資源,減少頻繁建立銷燬執行緒帶來的開銷,使用池化技術複用執行緒執行任務,提升系統效能,在高並行、非同步化的場景下,方法級使用根本達不到此目的,反而會使效能變低。
2.全域性共用一個執行緒池,任務執行參差不齊,相互影響,高耗時任務會佔滿執行緒池資源,導致低耗時任務沒機會執行;同時如果任務之間存在父子關係,可能會導致死鎖的發生,進而引發OOM。
3.按業務型別進行執行緒池隔離,各任務執行互不影響,粒度也比類級共用大點,不會建立大量執行緒池,降低系統排程壓力,像 Hystrix 執行緒池隔離就可以理解成這種模式。
綜上,建議大家都採用方式3,按業務功能分類定義執行緒池。
Spring 作為一個 Bean 容器,我們通常會將業務中用到的 ThreadPoolExecutor 註冊到 Spring 容器中,同時 Spring 在容器重新整理的時候會注入相應的 ThreadPoolExecutor 物件 到我們的業務 Bean 中,然後就可以直接使用了,比如定義如下(ThreadPoolBuilder是封裝的一個建造者模式實現):
@Configuration
public class ThreadPoolConfiguration {
@Bean
public ThreadPoolExecutor jobExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(LINKED_BLOCKING_QUEUE.getName(), 3000)
.build();
}
@Bean
public ThreadPoolExecutor remotingExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(SYNCHRONOUS_QUEUE.getName(), null)
.build();
}
@Bean
public ThreadPoolExecutor consumeExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(LINKED_BLOCKING_QUEUE.getName(), 5000)
.build();
}
}
以上按使用場景定義了三個執行緒池範例,一個用來執行耗時的定時任務、一個用來執行遠端RPC呼叫、一個用來執行 Mq 消費。
這樣使用 ThreadPoolExecutor 有個問題,Spring 容器關閉的時候可能任務佇列裡的任務還沒處理完,有丟失任務的風險。
我們知道 Spring 中的 Bean 是有生命週期的,如果 Bean 實現了 Spring 相應的生命週期介面(InitializingBean、DisposableBean介面),在 Bean 初始化、容器關閉的時候會呼叫相應的方法來做相應處理。
所以建議最好不要直接使用 ThreadPoolExecutor 在 Spring 環境中,可以使用 Spring 提供的 ThreadPoolTaskExecutor,或者 DynamicTp 框架提供的 DtpExecutor 執行緒池實現。
這裡分享一個原始碼閱讀技巧,就是開源專案和Spring整合時,很多同學不知從何入手閱讀原始碼。
我們知道Spring提供了很多的擴充套件點,第三方框架整合Spring其實大多也都是基於這些擴充套件介面來做的,所以我們可以從這些擴充套件介面入手,斷點偵錯,一步步深入框架核心。
這些擴充套件包括但不限於以下介面:
BeanFactoryPostProcessor:在Bean範例化之前對BeanDefinition進行修改
BeanPostProcessor:在Bean初始化前後對Bean進行一些修改包裝增強,比如返回代理物件
Aware:一個標記介面,實現該介面及子介面的類會收到Spring的通知回撥,賦予某種Spring框架的能力,比如ApplicationContextAware、EnvironmentAware等
ApplicationContextInitializer:在上下文準備階段,容器重新整理之前做一些初始化工作,比如我們常用的設定中心client基本都是繼承該初始化器,在容器重新整理前將設定從遠端拉到本地,然後封裝成PropertySource放到Environment中供使用
ApplicationListener:Spring事件機制,監聽特定的應用事件(ApplicationEvent),觀察者模式的一種實現
FactoryBean:用來自定義Bean的建立邏輯(Mybatis、Feign等等)
ImportBeanDefinitionRegistrar:定義@EnableXXX註解,在註解上Import了一個 ImportBeanDefinitionRegistrar,實現註冊BeanDefinition到容器中
ApplicationRunner/CommandLineRunner:容器啟動後回撥,執行一些初始化工作
上述列出了幾個比較常用的介面,但是Spring擴充套件遠不於此,還有很多擴充套件介面大家可以自己去了解。
DynamicTp 框架內部定義了 DtpExecutor 執行緒池類,其繼承關係如下:
EagerDtpExecutor:參考 Tomcat 執行緒池設計,調整了下執行緒池的執行流程,優先建立執行緒執行任務而不是放入佇列中,主要用於IO密集型場景,繼承 DtpExecutor
DtpExecutor:重寫了 ThreadPoolExecutor 的 execute 方法、beforeExecute 方法、afterExecute 方法,主要做任務包裝、執行超時、等待超時記錄等,繼承 DtpLifecycleSupport
DtpLifecycleSupport:實現了 Spring 中的 InitializingBean, DisposableBean 介面,在 Bean 初始化、Spring 容器銷燬時執行相應的邏輯,destroy 方法邏輯如下:
@Override
public void destroy() {
internalShutdown();
}
public void internalShutdown() {
if (log.isInfoEnabled()) {
log.info("Shutting down ExecutorService, poolName: {}", threadPoolName);
}
if (this.waitForTasksToCompleteOnShutdown) {
// 如果需要等待任務執行完畢,則呼叫shutdown()會執行先前已提交的任務,拒絕新任務提交,執行緒池狀態變成 SHUTDOWN
this.shutdown();
} else {
// 如果不需要等待任務執行完畢,則直接呼叫shutdownNow()方法,嘗試中斷正在執行的任務,返回所有未執行的任務,執行緒池狀態變成 STOP, 然後呼叫 Future 的 cancel 方法取消
for (Runnable remainingTask : this.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary();
}
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
private void awaitTerminationIfNecessary() {
if (this.awaitTerminationSeconds <= 0) {
return;
}
try {
// 配合 shutdown 使用,阻塞當前執行緒,等待已提交的任務執行完畢或者超時
if (!awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS) && log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor {} to terminate", threadPoolName);
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor {} to terminate", threadPoolName);
}
Thread.currentThread().interrupt();
}
}
DynamicTp 框架在整合 Spring 的時候,也是用到了上述說的擴充套件介面。
擴充套件1
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DtpBeanDefinitionRegistrar.class)
public @interface EnableDynamicTp {
}
使用過 DynamicTp 的小夥伴應該知道需要在啟動類加 @EnableDynamicTp 註解,該註解其實就用到了 ImportBeanDefinitionRegistrar 擴充套件,主要程式碼如下:
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
DtpProperties dtpProperties = new DtpProperties();
// 將環境變數中的執行緒池相關設定繫結到 DtpProperties 物件上
PropertiesBinder.bindDtpProperties(environment, dtpProperties);
val executors = dtpProperties.getExecutors();
if (CollUtil.isEmpty(executors)) {
log.warn("DynamicTp registrar, no executors are configured.");
return;
}
executors.forEach(x -> {
// 判斷執行緒池型別(common or eager)
Class<?> executorTypeClass = ExecutorType.getClass(x.getExecutorType());
String beanName = x.getThreadPoolName();
// 執行緒池物件屬性
Map<String, Object> properties = buildProperties(x);
// 構造器引數
Object[] args = buildArgs(executorTypeClass, x);
BeanUtil.registerIfAbsent(registry, beanName, executorTypeClass, properties, args);
});
}
程式碼解讀:
1.我們知道 ImportBeanDefinitionRegistrar 的實現是在 Spring 容器重新整理的時候執行的,在此之前在上下文準備階段已經從設定中心拉取到執行緒池設定放到環境變數裡了,所以第一步我們將環境變數裡的執行緒池相關設定繫結到 DtpProperties 物件上。
2.然後構造 BeanDefinitionBuilder 物件,設定建構函式引數、設定屬性值,註冊到 BeanDefinition 到 Spring 容器中
public static void doRegister(BeanDefinitionRegistry registry,
String beanName,
Class<?> clazz,
Map<String, Object> properties,
Object... constructorArgs) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);
// 設定構造器引數,老八股文了
for (Object constructorArg : constructorArgs) {
builder.addConstructorArgValue(constructorArg);
}
// 設定屬性及值的KV對,後續在Bean populateBean 的時候會通過反射set方法賦值
if (CollUtil.isNotEmpty(properties)) {
properties.forEach(builder::addPropertyValue);
}
registry.registerBeanDefinition(beanName, builder.getBeanDefinition());
}
3.Spring 容器重新整理時會根據註冊的 BeanDefinition 建立設定的執行緒池物件,初始化賦值,並注入到參照的 Bean 中。這樣就不用在手動用 @Bean 宣告執行緒池物件了,只需要在設定中心設定即可
擴充套件2
DtpPostProcessor 繼承 BeanPostProcessor,在 Bean 初始化前後對 ThreadPoolExecutor 及其子類進行一些處理,主要用來獲取執行緒池物件註冊到 DynamicTp 框架內部定義的容器中(就個 Map)
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!(bean instanceof ThreadPoolExecutor)) {
return bean;
}
if (bean instanceof DtpExecutor) {
DtpExecutor dtpExecutor = (DtpExecutor) bean;
if (bean instanceof EagerDtpExecutor) {
((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);
}
registerDtp(dtpExecutor);
return dtpExecutor;
}
ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
DynamicTp dynamicTp;
try {
dynamicTp = applicationContext.findAnnotationOnBean(beanName, DynamicTp.class);
if (dynamicTp == null) {
return bean;
}
} catch (NoSuchBeanDefinitionException e) {
log.error("There is no bean with the given name {}", beanName, e);
return bean;
}
String poolName = StringUtils.isNotBlank(dynamicTp.value()) ? dynamicTp.value() : beanName;
registerCommon(poolName, (ThreadPoolExecutor) bean);
return bean;
}
擴充套件3
ApplicationListener 主要用來解耦邏輯,釋出監聽事件,core 模組跟 adapter 模組通訊主要就用該擴充套件,以及框架會監聽 Spring 容器啟動的各階段事件,做相應的邏輯處理
public abstract class AbstractDtpHandleListener implements GenericApplicationListener {
@Override
public boolean supportsEventType(ResolvableType resolvableType) {
Class<?> type = resolvableType.getRawClass();
if (type != null) {
return RefreshEvent.class.isAssignableFrom(type) ||
CollectEvent.class.isAssignableFrom(type) ||
AlarmCheckEvent.class.isAssignableFrom(type);
}
return false;
}
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
try {
if (event instanceof RefreshEvent) {
doRefresh(((RefreshEvent) event).getDtpProperties());
} else if (event instanceof CollectEvent) {
doCollect(((CollectEvent) event).getDtpProperties());
} else if (event instanceof AlarmCheckEvent) {
doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
}
} catch (Exception e) {
log.error("DynamicTp adapter, event handle failed.", e);
}
}
}
擴充套件4
ApplicationRunner,等 Spring 容器啟動後,會呼叫該勾點函數,執行一些初始化操作,DtpMonitor、DtpRegistry 等都用到了該擴充套件
所以 DynamicTp 的正確使用姿勢,執行緒池只需在設定中心宣告,然後服務啟動時框架會基於 Spring 的這些擴充套件自動建立執行緒池物件注入到所需的 Bean 中,程式碼中不需要顯示宣告
DynamicTp 是一個基於設定中心實現的輕量級動態執行緒池管理工具,主要功能可以總結為 動態調參、通知報警、執行監控、三方包執行緒池管理等幾大類。
經過幾個版本迭代,目前最新版本v1.0.7具有以下特性
特性 ✅
程式碼零侵入:所有設定都放在設定中心,對業務程式碼零侵入
輕量簡單:基於 springboot 實現,引入 starter,接入只需簡單4步就可完成,順利3分鐘搞定
高可延伸:框架核心功能都提供 SPI 介面供使用者自定義個性化實現(設定中心、組態檔解析、通知告警、監控資料採集、任務包裝等等)
線上大規模應用:參考美團執行緒池實踐,美團內部已經有該理論成熟的應用經驗
多平臺通知報警:提供多種報警維度(設定變更通知、活性報警、容量閾值報警、拒絕觸發報警、任務執行或等待超時報警),已支援企業微信、釘釘、飛書報警,同時提供 SPI 介面可自定義擴充套件實現
監控:定時採集執行緒池指標資料,支援通過 MicroMeter、JsonLog 紀錄檔輸出、Endpoint 三種方式,可通過 SPI 介面自定義擴充套件實現
任務增強:提供任務包裝功能,實現TaskWrapper介面即可,如 TtlTaskWrapper 可以支援執行緒池上下文資訊傳遞,以及給任務設定標識id,方便問題追蹤
相容性:JUC 普通執行緒池也可以被框架監控,@Bean 定義時加 @DynamicTp 註解即可
可靠性:框架提供的執行緒池實現 Spring 生命週期方法,可以在 Spring 容器關閉前儘可能多的處理佇列中的任務
多模式:參考Tomcat執行緒池提供了 IO 密集型場景使用的 EagerDtpExecutor 執行緒池
支援多設定中心:基於主流設定中心實現執行緒池引數動態調整,實時生效,已支援 Nacos、Apollo、Zookeeper、Consul,同時也提供 SPI 介面可自定義擴充套件實現
中介軟體執行緒池管理:整合管理常用第三方元件的執行緒池,已整合Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix等元件的執行緒池管理(調參、監控報警)
目前累計 1.5k star,感謝你的star,歡迎pr,業務之餘一起給開源貢獻一份力量
gitee地址:https://gitee.com/dromara/dynamic-tp
github地址:https://github.com/dromara/dynamic-tp