原始碼基於JDK8
文章1.5w字,非常硬核
話說,王多魚給好友胖子錢讓其投資,希望虧得血本無歸。胖子開了一個外賣店賣國宴,主打高階,外賣小哥都是自己僱傭,並且開邁巴赫送外賣。最開始胖子覺得這麼貴的外賣,就僱傭100個外賣員(核心執行緒
)夠了,併購買了100臺邁巴赫。但是隨後王多魚讓他搞活動——顧客說說自己的虧錢經歷就可以免費吃外賣。隨即店鋪大火,100個外賣員都送不過來(核心執行緒打滿了
),胖子就把外賣放在桌子上,按照先後順序擺放好,等外賣員送完一單再送桌子上的外賣(阻塞佇列
)。但是慢慢的桌子上都放不下了(阻塞佇列滿了
),胖子只好臨時僱員外賣員(非核心執行緒
)送外賣。僱傭到20個臨時外賣員,還是不夠送外賣,最終決定每天限量,系統提示今日活動太火爆,讓使用者無法下單(拒絕策略
)。最終活動結束了,臨時僱傭的外賣員也都解僱了(回收非核心執行緒
)
我們把外賣員看作執行緒,多魚外賣店就是執行緒池,使用者點的外賣就是任務。執行緒池就是使用池化技術,維護執行緒,並使用這些執行緒執行任務的執行器(Executor)
。結合例子我們看下執行緒池的執行流程圖
降低資源消耗:
(如果多魚外賣店,每次都來一個外賣臨時僱傭一個,臨時釋出招聘的開銷大,隨後解聘的開銷也大)
通過池化技術重複利用已建立的執行緒,降低執行緒建立和銷燬造成的損耗。java執行緒和作業系統是一對一的對映關係,新建或者銷燬一個執行緒都存在資源的消耗
提高響應速度:
(當多魚外賣店沒有做活動的時候,來一個任務,100個核心外賣員可以立馬送出外賣)
任務到達時,一定情況下
無需等待執行緒建立即可立即執行。
提高執行緒的可管理性:
(多魚外賣店為什麼要100個人配100個車,就是要減少外賣員交替使用車輛送外賣帶來的上車下車開銷,並且店主可以控制外賣員的數量,並且弄出績效制度doge)
執行緒是稀缺資源,如果無限制建立,不僅會消耗系統資源,還會因為執行緒的不合理分佈導致資源排程失衡,降低系統的穩定性。使用執行緒池可以進行統一的分配、調優和監控。使用過多的執行緒會導致執行緒上下文切換更多,從而導致在儲存「現場」和恢復「現場」的開銷激增。
提供更多更強大的功能:
(多魚外賣店可以要求外賣員在送外賣到顧客家的時候,祝顧客新年快樂)
執行緒池具備可拓展性,允許開發人員向其中增加更多的功能。比如延時定時執行緒池ScheduledThreadPoolExecutor,就允許任務延期執行或定期執行
多魚外賣店,是一個員工一個車輛,假如外賣店就10臺車輛,那麼還需要僱傭100人麼?
顯然是不需要的,但是如果外賣員每天工作12小時,期間休息和吃飯佔用4小時,在休息和吃飯的時間內是不會使用到車輛的。那麼這時候我們應該僱傭 10(車輛數) + 10*4(空閒時間)/(12-4)(有效工作時間) = 15人
,這樣我們讓這些快遞員輪班,A吃飯的時候,B送貨。
但是還需要考慮到,難道外賣車就不送去保養麼,外賣車不也得加油。
10個車沒必要僱傭100人
好比CPU只有10核心,在計算密集型任務中(把外賣員吃飯看作IO操作,計算密集就如同外賣員絲毫不停歇努力配送中),那麼僱傭10人左右即可
10(車輛數) + 10*4(空閒時間)/(12-4)(有效工作時間) = 15人
這就是 執行緒數 = CPU 核心數 +CPU 核心數 x(IO耗時/CPU計算耗時)
。看看這個公式,難道IO密集型的任務就設定核心執行緒數為CPU核心x2
麼?不見得,如果IO耗時和CPU耗時不是1比1,IO耗時比例更高,那麼應該設定的更高一點
難道外賣車就不送去保養麼,外賣車不也得加油
這個可以理解為,CPU還得處理系統的其他計算,並非100%專注於當前這個執行緒池,所以核心執行緒數的設定需要考慮到 CPU利用率
最終核心執行緒數的設定,是一個需要壓測,需要實際資料去偵錯的,勉強只能給出
執行緒數 = (CPU 核心數 +CPU 核心數 x (IO耗時/CPU計算耗時))x cpu利用率
的理論公式
阻塞佇列就是故事中的"桌子",它基於AQS Condition
實現等待喚醒模式,線上程池中主要利用阻塞佇列佇列為空,獲取任務的執行緒將阻塞,成功提交任務到阻塞佇列將喚醒被阻塞的執行緒
的特性,之所以阻塞就是避免執行緒無意義的自旋浪費CPU。
阻塞佇列在juc包下具備很多實現,下面我們介紹幾種常用的阻塞佇列
ArrayBlockingQueue
基於陣列的有界佇列LinkedBlockingQeque
基於連結串列的有界佇列(預設容量是int型別最大)PriorityBlockingQueue
優先阻塞佇列,基於陣列實現的堆,並且具有阻塞佇列的特性DelayQueue
基於優先佇列實現的無界阻塞佇列,元素只有在其延遲到期時才能被取出SynchronousQueue
不儲存元素的阻塞佇列,每個插入操作都必須等待另一個執行緒的相應刪除操作如果我們選擇無界阻塞佇列LinkedBlockingQeque
,意味著最大執行緒數基本上沒用了,因為任務會一直塞到佇列直到達到int型別最大,這時候往往意味著OOM
如果選擇有界阻塞佇列,並且指定的容量太小,那麼意味著執行緒池在任務很多的時候,阻塞佇列將立馬塞滿,開始建立非核心執行緒,甚至直到觸發拒絕策略。
如果指定的容量太大,意味著很多工堆積,任務得不到及時執行。
另外還有SynchronousQueue
,它可以簡單看作容量為0的阻塞佇列。
PriorityBlockingQueue
和DelayQueue
都是基於堆實現,可以快速獲得堆頂元素,我們使用PriorityBlockingQueue
需要傳入比較器,或者任務本身就是Comparable
。
可以看出阻塞佇列的選擇,需要考慮到任務對及時性的要求,也要考慮到,峰值的時候任務有多。
最大執行緒數,是核心執行緒數打滿,阻塞佇列塞滿,然後會去建立 最大執行緒數 - 核心執行緒數
個非核心執行緒執行任務,但是非核心執行緒在存活時間內,如果拿不到任務,將被回收(如同多魚外賣活動結束,非核心外賣員沒有外賣送,自然被解僱)
最大執行緒數如果設定太小,那麼可能不能勝任大量任務,最後任務將被拒接策略處理。如果太大,並不意味著效率一定提高,因為執行緒的排程依賴於cpu排程。此引數的設定需要考慮到系統的效能(cpu不行設定太大也沒有意義),任務是否接受被拒絕策略處理,以及任務峰值等。
每個系統都有它的效能瓶頸,當任務是在太多,核心執行緒打滿,阻塞佇列塞滿,最大執行緒打滿,這時候繼續提交任務將觸發拒絕策略。JUC中提供了以下策略
CallerRunsPolicy
由提交任務的執行緒執行任務,如果執行緒池關閉了,那麼一聲不吭的拋棄任務
這個拒絕策略很有意思,從某種程度上說,它有點阻塞的意思,當需要提高任務的執行緒執行任務的時候,意味著提高任務執行緒的方法將不能立即返回,從而避擴音高任務繼續提交其他任務。
AbortPolicy
直接丟擲RejectedExecutionException
,執行緒池預設的拒絕策略
DiscardPolicy
悄無聲息的忽略任務 什麼都不做忽略任務
DiscardOldestPolicy
如果執行緒池沒有被關閉那麼丟棄佇列頭部的任務,然後提交此任務。
這個拒絕策略,會丟棄佇列頭部任務,然後再次呼叫執行緒池提交任務的方法,有點遞迴的意思,需要注意:丟棄佇列頭部任務,並再次提交任務並不是一個原子操作,這種拒絕策略會遞迴的呼叫提交任務的方法直到任務入隊
在自己系統中,觸發拒絕策略往往需要我們做好記錄,甚至提醒開發人員調優執行緒池。具體使用什麼拒絕策略需要看業務需求。
當任務有限或者提交不頻繁時,最終執行緒池中的執行緒將無任務執行。為了減少系統資源消耗,在存活時間內如果一直接收不到任務的話,執行緒將被回收。通常存活時間的設定只對非核心執行緒有效,但是如果呼叫allowCoreThreadTimeOut(true)
那麼核心執行緒也將被回收
那麼核心執行緒是否應該被回收呢?如果業務上這個執行緒池被呼叫的十分不頻繁,或許回收核心執行緒也是不錯的選擇,但是如果經常間歇性有任務需要執行且要求效率儘可能高,這時候如果核心執行緒被回收了,執行緒池又將new新的執行緒,會降低執行緒池的執行效率。
那麼存活時間如何設定呢?還是得依賴於業務,看業務需要執行緒池的時間間隔,取一個粗略估計值。
執行緒池建立執行緒最終使用呼叫ThreadFactory
進行,通常需要我們指定下執行緒的名稱,推薦使用ThreadFactoryBuilder
方便對執行緒的命名進行定義
@Async
spring的@Async
註解標註在spring bean的方法上,將被AsyncAnnotationBeanPostProcessor
代理原始物件,活的非同步增強的效果,其核心還是向執行緒池中提交任務。
建議使用此註解的時候,指定自己的執行緒池(註解中可以指定使用執行緒池bean的名稱)這樣可以讓不同型別的業務使用不同的執行緒池,如果IO密集和CPU密集使用一個執行緒池,且發生等待佇列中IO任務排在CPU密集任務前面,就如同墨跡的人在你前面排隊,會對效率有所影響
且AsyncAnnotationBeanPostProcessor
是一個BeanPostProcessor
並不是一個SmartInstantiationAwareBeanPostProcessor
,如果發生迴圈依賴需要注意代理物件的方法可能不具備非同步能力(而且呼叫的時候必須使用代理物件去呼叫,this.
,或者直接呼叫無非同步能力)
@EventListener
spring的@EventListener
標註的方法,會被EventListenerMethodProcessor
(BeanFactoryPostProcessor實現類
),在所有單例bean範例化後,將所有bean中標有@EventListener
註解的方法和bean包裝成ApplicationListener
,註冊到ApplicationContext
中(一般最終註冊到SimpleApplicationEventMulticaster(事件多播器)
中),如果我們為SimpleApplicationEventMulticaster
設定了一個執行緒池,它將非同步的回撥ApplicationListener
(反射呼叫bean對應的方法)
注意這裡的坑,如果非同步意味著事務可能會失效,spring還有一個@TransactionalEventListener
註解,可以指定在事務提交前等等階段去響應事件,其中@TransactionalEventListener
的fallbackExecution
可以指定,是否事務同步管理器中沒有事務(事務同步管理器基於ThreadLocal,非同步使用其他執行緒將導致事務失效,這時候事務管理器就是沒有事務的狀態)也繼續響應事件。
CompletableFuture
這是並行大師doug lea編寫的進行非同步or同步任務編排的一個工具類,如果不指定執行緒池那麼將使用公共執行緒池(執行緒數預設為CPU 核心數量減1)。如果使用者都不使用自定義的執行緒池,很容易造成大量任務堆積,反而降低執行效率。通常建議不同業務型別使用不同的執行緒池,並設定合適的執行緒池引數
@schedule
註解依賴於ScheduledAnnotationBeanPostProcessor
,它是一個BeanPostProcessor,在每一個單例bean 範例化的時候,會掃描是否存在此註解,如果存在那麼解析並記錄。在所有單例bean 範例化後,會將bean和其方法,以及解析的註解資訊包裝稱一個任務,提交到執行緒池中。
常用的有以下幾種
newFixedThreadPool
固定數目工作執行緒,無界任務阻塞佇列(可以容納int最大個任務)的執行緒池——容易oom,如果請求量大容易操作阻塞佇列積壓過多工造成oomnewSingleThreadExecutor
單執行緒,無界任務阻塞佇列的執行緒池newCachedThreadPool
,支援工作執行緒數達到Integer.MAX_VALUE
,且空閒時間達到60秒那麼就會被回收,使用的是SynchronousQueue
不會容納任何任務,每一個任務提交之後都必須有另外一個執行緒獲取任務——執行緒多並不意味著效率高,上下文的切換,執行緒的new 和消耗都是消耗大量資源的,支援Integer.MAX_VALUE
個執行緒顯然也是不符合實際的基本上程式設計規範都要求我們自己定義執行緒池引數,Executors
中的執行緒池多少都有點問題,建議開發人員使用ThreadPoolExecutor
構造方法結合業務實際設定引數後使用。
public interface Executor {
void execute(Runnable command);
}
Executor
的作用的是把任務和任務將如何執行進行解耦
(直接使用Thread我們需要自己把業務邏輯些在runnable中傳入,然後start,任務邏輯和任務的執行耦合緊密),其中只有一個方法execute
,但是其實現類,可能是同步的直接呼叫Runnable#run
,也可能是非同步開啟執行緒執行。
ExecutorService
實現了Executor
介面,提供管理自身生命週期的方法,其submit
方法生成 Future
來跟蹤一個或多個非同步任務的進度的方法,還提供了批次提交任務的方法。
方法 | 描述 |
---|---|
void shutdown() | 關閉執行器,如果還有任務沒有執行完,那麼任務還會執行,但是不會接受新的任務,如果已經處於關閉狀態還去呼叫此方法,不會有任何效果 |
List<Runnable> shutdownNow() |
嘗試停止所有正在執行的任務,返回等待執行但未執行的任務,停止執行任務通常是通過呼叫對應執行緒的interrupt 方法,如果執行緒自己不響應中斷,那麼無濟於事,任務還是會繼續執行 |
boolean isShutdown() | 如果已經被關閉那麼返回true,通常呼叫shutdown 和 shutdownNow 後可能存線上程在執行任務,但是還是返回true |
boolean isTerminated() | 如果所有任務在關閉後都已完成,則返回 true。請注意,除非呼叫了 shutdown 或 shutdownNow,且所以任務都結束了,否則 isTerminated 永遠不會為真 |
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException | 呼叫執行緒進入阻塞等待直到關閉當前ExcutorServuce,或者發生超時,或者當前執行緒被中斷。 |
<T> Future<T> submit(Callable<T> task) |
提供一個具備返回值的任務,返回一個Future 表示是此任務的非同步執行結果。 |
<T> Future<T> submit(Runnable task, T result) |
和submit(Callable) 類似,但是其非同步返回結果在執行完後返回結果是入參result |
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException |
批次提交一批任務,阻塞直到所有任務都完成or者任務執行失敗或者當前執行緒被中斷 |
批次提交一批任務,阻塞直到所有任務都完成or任務執行失敗或者當前執行緒被中斷,or指定的時間超時 | |
提交一批任務,等待其中一個執行完成,或者直到當前執行緒被中斷,返回時會取消沒有執行完的任務 | |
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException |
提交一批任務,等待其中一個執行完成,or到當前執行緒被中斷,or等待時間超時,返回時會取消沒有執行完的任務 |
AbstractExecutorService
提供了Runnable
,Callable
適配成RunnableFuture
(一般適配成實現類FutureTask
),還實現了ExecutorService
的submit
,invokeAny
,以及invokeAll
。是對ExecutorService
的抽象實現,有點模板方法的意思。
執行緒池使用一個AtomicInteger
型別的屬性,同時記錄執行緒池狀態和當前執行緒池中執行緒的數量。高3位標識執行緒池的狀態 低29位標識執行緒池工作執行緒個數
屬性名 | 型別 | 解釋 |
---|---|---|
workQueue | BlockingQueue<Runnable> |
儲存待處理任務的阻塞佇列 |
mainLock | ReentrantLock | 鎖,執行緒池用一個set儲存所有執行緒,一個int儲存最大的執行緒數,修改的時候使用這個鎖保證執行緒安全 |
workers | HashSet<Worker> |
包含池中所有工作執行緒的集合。僅在持有 mainLock 時存取 |
termination | Condition | 呼叫awaitTermination的執行緒在此等待佇列上等待。執行緒終止的時候也會使用此喚醒等待的執行緒 |
largestPoolSize | int | 程池中存在的最大的工作執行緒數。僅在持有 mainLock 時存取。 |
completedTaskCount | long | 完成任務的計數器。僅在工作執行緒終止時更新。 |
threadFactory | ThreadFactory | 所有執行緒都是使用這個工廠建立的 |
handler | RejectedExecutionHandler | 拒絕策略,佇列也無法容納任務,且達到最大執行緒數的時候呼叫此策略方法 |
keepAliveTime | long | 工作執行緒多久(納秒)沒有執行任務將被回收,(一般針對非核心執行緒,也可以用於核心執行緒的回收) |
allowCoreThreadTimeOut | boolean | 如果為 false(預設),核心執行緒即使在空閒時也保持活動狀態。如果為true,核心執行緒超過keepAliveTime納秒沒有工作將被回收。 |
corePoolSize | int | 核心執行緒數,如果池中執行緒數小於核心執行緒數,那麼接受新任務總是new一個執行緒 |
maximumPoolSize | int | 當核心執行緒數達到,阻塞佇列塞滿,將新增maximumPoolSize-corePoolSize 個執行緒處理任務 |
執行緒池中存在一些位運算,本文不會分析這些位運算
excute方法接受一個Runnale,submit方法也是基於excute實現的,這是執行緒池原始碼中的核心。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//ctl `高3位標識執行緒池的狀態 低29位標識執行緒池工作執行緒個數`
int c = ctl.get();
//如果當前工作執行緒總數小於核心執行緒
if (workerCountOf(c) < corePoolSize) {
//那麼會嘗試新增一個核心執行緒執行當前任務
//addWorker第一個引數是任務,第二個引數是是否核心執行緒,返回是否新增成功
//如果新增任務成功那麼直接返回
if (addWorker(command, true))
return;
//新增失敗那麼重新獲取執行緒總數和執行緒池狀態
c = ctl.get();
}
//如果是執行狀態 且加入到了任務佇列
if (isRunning(c) && workQueue.offer(command)) {
//如果新增成功 重新獲取程總數和執行緒池狀態
int recheck = ctl.get();
//如果發現不是執行狀態嘗試刪除任務
if (!isRunning(recheck) && remove(command))
//如果成功從佇列刪除了任務,那麼呼叫拒絕策略
reject(command);
//如果執行緒池中的執行緒為0那麼新增一個非核心執行緒,保證佇列中的任務會被執行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果佇列滿了,或者說不是running 那麼新增一個非核心執行緒
//如果新增非核心失敗 那麼呼叫拒絕策略
else if (!addWorker(command, false))
reject(command);
}
整個程式碼看下來並沒有很複雜,其中addWorker方法便是新增執行緒執行任務,成功返回true 失敗返回false。excute方法最大的難點就是 執行緒安全問題
(存在並行呼叫excute方法的可能)我們來一起品一品doug lea是如何解決的。
當執行緒數小於核心執行緒數(if (workerCountOf(c) < corePoolSize)
)
會嘗試呼叫addWorker(command, true)
新增一個核心執行緒執行任務,乍一看這裡存線上程安全問題,因為if (workerCountOf(c) < corePoolSize)
和addWorker(command, true)
不是一個原子操作,可能A執行緒正在addWorker
,B執行緒搶先一步addWorker
成功達到了核心執行緒數,如果A繼續成功那麼核心執行緒數將被突破。doug lea的解決辦法在addWorker
方法中
接著看,如果addWorker(command, true)
失敗,會再次呼叫 c = ctl.get()
,因為此時要麼核心執行緒數被突破,要麼執行緒池狀態發生變更,需要重新整理下區域性變數c
執行緒數達到核心執行緒數 if (isRunning(c) && workQueue.offer(command))
當執行緒數達到核心執行緒數,會首先看執行緒是否是執行狀態,然後workQueue.offer(command))
將任務放入阻塞佇列中。這裡對應了shutdown stop
等狀態下,執行緒池是不接受新任務的。但是需要注意 if (isRunning(c) && workQueue.offer(command))
不是一個原子操作,可能放入到阻塞佇列的過程中,執行緒狀態被更改了,doug lea解決辦法就是,如果放入到阻塞佇列後,可以從佇列中刪除任務,說明任務沒有被拿去執行,那麼拒絕任務。
如果刪除任務失敗了,並且執行緒池中的工作執行緒為0個,那麼會新增一個執行緒去執行任務,保證這個放入到佇列中的任務,一定會被執行到。
如果阻塞佇列滿,或者執行緒池不是running狀態 !addWorker(command, false)
會新增一個非核心執行緒去執行任務,如果新增非核心任務失敗,說明已經達到了最大執行緒數,那麼會呼叫拒絕策略
為什麼不是running還會addWorker
方法,不怕shutDown狀態還接受了一個任務麼?——addWorker中會對狀態再次進行判斷,保證了這種情況不會發生
addWorker
addWorker
接受兩個引數——任務,和是否核心執行緒。這個方法程式碼很精彩,使用cas樂觀鎖 + ReentrantLock
提高執行效率。
我們思考一個問題,修改工作執行緒計數,new 一個工作執行緒,將執行緒放入HashSet<Worker>
中,啟動工作執行緒,這四步中有哪些步驟是執行緒不安全的?
顯然是——修改工作執行緒計數,和將執行緒放入HashSet<Worker>
中是執行緒不安全。雖然新建執行緒呼叫構造方法涉及到記憶體空間的分配,但是jvm無論是使用指標指標碰撞,還是空閒連結串列,還是執行緒本地分配空間,都會為我們保證這一步是執行緒安全的
那麼我們是否需要鎖住整個addWorker
方法暱?顯然不需要,至少new 一個工作執行緒這一步是不需要加鎖的。接下來我們看看doug lea是如何巧妙減低鎖的粒度,提高執行效率的。
方法很長,我們具體解析的時候分多個部分
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//=========自旋部分開始=================
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果大於等於SHUTDOWN 且 執行緒池不是SHUTDOWN 說明是STOP TIDYING TERMINATED這幾種都是不接受新任務的
//大於等於SHUTDOWN 且佇列是空,這時候也不接受新任務,執行緒池即將關閉
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//自選
for (;;) {
int wc = workerCountOf(c);
//如果大於(2^29)-1 直接不可新增執行緒,ctl 高三位狀態低29位執行緒數 再多表示不了了
// 如果表示新增核心執行緒 且大於核心執行緒 或者非核心但是大於最大執行緒數 返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas增加 工作執行緒數 這裡只是更新ctl 不是真的增加一個執行緒
//這樣增加成功了才能退出 保證了執行緒數不會超過閾值
if (compareAndIncrementWorkerCount(c))
break retry;
//如果增加失敗了重新看下狀態,狀態改變了,那麼重新自旋
//cas失敗了,狀態沒變也會自選
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//=========自旋部分結束=================
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一個執行緒
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//上鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//執行緒池狀態
int rs = runStateOf(ctl.get());
//如果小於 SHUTDOWN 說明是RUNNING
//或者是SHUTDOWN 但是沒有任務執行,說明是為了執行佇列中的任務或者預熱執行緒池
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//加到set集合
workers.add(w);
int s = workers.size();
//更新最大執行緒數
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//解鎖
mainLock.unlock();
}
//如果加入了set 啟動worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果沒有啟動 說明執行緒池已經不接受新任務了,或者其他奇奇怪怪的異常
if (! workerStarted)
//嘗試減少工作執行緒數 並且嘗試關閉執行緒池
addWorkerFailed(w);
}
//返回worker是否啟動了
return workerStarted;
}
修改工作執行緒數,這一步doug lea使用自旋+cas的方式
外層for中的第一個if
rs 是執行緒池的執行狀態,看下這個if中哪些情況addWorker會直接返回false
首先rs>=SHUTDOWN
必須成立,這就意味著執行緒池處於SHUTDOWN,STOP, TIDYING ,TERMINATED這幾個狀態之一
接下來需要滿足以下情況之一執行緒池就無法新增工作執行緒
rs != SHUTDOWN
這意味著執行緒池是STOP or TIDYING or TERMINATED
狀態,這幾個狀態都不可新增工作執行緒
rs == SHUTDOWN
&&firstTask != null
這對應了 執行緒池處於SHUTDOWN,不會接受新提交的任務(firstTask != null
是excute 方法入參提交的任務)
rs == SHUTDOWN
&&firstTask == null
&& workQueue.isEmpty
這是意味著佇列中所有任務都執行成功了,當前呼叫的時候也不是提交新任務,接下來執行緒池將轉變為STOP,不需要新增新執行緒去處理佇列中的任務
內部for迴圈
這裡分別看三個if
內部for迴圈第一個if
,防止突破核心執行緒數,或者最大執行緒數
可以看到如果執行執行緒大於CAPACITY(2的29方-1,因為前3為表示狀態)
那麼直接無法新增執行緒
如果新增的是核心執行緒,那麼不能大於核心執行緒數
如果新增的是非核心執行緒數,那麼不能突破最大執行緒數
內部for迴圈 cas 新增工作執行緒數量
這裡compareAndIncrementWorkerCount
方法使用cas
更新工作執行緒數。
我們要考慮下,第一個if 和這裡的compareAndIncrementWorkerCount
會不會出現 第一個if確認不會突破執行緒數,但是準備執行第二個if的時候,其他執行緒新增了一個執行緒,然後第二個if還是成功cas增大執行緒數
的情況
其實是不會的,我們要看下c
這個區域性變數是第一層for迴圈,進來的時候獲取的,並沒有在第一個if 和第二個if 中去更新c
,所有如果真發生這種情況,cas會失敗。cas失敗的話,會重新整理c,然後會由內部for迴圈第一個if
確保不會突破執行緒數,如果cas成功那麼會去真正新建工作執行緒
如果執行緒池狀態變化
這時候會跳到外層迴圈,由外層for中的第一個if
判斷狀態
這裡很牛逼,太牛逼了!
我們說過,new 一個工作執行緒的過程,是不需要加速鎖,jvm保證new的過程分配記憶體執行緒安全。所有doug lea,讓這部分可以並行進行 (值得借鑑)。
這裡新增的Worker物件,Worker是一個內部類,後面我們分析Worker是如何執行的時候,再看其內部結構。
HashSet<Worker>
等變數接下來需要維護HashSet<Worker> workers
, int largestPoolSize
,並啟動工作執行緒。largestPoolSize
記錄了執行緒池曾經同時具備多少個執行緒,並使用一個HashSet儲存工作執行緒
首先會上鎖,然後重新檢查下執行緒池的狀態 (確保處於執行,執行可以接受新任務,新增工作執行緒,或者處於SHUTDOWN,但是不能是提交新任務)然後將維護HashSet<Worker> workers
, int largestPoolSize
這些屬性,然後解鎖。整個流程很簡單,但是沒什麼doug lea要再次檢查一次執行緒池執行狀態暱?
因為上面的雙層for,到這裡的上鎖,並非一個原子操作,可能在此期間由另外一個執行緒呼叫了關閉執行緒池的方法。
可以看到只有worker被加到HashSet<Worker> workers
後才會,執行工作執行緒
在此方法的finally
中,如果worker啟動失敗,會呼叫addWorkerFailed
這裡從工作執行緒集合中刪除工作執行緒,自旋cas減少工作執行緒數目,嘗試關閉執行緒池(這個方法內部會判斷執行緒池狀態,並不是嘗試關就一定會關)這一步就是上面操作的回滾。
上面我們看了,提交任務到執行緒池的流程,下面我們看執行緒池中的工作執行緒是如何處理任務的
Worker這個類繼承了AQS實現了Runnable介面,繼承Runnalbe 比較好理解,畢竟Worker的職責就是從阻塞佇列中不斷獲取任務執行。但是為什麼Worker為什麼要繼承AQS暱?(這部分需要有AQS的基礎,推薦學習JUC原始碼學習筆記1——AQS獨佔模式和ReentrantLock)
屬性 | 描述 |
---|---|
Thread thread | 執行Worker#run方法的執行緒,從ThreadFactory中建立而來 |
Runnable firstTask | 當呼叫addWorker傳入一個任務的時候,firstTask記錄初始任務 |
long completedTasks | 當前工作執行緒完成的任務數量 |
首先設定AQS狀態為-1,然後呼叫執行緒工廠建立一個執行緒,且Runnable為自己,那麼這個執行緒啟動將執行Worker#run
方法
Worker的執行呼叫了執行緒池的runWorker方法,我們先忽略Worker中對中斷的處理,專注於Worker從佇列拿任務執行,然後執行的流程
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 忽略,後續解釋這裡的作用
boolean completedAbruptly = true;
try {
//不斷從佇列中執行任務,如果firstTask不為null 那麼這裡直接先執行firstTask
while (task != null || (task = getTask()) != null) {
w.lock();//忽略,後續解釋這裡的作用
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); //忽略,後續解釋這裡的作用
try {
//勾點方法 可以進行擴充套件
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//勾點方法 可以進行擴充套件
afterExecute(task, thrown);
}
} finally {
task = null;
//記錄工作執行緒完成工作數
w.completedTasks++;
w.unlock(); //忽略,後續解釋這裡的作用
}
}
completedAbruptly = false;
} finally {
//如果工作執行緒執行的時候 丟擲了異常 那麼來到這裡,做善後工作
//completedAbruptly = true => 我們提交的任務,其業務邏輯丟擲了異常
processWorkerExit(w, completedAbruptly);
}
}
可以看到工作執行緒的職責,就是在While迴圈中不斷的從阻塞佇列那任務,然後呼叫beforeExecute
,然後執行我們向執行緒池中提交的任務,執行我們的業務邏輯,然後呼叫afterExecute
。如果執行過程中出現了異常或者當前執行緒長時間沒有拿到任務——getTask返回null,那麼會呼叫processWorkerExit
進行「善後工作」,此執行緒將被回收。
那麼getTask什麼時候會返回null
此方法負責從阻塞佇列中獲取任務,使用阻塞佇列的poll方法,或者使用take方法,前者可以指定超時時長,如果超過時長沒有獲取到任務,那麼返回null,0後者不會超時,如果沒有任務一直等待,二者都是對中斷敏感的(中斷在喚醒之前,那麼9重新獲取阻塞佇列的鎖之後丟擲中斷異常,中斷在喚醒之後,重新獲取鎖後恢復中斷標識)(推薦學習:JUC原始碼學習筆記3——AQS等待佇列和CyclicBarrier,BlockingQueue)。
private Runnable getTask() {
//獲取任務是否超時
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果執行緒池為STOP TIDYING TERMINATED 那麼cas減小執行緒數 return null
//如果SHUTDOWN 但是佇列存在任務 不會cas減少,那麼不會return
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//如果允許核心執行緒超時被回收 那麼為true 或者工作執行緒大於核心執行緒數會沒有任務的時候會減少到核心執行緒數
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果工作執行緒大於最大核心數 或者 允許過期且獲取任務超時
if ((wc > maximumPoolSize || (timed && timedOut))
//如果佇列不是空至少保證wc大於1 那麼減少後工作執行緒至少為1
&& (wc > 1 || workQueue.isEmpty())) {
//CAS 減少工作執行緒數
if (compareAndDecrementWorkerCount(c))
return null;
//如果CAS失敗那麼繼續自旋
continue;
}
try {
//在`allowCoreThreadTimeOut = true(允許核心執行緒過期)`或者`工作執行緒數>核心執行緒數`的時候會使用超時poll獲取任務
//反之使用無限期阻塞take方法獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//獲取到任務 那麼直接返回任務
return r;
//反之說明超時沒有獲取到任務
timedOut = true;
} catch (InterruptedException retry) {
//如果被中斷那麼把超時置為false 繼續自旋
timedOut = false;
}
}
getTask方法,整體是一個自旋,自旋返回的情況,要麼是執行緒池的狀態導致不需要繼續處理佇列中的任務,要麼是佇列中執行緒在存活時間內還沒有獲取到任務。
第一if
這裡if成立的的情況有兩種,成立後會減少工作執行緒數,並返回null
執行緒池處於 stop ,tidying,terminated
這種情況下一般是使用者呼叫了shutdownNow,這個方法導致執行緒池進入stop,並且返回沒有執行的任務
所以這時候是不需要去處理執行緒池中的任務的
執行緒池處於shutdown 且佇列沒有任務
shutdown狀態不處理新任務,但是處理佇列中的任務,既然佇列都沒有任務了,那麼可以返回null。
第二個if
是否允許執行緒過期
,在allowCoreThreadTimeOut = true(允許核心執行緒過期)
或者工作執行緒數>核心執行緒數
的時候會為true是否從阻塞佇列中拿任務超時,拿任務的時間超過了keepAliveTime
接下來我們看下這個if成立的條件,和對應的意義
工作執行緒數超過了最大執行緒數,且工作執行緒數大於1
可以看作doug lea寫兜底機制,反之工作執行緒數突破最大執行緒數,導致資源枯竭
工作執行緒數超過了最大執行緒數,且佇列是空
同上
允許超時,且發生超時沒有拿到任務,且工作執行緒數大於1
允許超時要麼是核心執行緒允許過期,要麼是工作執行緒數大於核心執行緒數,這時候工作執行緒長時間沒有拿到任務,將返回null。之所以要求工作執行緒數大於1,是要確保佇列中的任務有一個執行緒可以執行
允許超時,且發生超時沒有拿到任務,且佇列是空
基本同上
命中條件,那麼會cas減少工作執行緒數量,成功那麼返回null,這裡compareAndDecrementWorkerCount
沒有自旋,因為這裡失敗了,會continue,說明存在多個執行緒將被回收,如果同時回收了,可能執行緒池直接沒有執行緒執行佇列中的任務了
從佇列中獲取任務
這裡可以看出允許核心執行緒過期,和存活時間的作用。核心執行緒和非核心執行緒並沒有特殊標記記錄,而是如果不允許核心執行緒過期,那麼在工作執行緒數小於等於核心執行緒的時候使用無限期take 保證核心執行緒沒有任務至少阻塞於阻塞佇列中,而不是返回null 導致核心執行緒過期
如果工作執行緒數大於核心執行緒數,或者允許核心執行緒過期,那麼使用超時等待poll方法,這時候超過存活時間就返回null,執行緒將被「善後」
如果超時沒有拿到任務,這時候timedOut 會為true,將繼續自旋並可能命中第一個if
或者第二個if
導致執行緒被回收
如果成功獲取到任務,那麼返回任務進行執行
如果在阻塞佇列中獲取的時候被中斷,那麼timedOut = false
並且不響應中斷
在使用者執行緒
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果是由於使用者業務邏輯錯誤,那麼是沒有減少執行緒數的
decrementWorkerCount();//自旋+cas減少
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//獲取鎖
try {
//更新完成的任務數
completedTaskCount += w.completedTasks;
//從HashSet中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//嘗試終止執行緒池
tryTerminate();
int c = ctl.get();
//如果執行緒是running 或者 shutdown
if (runStateLessThan(c, STOP)) {
//不是由使用者異常導致的
if (!completedAbruptly) {
//執行緒最少數
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//確保最少有一個
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//不足一個
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//只要執行緒是running 或者shutdown都確儲存在一個執行緒可以執行佇列中的任務
//或者使用使用者業務邏輯錯誤,導致的異常,那麼補上一個執行緒
addWorker(null, false);
}
}
程式碼總共分為四步:
如果是使用者業務邏輯錯誤,那麼自旋+cas減少工作執行緒數
因為正常由於getTask返回null的情況,在getTask中就已經完成了減少工作執行緒數的操作
更新completedTaskCount和HashSet<Worker>
更新completedTaskCount就是把當前工作執行緒完成的任務數加和
然後更新HashSet<Worker>
嘗試終止執行緒池
如果是使用者業務邏輯錯誤導致的異常,那麼補上一個執行緒。如果是由於長時間沒有任務,但是回收這個執行緒後,佇列又有任務了,那麼確保執行緒池中有一個執行緒可以處理任務。
這都是建立在 執行緒池為running 或者 shutdown的情況下,因為其他狀態佇列中的任務都不需要去執行。
如果不是使用者業務錯誤,工作執行緒由於等待超時進入,且執行緒池是running 或者shutdown的時候,會增加一個執行緒,這就是執行緒池的保活
(這哪裡是保活啊,這是替身)
關閉執行緒池,如果還有任務沒有執行完,那麼任務還會執行,但是執行緒池將不接受新任務。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//檢查許可權相關
checkShutdownAccess();
//確保狀態至少為SHUTDOWN
advanceRunState(SHUTDOWN);
//中斷所有的空閒工作執行緒
interruptIdleWorkers();
//勾點函數 可以自行擴充套件
onShutdown();
} finally {
mainLock.unlock();
}
//嘗試終止執行緒池
tryTerminate();
}
advanceRunState 使用自旋+cas確保狀態至少為shutdown,因為存在其他執行緒呼叫shutdownNow,設定狀態為stop的可能
interruptIdleWorkers
呼叫了interruptIdleWorkers(false),表示中斷所有空閒的工作執行緒(tryLock成功表示工作執行緒空閒,這部分在Worker 與中斷
章節中詳細解釋)
onShutdown
勾點方法,可以自行實現進行擴充套件
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//自旋+cas 確保狀態為stop
advanceRunState(STOP);
//中斷所有已經啟動的工作執行緒,那怕這個工作執行緒在處理任務
interruptWorkers();
//將剩餘的任務從佇列中倒出來,吐給使用者
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//嘗試終止執行緒池
tryTerminate();
return tasks;
}
advanceRunState 使用自旋+cas確保狀態至少為stop,因為存在其他執行緒呼叫過shutdownNow並且執行緒池將終結(觸發了tryTerminate)設定為Tidying 或者Terminate的可能
interruptWorkers
對每一個worker呼叫interruptIfStarted
,只要工作執行緒啟動了(滿足getState>=0)那麼進進行中斷
drainQueue
呼叫阻塞佇列的drainTo
方法將任務吐出來,如果呼叫完還有任務,那麼使用遍歷 + 刪除的方式進行清理
/**
* 嘗試判斷是否滿足執行緒池中止條件,如果滿足條件,將其推進到最後的TERMINATED狀態
* 注意:必須在任何可能觸發執行緒池中止的場景下呼叫(例如工作執行緒退出,或者SHUTDOWN狀態下佇列工作佇列為空等)
/**
* 嘗試判斷是否滿足執行緒池中止條件,如果滿足條件,將其推進到最後的TERMINATED狀態
* 注意:必須在任何可能觸發執行緒池中止的場景下呼叫(例如工作執行緒退出,或者SHUTDOWN狀態下佇列工作佇列為空等)
* */
final void tryTerminate() {
for (;;) {
int currentCtl = this.ctl.get();
if (isRunning(currentCtl)
|| runStateAtLeast(currentCtl, TIDYING)
|| (runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty())) {
return;
}
// 有兩種場景會走到這裡
// 1 執行了shutdown方法(runState狀態為SHUTDOWN),工作執行緒都空閒導致,
// 2 執行了shutdownNow方法(runState狀態為STOP)
if (workerCountOf(currentCtl) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
// 執行緒池狀態runState為SHUTDOWN或者STOP,且存活的工作執行緒個數已經為0了
// 雖然前面的interruptIdleWorkers是一個一箇中斷idle執行緒的,但實際上有的工作執行緒是因為別的原因退出的(恰好workerCountOf為0了)
// 所以這裡是可能存在並行的,因此通過mainLock加鎖防止並行,避免重複的terminated方法呼叫和termination.signalAll方法呼叫
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// cas的設定ctl的值為TIDYING+工作執行緒個數0(防止與別的地方ctl並行更新)
if (ctl.compareAndSet(currentCtl, ctlOf(TIDYING, 0))) {
try {
// cas成功,呼叫terminated勾點函數
terminated();
} finally {
// 無論terminated勾點函數是否出現異常
// cas的設定ctl的值為TERMINATED最終態+工作執行緒個數0(防止與別的地方ctl並行更新)
ctl.set(ctlOf(TERMINATED, 0));
// 通知使用awaitTermination方法等待執行緒池關閉的其它執行緒(通過termination.await等待)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// 如果上述對ctl變數的cas操作失敗了,則進行重試,再來一次迴圈
// else retry on failed CAS
}
}
第一個if
isRunning(currentCtl)
為true,說明執行緒池還在執行中,不可以關閉執行緒池
runStateAtLeast(currentCtl, TIDYING)
當前執行緒池狀態已經大於等於TIDYING了,說明之前別的執行緒可能已經執行過tryTerminate,且通過了這個if校驗,不用重複執行了
(runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty()))
當前執行緒池是SHUTDOWN狀態,但工作佇列中還有任務沒處理完,也不滿足中止條件,這時候不能關閉,還需要處理佇列中的任務
工作佇列沒有任務的時候,這些執行緒getTask為null,就會呼叫processWorkerExist
也會呼叫到tryTerminate
,這時候滿足條件將自動關閉執行緒池
第二個if
來到這個if需要滿足 執行了shutdown方法(runState狀態為SHUTDOWN),且當前工作執行緒已經空了
or 執行了shutdownNow方法(runState狀態為STOP)
如果工作執行緒數不等於0,這裡會呼叫interruptIdleWorkers
中斷一個空閒的執行緒。
這個被中斷的執行緒會getTask方法返回null->processWorkerExit->tryTerminate
,這時候這個執行緒也會中斷一個空閒的執行緒,從而達到一個接一個的終止,優雅的關閉資源
修改狀態,喚醒由於 呼叫awaitTermination
而被阻塞的執行緒
這裡上鎖的原因是,也許執行緒是一個個停止的,然後突然有一個工作執行緒執行業務邏輯出現異常,呼叫processWorkerExit
,也呼叫到tryTerminate
,恰好執行緒數為0,出現並行
修改狀態,呼叫勾點方法,喚醒阻塞的執行緒
首先cas狀態到Tidying,工作執行緒數為0,然後呼叫terminated
勾點方法,然後設定為terminated,並且喚醒阻塞在termination
上的執行緒
程式碼不復雜,但是需要有AQS Condition的知識,才知道為什麼這裡會阻塞呼叫執行緒
JUC原始碼學習筆記3——AQS等待佇列和CyclicBarrier,BlockingQueue
此方法會提前讓執行緒池工作執行緒數到達核心執行緒數,這樣的好處相當於10個外賣員等待接單,一旦單子(任務)提交,立馬得到執行,減少了新建執行緒的耗時
submit 底層還是依賴excute ,但是它會先將任務包裝FutureTask,方便呼叫者來控制任務的執行,取消,獲取非同步執行結果。FutureTask本身就是一個任務,也是非同步執行的結果 ,FutureTask就如同一個紐帶,連線了任務 和 任務的結果
(FutureTask 學習: JUC原始碼學習筆記7——FutureTask原始碼解析,人生亦如是,run起來才有結果)
這裡我們主要分析,worker為什麼需要繼承AQS,以及Worker中state代表什麼,worker在不同工作狀態被中斷會如何
構造的時候為-1
runWorker對狀態的變更
unlock會呼叫Worker的tryRelease,設定為0
lock會呼叫Worker的 tryAcquire,cas修改state從0到1,如果失敗會阻塞在AQS同步佇列中
我們可以看到 state =1 意味著worker在執行業務邏輯
,state=0意味著worker處於空閒
,
shutdown 會中斷空閒的執行緒,並對空閒執行緒進行回收。怎麼識別一個執行緒是空閒執行緒暱,怎麼讓空閒執行緒被回收暱?
上面講shutdown
方法時候,我們瞭解到 shutdown 首先自旋+cas 確保執行緒池狀態到達 SHUTDOWN,然後呼叫interruptIdleWorkers
中斷空閒執行緒,這個方法會呼叫到interruptIdleWorkers(false)
其中的false表示中斷所有空閒執行緒,而不是一個
這裡需要品一品,為什麼tryLock成功,就意味著當前工作執行緒是空閒的。上面我們說到工作執行緒執行任務的時候會先執行lock,改變狀態為1,然後開始執行業務邏輯,這裡的tryLock會cas狀態從0到1,如果成功了,意味著cas的這一瞬間工作執行緒是空閒的。
這是工作執行緒也許阻塞與getTask方法,也可能剛剛拿到任務,準備lock但是比shutdown慢。
工作執行緒阻塞與getTask
中斷工作執行緒,會導致原本阻塞與阻塞佇列的執行緒丟擲中斷異常
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//3 發現 執行緒池是shutdown ,如果這時候阻塞佇列還沒任務,那麼會自旋減少工作執行緒數,返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//1.從這裡 丟擲中斷異常
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
//2.在這裡被捕獲
//將繼續自旋,來到3
timedOut = false;
}
}
}
順著程式碼中的1,2,3看,最終getTask返回null,執行緒會從runWorker
中的while迴圈退出,執行processWorkerExist
,從而實現空閒執行緒的回收
準備lock 但是 比shutdown慢
這時候,執行緒從getTask剛剛拿到任務,但是準備lock,被shutdown方法強佔先機,導致lock獲取鎖失敗,而阻塞與鎖,只有等shutdown釋放自己worker這把鎖才能返回,但是這時候工作執行緒被中斷了。
注意這時候返回以及被中斷,doug lea不能讓這個中斷帶到使用者的業務邏輯中,因為這樣會影響到業務邏輯(使用者程式碼中根據中斷也許有不同的邏輯)所以有下面這段程式碼
這段程式碼的作用是,如果執行緒池停止了(stop tidying terminated)那麼一定確保工作執行緒被中斷,但是如果不是那麼一定確保執行緒不被中斷
這段程式碼,我做了一點點排版調整,邏輯不變,如下
//1. (runStateAtLeast(ctl.get(), STOP) ||
//2. (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&&
//3. !wt.isInterrupted()
其中1,2是或的關係,3和(1或2)是且的關係。如果整個為true 那麼會中斷當前下次你,我們詳細分析下
如果執行緒池停止,那麼1 為true,如果這時候工作執行緒沒有中斷,那麼工作執行緒會被中斷
如果執行緒沒有停止,這是1為false,來到2,首先Thread.interrupted()
清除中斷標誌,返回之前有沒有被中斷。如果執行緒池沒有停止,但是之前被中斷了,這裡會清除中斷標識,這樣實現了 ——執行緒池沒有停止,那麼確保執行緒不被中斷。
如果之前被中斷,那麼說明是shutdown ,或者 shutdownNow,或者使用者業務邏輯進行的中斷,這時候且 runStateAtLeast(ctl.get(), STOP)
成立,那麼說明執行緒需要中斷,那麼這是再次進行中斷(整體為true if中的邏輯就是中斷)
有趣的是,為什麼doug lea寫兩次runStateAtLeast(ctl.get(), STOP)
?
這是(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
不是一個原子操作,可能我剛清除中斷了,這時候shutdownNow,成功改變狀態為STOP,這時候,其實需要對執行緒進行中斷(在原始碼註釋中doug lea稱之為清除中斷和shutdownNow的 race(競賽)
十分生動形象了)
至此我們理解了 中斷對於工作執行緒的意義,其中關鍵的一點是,中斷能讓阻塞於阻塞佇列中的執行緒,重新自旋從而來檢查執行緒池狀態,達到如果shutdown,shutdownNow執行了,工作執行緒會從阻塞,到自旋檢查執行緒池狀態從而讓getTask返回null,達到工作執行緒回收的目的(doug lea 牛逼!)
shutdownNow,不關工作執行緒是空閒還是執行都會進行中斷,而且這個中斷會傳播到我們提交的業務邏輯中
shutdownNow 會首先改變狀態為stop然後呼叫interruptWorkers
,這個方法會呼叫每一個Worker的interruptIfStarted
可以看到只要state >=0 都可能被中斷,只有Worker剛new出的來的時候是-1,一旦執行runWorker,首先第一個事情就是修改狀態為0,這時候就可能被shutdownNow中斷。
這中斷會讓空閒的執行緒從getTask返回null,然後執行緒進入回收。讓剛拿到任務準備執行的執行緒將中斷帶到業務邏輯中,讓正在執行業務邏輯執行緒被中斷(為什麼能帶到業務邏輯中?見4.2 shutdown 與工作執行緒的互動 中的 準備lock 但是 比shutdown慢
)。
這個中斷可以看作是執行緒池和我們業務邏輯的通訊 —— 爺關閉了,你好自為之
如果當前執行緒池有一百個執行緒,我上來一個shutdownNow, 讓執行緒池關閉,我能立馬回收一百個執行緒麼。顯然是不行的,也不能說我執行緒池先修改為停止狀態,執行緒愛咋咋地,這種不負責任的行為也是不行的,執行緒池需要等待池中所有工作執行緒為0,才能停止自己。
那我們來看看doug lea如何實現優雅停
tryTerminate
方法 在新增worker 失敗
,或者shutdown執行
,或者shutdownNow執行
等情況的時候,會被呼叫
所以上來就是一個判斷,如果執行緒池為執行,那麼不能停止;如果已經是TIDYING說明有執行緒已經將執行緒池停止了,不需要再次執行;如果是shutdown但是佇列有任務,那麼需要執行佇列中的任務,也不能停止執行緒池。
精彩的在於 workerCountOf(c) != 0
這是會中斷一個空閒的執行緒,為什麼只中斷一個啊,為什麼不都中斷?
如果全部中斷,這些執行緒都會從getTask中拿到null 然後呼叫processWorkerExist,然後並行執行terminate,從某種程度上cpu遭了殃,不夠優雅。
中斷一個可以讓其中一個空閒執行processWorkerExist 然後呼叫tryTerminate
,繼續執行一個空閒的執行緒,然後迴圈往復,直到所有工作執行緒呼叫processWorkerExist 進行回收後,才能到下面修改狀態為TIDYING的邏輯。
doug lea 在原始碼註釋中 說 中斷一個空閒執行緒,確保訊號的傳播
就是這個意思,doug lea 牛逼
執行緒池往往提交任務,等操作都是並行呼叫的,doug lea如何實現執行緒安全 和 高效率
首先doug lea 使用一個自旋 +cas的操作,確保成功增加了工作執行緒數後,才能繼續建立執行緒的操作,並且這個自旋判斷了執行緒池狀態是否能接受新任務,是否能新建工作執行緒,相當於一把自旋鎖,避免阻塞掛起的效能消耗。如果成功實現了工作執行緒數的增加,就如同佔據的名額,接下來使用執行緒工廠建立執行緒的步驟是不加鎖的,提高了並行。將執行緒放入worker集合 使用了ReentrantLock ,啟動執行緒的操作又是不加鎖的,通過這種縮小鎖的粒度的思想,提高並行執行效率。
執行緒池 的 shutdownNow和shutdown的區別
前者會修改執行緒池狀態為stop並中斷所有啟動的執行緒(工作執行緒剛新建的使用state = -1,呼叫runWorker首先設定狀態為0,視為已經啟動,如果開始執行任務那麼修改,如果執行任務首先cas修改狀態為1)所有state >=0 的執行緒都會被中斷,且中斷可以在使用者定義的任務中感知到,並且會把任務佇列中的任務通過阻塞佇列drainTo
方法倒出來給使用者。
後者會修改執行緒池狀態為shutdown,然後中斷所有空閒的執行緒,使用tryLock cas修改狀態從0到1,如果成功視為工作執行緒為空閒。
執行緒池停止的時候,如何確保所有工作執行緒回收後才停止執行緒池本身
tryTerminate方法負責停止執行緒池,會檢查工作執行緒數,如果不為0,那麼中斷一個空閒的執行緒。中斷工作執行緒的作用會讓阻塞於getTask方法的工作執行緒,重新自旋,從而判斷執行緒池狀態,如果停止那麼返回null,如果shutdown且阻塞佇列為空,也返回null,從而讓工作執行緒從runWorker方法while結束,執行processWorkerExist進行執行緒回收,processWorkerExist方法又會呼叫到tryTerminate,繼續中斷一個空閒執行緒,直到工作執行緒數為0,這時候才會修改狀態為TIDYING,然後執行terminated
方法,然後設定狀態為terminated狀態。