背景介紹:
你剛從學校畢業後,到新公司實習,試用期又被畢業,然後你又不得不出來面試,好在面試的時候碰到個美女面試官!
面試官: 小夥子,我看你簡歷上寫的專案中用到了執行緒池,你知道執行緒池是怎樣實現複用執行緒的?
這面試官是不是想坑我?是不是擺明了不讓我通過?
難道你不應該問執行緒池有哪些核心引數?每個引數具體作用是什麼?
往執行緒池中不斷提交任務,執行緒池的處理流程是什麼?
這些才是你應該問的,這些八股文我已經背熟了,你不問,瞎問什麼複用執行緒?
幸虧我看了一燈的八股文,聽我給你背一遍!
我: 執行緒池複用執行緒的邏輯很簡單,就是線上程啟動後,通過while死迴圈,不斷從阻塞佇列中拉取任務,從而達到了複用執行緒的目的。
具體原始碼如下:
// 執行緒執行入口
public void run() {
runWorker(this);
}
// 執行緒執行核心方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 1. 使用while死迴圈,不斷從阻塞佇列中拉取任務
while (task != null || (task = getTask()) != null) {
// 加鎖,保證thread不被其他執行緒中斷(除非執行緒池被中斷)
w.lock();
// 2. 校驗執行緒池狀態,是否需要中斷當前執行緒
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 3. 執行run方法
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法邏輯很簡單,就是不斷從阻塞佇列中拉取任務並執行。
面試官: 小夥子,有點東西。我們都知道執行緒池會回收超過空閒時間的執行緒,那麼執行緒池是怎麼統計執行緒的空閒時間的?
美女面試官的問題真刁鑽,讓人頭疼啊!這問的也太深了吧!
沒看過原始碼的話,真不好回答。
我: 嗯...,可能是有個監控執行緒在後臺不停的統計每個執行緒的空閒時間,看到執行緒的空閒時間超過閾值的時候,就回收掉。
面試官: 小夥子,你的想法挺不錯,邏輯很嚴謹,你確定執行緒池內部是這麼實現的嗎?
問得我有點不自信了,沒看過原始碼不能瞎蒙。
我還是去瞅一眼一燈寫的八股文吧。
我: 這個我知道,執行緒池統計執行緒的空閒時間的實現邏輯很簡單。
阻塞佇列(BlockingQueue)提供了一個poll(time, unit)
方法用來拉取資料,
作用就是: 當佇列為空時,會阻塞指定時間,然後返回null。
執行緒池就是就是利用阻塞佇列的這個方法,如果在指定時間內拉取不到任務,就表示該執行緒的存活時間已經超過閾值了,就要被回收了。
具體原始碼如下:
// 從阻塞佇列中拉取任務
private Runnable getTask() {
boolean timedOut = false;
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 如果執行緒池已經停了,或者阻塞佇列是空,就回收當前執行緒
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 2. 再次判斷是否需要回收執行緒
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 3. 在指定時間內,從阻塞佇列中拉取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 4. 如果沒有拉取到任務,就標識該執行緒已超時,然後就被回收
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
面試官: 小夥子,可以啊,你是懂執行緒池原始碼的。再問你個問題,如果執行緒池拋異常了,也沒有try/catch,會發生什麼?
美女面試官你這是準備打破砂鍋問到底,鐵了心不讓我過,是吧?
我的程式碼風格是很嚴謹的,誰寫的業務程式碼不try/catch,也沒遇到過這種情況。
讓我再看一下一燈總結的八股文吧。
我: 有了,執行緒池中的程式碼如果拋異常了,也沒有try/catch,會從執行緒池中刪除這個異常執行緒,並建立一個新執行緒。
不信的話,我們可以測試驗證一下:
/**
* @author 一燈架構
* @apiNote 執行緒池範例
**/
public class ThreadPoolDemo {
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
// 1. 建立一個單個執行緒的執行緒池
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 2. 往執行緒池中提交3個任務
for (int i = 0; i < 3; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 關注公眾號:一燈架構");
throw new RuntimeException("拋異常了!");
});
}
// 3. 關閉執行緒池
executorService.shutdown();
}
}
輸出結果:
pool-1-thread-1 關注公眾號:一燈架構
pool-1-thread-2 關注公眾號:一燈架構
pool-1-thread-3 關注公眾號:一燈架構
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 拋異常了!
at com.yideng.SynchronousQueueDemo.lambda$main$0(ThreadPoolDemo.java:21)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "pool-1-thread-2" java.lang.RuntimeException: 拋異常了!
at com.yideng.SynchronousQueueDemo.lambda$main$0(ThreadPoolDemo.java:21)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "pool-1-thread-3" java.lang.RuntimeException: 拋異常了!
at com.yideng.SynchronousQueueDemo.lambda$main$0(ThreadPoolDemo.java:21)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
從輸出結果中可以看出,執行緒名稱並不是同一個,而是累加的,說明原執行緒已經被回收,新建了個執行緒。
我們再看一下原始碼,驗證一下:
// 執行緒拋異常後,退出邏輯
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 1. 從工作執行緒中刪除當前執行緒
workers.remove(w);
} finally {
mainLock.unlock();
}
// 2. 中斷當前執行緒
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 3. 新建一個執行緒
addWorker(null, false);
}
}
如果想統一處理異常,可以自定義執行緒建立工廠,在工廠裡面設定例外處理邏輯。
/**
* @author 一燈架構
* @apiNote 執行緒池範例
**/
public class ThreadPoolDemo {
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
// 1. 建立一個單個執行緒的執行緒池
ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> {
// 2. 自定義執行緒建立工廠,並設定例外處理邏輯
Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler((t, e) -> {
System.out.println("捕獲到異常:" + e.getMessage());
});
return thread;
});
// 3. 往執行緒池中提交3個任務
for (int i = 0; i < 3; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 關注公眾號:一燈架構");
throw new RuntimeException("拋異常了!");
});
}
// 4. 關閉執行緒池
executorService.shutdown();
}
}
輸出結果:
Thread-0 關注公眾號:一燈架構
捕獲到異常:拋異常了!
Thread-1 關注公眾號:一燈架構
捕獲到異常:拋異常了!
Thread-2 關注公眾號:一燈架構
捕獲到異常:拋異常了!
面試官: 小夥子,論原始碼,還是得看你,還是你背的熟。現在我就給你發offer,薪資直接漲10%,明天9點就來上班吧,咱們公司實行996工作制。
我是「一燈架構」,如果本文對你有幫助,歡迎各位小夥伴點贊、評論和關注,感謝各位老鐵,我們下期見