在日常開發中經常會遇到需要使用其它執行緒將大量任務非同步處理的場景(非同步化以及提升系統的吞吐量),而在使用執行緒的過程中卻存在著兩個痛點。
而執行緒池正是為解決上述痛點而生的,其通過兩個手段來解決上述痛點。
池化執行緒資源,顧名思義就是維護一個存活執行緒的集合(池子)。提交任務的使用者程式不直接控制執行緒的建立和銷燬,不用每次執行任務時都申請建立一個新執行緒,而是通過執行緒池間接的獲得執行緒去處理非同步任務。
執行緒池中的執行緒在執行完任務後通常也不會被系統回收掉,而是繼續待在池子中用於執行其它的任務(執行堆積的待執行任務或是等待新任務)。
執行緒池通過池化執行緒資源,避免了系統反覆建立/銷燬執行緒的開銷,大幅提高了處理大規模非同步任務時的效能。
如果程式都統一使用執行緒池來處理非同步任務,則執行緒池內部便可以對系統資源的使用施加一定限制。
例如使用者可以指定一個執行緒池最大可維護的執行緒數量,以避免耗盡系統資源。
當用戶提交任務的速率過大,導致執行緒池中的執行緒數到達指定的最大值時依然無法滿足需求時,執行緒池可以通過丟棄部分任務或限制提交任務的流量的方式來處理這一問題。
執行緒池通過對執行緒資源的使用進行統一收口,使用者可以通過設定執行緒池的引數來控制系統資源的使用,從而避免系統資源耗盡。
前面介紹了執行緒池的概念,而要深入理解執行緒池的工作原理最好的辦法便是找到一個優秀的執行緒池實現來加以研究。
而自jdk1.5中引入的通用執行緒池框架ThreadPoolExecutor便是一個很好的學習物件。其內部實現不算複雜,卻在高效實現核心功能的同時還提供了較豐富的拓展能力。
下面從整體上介紹一下jdk通用執行緒池ThreadPoolExecutor的工作原理(基於jdk8)。
首先ThreadPoolExecutor允許使用者從兩個不同維度來控制執行緒資源的使用,即最大核心執行緒數(corePoolSize)和最大執行緒數(maximumPoolSize)。
最大核心執行緒數:核心執行緒指的是通常常駐執行緒池的執行緒。常駐執行緒線上程池沒有任務空閒時也不會被銷燬,而是處於idle狀態,這樣在新任務到來時就能很快的進行響應。
最大執行緒數:和第一節中提到的一樣,即執行緒池中所能允許的活躍執行緒的最大數量。
在向ThreadPoolExecutor提交任務時(execute方法),會執行一系列的判斷來決定任務應該如何被執行(原始碼在下一節中具體分析)。
執行緒池的優雅停止一般要能做到以下幾點:
執行緒池自啟動後便會有大量的工作執行緒在內部持續不斷並行的執行提交的各種任務,而要想做到優雅停止並不是一件容易的事情。
因此ThreadPoolExecutor中最複雜、細節最多的部分並不在於上文中的正常工作流程,而在於分散在各個地方但又緊密共同作業的,控制優雅停止的邏輯。
除了正常的工作流程以及優雅停止的功能外,ThreadPoolExecutor還提供了一些比較好用的功能
如費曼所說:What I can not create I do not understand(我不能理解我創造不了的東西)。
通過模仿jdk的ThreadPoolExecutor實現,從零開始實現一個執行緒池,可以迫使自己去仔細的捋清楚jdk執行緒池中設計的各種細節,加深理解而達到更好的學習效果。
前面提到ThreadPoolExecutor的核心邏輯主要分為兩部分,一是正常執行時處理提交的任務的邏輯,二是實現優雅停止的邏輯。
因此我們實現的執行緒池MyThreadPoolExecutor(以My開頭用於區分)也會分為兩個版本,v1版本只實現前一部分即正常執行時執行任務的邏輯,將有關執行緒池優雅停止的邏輯全部去除。
相比直接啃jdk最終實現的原始碼,v1版本的實現會更簡單更易理解,讓正常執行任務時的邏輯更加清晰而不會耦合太多關於優雅停止的邏輯。
ThreadPoolExecutor中有許多的成員變數,大致可以分為三類。
其中前6個設定引數都可以在ThreadPoolExecutor的建構函式中指定,而allowCoreThreadTimeOut則可以通過暴露的public方法allowCoreThreadTimeOut來動態的設定。
其中大部分屬性都是volatile修飾的,目的是讓執行過程中可以用過提供的public方法動態修改這些值後,執行緒池中的工作執行緒或提交任務的使用者執行緒能及時的感知到變化(執行緒間的可見性),並進行響應(比如令核心執行緒自動的idle退出)
這些設定屬性具體如何控制執行緒池行為的原理都會在下面的原始碼解析中展開介紹。理解這些引數的工作原理後才能在實際的業務中使用執行緒池時為其設定合適的值。
這裡重點介紹一下ctl屬性。ctl雖然是一個32位元的整型欄位(AtomicInteger),但實際上卻用於標識兩個業務屬性,即當前執行緒池的執行狀態和worker執行緒的總數量。
線上程池初始化時狀態位RUNNING,worker執行緒數量位0(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)。
ctl的32位元中的高3位用於標識執行緒池當前的狀態,剩餘的29位用於標識執行緒池中worker執行緒的數量(因此理論上ThreadPoolExecutor最大可容納的執行緒數並不是231-1(32位元中符號要佔一位),而是229-1)
由於聚合之後單獨的讀寫某一個屬性不是很方便,所以ThreadPoolExecutor中提供了很多基於位運算的輔助函數來簡化這些邏輯。
ctl這樣聚合的設計比起拆分成兩個獨立的欄位有什麼好處?
在ThreadPoolExecutor中關於優雅停止的邏輯中有很多地方是需要同時判斷當前工作執行緒數量與執行緒池狀態後,再對執行緒池狀態工作執行緒數量進行更新的(具體邏輯在下一篇v2版本的部落格中展開)。
且為了執行效率,不使用互斥鎖而是通過cas重試的方法來解決並行更新的問題。而對一個AtomicInteger屬性做cas重試的更新,要比同時控制兩個屬性進行cas的更新要簡單很多,執行效率也高很多。
ThreadPoolExecutor共有五種狀態,但有四種都和優雅停止有關(除了RUNNING)。
但由於v1版本的MyThreadPoolExecutorV1不支援優雅停止,所以不在本篇部落格中講解這些狀態具體的含義以及其是如何變化的(下一篇v2版本的部落格中展開)
public class MyThreadPoolExecutorV1 implements MyThreadPoolExecutor{
/**
* 指定的最大核心執行緒數量
* */
private volatile int corePoolSize;
/**
* 指定的最大執行緒數量
* */
private volatile int maximumPoolSize;
/**
* 執行緒保活時間(單位:納秒 nanos)
* */
private volatile long keepAliveTime;
/**
* 存放任務的工作佇列(阻塞佇列)
* */
private final BlockingQueue<Runnable> workQueue;
/**
* 執行緒工廠
* */
private volatile ThreadFactory threadFactory;
/**
* 拒絕策略
* */
private volatile MyRejectedExecutionHandler handler;
/**
* 是否允許核心執行緒在idle一定時間後被銷燬(和非核心執行緒一樣)
* */
private volatile boolean allowCoreThreadTimeOut;
/**
* 主控鎖
* */
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 當前執行緒池已完成的任務數量
* */
private long completedTaskCount;
/**
* 維護當前存活的worker執行緒集合
* */
private final HashSet<MyWorker> workers = new HashSet<>();
/**
* 當前執行緒池中存在的worker執行緒數量 + 狀態的一個聚合(通過一個原子int進行cas,來避免對兩個業務屬性欄位加鎖來保證一致性)
* v1版本只關心前者,即worker執行緒數量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 32位元的有符號整數,有3位是用來存放執行緒池狀態的,所以用來維護當前工作執行緒個數的部分就只能用29位了
* 被佔去的3位中,有1位原來的符號位,2位是原來的數值位。
* */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 執行緒池狀態poolStatus常數(狀態值只會由小到大,單調遞增)
* 執行緒池狀態遷移圖:
* ↗ SHUTDOWN ↘
* RUNNING ↓ TIDYING → TERMINATED
* ↘ STOP ↗
* 1 RUNNING狀態,代表著執行緒池處於正常執行的狀態。能正常的接收並處理提交的任務
* 執行緒池物件初始化時,狀態為RUNNING
* 對應邏輯:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
*
* 2 SHUTDOWN狀態,代表執行緒池處於停止對外服務的狀態。不再接收新提交的任務,但依然會將workQueue工作佇列中積壓的任務處理完
* 呼叫了shutdown方法時,狀態由RUNNING -> SHUTDOWN
* 對應邏輯:shutdown方法中的advanceRunState(SHUTDOWN);
*
* 3 STOP狀態,代表執行緒池處於停止狀態。不再接受新提交的任務,同時也不再處理workQueue工作佇列中積壓的任務,當前還在處理任務的工作執行緒將收到interrupt中斷通知
* 之前未呼叫shutdown方法,直接呼叫了shutdownNow方法,狀態由RUNNING -> STOP
* 之前先呼叫了shutdown方法,後呼叫了shutdownNow方法,狀態由SHUTDOWN -> STOP
* 對應邏輯:shutdownNow方法中的advanceRunState(STOP);
*
* 4 TIDYING狀態,代表著執行緒池即將完全終止,正在做最後的收尾工作
* 當前執行緒池狀態為SHUTDOWN,任務被消費完工作佇列workQueue為空,且工作執行緒全部退出完成工作執行緒集合workers為空時,tryTerminate方法中將狀態由SHUTDOWN->TIDYING
* 當前執行緒池狀態為STOP,工作執行緒全部退出完成工作執行緒集合workers為空時,tryTerminate方法中將狀態由STOP->TIDYING
* 對應邏輯:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
*
* 5 TERMINATED狀態,代表著執行緒池完全的關閉。之前執行緒池已經處於TIDYING狀態,且呼叫的勾點函數terminated已返回
* 當前執行緒池狀態為TIDYING,呼叫的勾點函數terminated已返回
* 對應邏輯:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
* */
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/**
* 跟蹤執行緒池曾經有過的最大執行緒數量(只能在mainLock的並行保護下更新)
*/
private int largestPoolSize;
private boolean compareAndIncrementWorkerCount(int expect) {
return this.ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {
// cas更新,workerCount自減1
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
public MyThreadPoolExecutorV1(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
MyRejectedExecutionHandler handler) {
// 基本的引數校驗
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
throw new IllegalArgumentException();
}
if (unit == null || workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}
// 設定成員變數
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
}
ThreadPoolExecutor中的工作執行緒並不是裸的Thread,而是被封裝在了一個Worker的內部類中。
Worker實現了Runnable所以可以作為一個普通的執行緒來啟動,在run方法中只是簡單的呼叫了一下runWorker(runWorker後面再展開)。
Worker類有三個成員屬性:
Worker內封裝的實際的工作執行緒物件thread,其在建構函式中由執行緒池的執行緒工廠threadFactory生成,傳入this,所以thread在start後,便會呼叫run方法進而執行runWorker。
執行緒工廠可以由使用者在建立執行緒池時通過引數指定,因此使用者在自由控制所生成的工作執行緒的同時,也需要保證newThread能正確的返回一個可用的執行緒物件。
除此之外,Worker物件還繼承了AbstractQueuedSynchronizer(AQS)類,簡單的實現了一個不可重入的互斥鎖。
對AQS互斥模式不太瞭解的讀者可以參考一下我之前關於AQS互斥模式的部落格:AQS互斥模式與ReentrantLock可重入鎖原理解析
AQS中維護了一個volatile修飾的int型別的成員變數state,其具體的含義可以由使用者自己定義。
在Worker中,state的值有三種狀態:
具體這三種情況分別在什麼時候出現會在下面解析提交任務原始碼的那部分裡詳細介紹。
/**
* jdk的實現中令Worker繼承AbstractQueuedSynchronizer並實現了一個不可重入的鎖
* AQS中的state屬性含義
* -1:標識工作執行緒還未啟動
* 0:標識工作執行緒已經啟動,但沒有開始處理任務(可能是在等待任務,idle狀態)
* 1:標識worker執行緒正在執行任務(runWorker中,成功獲得任務後,通過lock方法將state設定為1)
* */
private final class MyWorker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
public MyWorker(Runnable firstTask) {
this.firstTask = firstTask;
// newThread可能是null
this.thread = getThreadFactory().newThread(this);
}
@Override
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock(){
acquire(1);
}
public boolean tryLock(){
return tryAcquire(1);
}
public void unlock(){
release(1);
}
public boolean isLocked(){
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
// 三個條件同時滿足,才去中斷Worker對應的thread
// getState() >= 0,用於過濾還未執行runWorker的,剛入隊初始化的Worker
// thread != null,用於過濾掉構造方法中ThreadFactory.newThread返回null的Worker
// !t.isInterrupted(),用於過濾掉那些已經被其它方式中斷的Worker執行緒(比如使用者自己去觸發中斷,提前終止執行緒池中的任務)
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
下面介紹本篇部落格的重點,即執行緒池是如何執行使用者所提交的任務的。
使用者提交任務的入口是public的execute方法,Runnable型別的引數command就是提交的要執行的任務。
/**
* 提交任務,並執行
* */
public void execute(Runnable command) {
if (command == null){
throw new NullPointerException("command引數不能為空");
}
int currentCtl = this.ctl.get();
if (workerCountOf(currentCtl) < this.corePoolSize) {
// 如果當前存在的worker執行緒數量低於指定的核心執行緒數量,則建立新的核心執行緒
boolean addCoreWorkerSuccess = addWorker(command,true);
if(addCoreWorkerSuccess){
// addWorker新增成功,直接返回即可
return;
}
}
// 走到這裡有兩種情況
// 1 因為核心執行緒超過限制(workerCountOf(currentCtl) < corePoolSize == false),需要嘗試嘗試將任務放入阻塞佇列
// 2 addWorker返回false,建立核心工作執行緒失敗
if(this.workQueue.offer(command)){
// workQueue.offer入隊成功
if(workerCountOf(currentCtl) == 0){
// 在corePoolSize為0的情況下,當前不存在存活的核心執行緒
// 一個任務在入隊之後,如果當前執行緒池中一個執行緒都沒有,則需要兜底的建立一個非核心執行緒來處理入隊的任務
// 因此firstTask為null,目的是先讓任務先入隊後建立執行緒去拉取任務並執行
addWorker(null,false);
}else{
// 加入佇列成功,且當前存在worker執行緒,成功返回
return;
}
}else{
// 阻塞佇列已滿,嘗試建立一個新的非核心執行緒處理
boolean addNonCoreWorkerSuccess = addWorker(command,false);
if(!addNonCoreWorkerSuccess){
// 建立非核心執行緒失敗,執行拒絕策略(失敗的原因和前面建立核心執行緒addWorker的原因類似)
reject(command);
}else{
// 建立非核心執行緒成功,成功返回
return;
}
}
}
/**
* 根據指定的拒絕處理器,執行拒絕策略
* */
private void reject(Runnable command) {
this.handler.rejectedExecution(command, this);
}
可以看到,execute方法原始碼中對於任務處理的邏輯很清晰,也能與ThreadPoolExecutor執行時工作流程中所介紹的流程所匹配。
在execute方法中當需要建立核心執行緒或普通執行緒時,便需要通過addWorker方法嘗試建立一個新的工作執行緒。
/**
* 向執行緒池中加入worker
* */
private boolean addWorker(Runnable firstTask, boolean core) {
// retry標識外層迴圈
retry:
for (;;) {
int currentCtl = ctl.get();
// 用於cas更新workerCount的內層迴圈(注意這裡面與jdk的寫法不同,改寫成了邏輯一致但更可讀的形式)
for (;;) {
// 判斷當前worker數量是否超過了限制
int workerCount = workerCountOf(currentCtl);
if (workerCount >= CAPACITY) {
// 當前worker數量超過了設計上允許的最大限制
return false;
}
if (core) {
// 建立的是核心執行緒,判斷當前執行緒數是否已經超過了指定的核心執行緒數
if (workerCount >= this.corePoolSize) {
// 超過了核心執行緒數,建立核心worker執行緒失敗
return false;
}
} else {
// 建立的是非核心執行緒,判斷當前執行緒數是否已經超過了指定的最大執行緒數
if (workerCount >= this.maximumPoolSize) {
// 超過了最大執行緒數,建立非核心worker執行緒失敗
return false;
}
}
// cas更新workerCount的值
boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
if (casSuccess) {
// cas成功,跳出外層迴圈
break retry;
}
// compareAndIncrementWorkerCount方法cas爭搶失敗,重新執行內層迴圈
}
}
boolean workerStarted = false;
MyWorker newWorker = null;
try {
// 建立一個新的worker
newWorker = new MyWorker(firstTask);
final Thread myWorkerThread = newWorker.thread;
if (myWorkerThread != null) {
// MyWorker初始化時內部執行緒建立成功
// 加鎖,防止並行更新
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (myWorkerThread.isAlive()) {
// 預檢查執行緒的狀態,剛初始化的worker執行緒必須是未喚醒的狀態
throw new IllegalThreadStateException();
}
// 加入worker集合
this.workers.add(newWorker);
int workerSize = workers.size();
if (workerSize > largestPoolSize) {
// 如果當前worker個數超過了之前記錄的最大存活執行緒數,將其更新
largestPoolSize = workerSize;
}
// 建立成功
} finally {
// 無論是否發生異常,都先將主控鎖解鎖
mainLock.unlock();
}
// 加入成功,啟動worker執行緒
myWorkerThread.start();
// 標識為worker執行緒啟動成功,並作為返回值返回
workerStarted = true;
}
}finally {
if (!workerStarted) {
addWorkerFailed(newWorker);
}
}
return workerStarted;
}
addWorker可以分為兩部分:判斷當前是否滿足建立新工作執行緒的條件、建立並啟動新的Worker工作執行緒。
入口處開始的retry標識的for迴圈部分,便是用於判斷是否滿足建立新工作執行緒的條件。
需要注意的是:這裡面有兩個for迴圈的原因在於v1版本省略了優雅停止的邏輯(所以實際上v1版本能去掉內層迴圈的)。如果執行緒池處於停止狀態則不能再建立新工作執行緒了,因此也需要判斷執行緒池當前的狀態,
不滿足條件則也需要返回false,不建立工作執行緒。
而且compareAndIncrementWorkerCount中cas更新ctl時,如果並行的執行緒池被停止而導致執行緒池狀態發生了變化,也會導致cas失敗重新檢查。
這也是jdk的實現中為什麼把執行緒池狀態和工作執行緒數量繫結在一起的原因之一,這樣在cas更新時可以原子性的同時檢查兩個欄位的並行爭搶。(更具體的細節會在下一篇部落格的v2版本中介紹)
在通過retry那部分的層層條件檢查後,緊接著便是實際建立新工作執行緒的邏輯。
雖然在前面執行緒池工作流程的分析中提到了核心執行緒與非核心執行緒的概念,但Worker類中實際上並沒有核心/非核心的標識。
經過了工作執行緒啟動前的條件判斷後,新建立的工作執行緒實際上並沒有真正的核心與非核心的差別。
addWorker中工作執行緒可能會啟動失敗,所以要對addWorker中對workers集合以及workerCount等資料的操作進行回滾。
/**
* 當建立worker出現異常失敗時,對之前的操作進行回滾
* 1 如果新建立的worker加入了workers集合,將其移除
* 2 減少記錄存活的worker個數(cas更新)
* 3 檢查執行緒池是否滿足中止的狀態,防止這個存活的worker執行緒阻止執行緒池的中止(v1版本不考慮,省略了tryTerminate)
*/
private void addWorkerFailed(MyWorker myWorker) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (myWorker != null) {
// 如果新建立的worker加入了workers集合,將其移除
workers.remove(myWorker);
}
// 減少存活的worker個數
decrementWorkerCount();
// 嘗試著將當前worker執行緒終止(addWorkerFailed由工作執行緒自己呼叫)
// tryTerminate();
} finally {
mainLock.unlock();
}
}
前面介紹了使用者如何向執行緒池提交任務,以及如何建立新工作執行緒Worker,下面介紹工作執行緒線上程池中是如何執行的。
/**
* worker工作執行緒主迴圈執行邏輯
* */
private void runWorker(MyWorker myWorker) {
// 時worker執行緒的run方法呼叫的,此時的current執行緒的是worker執行緒
Thread workerThread = Thread.currentThread();
Runnable task = myWorker.firstTask;
// 已經暫存了firstTask,將其清空(有地方根據firstTask是否存在來判斷工作執行緒中負責的任務是否是新提交的)
myWorker.firstTask = null;
// 將state由初始化時的-1設定為0
// 標識著此時當前工作執行緒開始工作了,這樣可以被interruptIfStarted選中
myWorker.unlock();
// 預設執行緒是由於中斷退出的
boolean completedAbruptly = true;
try {
// worker執行緒處理主迴圈,核心邏輯
while (task != null || (task = getTask()) != null) {
// 將state由0標識為1,代表著其由idle狀態變成了正在工作的狀態
// 這樣interruptIdleWorkers中的tryLock會失敗,這樣工作狀態的執行緒就不會被該方法中斷任務的正常執行
myWorker.lock();
// v1版本此處省略優雅停止相關的核心邏輯
try {
// 任務執行前的勾點函數
beforeExecute(workerThread, task);
Throwable thrown = null;
try {
// 拿到的任務開始執行
task.run();
} catch (RuntimeException | Error x) {
// 使用thrown收集丟擲的異常,傳遞給afterExecute
thrown = x;
// 同時丟擲錯誤,從而中止主迴圈
throw x;
} catch (Throwable x) {
// 使用thrown收集丟擲的異常,傳遞給afterExecute
thrown = x;
// 同時丟擲錯誤,從而中止主迴圈
throw new Error(x);
} finally {
// 任務執行後的勾點函數,如果任務執行時丟擲了錯誤/異常,thrown不為null
afterExecute(task, thrown);
}
} finally {
// 將task設定為null,令下一次while迴圈通過getTask獲得新任務
task = null;
// 無論執行時是否存在異常,已完成的任務數加1
myWorker.completedTasks++;
// 無論如何將myWorker解鎖,標識為idle狀態
myWorker.unlock();
}
}
// getTask返回了null,說明沒有可執行的任務或者因為idle超時、執行緒數超過設定等原因需要回收當前執行緒。
// 執行緒正常的退出,completedAbruptly為false
completedAbruptly = false;
}finally {
// getTask返回null,執行緒正常的退出,completedAbruptly值為false
// task.run()執行時丟擲了異常/錯誤,直接跳出了主迴圈,此時completedAbruptly為初始化時的預設值true
processWorkerExit(myWorker, completedAbruptly);
// processWorkerExit執行完成後,worker執行緒對應的run方法(run->runWorker)也會執行完畢
// 此時執行緒物件會進入終止態,等待作業系統回收
// 而且processWorkerExit方法內將傳入的Worker從workers集合中移除,jvm中的物件也會因為不再被參照而被GC回收
// 此時,當前工作執行緒所佔用的所有資源都已釋放完畢
}
}
runWorker中是通過getTask獲取任務的,getTask中包含著工作執行緒是如何從工作佇列中獲取任務的關鍵邏輯。
/**
* 嘗試著從阻塞佇列裡獲得待執行的任務
* @return 返回null代表工作佇列為空,沒有需要執行的任務; 或者當前worker執行緒滿足了需要退出的一些條件
* 返回對應的任務
* */
private Runnable getTask() {
boolean timedOut = false;
for(;;) {
int currentCtl = ctl.get();
// 獲得當前工作執行緒個數
int workCount = workerCountOf(currentCtl);
// 有兩種情況需要指定超時時間的方式從阻塞佇列workQueue中獲取任務(即timed為true)
// 1.執行緒池設定引數allowCoreThreadTimeOut為true,即允許核心執行緒在idle一定時間後被銷燬
// 所以allowCoreThreadTimeOut為true時,需要令timed為true,這樣可以讓核心執行緒也在一定時間內獲取不到任務(idle狀態)而被銷燬
// 2.執行緒池設定引數allowCoreThreadTimeOut為false,但當前執行緒池中的執行緒數量workCount大於了指定的核心執行緒數量corePoolSize
// 說明當前有一些非核心的執行緒正在工作,而非核心的執行緒在idle狀態一段時間後需要被銷燬
// 所以此時也令timed為true,讓這些執行緒在keepAliveTime時間內由於佇列為空拉取不到任務而返回null,將其銷燬
boolean timed = allowCoreThreadTimeOut || workCount > corePoolSize;
// 有共四種情況不需要往下執行,代表
// 1 (workCount > maximumPoolSize && workCount > 1)
// 當前工作執行緒個數大於了指定的maximumPoolSize(可能是由於啟動後通過setMaximumPoolSize調小了maximumPoolSize的值)
// 已經不符合執行緒池的設定引數約束了,要將多餘的工作執行緒回收掉
// 且當前workCount > 1說明存在不止一個工作執行緒,意味著即使將當前工作執行緒回收後也還有其它工作執行緒能繼續處理工作佇列裡的任務,直接返回null表示自己需要被回收
// 2 (workCount > maximumPoolSize && workCount <= 1 && workQueue.isEmpty())
// 當前工作執行緒個數大於了指定的maximumPoolSize(maximumPoolSize被設定為0了)
// 已經不符合執行緒池的設定引數約束了,要將多餘的工作執行緒回收掉
// 但此時workCount<=1,說明將自己這個工作執行緒回收掉後就沒有其它工作執行緒能處理工作佇列裡剩餘的任務了
// 所以即使maximumPoolSize設定為0,也需要等待任務被處理完,工作佇列為空之後才能回收當前執行緒,否則還會繼續拉取剩餘任務
// 3 (workCount <= maximumPoolSize && (timed && timedOut) && workCount > 1)
// workCount <= maximumPoolSize符合要求
// 但是timed && timedOut,說明timed判定命中,需要以poll的方式指定超時時間,並且最近一次拉取任務超時了timedOut=true
// 進入新的一次迴圈後timed && timedOut成立,說明當前worker執行緒處於idle狀態等待任務超過了規定的keepAliveTime時間,需要回收當前執行緒
// 且當前workCount > 1說明存在不止一個工作執行緒,意味著即使將當前工作執行緒回收後也還有其它工作執行緒能繼續處理工作佇列裡的任務,直接返回null表示自己需要被回收
// 4 (workCount <= maximumPoolSize && (timed && timedOut) && workQueue.isEmpty())
// workCount <= maximumPoolSize符合要求
// 但是timed && timedOut,說明timed判定命中,需要以poll的方式指定超時時間,並且最近一次拉取任務超時了timedOut=true
// 進入新的一次迴圈後timed && timedOut成立,說明當前worker執行緒處於idle狀態等待任務超過了規定的keepAliveTime時間,需要回收當前執行緒
// 但此時workCount<=1,說明將自己這個工作執行緒回收掉後就沒有其它工作執行緒能處理工作佇列裡剩餘的任務了
// 所以即使timed && timedOut超時邏輯匹配,也需要等待任務被處理完,工作佇列為空之後才能回收當前執行緒,否則還會繼續拉取剩餘任務
if ((workCount > maximumPoolSize || (timed && timedOut))
&& (workCount > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(currentCtl)) {
// 滿足上述條件,說明當前執行緒需要被銷燬了,返回null
return null;
}
// compareAndDecrementWorkerCount方法由於並行的原因cas執行失敗,continue迴圈重試
continue;
}
try {
// 根據上面的邏輯的timed標識,決定以什麼方式從阻塞佇列中獲取任務
Runnable r = timed ?
// timed為true,通過poll方法指定獲取任務的超時時間(如果指定時間內沒有佇列依然為空,則返回)
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// timed為false,通過take方法無限期的等待阻塞佇列中加入新的任務
workQueue.take();
if (r != null) {
// 獲得了新的任務,getWork正常返回對應的任務物件
return r;
}else{
// 否則說明timed=true,且poll拉取任務時超時了
timedOut = true;
}
} catch (InterruptedException retry) {
// poll or take任務等待時worker執行緒被中斷了,捕獲中斷異常
// timeout = false,標識拉取任務時沒有超時
timedOut = false;
}
}
}
在runWorker中,如果getTask方法沒有拿到任務返回了null或者任務在執行時丟擲了異常就會在最終的finally塊中呼叫processWorkerExit方法,令當前工作執行緒銷燬退出。
/**
* 處理worker執行緒退出
* @param myWorker 需要退出的工作執行緒物件
* @param completedAbruptly 是否是因為中斷異常的原因,而需要回收
* */
private void processWorkerExit(MyWorker myWorker, boolean completedAbruptly) {
if (completedAbruptly) {
// 如果completedAbruptly=true,說明是任務在run方法執行時出錯導致的執行緒退出
// 而正常退出時completedAbruptly=false,在getTask中已經將workerCount的值減少了
decrementWorkerCount();
}
ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 執行緒池全域性總完成任務數累加上要退出的工作執行緒已完成的任務數
this.completedTaskCount += myWorker.completedTasks;
// workers集合中將當前工作執行緒剔除
workers.remove(myWorker);
// completedTaskCount是long型別的,workers是HashSet,
// 都是非執行緒安全的,所以在mainLock的保護進行修改
} finally {
mainLock.unlock();
}
int currentCtl = this.ctl.get();
if (!completedAbruptly) {
// completedAbruptly=false,說明不是因為中斷異常而退出的
// min標識當前執行緒池允許的最小執行緒數量
// 1 如果allowCoreThreadTimeOut為true,則核心執行緒也可以被銷燬,min=0
// 2 如果allowCoreThreadTimeOut為false,則min應該為所允許的核心執行緒個數,min=corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) {
// 如果min為0了,但工作佇列不為空,則修正min=1,因為至少需要一個工作執行緒來將工作佇列中的任務消費、處理掉
min = 1;
}
if (workerCountOf(currentCtl) >= min) {
// 如果當前工作執行緒數大於了min,當前執行緒數量是足夠的,直接返回(否則要執行下面的addWorker恢復)
return;
}
}
// 兩種場景會走到這裡進行addWorker操作
// 1 completedAbruptly=true,說明執行緒是因為中斷異常而退出的,需要重新建立一個新的工作執行緒
// 2 completedAbruptly=false,且上面的workerCount<min,則說明當前工作執行緒數不夠,需要建立一個
// 為什麼引數core傳的是false呢?
// 因為completedAbruptly=true而中斷退出的執行緒,無論當前工作執行緒數是否大於核心執行緒,都需要建立一個新的執行緒來代替原有的被退出的執行緒
addWorker(null, false);
}
ThreadPoolExecutor除了支援啟動前通過建構函式設定設定引數外,也允許線上程池執行的過程中動態的更改設定。而要實現動態的修改設定,麻煩程度要比啟動前靜態的指定大得多。
舉個例子,線上程池的執行過程中如果當前corePoolSize=20,且已經建立了20個核心執行緒時(workerCount=20),現在將corePoolSize減少為10或者增大為30時應該如何實時的生效呢?
下面通過內嵌於程式碼中的註釋,詳細的說明了allowCoreThreadTimeOut、corePoolSize、maximumPoolSize這三個關鍵設定引數實現動態修改的原理。
/**
* 設定是否允許核心執行緒idle超時後退出
* */
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0) {
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
}
// 判斷一下新舊值是否相等,避免無意義的volatile變數更新,導致不必要的cpu cache同步
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value) {
// 引數值value為true,說明之前不允許核心執行緒由於idle超時而退出
// 而此時更新為true說明現在允許了,則通過interruptIdleWorkers喚醒所有的idle執行緒
// 令其走一遍runWorker中的邏輯,嘗試著讓idle超時的核心執行緒及時銷燬
interruptIdleWorkers();
}
}
}
/**
* 動態更新核心執行緒最大值corePoolSize
* */
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0) {
throw new IllegalArgumentException();
}
// 計算差異
int delta = corePoolSize - this.corePoolSize;
// 賦值
this.corePoolSize = corePoolSize;
if (workerCountOf(this.ctl.get()) > corePoolSize) {
// 更新完畢後,發現當前工作執行緒數超過了指定的值
// 喚醒所有idle執行緒,讓目前空閒的idle超時的執行緒在workerCount大於maximumPoolSize時及時銷燬
interruptIdleWorkers();
} else if (delta > 0) {
// 差異大於0,代表著新值大於舊值
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
// 我們無法確切的知道有多少新的執行緒是所需要的。
// 啟發式的預先啟動足夠的新工作執行緒用於處理工作佇列中的任務
// 但當執行此操作時工作佇列為空了,則立即停止此操作(佇列為空了說明當前負載較低,再建立更多的工作執行緒是浪費資源)
// 取差異和當前工作佇列中的最小值為k
int k = Math.min(delta, workQueue.size());
// 嘗試著一直增加新的工作執行緒,直到和k相同
// 這樣設計的目的在於控制增加的核心執行緒數量,不要一下子建立過多核心執行緒
// 舉個例子:原來的corePoolSize是10,且工作執行緒數也是10,現在新值設定為了30,新值比舊值大20,理論上應該直接建立20個核心工作執行緒
// 而工作佇列中的任務數只有10,那麼這個時候直接建立20個新工作執行緒是沒必要的,只需要一個一個建立,在建立的過程中新的執行緒會盡量的消費工作佇列中的任務
// 這樣就可以以一種啟發性的方式建立合適的新工作執行緒,一定程度上節約資源。後面再有新的任務提交時,再從runWorker方法中去單獨建立核心執行緒(類似惰性建立)
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty()) {
// 其它工作執行緒在迴圈的過程中也在消費工作執行緒,且使用者也可能不斷地提交任務
// 這是一個動態的過程,但一旦發現當前工作佇列為空則立即結束
break;
}
}
}
}
/**
* 動態更新最大執行緒數maximumPoolSize
* */
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
throw new IllegalArgumentException();
}
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(this.ctl.get()) > maximumPoolSize) {
// 更新完畢後,發現當前工作執行緒數超過了指定的值
// 喚醒所有idle執行緒,讓目前空閒的idle超時的執行緒在workerCount大於maximumPoolSize時及時銷燬
interruptIdleWorkers();
}
}
目前為止,通過v1版本的MyThreadPoolExecutor原始碼,已經將jdk執行緒池ThreadPoolExecutor在RUNNING狀態下提交任務,啟動工作執行緒執行任務相關的核心邏輯講解完畢了(不考慮優雅停止)。
jdk執行緒池支援使用者傳入自定義的拒絕策略處理器,只需要傳入實現了RejectedExecutionHandler介面的物件就行。
而jdk在ThreadPoolExecutor中提供了預設的四種拒絕策略方便使用者使用。
上面介紹的四種jdk預設拒絕策略分別適應不同的業務場景,需要使用者仔細考慮最適合的拒絕策略。同時靈活的、基於介面的設計也開放的支援使用者去自己實現更貼合自己業務的拒絕策略處理器。
/**
* 預設的拒絕策略:AbortPolicy
* */
private static final MyRejectedExecutionHandler defaultHandler = new MyAbortPolicy();
/**
* 丟擲RejectedExecutionException的拒絕策略
* 評價:能讓提交任務的一方感知到異常的策略,比較通用,也是jdk預設的拒絕策略
* */
public static class MyAbortPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
// 直接丟擲異常
throw new RejectedExecutionException("Task " + command.toString() +
" rejected from " + executor.toString());
}
}
/**
* 令呼叫者執行緒自己執行command任務的拒絕策略
* 評價:線上程池壓力過大時,讓提交任務的執行緒自己執行該任務(非同步變同步),
* 能夠有效地降低執行緒池的壓力,也不會丟失任務,但可能導致整體業務吞吐量大幅降低
* */
public static class MyCallerRunsPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 如果當前執行緒池不是shutdown狀態,則令呼叫者執行緒自己執行command任務
command.run();
}else{
// 如果已經是shutdown狀態了,就什麼也不做直接丟棄任務
}
}
}
/**
* 直接丟棄任務的拒絕策略
* 評價:簡單的直接丟棄任務,適用於對任務執行成功率要求不高的場合
* */
public static class MyDiscardPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
// 什麼也不做的,直接返回
// 效果就是command任務被無聲無息的丟棄了,沒有異常
}
}
/**
* 丟棄當前工作佇列中最早入隊的任務,然後將當前任務重新提交
* 評價:適用於後出現的任務能夠完全代替之前任務的場合(追求最終一致性)
* */
public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 如果當前執行緒池不是shutdown狀態,則丟棄當前工作佇列中最早入隊的任務,然後將當前任務重新提交
executor.getQueue().poll();
executor.execute(command);
}else{
// 如果已經是shutdown狀態了,就什麼也不做直接丟棄任務
}
}
}
jdk中除了提供了預設的拒絕策略,還在Executors類中提供了四種基於ThreadPoolExecutor的、比較常用的執行緒池,以簡化使用者對執行緒池的使用。
這四種執行緒池可以通過Executors提供的public方法來分別建立:
newFixedThreadPool方法建立一個工作執行緒數量固定的執行緒池,其建立ThreadPoolExecutor時傳入的核心執行緒數corePoolSize和最大執行緒數maximumPoolSize是相等的。
因此其工作佇列傳入是一個無界的LinkedBlockingQueue,無界的工作佇列意味著永遠都不會建立新的非核心執行緒。
在預設allowCoreThreadTimeOut為false的情況下,執行緒池中的所有執行緒都是不會因為idle超時而銷燬的核心執行緒。
適用場景:由於工作執行緒數量固定,「fixedThreadPool」適用於任務流量較為穩定的場景
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool方法建立一個工作執行緒數量有巨大彈性的執行緒池,其核心執行緒數corePoolSize=0而最大執行緒數maximumPoolSize為Integer.MAX_VALUE,60s的保活時間。
同時其工作佇列是SynchronousQueue,是一種佇列容量為0、無法快取任何任務的阻塞佇列(任何時候插入資料(offer)時必須有消費者執行緒消費,否則生產者執行緒將會被阻塞)。
這也意味著「cachedThreadPool」中沒有核心執行緒,所有工作執行緒在任務負載較低時都會在60s的idle後被銷燬;同時當負載較高,新任務到來時由於所有的工作執行緒都在執行其它任務,將會立即建立一個新的非核心執行緒來處理任務。
適用場景:由於可以無限制的建立新執行緒來做到及時響應任務,「cachedThreadPool」適用於任務流量較大且不穩定,對任務延遲容忍度較低的場景
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor方法建立一個單執行緒的執行緒池,其核心執行緒數corePoolSize=1且最大執行緒數maximumPoolSize也為1,其工作佇列是無界佇列。
這意味著「singleThreadExecutor」中任何提交的任務都將嚴格按照先入先出的順序被執行。
適用場景:「singleThreadExecutor」適用於任務量較小、對任務延遲容忍度較高、並要求任務順序執行的場景。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool方法建立一個支援定時任務、延遲任務執行的執行緒池(關於jdk定時任務執行緒池ScheduledThreadPoolExecutor的工作原理會在未來的部落格中展開)
適用場景:「scheduledThreadPool」適用於需要任務定時或者延遲執行的場景。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}