在前面的文章自己動手寫乞丐版執行緒池中,我們寫了一個非常簡單的執行緒池實現,這個只是一個非常簡單的實現,在本篇文章當中我們將要實現一個和JDK內部實現的執行緒池非常相似的執行緒池。
我們首先看一個JDK給我們提供的執行緒池ThreadPoolExecutor
的建構函式的引數:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
引數解釋:
如果上面的引數你不能夠理解,可以先閱讀這篇文章自己動手寫乞丐版執行緒池。基於上面談到的引數,執行緒池當中提交任務的流程大致如下圖所示:
根據前面的引數分析我們自己實現的執行緒池需要實現一下功能:
private AtomicInteger ct = new AtomicInteger(0); // 當前在執行任務的執行緒個數
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private TimeUnit unit;
private BlockingQueue<Runnable> taskQueue;
private RejectPolicy policy;
private ArrayList<Worker> workers = new ArrayList<>();
private volatile boolean isStopped;
private boolean useTimed;
引數解釋如下:
ct:表示當前執行緒池當中執行緒的個數。
corePoolSize:執行緒池當中核心執行緒的個數,意義和上面談到的JDK的執行緒池意義一致。
maximumPoolSize:執行緒池當中最大的執行緒個數,意義和上面談到的JDK的執行緒池意義一致。
keepAliveTime 和 unit:和JDK執行緒池的引數意義一致。
taskQueue:任務佇列,用不儲存提交的任務。
policy:拒絕策略,主要有一下四種策略:
public enum RejectPolicy {
ABORT,
CALLER_RUN,
DISCARD_OLDEST,
DISCARD
}
// 下面這個方法是向執行緒池提交任務
public void execute(Runnable runnable) throws InterruptedException {
checkPoolState();
if (addWorker(runnable, false) // 如果能夠加入新的執行緒執行任務 加入成功就直接返回
|| !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 說明提交任務失敗 任務佇列已經滿了
|| addWorker(runnable, true)) // 使用能夠使用的最大的執行緒數 (maximumPoolSize) 看是否能夠產生新的執行緒
return;
// 如果任務佇列滿了而且不能夠加入新的執行緒 則拒絕這個任務
if (!taskQueue.offer(runnable))
reject(runnable);
}
在上面的程式碼當中:
private void checkPoolState() {
if (isStopped) {
// 如果執行緒池已經停下來了,就不在向任務佇列當中提交任務了
throw new RuntimeException("thread pool has been stopped, so quit submitting task");
}
}
/**
*
* @param runnable 需要被執行的任務
* @param max 是否使用 maximumPoolSize
* @return boolean
*/
public synchronized boolean addWorker(Runnable runnable, boolean max) {
if (ct.get() >= corePoolSize && !max)
return false;
if (ct.get() >= maximumPoolSize && max)
return false;
Worker worker = new Worker(runnable);
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
thread.start();
return true;
}
這個函數其實比較簡單,只需要將傳入的Callable物件封裝成一個FutureTask物件即可,因為FutureTask實現了Callable和Runnable兩個介面,然後將這個結果返回即可,得到這個物件,再呼叫物件的 get 方法就能夠得到結果。
public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
checkPoolState();
FutureTask<V> futureTask = new FutureTask<>(task);
execute(futureTask);
return futureTask;
}
根據前面提到的各種策略的具體實現方式,具體的程式碼實現如下所示:
private void reject(Runnable runnable) throws InterruptedException {
switch (policy) {
case ABORT:
throw new RuntimeException("task queue is full");
case CALLER_RUN:
runnable.run();
case DISCARD: // 直接放棄這個任務
return;
case DISCARD_OLDEST:
// 放棄等待時間最長的任務 也就是佇列當中的第一個任務
taskQueue.poll();
execute(runnable); // 重新執行這個任務
}
}
一共兩種方式實現執行緒池關閉:
// 強制關閉執行緒池
public synchronized void stop() {
isStopped = true;
for (Worker worker : workers) {
worker.stopWorker();
}
}
public synchronized void shutDown() {
// 先表示關閉執行緒池 執行緒就不能再向執行緒池提交任務
isStopped = true;
// 先等待所有的任務執行完成再關閉執行緒池
waitForAllTasks();
stop();
}
private void waitForAllTasks() {
// 當執行緒池當中還有任務的時候 就不退出迴圈
while (taskQueue.size() > 0) {
Thread.yield();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void run() {
// 先執行傳遞過來的第一個任務 這裡是一個小的優化 讓執行緒直接執行第一個任務 不需要
// 放入任務佇列再取出來執行了
firstTask.run();
thisThread = Thread.currentThread();
while (!isStopped) {
try {
// 是否使用時間就在這裡顯示出來了
Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
if (task == null) {
int i;
boolean exit = true;
// 如果當前執行緒數大於核心執行緒數 則使用 CAS 去退出 用於保證線上程安全下的退出
// 且保證執行緒的個數不小於 corePoolSize 下面這段程式碼需要仔細分析一下
if (ct.get() > corePoolSize) {
do{
i = ct.get();
if (i <= corePoolSize) {
exit = false;
break;
}
}while (!ct.compareAndSet(i, i - 1));
if (exit) {
return;
}
}
}else {
task.run();
}
} catch (InterruptedException e) {
// do nothing
}
}
}
我們現在來仔細分析一下,執行緒退出執行緒池的時候是如何保證執行緒池當中總的執行緒數是不小於 corePoolSize 的!首先整體的框架是使用 CAS 進行實現,具體程式碼為 do ... while 操作,然後在 while 操作裡面使用 CAS 進行測試替換,如果沒有成功再次獲取 ,當執行緒池當中核心執行緒的數目小於等於 corePoolSize 的時候也需要退出迴圈,因為執行緒池當中執行緒的個數不能小於 corePoolSize 。因此使用 break 跳出迴圈的執行緒是不會退出執行緒池的。
在我們自己實現的執行緒池當中當執行緒退出的時候,workers 當中還儲存這指向這個執行緒的物件,但是當執行緒退出的時候我們還沒有在 workers 當中刪除這個物件,因此這個執行緒物件不會被垃圾回收器收集掉,但是我們這個只是一個執行緒池實現的例子而已,並不用於生產環境,只是為了幫助大家理解執行緒池的原理。
package cscore.concurrent.java.threadpoolv2;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPool {
private AtomicInteger ct = new AtomicInteger(0); // 當前在執行任務的執行緒個數
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private TimeUnit unit;
private BlockingQueue<Runnable> taskQueue;
private RejectPolicy policy;
private ArrayList<Worker> workers = new ArrayList<>();
private volatile boolean isStopped;
private boolean useTimed;
public int getCt() {
return ct.get();
}
public ThreadPool(int corePoolSize, int maximumPoolSize, TimeUnit unit, long keepAliveTime, RejectPolicy policy
, int maxTasks) {
// please add -ea to vm options to make assert keyword enable
assert corePoolSize > 0;
assert maximumPoolSize > 0;
assert keepAliveTime >= 0;
assert maxTasks > 0;
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.unit = unit;
this.policy = policy;
this.keepAliveTime = keepAliveTime;
taskQueue = new ArrayBlockingQueue<Runnable>(maxTasks);
useTimed = keepAliveTime != 0;
}
/**
*
* @param runnable 需要被執行的任務
* @param max 是否使用 maximumPoolSize
* @return boolean
*/
public synchronized boolean addWorker(Runnable runnable, boolean max) {
if (ct.get() >= corePoolSize && !max)
return false;
if (ct.get() >= maximumPoolSize && max)
return false;
Worker worker = new Worker(runnable);
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
thread.start();
return true;
}
// 下面這個方法是向執行緒池提交任務
public void execute(Runnable runnable) throws InterruptedException {
checkPoolState();
if (addWorker(runnable, false) // 如果能夠加入新的執行緒執行任務 加入成功就直接返回
|| !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 說明提交任務失敗 任務佇列已經滿了
|| addWorker(runnable, true)) // 使用能夠使用的最大的執行緒數 (maximumPoolSize) 看是否能夠產生新的執行緒
return;
// 如果任務佇列滿了而且不能夠加入新的執行緒 則拒絕這個任務
if (!taskQueue.offer(runnable))
reject(runnable);
}
private void reject(Runnable runnable) throws InterruptedException {
switch (policy) {
case ABORT:
throw new RuntimeException("task queue is full");
case CALLER_RUN:
runnable.run();
case DISCARD:
return;
case DISCARD_OLDEST:
// 放棄等待時間最長的任務
taskQueue.poll();
execute(runnable);
}
}
private void checkPoolState() {
if (isStopped) {
// 如果執行緒池已經停下來了,就不在向任務佇列當中提交任務了
throw new RuntimeException("thread pool has been stopped, so quit submitting task");
}
}
public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
checkPoolState();
FutureTask<V> futureTask = new FutureTask<>(task);
execute(futureTask);
return futureTask;
}
// 強制關閉執行緒池
public synchronized void stop() {
isStopped = true;
for (Worker worker : workers) {
worker.stopWorker();
}
}
public synchronized void shutDown() {
// 先表示關閉執行緒池 執行緒就不能再向執行緒池提交任務
isStopped = true;
// 先等待所有的任務執行完成再關閉執行緒池
waitForAllTasks();
stop();
}
private void waitForAllTasks() {
// 當執行緒池當中還有任務的時候 就不退出迴圈
while (taskQueue.size() > 0) {
Thread.yield();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Worker implements Runnable {
private Thread thisThread;
private final Runnable firstTask;
private volatile boolean isStopped;
public Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
@Override
public void run() {
// 先執行傳遞過來的第一個任務 這裡是一個小的優化 讓執行緒直接執行第一個任務 不需要
// 放入任務佇列再取出來執行了
firstTask.run();
thisThread = Thread.currentThread();
while (!isStopped) {
try {
Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
if (task == null) {
int i;
boolean exit = true;
if (ct.get() > corePoolSize) {
do{
i = ct.get();
if (i <= corePoolSize) {
exit = false;
break;
}
}while (!ct.compareAndSet(i, i - 1));
if (exit) {
return;
}
}
}else {
task.run();
}
} catch (InterruptedException e) {
// do nothing
}
}
}
public synchronized void stopWorker() {
if (isStopped) {
throw new RuntimeException("thread has been interrupted");
}
isStopped = true;
thisThread.interrupt();
}
}
}
package cscore.concurrent.java.threadpoolv2;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException {
var pool = new ThreadPool(2, 5, TimeUnit.SECONDS, 10, RejectPolicy.ABORT, 100000);
for (int i = 0; i < 10; i++) {
RunnableFuture<Integer> submit = pool.submit(() -> {
System.out.println(Thread.currentThread().getName() + " output a");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0;
});
System.out.println(submit.get());
}
int n = 15;
while (n-- > 0) {
System.out.println("Number Threads = " + pool.getCt());
Thread.sleep(1000);
}
pool.shutDown();
}
}
上面測試程式碼的輸出結果如下所示:
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
Number Threads = 5
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 3
Number Threads = 2
Number Threads = 2
Number Threads = 2
Number Threads = 2
從上面的程式碼可以看出我們實現了正確的任務實現結果,同時執行緒池當中的核心執行緒數從 2 變到了 5 ,當執行緒池當中任務佇列全部別執行完成之後,執行緒的數目重新降下來了,這確實是我們想要達到的結果。
在本篇文章當中主要給大家介紹瞭如何實現一個類似於JDK中的執行緒池,裡面有非常多的實現細節,大家可以仔細捋一下其中的流程,對執行緒池的理解將會非常有幫助。
以上就是本篇文章的所有內容了,我是LeHung,我們下期再見!!!更多精彩內容合集可存取專案:https://github.com/Chang-LeHung/CSCore
關注公眾號:一無是處的研究僧,瞭解更多計算機(Java、Python、計算機系統基礎、演演算法與資料結構)知識。