本文已經收錄到Github倉庫,該倉庫包含計算機基礎、Java基礎、多執行緒、JVM、資料庫、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分散式、微服務、設計模式、架構、校招社招分享等核心知識點,歡迎star~
Github地址:https://github.com/Tyson0314/Java-learning
執行緒池:一個管理執行緒的池子。
嗯,手動建立執行緒有兩個缺點
為什麼不受控?
系統資源有限,每個人針對不同業務都可以手動建立執行緒,並且建立執行緒沒有統一標準,比如建立的執行緒有沒有名字等。當系統執行起來,所有執行緒都在搶佔資源,毫無規則,混亂場面可想而知,不好管控。
頻繁手動建立執行緒為什麼開銷會大?跟new Object() 有什麼差別?
雖然Java中萬物皆物件,但是new Thread() 建立一個執行緒和 new Object()還是有區別的。
new Object()過程如下:
建立執行緒的過程如下:
建立一個執行緒大概需要1M左右的空間(Java8,機器規格2c8G)。可見,頻繁手動建立/銷燬執行緒的代價是非常大的。
corePoolSize
時,這時對於一個新提交的任務,執行緒池會建立一個執行緒去處理任務。當執行緒池裡面存活的執行緒數小於等於核心執行緒數corePoolSize
時,執行緒池裡面的執行緒會一直存活著,就算空閒時間超過了keepAliveTime
,執行緒也不會被銷燬,而是一直阻塞在那裡一直等待任務佇列的任務來執行。corePoolSize
了,並且任務佇列也滿了,假設maximumPoolSize>corePoolSize
,這時如果再來新的任務,執行緒池就會繼續建立新的執行緒來處理新的任務,知道執行緒數達到maximumPoolSize
,就不會再建立了。maximumPoolSize
,並且任務佇列也滿了,如果還有新的任務過來,那就直接採用拒絕策略進行處理。預設的拒絕策略是丟擲一個RejectedExecutionException異常。ThreadPoolExecutor 的通用建構函式:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
1、corePoolSize
:當有新任務時,如果執行緒池中執行緒數沒有達到執行緒池的基本大小,則會建立新的執行緒執行任務,否則將任務放入阻塞佇列。當執行緒池中存活的執行緒數總是大於 corePoolSize 時,應該考慮調大 corePoolSize。
2、maximumPoolSize
:當阻塞佇列填滿時,如果執行緒池中執行緒數沒有超過最大執行緒數,則會建立新的執行緒執行任務。否則根據拒絕策略處理新任務。非核心執行緒類似於臨時借來的資源,這些執行緒在空閒時間超過 keepAliveTime 之後,就應該退出,避免資源浪費。
3、BlockingQueue
:儲存等待執行的任務。
4、keepAliveTime
:非核心執行緒空閒後,保持存活的時間,此引數只對非核心執行緒有效。設定為0,表示多餘的空閒執行緒會被立即終止。
5、TimeUnit
:時間單位
TimeUnit.DAYS
TimeUnit.HOURS
TimeUnit.MINUTES
TimeUnit.SECONDS
TimeUnit.MILLISECONDS
TimeUnit.MICROSECONDS
TimeUnit.NANOSECONDS
6、ThreadFactory
:每當執行緒池建立一個新的執行緒時,都是通過執行緒工廠方法來完成的。在 ThreadFactory 中只定義了一個方法 newThread,每當執行緒池需要建立新執行緒就會呼叫它。
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
public Thread newThread(Runnable runnable) {
return new MyAppThread(runnable, poolName);//將執行緒池名字傳遞給建構函式,用於區分不同執行緒池的執行緒
}
}
7、RejectedExecutionHandler
:當佇列和執行緒池都滿了的時候,根據拒絕策略處理新任務。
AbortPolicy:預設的策略,直接丟擲RejectedExecutionException
DiscardPolicy:不處理,直接丟棄
DiscardOldestPolicy:將等待佇列隊首的任務丟棄,並執行當前任務
CallerRunsPolicy:由呼叫執行緒處理該任務
如果執行緒池執行緒數量太小,當有大量請求需要處理,系統響應比較慢,會影響使用者體驗,甚至會出現任務佇列大量堆積任務導致OOM。
如果執行緒池執行緒數量過大,大量執行緒可能會同時搶佔 CPU 資源,這樣會導致大量的上下文切換,從而增加執行緒的執行時間,影響了執行效率。
CPU 密集型任務(N+1): 這種任務消耗的主要是 CPU 資源,可以將執行緒數設定為 N(CPU 核心數)+1
,多出來的一個執行緒是為了防止某些原因導致的執行緒阻塞(如IO操作,執行緒sleep,等待鎖)而帶來的影響。一旦某個執行緒被阻塞,釋放了CPU資源,而在這種情況下多出來的一個執行緒就可以充分利用 CPU 的空閒時間。
I/O 密集型任務(2N): 系統的大部分時間都在處理 IO 操作,此時執行緒可能會被阻塞,釋放CPU資源,這時就可以將 CPU 交出給其它執行緒使用。因此在 IO 密集型任務的應用中,可以多設定一些執行緒,具體的計算方法:最佳執行緒數 = CPU核心數 * (1/CPU利用率) = CPU核心數 * (1 + (IO耗時/CPU耗時))
,一般可設定為2N。
常見的執行緒池有 FixedThreadPool
、SingleThreadExecutor
、CachedThreadPool
和 ScheduledThreadPool
。這幾個都是 ExecutorService
執行緒池範例。
FixedThreadPool
固定執行緒數的執行緒池。任何時間點,最多隻有 nThreads 個執行緒處於活動狀態執行任務。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
使用無界佇列 LinkedBlockingQueue(佇列容量為 Integer.MAX_VALUE),執行中的執行緒池不會拒絕任務,即不會呼叫RejectedExecutionHandler.rejectedExecution()方法。
maxThreadPoolSize 是無效引數,故將它的值設定為與 coreThreadPoolSize 一致。
keepAliveTime 也是無效引數,設定為0L,因為此執行緒池裡所有執行緒都是核心執行緒,核心執行緒不會被回收(除非設定了executor.allowCoreThreadTimeOut(true))。
適用場景:適用於處理CPU密集型的任務,確保CPU在長期被工作執行緒使用的情況下,儘可能的少的分配執行緒,即適用執行長期的任務。需要注意的是,FixedThreadPool 不會拒絕任務,在任務比較多的時候會導致 OOM。
SingleThreadExecutor
只有一個執行緒的執行緒池。
public static ExecutionService newSingleThreadExecutor() {
return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
使用無界佇列 LinkedBlockingQueue。執行緒池只有一個執行的執行緒,新來的任務放入工作佇列,執行緒處理完任務就回圈從佇列裡獲取任務執行。保證順序的執行各個任務。
適用場景:適用於序列執行任務的場景,一個任務一個任務地執行。在任務比較多的時候也是會導致 OOM。
CachedThreadPool
根據需要建立新執行緒的執行緒池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
如果主執行緒提交任務的速度高於執行緒處理任務的速度時,CachedThreadPool
會不斷建立新的執行緒。極端情況下,這樣會導致耗盡 cpu 和記憶體資源。
使用沒有容量的SynchronousQueue作為執行緒池工作佇列,當執行緒池有空閒執行緒時,SynchronousQueue.offer(Runnable task)
提交的任務會被空閒執行緒處理,否則會建立新的執行緒處理任務。
適用場景:用於並行執行大量短期的小任務。CachedThreadPool
允許建立的執行緒數量為 Integer.MAX_VALUE ,可能會建立大量執行緒,從而導致 OOM。
ScheduledThreadPoolExecutor
在給定的延遲後執行任務,或者定期執行任務。在實際專案中基本不會被用到,因為有其他方案選擇比如quartz
。
使用的任務佇列 DelayQueue
封裝了一個 PriorityQueue
,PriorityQueue
會對佇列中的任務進行排序,時間早的任務先被執行(即ScheduledFutureTask
的 time
變數小的先執行),如果time相同則先提交的任務會被先執行(ScheduledFutureTask
的 squenceNumber
變數小的先執行)。
執行週期任務步驟:
DelayQueue
中獲取已到期的 ScheduledFutureTask(DelayQueue.take())
。到期任務是指 ScheduledFutureTask
的 time 大於等於當前系統的時間;ScheduledFutureTask
;ScheduledFutureTask
的 time 變數為下次將要被執行的時間;ScheduledFutureTask
放回 DelayQueue
中(DelayQueue.add()
)。適用場景:週期性執行任務的場景,需要限制執行緒數量的場景。
程序是指一個記憶體中執行的應用程式,每個程序都有自己獨立的一塊記憶體空間。
執行緒是比程序更小的執行單位,它是在一個程序中獨立的控制流,一個程序可以啟動多個執行緒,每條執行緒並行執行不同的任務。
初始(NEW):執行緒被構建,還沒有呼叫 start()。
執行(RUNNABLE):包括作業系統的就緒和執行兩種狀態。
阻塞(BLOCKED):一般是被動的,在搶佔資源中得不到資源,被動的掛起在記憶體,等待資源釋放將其喚醒。執行緒被阻塞會釋放CPU,不釋放記憶體。
等待(WAITING):進入該狀態的執行緒需要等待其他執行緒做出一些特定動作(通知或中斷)。
超時等待(TIMED_WAITING):該狀態不同於WAITING,它可以在指定的時間後自行返回。
終止(TERMINATED):表示該執行緒已經執行完畢。
圖片來源:Java並行程式設計的藝術
執行緒中斷即執行緒執行過程中被其他執行緒給打斷了,它與 stop 最大的區別是:stop 是由系統強制終止執行緒,而執行緒中斷則是給目標執行緒傳送一箇中斷訊號,如果目標執行緒沒有接收執行緒中斷的訊號並結束執行緒,執行緒則不會終止,具體是否退出或者執行其他邏輯取決於目標執行緒。
執行緒中斷三個重要的方法:
1、java.lang.Thread#interrupt
呼叫目標執行緒的interrupt()
方法,給目標執行緒發一箇中斷訊號,執行緒被打上中斷標記。
2、java.lang.Thread#isInterrupted()
判斷目標執行緒是否被中斷,不會清除中斷標記。
3、java.lang.Thread#interrupted
判斷目標執行緒是否被中斷,會清除中斷標記。
private static void test2() {
Thread thread = new Thread(() -> {
while (true) {
Thread.yield();
// 響應中斷
if (Thread.currentThread().isInterrupted()) {
System.out.println("Java技術棧執行緒被中斷,程式退出。");
return;
}
}
});
thread.start();
thread.interrupt();
}
Thread
類來建立多執行緒Runnable
介面來建立多執行緒Callable
介面,通過FutureTask
介面建立執行緒。Executor
框架來建立執行緒池。繼承 Thread 建立執行緒程式碼如下。run()方法是由jvm建立完作業系統級執行緒後回撥的方法,不可以手動呼叫,手動呼叫相當於呼叫普通方法。
/**
* @author: 程式設計師大彬
* @time: 2021-09-11 10:15
*/
public class MyThread extends Thread {
public MyThread() {
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread() + ":" + i);
}
}
public static void main(String[] args) {
MyThread mThread1 = new MyThread();
MyThread mThread2 = new MyThread();
MyThread myThread3 = new MyThread();
mThread1.start();
mThread2.start();
myThread3.start();
}
}
Runnable 建立執行緒程式碼:
/**
* @author: 程式設計師大彬
* @time: 2021-09-11 10:04
*/
public class RunnableTest {
public static void main(String[] args){
Runnable1 r = new Runnable1();
Thread thread = new Thread(r);
thread.start();
System.out.println("主執行緒:["+Thread.currentThread().getName()+"]");
}
}
class Runnable1 implements Runnable{
@Override
public void run() {
System.out.println("當前執行緒:"+Thread.currentThread().getName());
}
}
實現Runnable介面比繼承Thread類所具有的優勢:
Callable 建立執行緒程式碼:
/**
* @author: 程式設計師大彬
* @time: 2021-09-11 10:21
*/
public class CallableTest {
public static void main(String[] args) {
Callable1 c = new Callable1();
//非同步計算的結果
FutureTask<Integer> result = new FutureTask<>(c);
new Thread(result).start();
try {
//等待任務完成,返回結果
int sum = result.get();
System.out.println(sum);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
class Callable1 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
}
使用 Executor 建立執行緒程式碼:
/**
* @author: 程式設計師大彬
* @time: 2021-09-11 10:44
*/
public class ExecutorsTest {
public static void main(String[] args) {
//獲取ExecutorService範例,生產禁用,需要手動建立執行緒池
ExecutorService executorService = Executors.newCachedThreadPool();
//提交任務
executorService.submit(new RunnableDemo());
}
}
class RunnableDemo implements Runnable {
@Override
public void run() {
System.out.println("大彬");
}
}
執行緒死鎖是指兩個或兩個以上的執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象。若無外力作用,它們都將無法推進下去。
如下圖所示,執行緒 A 持有資源 2,執行緒 B 持有資源 1,他們同時都想申請對方持有的資源,所以這兩個執行緒就會互相等待而進入死鎖狀態。
下面通過例子說明執行緒死鎖,程式碼來自並行程式設計之美。
public class DeadLockDemo {
private static Object resource1 = new Object();//資源 1
private static Object resource2 = new Object();//資源 2
public static void main(String[] args) {
new Thread(() -> {
synchronized (resource1) {
System.out.println(Thread.currentThread() + "get resource1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "waiting get resource2");
synchronized (resource2) {
System.out.println(Thread.currentThread() + "get resource2");
}
}
}, "執行緒 1").start();
new Thread(() -> {
synchronized (resource2) {
System.out.println(Thread.currentThread() + "get resource2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "waiting get resource1");
synchronized (resource1) {
System.out.println(Thread.currentThread() + "get resource1");
}
}
}, "執行緒 2").start();
}
}
程式碼輸出如下:
Thread[執行緒 1,5,main]get resource1
Thread[執行緒 2,5,main]get resource2
Thread[執行緒 1,5,main]waiting get resource2
Thread[執行緒 2,5,main]waiting get resource1
執行緒 A 通過 synchronized
(resource1) 獲得 resource1 的監視器鎖,然後通過 Thread.sleep(1000)
。讓執行緒 A 休眠 1s 為的是讓執行緒 B 得到執行然後獲取到 resource2 的監視器鎖。執行緒 A 和執行緒 B 休眠結束了都開始企圖請求獲取對方的資源,然後這兩個執行緒就會陷入互相等待的狀態,這也就產生了死鎖。
死鎖產生的四個必要條件:
互斥:一個資源每次只能被一個程序使用
請求與保持:一個程序因請求資源而阻塞時,不釋放獲得的資源
不剝奪:程序已獲得的資源,在未使用之前,不能強行剝奪
迴圈等待:程序之間迴圈等待著資源
避免死鎖的方法:
start()
方法,將會建立一個新執行緒去執行run()
方法中的程式碼。run()
就像一個普通方法一樣,直接呼叫run()
的話,不會建立新執行緒。start()
方法只能呼叫一次,多次呼叫會丟擲 java.lang.IllegalThreadStateException 異常。run()
方法則沒有限制。start
用於啟動執行緒。
getPriority
獲取執行緒優先順序,預設是5,執行緒預設優先順序為5,如果不手動指定,那麼執行緒優先順序具有繼承性,比如執行緒A啟動執行緒B,那麼執行緒B的優先順序和執行緒A的優先順序相同
setPriority
設定執行緒優先順序。CPU會盡量將執行資源讓給優先順序比較高的執行緒。
interrupt
告訴執行緒,你應該中斷了,具體到底中斷還是繼續執行,由被通知的執行緒自己處理。
當對一個執行緒呼叫 interrupt() 時,有兩種情況:
如果執行緒處於被阻塞狀態(例如處於sleep, wait, join 等狀態),那麼執行緒將立即退出被阻塞狀態,並丟擲一個InterruptedException異常。
如果執行緒處於正常活動狀態,那麼會將該執行緒的中斷標誌設定為 true。不過,被設定中斷標誌的執行緒可以繼續正常執行,不受影響。
interrupt() 並不能真正的中斷執行緒,需要被呼叫的執行緒自己進行配合才行。
join
等待其他執行緒終止。在當前執行緒中呼叫另一個執行緒的join()方法,則當前執行緒轉入阻塞狀態,直到另一個程序執行結束,當前執行緒再由阻塞轉為就緒狀態。
yield
暫停當前正在執行的執行緒物件,把執行機會讓給相同或者更高優先順序的執行緒。
sleep
使執行緒轉到阻塞狀態。millis引數設定睡眠的時間,以毫秒為單位。當睡眠結束後,執行緒自動轉為Runnable狀態。
volatile
是輕量級的同步機制,volatile
保證變數對所有執行緒的可見性,不保證原子性。
volatile
變數進行寫操作的時候,JVM會向處理器傳送一條LOCK
字首的指令,將該變數所在快取行的資料寫回系統記憶體。來看看快取一致性協定是什麼。
快取一致性協定:當CPU寫資料時,如果發現操作的變數是共用變數,即在其他CPU中也存在該變數的副本,會發出訊號通知其他CPU將該變數的快取行置為無效狀態,因此當其他CPU需要讀取這個變數時,就會從記憶體重新讀取。
volatile
關鍵字的兩個作用:
指令重排序是JVM為了優化指令,提高程式執行效率,在不影響單執行緒程式執行結果的前提下,儘可能地提高並行度。Java編譯器會在生成指令系列時在適當的位置會插入
記憶體屏障
指令來禁止處理器重排序。插入一個記憶體屏障,相當於告訴CPU和編譯器先於這個命令的必須先執行,後於這個命令的必須後執行。對一個volatile欄位進行寫操作,Java記憶體模型將在寫操作後插入一個寫屏障指令,這個指令會把之前的寫入值都重新整理到記憶體。
原子性:確保執行緒互斥的存取同步程式碼;
可見性:保證共用變數的修改能夠及時可見;
有序性:有效解決重排序問題。
synchronized 同步程式碼塊的實現是通過 monitorenter
和 monitorexit
指令,其中 monitorenter
指令指向同步程式碼塊的開始位置,monitorexit
指令則指明同步程式碼塊的結束位置。當執行 monitorenter
指令時,執行緒試圖獲取鎖也就是獲取 monitor
的持有權(monitor物件存在於每個Java物件的物件頭中, synchronized 鎖便是通過這種方式獲取鎖的,也是為什麼Java中任意物件可以作為鎖的原因)。
其內部包含一個計數器,當計數器為0則可以成功獲取,獲取後將鎖計數器設為1也就是加1。相應的在執行 monitorexit
指令後,將鎖計數器設為0
,表明鎖被釋放。如果獲取物件鎖失敗,那當前執行緒就要阻塞等待,直到鎖被另外一個執行緒釋放為止
synchronized 修飾的方法並沒有 monitorenter
指令和 monitorexit
指令,取得代之的確實是ACC_SYNCHRONIZED
標識,該標識指明瞭該方法是一個同步方法,JVM 通過該 ACC_SYNCHRONIZED
存取標誌來辨別一個方法是否宣告為同步方法,從而執行相應的同步呼叫。
volatile
只能使用在變數上;而synchronized
可以在類,變數,方法和程式碼塊上。volatile
至保證可見性;synchronized
保證原子性與可見性。volatile
禁用指令重排序;synchronized
不會。volatile
不會造成阻塞;synchronized
會。相同點:
InterruptedException
不同點:
wait()
是Object超類中的方法;而sleep()
是執行緒Thread類中的方法wait()
會釋放鎖,而sleep()
並不釋放鎖wait()
依靠notify
或者notifyAll
、中斷、達到指定時間來喚醒;而sleep()
到達指定時間被喚醒wait()
需要先獲取物件的鎖,而Thread.sleep()
不用call()
,Runnable的方法是run()
;call()
方法允許丟擲異常;而Runnable介面run()
方法不能繼續上拋異常。假設有T1、T2、T3三個執行緒,你怎樣保證T2在T1執行完後執行,T3在T2執行完後執行?
可以使用join方法解決這個問題。比如線上程A中,呼叫執行緒B的join方法表示的意思就是:A等待B執行緒執行完畢後(釋放CPU執行權),在繼續執行。
程式碼如下:
public class ThreadTest {
public static void main(String[] args) {
Thread spring = new Thread(new SeasonThreadTask("春天"));
Thread summer = new Thread(new SeasonThreadTask("夏天"));
Thread autumn = new Thread(new SeasonThreadTask("秋天"));
try
{
//春天執行緒先啟動
spring.start();
//主執行緒等待執行緒spring執行完,再往下執行
spring.join();
//夏天執行緒再啟動
summer.start();
//主執行緒等待執行緒summer執行完,再往下執行
summer.join();
//秋天執行緒最後啟動
autumn.start();
//主執行緒等待執行緒autumn執行完,再往下執行
autumn.join();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
class SeasonThreadTask implements Runnable{
private String name;
public SeasonThreadTask(String name){
this.name = name;
}
@Override
public void run() {
for (int i = 1; i <4; i++) {
System.out.println(this.name + "來了: " + i + "次");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
執行結果:
春天來了: 1次
春天來了: 2次
春天來了: 3次
夏天來了: 1次
夏天來了: 2次
夏天來了: 3次
秋天來了: 1次
秋天來了: 2次
秋天來了: 3次
守護執行緒是執行在後臺的一種特殊程序。它獨立於控制終端並且週期性地執行某種任務或等待處理某些發生的事件。在 Java 中垃圾回收執行緒就是特殊的守護執行緒。
1、使用 Object 類的 wait()/notify()。Object 類提供了執行緒間通訊的方法:wait()
、notify()
、notifyAll()
,它們是多執行緒通訊的基礎。其中,wait/notify
必須配合 synchronized
使用,wait 方法釋放鎖,notify 方法不釋放鎖。wait 是指在一個已經進入了同步鎖的執行緒內,讓自己暫時讓出同步鎖,以便其他正在等待此鎖的執行緒可以得到同步鎖並執行,只有其他執行緒呼叫了notify()
,notify並不釋放鎖,只是告訴呼叫過wait()
的執行緒可以去參與獲得鎖的競爭了,但不是馬上得到鎖,因為鎖還在別人手裡,別人還沒釋放,呼叫 wait()
的一個或多個執行緒就會解除 wait 狀態,重新參與競爭物件鎖,程式如果可以再次得到鎖,就可以繼續向下執行。
2、使用 volatile 關鍵字。基於volatile關鍵字實現執行緒間相互通訊,其底層使用了共用記憶體。簡單來說,就是多個執行緒同時監聽一個變數,當這個變數發生變化的時候 ,執行緒能夠感知並執行相應的業務。
3、使用JUC工具類 CountDownLatch。jdk1.5 之後在java.util.concurrent
包下提供了很多並行程式設計相關的工具類,簡化了並行程式設計開發,CountDownLatch
基於 AQS 框架,相當於也是維護了一個執行緒間共用變數 state。
4、基於 LockSupport 實現執行緒間的阻塞和喚醒。LockSupport
是一種非常靈活的實現執行緒間阻塞和喚醒的工具,使用它不用關注是等待執行緒先進行還是喚醒執行緒先執行,但是得知道執行緒的名字。
執行緒本地變數。當使用ThreadLocal
維護變數時,ThreadLocal
為每個使用該變數的執行緒提供獨立的變數副本,所以每一個執行緒都可以獨立地改變自己的副本,而不會影響其它執行緒。
每個執行緒都有一個ThreadLocalMap
(ThreadLocal
內部類),Map中元素的鍵為ThreadLocal
,而值對應執行緒的變數副本。
呼叫threadLocal.set()
-->呼叫getMap(Thread)
-->返回當前執行緒的ThreadLocalMap<ThreadLocal, value>
-->map.set(this, value)
,this是threadLocal
本身。原始碼如下:
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
呼叫get()
-->呼叫getMap(Thread)
-->返回當前執行緒的ThreadLocalMap<ThreadLocal, value>
-->map.getEntry(this)
,返回value
。原始碼如下:
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
threadLocals
的型別ThreadLocalMap
的鍵為ThreadLocal
物件,因為每個執行緒中可有多個threadLocal
變數,如longLocal
和stringLocal
。
public class ThreadLocalDemo {
ThreadLocal<Long> longLocal = new ThreadLocal<>();
public void set() {
longLocal.set(Thread.currentThread().getId());
}
public Long get() {
return longLocal.get();
}
public static void main(String[] args) throws InterruptedException {
ThreadLocalDemo threadLocalDemo = new ThreadLocalDemo();
threadLocalDemo.set();
System.out.println(threadLocalDemo.get());
Thread thread = new Thread(() -> {
threadLocalDemo.set();
System.out.println(threadLocalDemo.get());
}
);
thread.start();
thread.join();
System.out.println(threadLocalDemo.get());
}
}
ThreadLocal
並不是用來解決共用資源的多執行緒存取問題,因為每個執行緒中的資源只是副本,不會共用。因此ThreadLocal
適合作為執行緒上下文變數,簡化執行緒內傳參。
每個執行緒都有⼀個ThreadLocalMap
的內部屬性,map的key是ThreaLocal
,定義為弱參照,value是強參照型別。垃圾回收的時候會⾃動回收key,而value的回收取決於Thread物件的生命週期。一般會通過執行緒池的方式複用執行緒節省資源,這也就導致了執行緒物件的生命週期比較長,這樣便一直存在一條強參照鏈的關係:Thread
--> ThreadLocalMap
-->Entry
-->Value
,隨著任務的執行,value就有可能越來越多且無法釋放,最終導致記憶體漏失。
解決⽅法:每次使⽤完ThreadLocal
就調⽤它的remove()
⽅法,手動將對應的鍵值對刪除,從⽽避免記憶體漏失。
場景1
ThreadLocal 用作儲存每個執行緒獨享的物件,為每個執行緒都建立一個副本,這樣每個執行緒都可以修改自己所擁有的副本, 而不會影響其他執行緒的副本,確保了執行緒安全。
這種場景通常用於儲存執行緒不安全的工具類,典型的使用的類就是 SimpleDateFormat。
假如需求為500個執行緒都要用到 SimpleDateFormat,使用執行緒池來實現執行緒的複用,否則會消耗過多的記憶體等資源,如果我們每個任務都建立了一個 simpleDateFormat 物件,也就是說,500個任務對應500個 simpleDateFormat 物件。但是這麼多物件的建立是有開銷的,而且這麼多物件同時存在在記憶體中也是一種記憶體的浪費。可以將simpleDateFormat 物件給提取了出來,變成靜態變數,但是這樣一來就會有執行緒不安全的問題。我們想要的效果是,既不浪費過多的記憶體,同時又想保證執行緒安全。此時,可以使用 ThreadLocal來達到這個目的,每個執行緒都擁有一個自己的 simpleDateFormat 物件。
場景2
ThreadLocal 用作每個執行緒內需要獨立儲存資訊,以便供其他方法更方便地獲取該資訊的場景。每個執行緒獲取到的資訊可能都是不一樣的,前面執行的方法儲存了資訊後,後續方法可以通過 ThreadLocal 直接獲取到,避免了傳參,類似於全域性變數的概念。
比如Java web應用中,每個執行緒有自己單獨的Session
範例,就可以使用ThreadLocal
來實現。
AQS,AbstractQueuedSynchronizer
,抽象佇列同步器,定義了一套多執行緒存取共用資源的同步器框架,許多並行工具的實現都依賴於它,如常用的ReentrantLock/Semaphore/CountDownLatch
。
AQS使用一個volatile
的int型別的成員變數state
來表示同步狀態,通過CAS修改同步狀態的值。當執行緒呼叫 lock 方法時 ,如果 state
=0,說明沒有任何執行緒佔有共用資源的鎖,可以獲得鎖並將 state
加1。如果 state
不為0,則說明有執行緒目前正在使用共用變數,其他執行緒必須加入同步佇列進行等待。
private volatile int state;//共用變數,使用volatile修飾保證執行緒可見性
同步器依賴內部的同步佇列(一個FIFO雙向佇列)來完成同步狀態的管理,當前執行緒獲取同步狀態失敗時,同步器會將當前執行緒以及等待狀態(獨佔或共用 )構造成為一個節點(Node)並將其加入同步佇列並進行自旋,當同步狀態釋放時,會把首節點中的後繼節點對應的執行緒喚醒,使其再次嘗試獲取同步狀態。
ReentrantLock
內部自定義了同步器sync,在加鎖的時候通過CAS演演算法,將執行緒物件放到一個雙向連結串列中,每次獲取鎖的時候,檢查當前維護的那個執行緒ID和當前請求的執行緒ID是否 一致,如果一致,同步狀態加1,表示鎖被當前執行緒獲取了多次。
原始碼如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
按照執行緒存取順序獲取物件鎖。synchronized
是非公平鎖,Lock
預設是非公平鎖,可以設定為公平鎖,公平鎖會影響效能。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
共用式與獨佔式的最主要區別在於:同一時刻獨佔式只能有一個執行緒獲取同步狀態,而共用式在同一時刻可以有多個執行緒獲取同步狀態。例如讀操作可以有多個執行緒同時進行,而寫操作同一時刻只能有一個執行緒進行寫操作,其他操作都會被阻塞。
悲觀鎖,每次存取資源都會加鎖,執行完同步程式碼釋放鎖,synchronized
和ReentrantLock
屬於悲觀鎖。
樂觀鎖,不會鎖定資源,所有的執行緒都能存取並修改同一個資源,如果沒有衝突就修改成功並退出,否則就會繼續迴圈嘗試。樂觀鎖最常見的實現就是CAS
。
適用場景:
樂觀鎖避免了悲觀鎖獨佔物件的問題,提高了並行效能,但它也有缺點:
CAS全稱Compare And Swap
,比較與交換,是樂觀鎖的主要實現方式。CAS在不使用鎖的情況下實現多執行緒之間的變數同步。ReentrantLock
內部的AQS和原子類內部都使用了CAS。
CAS演演算法涉及到三個運算元:
只有當V的值等於A時,才會使用原子方式用新值B來更新V的值,否則會繼續重試直到成功更新值。
以AtomicInteger
為例,AtomicInteger
的getAndIncrement()
方法底層就是CAS實現,關鍵程式碼是 compareAndSwapInt(obj, offset, expect, update)
,其含義就是,如果obj
內的value
和expect
相等,就證明沒有其他執行緒改變過這個變數,那麼就更新它為update
,如果不相等,那就會繼續重試直到成功更新值。
CAS 三大問題:
ABA問題。CAS需要在操作值的時候檢查記憶體值是否發生變化,沒有發生變化才會更新記憶體值。但是如果記憶體值原來是A,後來變成了B,然後又變成了A,那麼CAS進行檢查時會發現值沒有發生變化,但是實際上是有變化的。ABA問題的解決思路就是在變數前面新增版本號,每次變數更新的時候都把版本號加一,這樣變化過程就從A-B-A
變成了1A-2B-3A
。
JDK從1.5開始提供了AtomicStampedReference類來解決ABA問題,原子更新帶有版本號的參照型別。
迴圈時間長開銷大。CAS操作如果長時間不成功,會導致其一直自旋,給CPU帶來非常大的開銷。
只能保證一個共用變數的原子操作。對一個共用變數執行操作時,CAS能夠保證原子操作,但是對多個共用變數操作時,CAS是無法保證操作的原子性的。
Java從1.5開始JDK提供了AtomicReference類來保證參照物件之間的原子性,可以把多個變數放在一個物件裡來進行CAS操作。
在JDK的並行包裡提供了幾個非常有用的並行工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種並行流程控制的手段。
CountDownLatch用於某個執行緒等待其他執行緒執行完任務再執行,與thread.join()功能類似。常見的應用場景是開啟多個執行緒同時執行某個任務,等到所有任務執行完再執行特定操作,如彙總統計結果。
public class CountDownLatchDemo {
static final int N = 4;
static CountDownLatch latch = new CountDownLatch(N);
public static void main(String[] args) throws InterruptedException {
for(int i = 0; i < N; i++) {
new Thread(new Thread1()).start();
}
latch.await(1000, TimeUnit.MILLISECONDS); //呼叫await()方法的執行緒會被掛起,它會等待直到count值為0才繼續執行;等待timeout時間後count值還沒變為0的話就會繼續執行
System.out.println("task finished");
}
static class Thread1 implements Runnable {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "starts working");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
}
執行結果:
Thread-0starts working
Thread-1starts working
Thread-2starts working
Thread-3starts working
task finished
CyclicBarrier(同步屏障),用於一組執行緒互相等待到某個狀態,然後這組執行緒再同時執行。
public CyclicBarrier(int parties, Runnable barrierAction) {
}
public CyclicBarrier(int parties) {
}
引數parties指讓多少個執行緒或者任務等待至某個狀態;引數barrierAction為當這些執行緒都達到某個狀態時會執行的內容。
public class CyclicBarrierTest {
// 請求的數量
private static final int threadCount = 10;
// 需要同步的執行緒數量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws InterruptedException {
// 建立執行緒池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
/**等待60秒,保證子執行緒完全執行結束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}
}
執行結果如下,可以看出CyclicBarrier是可以重用的:
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:3is finish
threadnum:2is finish
threadnum:1is finish
threadnum:0is finish
threadnum:5is ready
threadnum:6is ready
...
當四個執行緒都到達barrier狀態後,會從四個執行緒中選擇一個執行緒去執行Runnable。
CyclicBarrier 和 CountDownLatch 都能夠實現執行緒之間的等待。
CountDownLatch用於某個執行緒等待其他執行緒執行完任務再執行。CyclicBarrier用於一組執行緒互相等待到某個狀態,然後這組執行緒再同時執行。
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置,可用於處理更為複雜的業務場景。
Semaphore類似於鎖,它用於控制同時存取特定資源的執行緒數量,控制並行執行緒數。
public class SemaphoreDemo {
public static void main(String[] args) {
final int N = 7;
Semaphore s = new Semaphore(3);
for(int i = 0; i < N; i++) {
new Worker(s, i).start();
}
}
static class Worker extends Thread {
private Semaphore s;
private int num;
public Worker(Semaphore s, int num) {
this.s = s;
this.num = num;
}
@Override
public void run() {
try {
s.acquire();
System.out.println("worker" + num + " using the machine");
Thread.sleep(1000);
System.out.println("worker" + num + " finished the task");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
執行結果如下,可以看出並非按照執行緒存取順序獲取資源的鎖,即
worker0 using the machine
worker1 using the machine
worker2 using the machine
worker2 finished the task
worker0 finished the task
worker3 using the machine
worker4 using the machine
worker1 finished the task
worker6 using the machine
worker4 finished the task
worker3 finished the task
worker6 finished the task
worker5 using the machine
worker5 finished the task
使用原子的方式更新基本型別
AtomicInteger 類常用的方法:
public final int get() //獲取當前的值
public final int getAndSet(int newValue)//獲取當前的值,並設定新的值
public final int getAndIncrement()//獲取當前的值,並自增
public final int getAndDecrement() //獲取當前的值,並自減
public final int getAndAdd(int delta) //獲取當前的值,並加上預期的值
boolean compareAndSet(int expect, int update) //如果輸入的數值等於預期值,則以原子方式將該值設定為輸入值(update)
public final void lazySet(int newValue)//最終設定為newValue,使用 lazySet 設定之後可能導致其他執行緒在之後的一小段時間內還是可以讀到舊的值。
AtomicInteger 類主要利用 CAS (compare and swap) 保證原子操作,從而避免加鎖的高開銷。
使用原子的方式更新陣列裡的某個元素
AtomicIntegerArray 類常用方法:
public final int get(int i) //獲取 index=i 位置元素的值
public final int getAndSet(int i, int newValue)//返回 index=i 位置的當前的值,並將其設定為新值:newValue
public final int getAndIncrement(int i)//獲取 index=i 位置元素的值,並讓該位置的元素自增
public final int getAndDecrement(int i) //獲取 index=i 位置元素的值,並讓該位置的元素自減
public final int getAndAdd(int i, int delta) //獲取 index=i 位置元素的值,並加上預期的值
boolean compareAndSet(int i, int expect, int update) //如果輸入的數值等於預期值,則以原子方式將 index=i 位置的元素值設定為輸入值(update)
public final void lazySet(int i, int newValue)//最終 將index=i 位置的元素設定為newValue,使用 lazySet 設定之後可能導致其他執行緒在之後的一小段時間內還是可以讀到舊的值。
後臺(daemon)執行緒,是指在程式執行的時候在後臺提供一種通用服務的執行緒,並且這個執行緒並不屬於程式中不可或缺的部分。因此,當所有的非後臺執行緒結束時,程式也就終止了,同時會殺死程序中的所有後臺執行緒。反過來說,只要有任何非後臺執行緒還在執行,程式就不會終止。必須線上程啟動之前呼叫setDaemon()方法,才能把它設定為後臺執行緒。
注意:後臺程序在不執行finally子句的情況下就會終止其run()方法。
比如:JVM的垃圾回收執行緒就是Daemon執行緒,Finalizer也是守護執行緒。
SynchronizedMap一次鎖住整張表來保證執行緒安全,所以每次只能有一個執行緒來存取map。
JDK1.8 ConcurrentHashMap採用CAS和synchronized來保證並行安全。資料結構採用陣列+連結串列/紅黑二元樹。synchronized只鎖定當前連結串列或紅黑二元樹的首節點,支援並行存取、修改。
另外ConcurrentHashMap使用了一種不同的迭代方式。當iterator被建立後集合再發生改變就不再是丟擲ConcurrentModificationException,取而代之的是在改變時new新的資料從而不影響原有的資料 ,iterator完成後再將頭指標替換為新的資料 ,這樣iterator執行緒可以使用原來老的資料,而寫執行緒也可以並行的完成改變。
有幾種方法:
1、使用執行緒池的原生函數isTerminated();
executor提供一個原生函數isTerminated()來判斷執行緒池中的任務是否全部完成。如果全部完成返回true,否則返回false。
2、使用重入鎖,維持一個公共計數。
所有的普通任務維持一個計數器,當任務完成時計數器加一(這裡要加鎖),當計數器的值等於任務數時,這時所有的任務已經執行完畢了。
3、使用CountDownLatch。
它的原理跟第二種方法類似,給CountDownLatch一個計數值,任務執行完畢後,呼叫countDown()執行計數值減一。最後執行的任務在呼叫方法的開始呼叫await()方法,這樣整個任務會阻塞,直到這個計數值為零,才會繼續執行。
這種方式的缺點就是需要提前知道任務的數量。
4、submit向執行緒池提交任務,使用Future判斷任務執行狀態。
使用submit向執行緒池提交任務與execute提交不同,submit會有Future型別的返回值。通過future.isDone()方法可以知道任務是否執行完成。
在並行程式設計中,不管是繼承thread類還是實現runnable介面,都無法保證獲取到之前的執行結果。通過實現Callback介面,並用Future可以來接收多執行緒的執行結果。
Future表示一個可能還沒有完成的非同步任務的結果,針對這個結果可以新增Callback以便在任務執行成功或失敗後作出相應的操作。
舉個例子:比如去吃早點時,點了包子和冷盤,包子需要等3分鐘,冷盤只需1分鐘,如果是序列的一個執行,在吃上早點的時候需要等待4分鐘,但是因為你在等包子的時候,可以同時準備冷盤,所以在準備冷盤的過程中,可以同時準備包子,這樣只需要等待3分鐘。Future就是後面這種執行模式。
Future介面主要包括5個方法:
最後給大家分享一個Github倉庫,上面有大彬整理的300多本經典的計算機書籍PDF,包括C語言、C++、Java、Python、前端、資料庫、作業系統、計算機網路、資料結構和演演算法、機器學習、程式設計人生等,可以star一下,下次找書直接在上面搜尋,倉庫持續更新中~
Github地址:https://github.com/Tyson0314/java-books