我們知道如果程式中並行的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束時,會因為頻繁建立執行緒而大大降低系統的效率,因此出現了執行緒池的使用方式,它可以提前建立好執行緒來執行任務。本文主要通過java的ThreadPoolExecutor來檢視執行緒池的內部處理過程。
java.uitl.concurrent.ThreadPoolExecutor類是執行緒池中最核心的一個類,下面我們來看一下ThreadPoolExecutor類的部分實現原始碼。
ThreadPoolExecutor類提供瞭如下4個構造方法
// 設定執行緒池時指定核心執行緒數、最大執行緒數、執行緒存活時間及等待佇列。
// 執行緒建立工廠和拒絕策略使用預設的(AbortPolicy)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 設定執行緒池時指定核心執行緒數、最大執行緒數、執行緒存活時間、等待佇列及執行緒建立工廠
// 拒絕策略使用預設的(AbortPolicy)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
// 設定執行緒池時指定核心執行緒數、最大執行緒數、執行緒存活時間、等待佇列及拒絕策略
// 執行緒建立工廠使用預設的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
// 設定執行緒池時指定核心執行緒數、最大執行緒數、執行緒存活時間、等待佇列、執行緒建立工廠及拒絕策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
通過觀察上述每個構造器的原始碼實現,我們可以發現前面三個構造器都是呼叫的第四個構造器進行的初始化工作。
下面解釋一下構造器中各個引數的含義:
在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法裡面最終呼叫的還是execute()方法。
public void execute(Runnable command) {
// 判斷提交的任務command是否為null,若是null,則丟擲空指標異常;
if (command == null)
throw new NullPointerException();
// 獲取執行緒池中當前執行緒數
int c = ctl.get();
// 如果執行緒池中當前執行緒數小於核心池大小,進入if語句塊
if (workerCountOf(c) < corePoolSize) {
// 如果以給定的命令啟動一個核心執行緒執行任務成功,直接返回
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果當前執行緒池處於RUNNING狀態,則將任務放入任務快取佇列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果執行緒池不處於執行狀態並且移除剛加入的任務成功則執行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果當前執行緒數為0,則線上程池裡增加一個執行緒,保證佇列裡的任務不會沒有執行緒執行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 嘗試啟動核心執行緒之外的執行緒,如果不滿足,則執行對應的拒絕策略
else if (!addWorker(command, false))
reject(command);
}
主要方法addWorker。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果執行緒池狀態大於SHUTDOWN或者執行緒池狀態等於SHUTDOWN,firstTask不等於null
// 或者執行緒池狀態等於SHUTDOWN,任務佇列等於空時,直接返回false結束。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果執行緒數量大於等於最大數量或者大於等於上限
//(入參core傳true,取核心執行緒數,否則取最大執行緒數),直接返回false結束。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false
// CAS操作給工作執行緒數加1,成功則跳到retry處,不再進入迴圈。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果執行緒池狀態與剛進入時不一致,則跳到retry處,再次進入迴圈
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建一個執行緒
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 如果執行緒池狀態在SHUTDOWN之前或者
// 執行緒池狀態等於SHUTDOWN並且firstTask等於null時,進入處理。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果要執行的執行緒正在執行,則拋異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動執行緒
t.start();
workerStarted = true;
}
}
} finally {
// 如果執行緒新增失敗,則將新增的對應資訊刪除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
在上述addWorker中,當呼叫執行緒的start方法啟動執行緒後,會執行其中的run方法。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果任務不為空或者新獲取到的任務不為空
while (task != null || (task = getTask()) != null) {
w.lock();
// 當執行緒池狀態,大於等於 STOP 時,保證工作執行緒都有中斷標誌。
// 當執行緒池狀態,小於STOP時,保證工作執行緒都沒有中斷標誌。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
通過上述原始碼分析,我們可以得出執行緒池處理任務的過程如下:
本文從原始碼層面主要分析了執行緒池的建立、執行過程,通過上述的分析,可以看出當執行緒池中的執行緒數量超過核心執行緒數後,會先將任務放入等待佇列,佇列放滿後當最大執行緒數大於核心執行緒數時,才會建立新的執行緒執行。
作者:京東物流 管碧強
來源:京東雲開發者社群 自猿其說Tech 轉載請註明來源