我們應對並行場景時一般會採用下面方式去預估執行緒池的執行緒數量,比如QPS需求是1000,平均每個任務需要執行的時間是t秒,那麼我們需要的執行緒數是t * 1000。
但是在一些情況下,這個t是不好估算的,即便是估算出來了,在實際的執行緒環境上也需要進行驗證和微調。比如在本文所闡述分頁查詢的資料項組合場景中。
1、資料組合依賴不同的上游接介面, 它們的響應時間參差不齊,甚至差距還非常大。有些介面支援批次查詢而另一些則不支援批次查詢。有些介面因為效能問題還需要考慮降級和平滑方案。
2、為了提升使用者體驗,這裡的查詢設計了動態列,因此每一次存取所需要組合的資料項和數量也是不同的。
因此這裡如果需要估算出一個合理的t是不太現實的。
一種可動態調節的策略,根據監控的反饋對執行緒池進行微調。整體設計分為裝配邏輯和執行緒池封裝設計。
查詢結果,拆分分片(水平拆分),並行裝配(垂直拆分),獲得裝配項列表(動態列), 並行裝配每一項。
可調節的核心執行緒數、最大執行緒數、執行緒保持時間,佇列大小,提交任務重試等待時間,提交任務重試次數。 固定異常拒絕策略。
調節引數:
欄位 | 名稱 | 說明 |
---|---|---|
corePoolSize | 核心執行緒數 | 參考執行緒池定義 |
maximumPoolSize | 最大執行緒數 | 參考執行緒池定義 |
keepAliveTime | 執行緒存活時間 | 參考執行緒池定義 |
queueSize | 佇列長度 | 參考執行緒池定義 |
resubmitSleepMillis | 提交任務重試等待時間 | 新增任務被拒絕後重試時的等待時間 |
resubmitTimes | 提交任務重試次數 | 新增任務被拒絕後重試新增的最大次數 |
@Data
private static class PoolPolicy {
/** 核心執行緒數 */
private Integer corePoolSize;
/** 最大執行緒數 */
private Integer maximumPoolSize;
/** 執行緒存活時間 */
private Integer keepAliveTime;
/** 佇列容量 */
private Integer queueSize;
/** 重試等待時間 */
private Long resubmitSleepMillis;
/** 重試次數 */
private Integer resubmitTimes;
}
建立執行緒池:
執行緒池的建立考慮了動態的需求,滿足根據壓測結果進行微調的要求。首先快取舊的執行緒池後再建立新的執行緒,當新的執行緒池建立成功後再去關閉舊的執行緒池。保證在這個替換過程中不影響正在執行的業務。執行緒池使用了中斷策略,使用者可以及時感知到系統繁忙併保證了系統資源佔用的安全。
public void reloadThreadPool(PoolPolicy poolPolicy) {
if (poolPolicy == null) {
throw new RuntimeException("The thread pool policy cannot be empty.");
}
if (poolPolicy.getCorePoolSize() == null) {
poolPolicy.setCorePoolSize(0);
}
if (poolPolicy.getMaximumPoolSize() == null) {
poolPolicy.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() + 1);
}
if (poolPolicy.getKeepAliveTime() == null) {
poolPolicy.setKeepAliveTime(60);
}
if (poolPolicy.getQueueSize() == null) {
poolPolicy.setQueueSize(Runtime.getRuntime().availableProcessors() + 1);
}
if (poolPolicy.getResubmitSleepMillis() == null) {
poolPolicy.setResubmitSleepMillis(200L);
}
if (poolPolicy.getResubmitTimes() == null) {
poolPolicy.setResubmitTimes(5);
}
// - 執行緒池策略沒有變化直接返回已有執行緒池。
ExecutorService original = this.executorService;
this.executorService = new ThreadPoolExecutor(
poolPolicy.getCorePoolSize(),
poolPolicy.getMaximumPoolSize(),
poolPolicy.getKeepAliveTime(), TimeUnit.SECONDS,
new ArrayBlockingQueue<>(poolPolicy.getQueueSize()),
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build(),
new ThreadPoolExecutor.AbortPolicy());
this.poolPolicy = poolPolicy;
if (original != null) {
original.shutdownNow();
}
}
任務提交:
執行緒池封裝物件中使用的執行緒池拒絕策略是AbortPolicy,因此線上程數和阻塞佇列到達上限後會觸發異常。另外在這裡為了保證提交的成功率利用重試策略實現了一定程度的延遲處理,具體場景中可以結合業務特點進行適當的調節和設定。
public <T> Future<T> submit(Callable<T> task) {
RejectedExecutionException exception = null;
Future<T> future = null;
for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
try {
// - 新增任務
future = this.executorService.submit(task);
exception = null;
break;
} catch (RejectedExecutionException e) {
exception = e;
this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
}
}
if (exception != null) {
throw exception;
}
return future;
}
監控:
1、submit提交的監控
見程式碼中的「監控點①」,在submit方法中新增監控點,監控key的需要添執行緒池封裝物件的執行緒名稱字首,用於區分具體的執行緒池物件。
「監控點①」用於監控新增任務的動作是否正常,以便對執行緒池物件及策略引數進行微調。
public <T> Future<T> submit(Callable<T> task) {
// - 監控點①
CallerInfo callerInfo = Profiler.registerInfo(UmpConstant.THREAD_POOL_WAP + threadNamePrefix,
UmpConstant.APP_NAME,
UmpConstant.UMP_DISABLE_HEART,
UmpConstant.UMP_ENABLE_TP);
RejectedExecutionException exception = null;
Future<T> future = null;
for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
try {
// - 新增任務
future = this.executorService.submit(task);
exception = null;
break;
} catch (RejectedExecutionException e) {
exception = e;
this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
}
}
if (exception != null) {
// - 監控點①
Profiler.functionError(callerInfo);
throw exception;
}
// - 監控點①
Profiler.registerInfoEnd(callerInfo);
return future;
}
2、執行緒池並行任務
見程式碼的「監控點②」,分別在新增任務和任務完成後。
「監控點②」實時統計線上程中執行的總任務數量,用於評估執行緒池的任務的數量的滿載水平。
/** 任務並行數量統計 */
private AtomicInteger parallelTaskCount = new AtomicInteger(0);
public <T> Future<T> submit(Callable<T> task) {
RejectedExecutionException exception = null;
Future<T> future = null;
for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
try {
// - 新增任務
future = this.executorService.submit(()-> {
T rst = task.call();
// - 監控點②
log.info("{} - Parallel task count {}", this.threadNamePrefix, this.parallelTaskCount.decrementAndGet());
return rst;
});
// - 監控點②
log.info("{} + Parallel task count {}", this.threadNamePrefix, this.parallelTaskCount.incrementAndGet());
exception = null;
break;
} catch (RejectedExecutionException e) {
exception = e;
this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
}
}
if (exception != null) {
throw exception;
}
return future;
}
執行緒池封裝物件策略的調節時機
1)上線前基於流量預估的壓測階段;
2)上線後跟進監控資料和執行緒池中任務的滿載水平進行人工微調,也可以通過JOB在指定的時間自動調整;
3)大促前依據往期大促峰值來調高相關引數。
執行緒池封裝物件策略的調節經驗
1)存取時長要求較低時,我們可以考慮調小執行緒數和阻塞佇列,適當調大提交任務重試等待時間和次數,以便降低資源佔用。
2)存取時長要求較高時,就需要調大執行緒數並保證相對較小的阻塞佇列,調小提交任務的重試等待時間和次數甚至分別調成0和1(即關閉重試提交邏輯)。
作者:京東零售 王文明
來源:京東雲開發者社群 轉載請註明來源