本篇部落格是jdk執行緒池ThreadPoolExecutor工作原理解析系列部落格的第二篇,在第一篇部落格中從原始碼層面分析了ThreadPoolExecutor在RUNNING狀態下處理任務的核心邏輯,而在這篇部落格中將會詳細講解jdk執行緒池ThreadPoolExecutor優雅停止的實現原理。
ThreadPoolExecutor為了實現優雅停止功能,為執行緒池設定了一個狀態屬性,其共有5種情況。
在第一篇部落格中曾介紹過,AtomicInteger型別的變數ctl同時維護了兩個業務屬性當前活躍工作執行緒個數與執行緒池狀態,其中ctl的高3位用於存放執行緒池狀態。
執行緒池工作狀態是單調推進的,即從執行時->停止中->完全停止。共有以下五種情況
RUNNING狀態,代表著執行緒池處於正常執行(執行時)。RUNNING狀態的執行緒池能正常的接收並處理提交的任務
ThreadPoolExecutor初始化時對ctl賦予的預設屬性便是RUNNING(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)
RUNNING狀態下執行緒池正常工作的原理已經在第一篇部落格中詳細的介紹過了,這裡不再贅述。
SHUTDOWN狀態,代表執行緒池處於停止對外服務的狀態(停止中)。不再接收新提交的任務,但依然會將workQueue工作佇列中積壓的任務逐步處理完。
使用者可以通過呼叫shutdown方法令執行緒池由RUNNING狀態進入SHUTDOWN狀態,shutdown方法會在下文詳細展開分析。
STOP狀態,代表執行緒池處於停止狀態。不再接受新提交的任務(停止中),同時也不再處理workQueue工作佇列中積壓的任務,當前還在處理任務的工作執行緒將收到interrupt中斷通知
使用者可以通過呼叫shutdownNow方法令執行緒池由RUNNING或者SHUTDOWN狀態進入STOP狀態,shutdownNow方法會在下文詳細展開分析。
TIDYING狀態,代表著執行緒池即將完全終止,正在做最後的收尾工作(停止中)。
線上程池中所有的工作執行緒都已經完全退出,且工作佇列中的任務已經被清空時會由SHUTDOWN或STOP狀態進入TIDYING狀態。
TERMINATED狀態,代表著執行緒池完全的關閉(完全停止)。
public class MyThreadPoolExecutorV2 implements MyThreadPoolExecutor {
/**
* 當前執行緒池中存在的worker執行緒數量 + 狀態的一個聚合(通過一個原子int進行cas,來避免對兩個業務屬性欄位加鎖來保證一致性)
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 32位元的有符號整數,有3位是用來存放執行緒池狀態的,所以用來維護當前工作執行緒個數的部分就只能用29位了
* 被佔去的3位中,有1位原來的符號位,2位是原來的數值位
* */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 執行緒池狀態poolStatus常數(狀態值只會由小到大,單調遞增)
* 執行緒池狀態遷移圖:
* ↗ SHUTDOWN ↘
* RUNNING ↓ TIDYING → TERMINATED
* ↘ STOP ↗
* 1 RUNNING狀態,代表著執行緒池處於正常執行的狀態。能正常的接收並處理提交的任務
* 執行緒池物件初始化時,狀態為RUNNING
* 對應邏輯:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
*
* 2 SHUTDOWN狀態,代表執行緒池處於停止對外服務的狀態。不再接收新提交的任務,但依然會將workQueue工作佇列中積壓的任務處理完
* 呼叫了shutdown方法時,狀態由RUNNING -> SHUTDOWN
* 對應邏輯:shutdown方法中的advanceRunState(SHUTDOWN);
*
* 3 STOP狀態,代表執行緒池處於停止狀態。不再接受新提交的任務,同時也不再處理workQueue工作佇列中積壓的任務,當前還在處理任務的工作執行緒將收到interrupt中斷通知
* 之前未呼叫shutdown方法,直接呼叫了shutdownNow方法,狀態由RUNNING -> STOP
* 之前先呼叫了shutdown方法,後呼叫了shutdownNow方法,狀態由SHUTDOWN -> STOP
* 對應邏輯:shutdownNow方法中的advanceRunState(STOP);
*
* 4 TIDYING狀態,代表著執行緒池即將完全終止,正在做最後的收尾工作
* 當前執行緒池狀態為SHUTDOWN,任務被消費完工作佇列workQueue為空,且工作執行緒全部退出完成工作執行緒集合workers為空時,tryTerminate方法中將狀態由SHUTDOWN->TIDYING
* 當前執行緒池狀態為STOP,工作執行緒全部退出完成工作執行緒集合workers為空時,tryTerminate方法中將狀態由STOP->TIDYING
* 對應邏輯:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
*
* 5 TERMINATED狀態,代表著執行緒池完全的關閉。之前執行緒池已經處於TIDYING狀態,且呼叫的勾點函數terminated已返回
* 當前執行緒池狀態為TIDYING,呼叫的勾點函數terminated已返回
* 對應邏輯:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
* */
// 11100000 00000000 00000000 00000000
private static final int RUNNING = -1 << COUNT_BITS;
// 00000000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 00100000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;
// 01000000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;
// 01100000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 推進執行緒池工作狀態
* */
private void advanceRunState(int targetState) {
for(;;){
// 獲得當前的執行緒池狀態
int currentCtl = this.ctl.get();
// 1 (runState >= targetState)如果當前執行緒池狀態不比傳入的targetState小
// 代表當前狀態已經比引數要制定的更加快(或者至少已經處於對應階段了),則無需更新poolStatus的狀態(或語句中第一個條件為false,直接break了)
// 2 (this.ctl.compareAndSet),cas的將runState更新為targetState
// 如果返回true則說明cas更新成功直接break結束(或語句中第一個條件為false,第二個條件為true)
// 如果返回false說明cas爭搶失敗,再次進入while迴圈重試(或語句中第一個和第二個條件都是false,不break而是繼續執行迴圈重試)
if (runStateAtLeast(currentCtl, targetState) ||
this.ctl.compareAndSet(
currentCtl,
ctlOf(targetState, workerCountOf(currentCtl)
))) {
break;
}
}
}
}
執行緒池的優雅停止一般要能做到以下幾點:
下面我們從原始碼層面解析ThreadPoolExecutor,看看其是如何實現上述這三點的.
ThreadPoolExecutor執行緒池提供了shutdown和shutdownNow這兩個public方法給使用者用於發出執行緒池的停止指令。
shutdown方法用於關閉執行緒池,並令執行緒池從RUNNING狀態轉變位SHUTDOWN狀態。位於SHUTDOWN狀態的執行緒池,不再接收新任務,但已提交的任務會全部被執行完。
/**
* 關閉執行緒池(不再接收新任務,但已提交的任務會全部被執行)
* 但不會等待任務徹底的執行完成(awaitTermination)
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// shutdown操作中涉及大量的資源存取和更新,直接通過互斥鎖防並行
mainLock.lock();
try {
// 用於shutdown/shutdownNow時的安全存取許可權
checkShutdownAccess();
// 將執行緒池狀態從RUNNING推進到SHUTDOWN
advanceRunState(SHUTDOWN);
// shutdown不會立即停止所有執行緒,而僅僅先中斷idle狀態的多餘執行緒進行回收,還在執行任務的執行緒就慢慢等其執行完
interruptIdleWorkers();
// 單獨為ScheduledThreadPoolExecutor開的一個勾點函數(hook for ScheduledThreadPoolExecutor)
onShutdown();
} finally {
mainLock.unlock();
}
// 嘗試終止執行緒池
tryTerminate();
}
/**
* 用於shutdown/shutdownNow時的安全存取許可權
* 檢查當前呼叫者是否有許可權去通過interrupt方法去中斷對應工作執行緒
* */
private void checkShutdownAccess() {
// 判斷jvm啟動時是否設定了安全管理器SecurityManager
SecurityManager security = System.getSecurityManager();
// 如果沒有設定,直接返回無事發生
if (security != null) {
// 設定了許可權管理器,驗證當前呼叫者是否有modifyThread的許可權
// 如果沒有,checkPermission會丟擲SecurityException異常
security.checkPermission(shutdownPerm);
// 通過上述校驗,檢查工作執行緒是否能夠被呼叫者存取
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (MyWorker w : workers) {
// 檢查每一個工作執行緒中的thread物件是否有許可權被呼叫者存取
security.checkAccess(w.thread);
}
} finally {
mainLock.unlock();
}
}
}
/**
* 中斷所有處於idle狀態的執行緒
* */
private void interruptIdleWorkers() {
// 預設打斷所有idle狀態的工作執行緒
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
/**
* 中斷處於idle狀態的執行緒
* @param onlyOne 如果為ture,至多隻中斷一個工作執行緒(可能一個都不中斷)
* 如果為false,中斷workers內註冊的所有工作執行緒
* */
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (MyWorker w : workers) {
Thread t = w.thread;
// 1. t.isInterrupted(),說明當前執行緒存在中斷訊號,之前已經被中斷了,無需再次中斷
// 2. w.tryLock(), runWorker方法中如果工作執行緒獲取到任務開始工作,會先進行Lock加鎖
// 則這裡的tryLock會加鎖失敗,返回false。 而返回true的話,就說明當前工作執行緒是一個idle執行緒,需要被中斷
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// tryLock成功時,會將內部state的值設定為1,通過unlock恢復到未加鎖的狀態
w.unlock();
}
}
if (onlyOne) {
// 引數onlyOne為true,至多隻中斷一個工作執行緒
// 即使上面的t.interrupt()沒有執行,也在這裡跳出迴圈
break;
}
}
} finally {
mainLock.unlock();
}
}
/**
* 單獨為jdk的ScheduledThreadPoolExecutor開的一個勾點函數
* 由ScheduledThreadPoolExecutor繼承ThreadExecutor時重寫(包級別存取許可權)
* */
void onShutdown() {}
shutdownNow方法同樣用於關閉執行緒池,但比shutdown方法更加激進。shutdownNow方法令執行緒池從RUNNING狀態轉變為STOP狀態,不再接收新任務,而工作佇列中未完成的任務會以列表的形式返回給shutdownNow的呼叫者。
/**
* 立即關閉執行緒池(不再接收新任務,工作佇列中未完成的任務會以列表的形式返回)
* @return 當前工作佇列中未完成的任務
* */
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// shutdown操作中涉及大量的資源存取和更新,直接通過互斥鎖防並行
mainLock.lock();
try {
// 用於shutdown/shutdownNow時的安全存取許可權
checkShutdownAccess();
// 將執行緒池狀態從RUNNING推進到STOP
advanceRunState(STOP);
interruptWorkers();
// 將工作佇列中未完成的任務提取出來(會清空執行緒池的workQueue)
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試終止執行緒池
tryTerminate();
return tasks;
}
/**
* shutdownNow方法內,立即終止執行緒池時該方法被呼叫
* 中斷通知所有已經啟動的工作執行緒(比如等待在工作佇列上的idle工作執行緒,或者run方法內部await、sleep等,令其丟擲中斷異常快速結束)
* */
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (MyWorker w : workers) {
// 遍歷所有的worker執行緒,已啟動的工作執行緒全部呼叫Thread.interrupt方法,發出中斷訊號
w.interruptIfStarted();
}
} finally {
mainLock.unlock();
}
}
/**
* 將工作佇列中的任務全部轉移出來
* 用於shutdownNow緊急關閉執行緒池時將未完成的任務返回給呼叫者,避免任務丟失
* */
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> queue = this.workQueue;
ArrayList<Runnable> taskList = new ArrayList<>();
queue.drainTo(taskList);
// 通常情況下,普通的阻塞佇列的drainTo方法可以一次性的把所有元素都轉移到taskList中
// 但jdk的DelayedQueue或者一些自定義的阻塞佇列,drainTo方法無法轉移所有的元素
// (比如DelayedQueue的drainTo方法只能轉移已經不需要延遲的元素,即getDelay()<=0)
if (!queue.isEmpty()) {
// 所以在這裡打一個修補程式邏輯:如果drainTo方法執行後工作佇列依然不為空,則通過更基礎的remove方法把佇列中剩餘元素一個一個的迴圈放到taskList中
for (Runnable r : queue.toArray(new Runnable[0])) {
if (queue.remove(r)) {
taskList.add(r);
}
}
}
return taskList;
}
在execute方法作為入口,提交任務的邏輯中,v2版本相比v1版本新增了一些基於執行緒池狀態的校驗(和jdk的實現保持一致了)。
execute提交任務時addWorker方法和shutdown/shutdownNow方法是可能並行執行的,但addWorker中有多處地方都對執行緒池的狀態進行了檢查,盡最大的可能避免執行緒池停止時繼續建立新的工作執行緒。
基於execute方法和addWorker方法中關於各項關於執行緒池停止狀態校驗,最大程度的避免了執行緒池在停止過程中新任務的提交和可能的新工作執行緒的建立。使得execute方法線上程池接收到停止指令後(>=SHUTDOWN),最終都會去執行reject拒絕策略邏輯。
/**
* 提交任務,並執行
* */
@Override
public void execute(Runnable command) {
if (command == null){
throw new NullPointerException("command引數不能為空");
}
int currentCtl = this.ctl.get();
if (workerCountOf(currentCtl) < this.corePoolSize) {
// 如果當前存在的worker執行緒數量低於指定的核心執行緒數量,則建立新的核心執行緒
boolean addCoreWorkerSuccess = addWorker(command,true);
if(addCoreWorkerSuccess){
// addWorker新增成功,直接返回即可
return;
}
// addWorker失敗了
// 失敗的原因主要有以下幾個:
// 1 執行緒池的狀態出現了變化,比如呼叫了shutdown/shutdownNow方法,不再是RUNNING狀態,停止接受新的任務
// 2 多個執行緒並行的execute提交任務,導致cas失敗,重試後發現當前執行緒的個數已經超過了限制
// 3 小概率是ThreadFactory執行緒工廠沒有正確的返回一個Thread
// 獲取最新的ctl狀態
currentCtl = this.ctl.get();
}
// 走到這裡有兩種情況
// 1 因為核心執行緒超過限制(workerCountOf(currentCtl) < corePoolSize == false),需要嘗試嘗試將任務放入阻塞佇列
// 2 addWorker返回false,建立核心工作執行緒失敗
// 判斷當前執行緒池狀態是否為running
// 如果是running狀態,則進一步執行任務入隊操作
if(isRunning(currentCtl) && this.workQueue.offer(command)){
// 執行緒池是running狀態,且workQueue.offer入隊成功
int recheck = this.ctl.get();
// 重新檢查狀態,避免在上面入隊的過程中執行緒池並行的關閉了
// 如果是isRunning=false,則進一步需要通過remove操作將剛才入隊的任務刪除,進行回滾
if (!isRunning(recheck) && remove(command)) {
// 執行緒池關閉了,執行reject操作
reject(command);
} else if(workerCountOf(currentCtl) == 0){
// 在corePoolSize為0的情況下,當前不存在存活的核心執行緒
// 一個任務在入隊之後,如果當前執行緒池中一個執行緒都沒有,則需要兜底的建立一個非核心執行緒來處理入隊的任務
// 因此firstTask為null,目的是先讓任務先入隊後建立執行緒去拉取任務並執行
addWorker(null,false);
}else{
// 加入佇列成功,且當前存在worker執行緒,成功返回
return;
}
}else{
// 阻塞佇列已滿,嘗試建立一個新的非核心執行緒處理
boolean addNonCoreWorkerSuccess = addWorker(command,false);
if(!addNonCoreWorkerSuccess){
// 建立非核心執行緒失敗,執行拒絕策略(失敗的原因和前面建立核心執行緒addWorker的原因類似)
reject(command);
}else{
// 建立非核心執行緒成功,成功返回
return;
}
}
}
/**
* 向執行緒池中加入worker
* */
private boolean addWorker(Runnable firstTask, boolean core) {
// retry標識外層迴圈
retry:
for (;;) {
int currentCtl = ctl.get();
int runState = runStateOf(currentCtl);
// Check if queue empty only if necessary.
// 執行緒池終止時需要返回false,避免新的worker被建立
// 1 先判斷runState >= SHUTDOWN
// 2 runState >= SHUTDOWN時,意味著不再允許建立新的工作執行緒,但有一種情況例外
// 即SHUTDOWN狀態下(runState == SHUTDOWN),工作佇列不為空(!workQueue.isEmpty()),還需要繼續執行
// 比如在當前存活的執行緒發生中斷異常時,會呼叫processWorkerExit方法,在銷燬原有工作執行緒後呼叫addWorker重新建立一個新的(firstTask == null)
if (runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
// 執行緒池已經是關閉狀態了,不再允許建立新的工作執行緒,返回false
return false;
}
// 用於cas更新workerCount的內層迴圈(注意這裡面與jdk的寫法不同,改寫成了邏輯一致但更可讀的形式)
for (;;) {
// 判斷當前worker數量是否超過了限制
int workerCount = workerCountOf(currentCtl);
if (workerCount >= CAPACITY) {
// 當前worker數量超過了設計上允許的最大限制
return false;
}
if (core) {
// 建立的是核心執行緒,判斷當前執行緒數是否已經超過了指定的核心執行緒數
if (workerCount >= this.corePoolSize) {
// 超過了核心執行緒數,建立核心worker執行緒失敗
return false;
}
} else {
// 建立的是非核心執行緒,判斷當前執行緒數是否已經超過了指定的最大執行緒數
if (workerCount >= this.maximumPoolSize) {
// 超過了最大執行緒數,建立非核心worker執行緒失敗
return false;
}
}
// cas更新workerCount的值
boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
if (casSuccess) {
// cas成功,跳出外層迴圈
break retry;
}
// 重新檢查一下當前執行緒池的狀態與之前是否一致
currentCtl = ctl.get(); // Re-read ctl
if (runStateOf(currentCtl) != runState) {
// 從外層迴圈開始continue(因為說明在這期間 執行緒池的工作狀態出現了變化,需要重新判斷)
continue retry;
}
// compareAndIncrementWorkerCount方法cas爭搶失敗,重新執行內層迴圈
}
}
boolean workerStarted = false;
boolean workerAdded = false;
MyWorker newWorker = null;
try {
// 建立一個新的worker
newWorker = new MyWorker(firstTask);
final Thread myWorkerThread = newWorker.thread;
if (myWorkerThread != null) {
// MyWorker初始化時內部執行緒建立成功
// 加鎖,防止並行更新
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int runState = runStateOf(ctl.get());
// 重新檢查執行緒池執行狀態,滿足以下兩個條件的任意一個才建立新Worker
// 1 runState < SHUTDOWN
// 說明執行緒池處於RUNNING狀態正常執行,可以建立新的工作執行緒
// 2 runState == SHUTDOWN && firstTask == null
// 說明執行緒池呼叫了shutdown,但工作佇列不為空,依然需要新的Worker。
// firstTask == null標識著其不是因為外部提交新任務而建立新Worker,而是在消費SHUTDOWN前已提交的任務
if (runState < SHUTDOWN ||
(runState == SHUTDOWN && firstTask == null)) {
if (myWorkerThread.isAlive()) {
// 預檢查執行緒的狀態,剛初始化的worker執行緒必須是未喚醒的狀態
throw new IllegalThreadStateException();
}
// 加入worker集合
this.workers.add(newWorker);
int workerSize = workers.size();
if (workerSize > largestPoolSize) {
// 如果當前worker個數超過了之前記錄的最大存活執行緒數,將其更新
largestPoolSize = workerSize;
}
// 建立成功
workerAdded = true;
}
} finally {
// 無論是否發生異常,都先將主控鎖解鎖
mainLock.unlock();
}
if (workerAdded) {
// 加入成功,啟動worker執行緒
myWorkerThread.start();
// 標識為worker執行緒啟動成功,並作為返回值返回
workerStarted = true;
}
}
}finally {
if (!workerStarted) {
addWorkerFailed(newWorker);
}
}
return workerStarted;
}
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
// 當一個任務從工作佇列中被成功移除,可能此時工作佇列為空。嘗試判斷是否滿足執行緒池中止條件
tryTerminate();
return removed;
}
中止前已提交的任務不會丟失;而中止後執行緒池也不會再接收新的任務(走拒絕策略)。這兩點共同保證了提交的任務不會丟失。
執行緒池在收到中止命令進入SHUTDOWN或者STOP狀態時,會一直等到工作佇列為空且所有工作執行緒都中止退出後才會推進到TIDYING階段。
上面描述的條件是一個複合的條件,其只有在「收到停止指令(進入SHUTDOWN或者STOP狀態)」、"工作佇列中任務被移除或消費(工作佇列為空)"或是「工作執行緒退出(所有工作執行緒都中止退出)」這三類事件發生時才有可能滿足。
而判斷是否滿足條件並推進到TIDYING狀態的關鍵就在tryTerminate方法中。tryTerminate顧名思義便是用於嘗試終止執行緒池的,當上述任意事件觸發時便判斷是否滿足終止條件,如果滿足則將執行緒池推進到TIDYING階段。
因此在ThreadPoolExecutor中tryTerminate一共在6個地方被呼叫,分別是shutdown、shutdownNow、remove、purge、addWorkerFailed和processWorkerExit方法。
/**
* 嘗試判斷是否滿足執行緒池中止條件,如果滿足條件,將其推進到最後的TERMINATED狀態
* 注意:必須在任何可能觸發執行緒池中止的場景下呼叫(例如工作執行緒退出,或者SHUTDOWN狀態下佇列工作佇列為空等)
* */
final void tryTerminate() {
for (;;) {
int currentCtl = this.ctl.get();
if (isRunning(currentCtl)
|| runStateAtLeast(currentCtl, TIDYING)
|| (runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty())) {
// 1 isRunning(currentCtl)為true,說明執行緒池還在執行中,不滿足中止條件
// 2 當前執行緒池狀態已經大於等於TIDYING了,說明之前別的執行緒可能已經執行過tryTerminate,且通過了這個if校驗,不用重複執行了
// 3 當前執行緒池是SHUTDOWN狀態,但工作佇列中還有任務沒處理完,也不滿足中止條件
// 以上三個條件任意一個滿足即直接提前return返回
return;
}
// 有兩種場景會走到這裡
// 1 執行了shutdown方法(runState狀態為SHUTDOWN),且當前工作執行緒已經空了
// 2 執行了shutdownNow方法(runState狀態為STOP)
// 這個時候需要令所有的工作執行緒都主動的退出來回收資源
if (workerCountOf(currentCtl) != 0) {
// 如果當前工作執行緒個數不為0,說明還有別的工作執行緒在工作中。
// 通過interruptIdleWorkers(true),打斷其中的一個idle執行緒,嘗試令其也執行runWorker中的processWorkerExit邏輯,並執行tryTerminate
// 被中斷的那個工作執行緒也會執行同樣的邏輯(getTask方法返回->processWorkerExit->tryTerminate)
// 這樣可以一個接著一個的不斷打斷每一個工作執行緒,令其逐步的退出(比起一次性的通知所有的idle工作執行緒,這樣相對平滑很多)
interruptIdleWorkers(ONLY_ONE);
return;
}
// 執行緒池狀態runState為SHUTDOWN或者STOP,且存活的工作執行緒個數已經為0了
// 雖然前面的interruptIdleWorkers是一個一箇中斷idle執行緒的,但實際上有的工作執行緒是因為別的原因退出的(恰好workerCountOf為0了)
// 所以這裡是可能存在並行的,因此通過mainLock加鎖防止並行,避免重複的terminated方法呼叫和termination.signalAll方法呼叫
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// cas的設定ctl的值為TIDYING+工作執行緒個數0(防止與別的地方ctl並行更新)
if (ctl.compareAndSet(currentCtl, ctlOf(TIDYING, 0))) {
try {
// cas成功,呼叫terminated勾點函數
terminated();
} finally {
// 無論terminated勾點函數是否出現異常
// cas的設定ctl的值為TERMINATED最終態+工作執行緒個數0(防止與別的地方ctl並行更新)
ctl.set(ctlOf(TERMINATED, 0));
// 通知使用awaitTermination方法等待執行緒池關閉的其它執行緒(通過termination.await等待)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// 如果上述對ctl變數的cas操作失敗了,則進行重試,再來一次迴圈
// else retry on failed CAS
}
}
從上面tryTerminate方法的實現中可以看到,執行緒池必須等到所有工作執行緒都全部退出(workerCount為0),工作執行緒佔用的全部資源都回收後才會推進到終止態。
那麼之前啟動的工作執行緒一定能通過processWorkerExit退出並銷燬嗎?答案是不一定,這主要取決於使用者是否正確的編寫了令工作執行緒安全退出的任務邏輯。
因為只有能退出任務執行邏輯(runWorker方法中的task.run())的工作執行緒才有機會執行processWorkerExit,無法從任務中跳出(正常退出or拋異常)的工作執行緒將永遠無法退出,導致執行緒池也永遠無法推進到終態。
下面分情況討論:
()->{
// 會正常結束的
System.out.println("hello world!");
};
()->{
// 無限迴圈
while(true){
System.out.println("hello world!");
}
};
()->{
// 無限迴圈時監聽一個變數
while(!isStop) {
System.out.println("hello world!");
}
};
()->{
try {
new ReentrantLock().newCondition().await();
} catch (InterruptedException e) {
// doSomething處理一些邏輯後。。。
// 向上丟擲異常
throw new XXXException(e);
}
}
()->{
try {
new ReentrantLock().newCondition().await();
} catch (InterruptedException e) {
}
// doSomething處理一些邏輯後。。。正常退出
}
雖然Thread.stop能夠保證執行緒一定會被停止,但由於停止的過程中存在很嚴重的並行安全問題而被廢棄而不推薦使用了。
具體原因可以參考官方檔案(Why is Thread.stop deprecated?):https://docs.oracle.com/javase/8/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html