位邏輯運運算元:
&:
位與運運算元,只有兩個運算元都是true,結果才是true。
|:
位或運運算元,只有兩個運算元都是false,結果才是false。
~:
位非運運算元:如果位為0,結果是1,如果位為1,結果是0.
^:
位互斥或運算:兩個數轉為二進位制,然後從高位開始比較,如果相同則為0,不相同則為1。
位移運算:
無符號左移
無符號右移
:帶符號右移(沒有帶符號左移這種操作)
二進位制:
二進位制都是以二補數的形式表示的
正數的原碼,反碼,二補數都一樣;
要得到負數的二補數,必須先求負數的反碼,負數的反碼;負數的反碼按位元1改成0,0改成1;負數的二補數等於反碼+1
分享一個,有很多幹貨,包含netty,spring,執行緒,spring cloud等詳細講解,也有詳細的學習規劃圖,面試題整理等,我感覺在面試這塊講的非常清楚:獲取面試資料只需:點選這裡領取!!! 暗號:CSDN
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
BlockingQueue b = new ArrayBlockingQueue(100);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 500000, TimeUnit.SECONDS,
b, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
});
threadPoolExecutor.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(111111);
});
threadPoolExecutor.execute(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(2222222);
});
}
}
public class ThreadPoolExecutor extends AbstractExecutorService {
//用於儲存執行緒執行狀態和當前執行緒池執行緒執行的數量
//高3位用於代表執行緒的執行狀態,低29位代表執行緒池的執行緒最大數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//32-3=29
private static final int COUNT_BITS = Integer.SIZE - 3;
//00011111111111111111111111111111,高3位為0,低29位為1,代表執行緒池的執行緒最大數量
//參與運算用於得到執行緒的執行狀態和執行緒池執行緒的數量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//執行緒池執行的狀態,後面單獨分析
//執行緒池處於執行狀態,11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//執行緒池處於shutdown狀態,00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//執行緒池處於結束狀態,00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//執行緒池執行任務為空的時候的狀態,01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//執行緒處於徹底終止的狀態,01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
//獲取執行緒池執行狀態,c代表ctl值
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取執行緒池工作執行緒數量,c代表ctl值
private static int workerCountOf(int c) { return c & CAPACITY; }
//獲取ctl值,rs代表執行緒的執行狀態,wc代表執行緒池工作執行緒數量,形成一個32位元二進位制數
//高三位代表執行緒池執行狀態,低29位代表執行緒池工作執行緒數量
private static int ctlOf(int rs, int wc) { return rs | wc; }
//傳入ctl值和執行緒某個執行狀態,比較ctl值是否小於傳入的執行緒的某個執行狀態
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//傳入ctl值和執行緒執行狀態,比較ctl值是否大於傳入的執行緒的某個執行狀態
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
//判斷執行緒執行狀態是否是執行狀態,因為RUNNING=-1是最小的狀態值
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
//通過CAS操作將工作執行緒數+1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
//通過CAS操作將工作執行緒數-1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
//do-while迴圈可以強制讓工作執行緒數-1
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
//執行緒池的工作佇列,在構造方法中初始化
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
//儲存worker的池子
private final HashSet<Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition();
//記錄執行緒池生命週期中,執行緒池執行的執行緒的最大數量
private int largestPoolSize;
//執行緒池完成任務數量
private long completedTaskCount;
//建立執行緒工廠
private volatile ThreadFactory threadFactory;
//執行緒中斷策列
private volatile RejectedExecutionHandler handler;
//在指定時間單位下,執行緒存活時間
private volatile long keepAliveTime;
//核心執行緒數
private volatile int corePoolSize;
//最大執行緒數
private volatile int maximumPoolSize;
//執行緒池滿了後的中斷策列
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
private final AccessControlContext acc;
//當從工作佇列中取不到任務時的時候,是否需要回收核心執行緒
private volatile boolean allowCoreThreadTimeOut;
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//會檢查引數,不符合條件丟擲異常
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
//工作佇列,執行緒工廠,拒絕策列不能為空
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
當我們執行execute方法提交一個執行緒的時候,首先會判斷當前執行緒池執行緒數是否超過核心執行緒數corePoolSize,若是沒有超過,則建立新執行緒,若是超過,則嘗試將Runnable提交到工作佇列workQueue中。
如果工作佇列workQueue沒有超過容量,則Runnable提交到工作佇列中,如果超過了workQueue的容量,則嘗試建立執行緒。如果此時建立的執行緒小於最大執行緒數maximumPoolSize,則建立執行緒,如果超過了maximumPoolSize,則執行拒絕策列。
public void execute(Runnable command) {
//如果runnable為空,丟擲異常
if (command == null)
throw new NullPointerException();
//獲取ctl值,該值高3位代表執行緒池執行狀態,低29位代表執行緒池當前執行執行緒數量
int c = ctl.get();
//CASE1:獲取執行緒池執行執行緒數量,如果小於核心執行緒數,則建立執行緒,addWorker傳入引數為core
//也就是說,執行緒池不是一上來就把核心執行緒建立了,必須在提交runnable任務到執行緒池的時候才一個一個建立
if (workerCountOf(c) < corePoolSize) {
//addWorker是建立執行緒的核心方法,關鍵點在Worker類的構造方法和runWorker方法的while迴圈
if (addWorker(command, true))
return;
c = ctl.get();
}
//CASE2:條件1不成立說明核心執行緒數已滿,將任務新增到阻塞佇列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//CASE3:條件1和2都不成立,說明核心執行緒已建立完全並且任務佇列已滿
//呼叫addWorker建立非核心執行緒,如果返回false,說明執行緒數達到最大執行緒數,執行拒絕策列
else if (!addWorker(command, false))
reject(command);
}
每個執行緒的建立都離不開Worker類,該類完全包裝了執行緒執行的所需要的屬性,並提供了初始化執行緒和從阻塞佇列中獲取被阻塞的執行緒並執行的一系列方法。
觀察該類程式碼發現,該類繼承了AbstractQueuedSynchronizer並實現了Runnable介面,在建立執行緒的時候,實際Thread類的構造方法包裝的就是Worker類自己(我們知道一般Runnable需要被傳入到Thread裡面的,如:Thread t = new Thread(runnable), t.start()啟動執行緒)。
而從execute方法傳過來的Runnable實現只是被封裝到了firstTask中,建立出來的Thread在執行的時候,呼叫的start方法也只是啟動了該類的runWorker方法,而真正封裝我們執行邏輯的firstTask這個Runnable類在後續呼叫中也只是執行自己的run方法而已,並不再被Thread封裝。
worker為什麼要繼承AbstractQueuedSynchronizer呢?
因為在runWork的方法內,在呼叫firstTask處理業務邏輯前,會給程式碼加上獨佔鎖,加這個獨佔鎖的目的是什麼呢?因為我們在呼叫shutDown方法的時候,並不會終止處於執行中的執行緒。shutDown方法會根據獨佔鎖來判斷當前worker是否正在工作。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//需要建立的執行緒,該執行緒封裝的Runnable是自己
final Thread thread;
//execute傳入的Runnable,該runnable並不被Thread包裝,後續呼叫自己的run方法。
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
//設定執行緒池處於執行狀態
setState(-1);
//封裝execute傳入進來的包含實際邏輯的Runnable
this.firstTask = firstTask;
//建立一個執行緒,這裡注意,執行緒封裝的Runnable是自己
//範例使用Executors.defaultThreadFactory()
this.thread = getThreadFactory().newThread(this);
}
//被thread屬性封裝後呼叫start方法後,會自動啟動該run方法,執行後續邏輯
//後續邏輯會呼叫firstTask.run()方法啟動實際業務邏輯
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
addWorker是建立執行緒的核心方法,一個worker代表一個執行緒,而workers這個全域性變數可以代表執行緒池,只有向workers裡面新增worker成功的時候,才能代表建立執行緒成功了。
addWorker在執行過程中,會根據執行緒池狀態和執行緒池數量判斷是否能建立執行緒,建立執行緒成功會將記錄執行緒池狀態和數量的ctl值+1,並將worker加入到workers裡面,更新執行緒池生命週期內執行緒池執行緒的最大數量,然後啟動執行緒執行任務。
addWorker的core引數代表是否是在建立核心執行緒,core為true代表建立核心執行緒,false代表阻塞佇列已滿,建立非核心執行緒。
返回值: true代表建立執行緒並啟動成功,false代表建立執行緒失敗。
什麼時候返回false呢? CASE1.執行緒池狀態rs>SHUTDOWN;
CASE2.執行緒池狀態為SHUTDOWN的時候,阻塞佇列沒有任務了,為空;
CASE3.執行緒池狀態為SHUTDOWN的時候,execute提交的Runnable(被封裝到firstTask裡面)不為空;
CASE4.如果是建立核心執行緒,此時已經超過核心執行緒數;如果是建立非核心執行緒,此時已經超過最大執行緒數;
CASE5.ThreadFactory建立執行緒為空,這裡一般是我們自定義執行緒工廠的時候出的問題;
private boolean addWorker(Runnable firstTask, boolean core) {
//retry程式碼除了檢查是否能建立執行緒外,還負責將ctl值+1,如果不能建立執行緒,則返回false;
retry:
for (;;) {
//獲取當前ctl值
int c = ctl.get();
//獲取當前執行緒池執行狀態
int rs = runStateOf(c);
//CASE1.執行緒池狀態rs>SHUTDOWN;返回false;
//CASE2.執行緒池狀態為SHUTDOWN的時候,阻塞佇列沒有任務了,為空;
//CASE3.執行緒池狀態為SHUTDOWN的時候,execute提交的Runnable(被封裝到firstTask裡面)不為空;
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//迴圈判斷是否能將ctl+1設定成功
for (;;) {
//獲取當前執行中的執行緒數量
int wc = workerCountOf(c);
//條件1:wc >= CAPACITY基本不可能,CAPACITY為理論上的最大執行緒數,一個5億級的數位
//CASE4.根據core引數,如果是建立核心執行緒,此時已經超過核心執行緒數,則返回false
//如果是建立非核心執行緒,此時已經超過最大執行緒數,則返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//條件成立,說明給ctl修改+1成功了,代表給執行緒數+1設定成功了,可以退出迴圈建立執行緒了
//如果在執行這段程式碼的時候,執行緒池狀態正巧被改了,這裡也會失敗,因為ctl的高3位代表的是執行緒狀態
if (compareAndIncrementWorkerCount(c))
break retry;
//如果上面設定執行緒數+1失敗,則實時獲取執行緒狀態並和當前的比較
c = ctl.get();
//狀態被改變了跳到retry再次判斷是否允許建立執行緒
if (runStateOf(c) != rs)
continue retry;
}
}
//程式碼走到這裡代表已經允許建立執行緒了
//表示建立的worker是否已經啟動,啟動也代表執行緒建立成功了
boolean workerStarted = false;
//新增worker到worker佇列是否成功的狀態
boolean workerAdded = false;
//區域性變數
Worker w = null;
try {
//構建work物件
w = new Worker(firstTask);
//獲取worker的構造方法建立的執行緒
final Thread t = w.thread;
//這裡加了這段判斷是為了防止自己實現的TreadFactory有bug導致建立執行緒失敗
if (t != null) {
//向works這個hashset裡面新增works的時候,需要全域性加鎖,以下程式碼執行緒並不安全
//該段程式碼我的理解就是為了維護largestPoolSize這個值,記錄執行緒池生命週期中,
//執行緒池執行的執行緒的最大數量
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//獲取當前執行緒池狀態
int rs = runStateOf(ctl.get());
//檢查執行緒池狀態必須是RUNNING或者處於SHUTDOWN的時候,並沒有提交具體的任務
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//防止人為定義執行緒工廠建立執行緒並啟動了start方法的情況
if (t.isAlive())
throw new IllegalThreadStateException();
//向執行緒池新增worker
workers.add(w);
//獲取執行緒池執行緒數量
int s = workers.size();
//如果執行緒池執行緒數量>記錄的值,更新執行緒池生命週期內最大執行緒記錄數
if (s > largestPoolSize)
largestPoolSize = s;
//表示向執行緒池中新增執行緒成功了
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動執行緒,該方法實際會啟動worker類的run方法,然後執行runWorker方法
t.start();
//設定執行緒啟動狀態為成功
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//後面再分析
addWorkerFailed(w);
}
return workerStarted;
}
addWorker方法建立執行緒成功,Worker類的Thread會呼叫start方法啟動自己的run方法,因為Worker類實現了Runnable介面,run方法裡面呼叫了runWorker方法。實際我們execute方法傳入的Runnable被封裝到了Worker類的firstTask屬性裡面。然後在runWorker裡面呼叫run方法啟動具體的邏輯,注意這裡並沒用再用Thrad封裝Runnable了。執行緒啟動後,會一直執行While迴圈,迴圈第一次執行自己傳入的Runnable,第二次及之後則通過getTask方法從任務佇列種獲取具體的Runnable任務了。一旦While迴圈內發生異常或者getTask返回空,則會呼叫processWorkerExit執行執行緒銷燬邏輯。getTask方法獲取不到具體任務的執行緒都可被認為是空閒執行緒。
final void runWorker(Worker w) {
//wt=w.thread
Thread wt = Thread.currentThread();
//execute實際傳入的包含業務邏輯的Runnable,該Runnable不再被Thread包裝,呼叫自己的run方法
Runnable task = w.firstTask;
//參照設定為null,幫助gc
w.firstTask = null;
//先呼叫unlock方法設定當前獨佔執行緒為空,執行緒執行狀態為0
w.unlock();
//執行緒退出狀態,true代表執行緒因為異常退出了
boolean completedAbruptly = true;
try {
//執行緒被建立並啟動後就一直執行while迴圈,直到發生異常或者退出
//條件1:task != null,執行緒初創task不為空
//條件2:條件1不成立說明執行緒非初創並且核心執行緒數已滿,說明已經建立好執行緒,從佇列中取task任務
while (task != null || (task = getTask()) != null) {
//設定獨佔鎖,因為shutDown方法呼叫的時候不會立刻終止執行中的執行緒,
//會根據是否持有獨佔鎖來判斷當前worker是否處於執行狀態
w.lock();
//執行緒池處於STOP/TIDYING/TERMINATION且當前執行緒沒有設定中斷狀態
//給當前執行緒設一箇中斷狀態
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//勾點方法,留給子類實現,在執行實際業務程式碼之前
beforeExecute(wt, task);
Throwable thrown = null;
try {
//呼叫實際業務方法的邏輯
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//勾點方法,留給子類實現,在執行實際業務程式碼之後
afterExecute(task, thrown);
}
} finally {
//將task設定為空
task = null;
//每個worker完成任務數量+1
w.completedTasks++;
//釋放掉鎖,正常情況下會回到while迴圈繼續執行getTask,發生異常會執行下面的finally
//getTask為空也會退出while迴圈
w.unlock();
}
}
//如果getTask()返回空的時候,執行退出邏輯
completedAbruptly = false;
} finally {
//執行緒退出邏輯
//completedAbruptly=true因為異常退出
//completedAbruptly=false正常退出
processWorkerExit(w, completedAbruptly);
}
}
當getTask返回空的時候,執行緒可以執行銷燬邏輯了。
getTask什麼時候返回空?
1.執行緒池處於SHUTDOWN狀態,工作佇列為空的時候;
2.執行緒池處於STOP狀態以上的時候,將執行緒池執行緒數-1並返回空;
3.當工作佇列為空的時候;
注意,執行緒池的allowCoreThreadTimeOut屬性會影響getTask方法,導致getTask方法一直阻塞在workQueue.take()這裡的,這樣就不會銷燬執行緒。
1.allowCoreThreadTimeOut=true,使用非阻塞方法從佇列獲取任務
2.allowCoreThreadTimeOut=false,執行緒池執行緒數還未達到核心執行緒數上限,使用阻塞方法獲取任務,這樣就可以使得核心執行緒不會被銷燬,getTask方法一直阻塞等待獲取佇列種的任務。
3.allowCoreThreadTimeOut=false,執行緒池執行緒數達到核心執行緒數,使用非阻塞方法獲取任務
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//成立1:執行緒池處於SHUTDOWN狀態且工作佇列為空的時候
//成立2:執行緒池處於STOP狀態以上的時候,將執行緒池執行緒數-1並返回空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取執行緒池執行緒數
int wc = workerCountOf(c);
//1.allowCoreThreadTimeOut=true,使用非阻塞方法從佇列獲取任務
//2.allowCoreThreadTimeOut=false,執行緒池執行緒數還未達到核心執行緒數,使用阻塞方法獲取任務,
//3.allowCoreThreadTimeOut=false,執行緒池執行緒數達到核心執行緒數,使用非阻塞方法獲取任務
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//條件1-1:wc > maximumPoolSize,可能最大執行緒數設定的比核心執行緒數小,此時沒有空閒執行緒可以接手
//條件1-2:timed && timedOut的timedOut在poll(keepAliveTime, TimeUnit.NANOSECONDS)
//方法獲取超時的時候,迴圈第二次執行的時候才可能導致條件為true
//條件2:執行緒池還有其他執行緒,工作佇列為空返回true
//以上1和2條件成立了,任務返回為空。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//嘗試將執行緒數-1,並不一定會成功,可能被其他執行緒改過,失敗則繼續迴圈嘗試-1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//當timed=true的時候,使用poll獲取超時候導致r=null的時候,timedOut=true,
//再次執行迴圈
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();//阻塞方式從佇列獲取,防止執行緒執行緒執行銷燬邏輯
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
該方法用於執行執行緒銷燬的邏輯。
1.加鎖,從workers佇列中移除一個worker,將執行緒池完成任務總數量+1, 釋放鎖;
2.判斷是否需要關閉執行緒池;
3.如果執行緒正常完成並退出,根據allowCoreThreadTimeOut判斷是否需要回收核心執行緒,
若allowCoreThreadTimeOut=true,只需要保證執行緒池最少有一個執行緒即可,也就是說超過1的空閒執行緒一定會被銷燬。
若allowCoreThreadTimeOut=false,線上程數未達到核心執行緒數上限的情況下,由於getTask方法的阻塞,不會執行執行緒銷燬的邏輯;當執行緒數達到核心執行緒數上限的情況,且佇列也達到上限數,這之後建立的任何執行緒在getTask方法獲取不到具體任務的情況下都會銷燬
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//因為異常退出,執行緒執行數-1
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//取出每個worker執行的任務數量,並彙總到全域性的任務執行數量中
completedTaskCount += w.completedTasks;
//將worker從池中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//嘗試關閉執行緒池並處理空閒執行緒
tryTerminate();
//獲取執行緒ctl值
int c = ctl.get();
//如果執行緒池當前狀態小於STOP狀態,說明執行緒池處於RUNNING,SHUTDOWN狀態
if (runStateLessThan(c, STOP)) {
//如果執行緒池是正常退出返回false走下面的流程
if (!completedAbruptly) {
//allowCoreThreadTimeOut表示是否允許核心執行緒銷燬
//min表示執行緒池允許的最小執行緒數,最少為1
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//條件1:min==0說明允許核心執行緒銷燬
//條件2:工作佇列不為空
if (min == 0 && ! workQueue.isEmpty())
//設定執行緒池最小執行緒數
min = 1;
//如果當前執行緒池執行緒數大於min的值,返回,這裡不管min是核心執行緒數還是1
//也就是說,超過核心執行緒的執行緒數在getTask方法從佇列取不到的時候一定會回收
//而核心執行緒是否回收會根據allowCoreThreadTimeOut屬性來判斷
if (workerCountOf(c) >= min)
return;
}
//上面從workers池中刪除了一個worker,這裡新增進去一個空任務的worker
//核心執行緒數=0的情況會執行到這裡,會維持核心執行緒數最少為1
addWorker(null, false);
}
}
嘗試關閉執行緒池方法並處理空閒執行緒,interruptIdleWorkers方法處理空閒執行緒,設定中斷狀態。每個執行緒退出都會單獨呼叫該方法。
final void tryTerminate() {
//自旋
for (;;) {
//獲取執行緒ctl值
int c = ctl.get();
//條件1:執行緒池處於RUNNING狀態說明執行緒池當前正常,直接返回
//條件2:runStateAtLeast(c, TIDYING)說明已經有執行緒使得執行緒池由TIDYING -> TERMINATED狀態
//轉換了,當前執行緒直接返回
//條件3:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())
//說明執行緒池雖然處於SHUTDOWN狀態,但工作佇列不為空,得等佇列處理完再嘗試關閉執行緒池的邏輯。
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//能走到這裡兩種情況
//1.執行緒池狀態>=STOP了
//2.執行緒池狀態於SHUTDOWN狀態,但佇列為空了
if (workerCountOf(c) != 0) {
//回收空閒的執行緒,因為執行runworer方法的時候worker會加鎖,所以沒加鎖的都是空閒的
interruptIdleWorkers(ONLY_ONE);
return;
}
//workerCountOf(c) == 0 時,會來到這裡,說明執行緒都已經銷燬了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//設定執行緒池狀態為TIDYING狀態。
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//勾點方法,等使用者實現
terminated();
} finally {
//勾點方法設定執行緒池狀態為TERMINATED狀態
ctl.set(ctlOf(TERMINATED, 0));
//喚醒呼叫 awaitTermination() 方法的執行緒。
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
處理一個空閒執行緒方法。所有處於執行中的執行緒都會加鎖(w.lock())。上面我們提過,核心執行緒被take方法阻塞的時候,我們這裡設定執行緒t.interrupt(), 會解除take的阻塞。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//執行緒沒有中斷且嘗試加鎖成功,因為所有處於執行中的執行緒都會加鎖(w.lock())
//未加鎖的說明處於空閒中了。
if (!t.isInterrupted() && w.tryLock()) {
try {
//設定執行緒中斷
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
該方法是判斷執行緒池狀態狀態是否是TERMINATED,如果是則直接返回true,否則會await掛起當前執行緒指定的時間
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
## 6.9.shutDown和shutDownNow方法分析
shutDown方法會優雅的關閉執行緒池,設定執行緒池狀態為SHUTDOWN,已經處於佇列中的任務會繼續等待執行完。
shutDownNow方法會立即關閉執行緒池,設定執行緒池狀態為STOP。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//獲取執行緒池全域性鎖
mainLock.lock();
try {
checkShutdownAccess();
//設定執行緒池狀態為SHUTDOWN
advanceRunState(SHUTDOWN);
//中斷空閒執行緒
interruptIdleWorkers();
//空方法,子類可以擴充套件
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
//釋放執行緒池全域性鎖
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
//返回值參照
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
//獲取執行緒池全域性鎖
mainLock.lock();
try {
checkShutdownAccess();
//設定執行緒池狀態為STOP
advanceRunState(STOP);
//中斷執行緒池中所有執行緒
interruptWorkers();
//匯出未處理的task
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
//返回當前任務佇列中 未處理的任務。
return tasks;
}
#7.ThreadPoolExecutor拒絕策列
預設有以下4中拒絕策列,使用者也可以實現RejectedExecutionHandler介面自定義。
CallerRunsPolicy將任務交給呼叫者執行
AbortPolicy丟擲異常
DiscardPolicy什麼都不做,直接丟棄
DiscardOldestPolicy丟棄老的,執行新的
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
//交給主執行緒執行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
//中斷拒絕
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
//直接拋棄什麼都不做
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
//丟棄老的 ,執行新的
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
如果我們想讓執行緒按核心執行緒,最大執行緒,最後再進佇列的方式初始化,應該怎麼做?
public void execute(Runnable command) {
//如果runnable為空,丟擲異常
if (command == null)
throw new NullPointerException();
//獲取ctl值,該值高3位代表執行緒池執行狀態,低29位代表執行緒池當前執行執行緒數量
int c = ctl.get();
//CASE1:獲取執行緒池執行執行緒數量,如果小於核心執行緒數,則建立執行緒,addWorker傳入引數為core
//也就是說,執行緒池不是一上來就把核心執行緒建立了,必須在提交runnable任務到執行緒池的時候才一個一個建立
if (workerCountOf(c) < corePoolSize) {
//addWorker是建立執行緒的核心方法,關鍵點在Worker類的構造方法和runWorker方法的while迴圈
if (addWorker(command, true))
return;
c = ctl.get();
}
//CASE2:條件1不成立說明核心執行緒數已滿,將任務新增到阻塞佇列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//CASE3:條件1和2都不成立,說明核心執行緒已建立完全並且任務佇列已滿
//呼叫addWorker建立非核心執行緒,如果返回false,說明執行緒數達到最大執行緒數,執行拒絕策列
else if (!addWorker(command, false))
reject(command);
}
我們在說execute方法初始化執行緒池過程中。CASE2:workQueue.offer(command)會將任務加入到佇列。所以,我們這裡只需要自定義BlockingQueue,改造offer方法,在裡面判斷,當執行緒池執行緒數還未達到最大執行緒數的時候返回false即可。
Dubbo的EagerThreadPool自定義了一個BlockingQueue,在offer()方法中,如果當前執行緒池數量小於最大執行緒池時,直接返回false,這裡就達到了調節執行緒池執行順序的目的。
分享一個,有很多幹貨,包含netty,spring,執行緒,spring cloud等詳細講解,也有詳細的學習規劃圖,面試題整理等,我感覺在面試這塊講的非常清楚:獲取面試資料只需:點選這裡領取!!! 暗號:CSDN