前言:本文是《Java程式設計思想》讀書筆記系列的最後一章,本章的內容很多,需要細讀慢慢去理解,文中的範例最好在自己電腦上多執行幾次,相關範例完整程式碼放在碼雲上了,碼雲地址:https://gitee.com/reminis_com/thinking-in-java
並行程式設計使我們可以將程式劃分為多個分離的、獨立執行的任務。通過使用多執行緒機制,這些獨立任務(也被稱為子任務)中的每一個都將由執行執行緒來驅動。一個執行緒就是在程序中的一個單一的順序控制流,因此,單個程序可以擁有多個並行執行的任務,但是你的程式使得每個任務都好像有其自己的CPU一樣。其底層機制是切分CPU時間,但通常你不需要考慮它。
執行緒模型為程式設計帶來了便利,它簡化了在單一程式中同時交織在一起的多個操作的處理。在使用執行緒時,CPU將輪流給每個任務分配其佔用時間「。每個任務都覺得自己在一直佔用CPU,但事實上CPU時間是劃分成片段分配給了所有的任務(例外情況是程式確實執行在多個CPU之上)。執行緒的一大好處是可以使你從這個層次抽身出來,即程式碼不必知道它是執行在具有一個還是多個CPU的機器上。所以,使用執行緒機制,是一種建立透明的、可延伸的程式的方法,如果程式執行的太慢,為機器增添一個CPU就很容易的加快程式的執行速度。多工和多執行緒往往是使用多處理器系統的最合理方式。
執行緒可以驅動任務,因此你需要一種描述任務的方式。
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月03日 10:14
*
* 演示通過實現Runnable介面定義任務
*
* <p>
* 顯示發射之前的倒計時
* </p>
*/
public class LiftOff implements Runnable {
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {
}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" + (countDown > 0 ? countDown : "LiftOff!") + "). ";
}
@Override
public void run() {
while (countDown-- > 0) {
System.out.print(status());
Thread.yield();
}
}
} ///:~
識別符號id可以用來區分任務的多個範例,它是final的,因為它一旦被初始化之後就不希望被修改。任務的run()方法通常總會有某種形式的迴圈,使得任務一直執行下去直到不再需要,所以要設定跳出迴圈的條件(有一種選擇是直接從run)返回)。通常,run()被寫成無限迴圈的形式,這就意味著,除非有某個條件使得run()終止,否則它將永遠執行下去(在本章後面將會看到如何安全地終止執行緒)。
在run()中對靜態方法Thread.yield()的呼叫是對執行緒排程器(Java執行緒機制的一部分,可以將CPU從一個執行緒轉移給另一個執行緒)的一種建議,它在宣告:"我已經執行完生命週期中最重要的部分了,此刻正是切換給其他任務執行一段時間的大好時機。」這完全是選擇性的,但是這裡使用它是因為它會在這些範例中產生更加有趣的輸出∶你更有可能會看到任務換進換出的證據。
在下面的範例中,這個任務的run()不是由單獨的執行緒驅動的,它是在main()中直接呼叫的(實際上,這裡仍舊使用了執行緒,即總是分配給main()的那個執行緒)∶
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月03日 10:23
*/
public class MainThread {
public static void main(String[] args) {
LiftOff launch = new LiftOff();
launch.run();
}
} /* Output:
#0(9). #0(8). #0(7). #0(6). #0(5). #0(4). #0(3). #0(2). #0(1). #0(LiftOff!).
*///:~
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月03日 10:29
*
* 將Runnable物件轉變為工作任務的傳統方式是把它交給一個Thread構造器
*/
public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
} /* Output:
Waiting for LiftOff
#0(9). #0(8). #0(7). #0(6). #0(5). #0(4). #0(3). #0(2). #0(1). #0(LiftOff!).
*///:~
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Mr.Sun
* @date 2022年09月03日 16:04
*
* 使用Executor
*/
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}/* Output:
#0(9). #1(9). #2(9). #3(9). #0(8). #4(9). #1(8). #2(8). #3(8). #0(7). #4(8). #1(7). #2(7). #3(7). #0(6). #4(7). #1(6). #2(6). #3(6). #0(5). #1(5). #4(6). #3(5). #1(4). #2(5). #4(5). #0(4). #1(3). #2(4). #3(4). #0(3). #4(4). #1(2). #2(3). #3(3). #0(2). #4(3). #1(1). #2(2). #3(2). #0(1). #4(2). #1(LiftOff!). #2(1). #3(1). #0(LiftOff!). #4(1). #2(LiftOff!). #3(LiftOff!). #4(LiftOff!).
*///:~
你可以很容易地將上面範例中的CachedThreadPool替換為不同型別的Executor。
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Mr.Sun
* @date 2022年09月03日 16:10
*/
public class FixedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}
有了FixedThreadPool,你就可以一次性預先執行代價高昂的執行緒分配,因而也就可以限制執行緒的數量了。這可以節省時間,因為你不用為每個任務都固定地付出建立執行緒的開銷。在事件驅動的系統中,需要執行緒的事件處理器,通過直接從池中獲取執行緒,也可以如你所願地儘快得到服務。你不會濫用可獲得的資源,因為FixedThreadPool使用的Thread物件的數量是有界的。
注意,在任何執行緒池中,現有執行緒在可能的情況下,都會被自動複用。
儘管本書將使用CachedThreadPool,但是也應該考慮在產生執行緒的程式碼中使用FixedThreadPool。CachedThreadPool在程式執行過程中通常會建立與所需數量相同的執行緒,然後在它回收舊執行緒時停止建立新執行緒,因此它是合理的Executor的首選。只有當這種方式會引發問題時,你才需要切換到FixedThreadPool。
SingleThreadExecutor就像是執行緒數量為1的FixedThreadPool。這對於你希望在另一個執行緒中連續執行的任何事物(長期存活的任務)來說,都是很有用的,例如監聽進入的通訊端連線的任務。它對於希望線上程中執行的短任務也同樣很方便,例如,更新本地或遠端紀錄檔的小任務,或者是事件分發執行緒。
如果向SingleThreadExecutor提交了多個任務,那麼這些任務將排隊,每個任務都會在下一個任務開始之前執行結束,所有的任務將使用相同的執行緒。在下面的範例中,你可以看到每個任務都是按照它們被提交的順序,並且是在下一個任務開始之前完成的。因此,SingleThread-Executor會序列化所有提交給它的任務,並會維護它自己(隱藏)的懸掛任務佇列。
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Mr.Sun
* @date 2022年09月03日 16:17
*/
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
} /* Output:
#0(9). #0(8). #0(7). #0(6). #0(5). #0(4). #0(3). #0(2). #0(1). #0(LiftOff!). #1(9). #1(8). #1(7). #1(6). #1(5). #1(4). #1(3). #1(2). #1(1). #1(LiftOff!). #2(9). #2(8). #2(7). #2(6). #2(5). #2(4). #2(3). #2(2). #2(1). #2(LiftOff!). #3(9). #3(8). #3(7). #3(6). #3(5). #3(4). #3(3). #3(2). #3(1). #3(LiftOff!). #4(9). #4(8). #4(7). #4(6). #4(5). #4(4). #4(3). #4(2). #4(1). #4(LiftOff!).
*///:~
作為另一個範例,假設你有大量的執行緒,那它們執行的任務將使用檔案系統。你可以用SingleThreadExecutor來執行這些執行緒,以確保任意時刻在任何執行緒中都只有唯一的任務在執行。在這種方式中,你不需要在共用資源上處理同步(同時不會過度使用檔案系統)。有時更好的解決方案是在資源上同步(你將在本章稍後學習),但是SingleThreadExecutor可以讓你省去只是為了維持某些事物的原型而進行的各種協調努力。通過序列化任務,你可以消除對序列化物件的需求。
package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author Mr.Sun
* @date 2022年09月03日 16:23
*
* 從任務中產生返回值
*/
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
List<Future<String>> futureList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
futureList.add(exec.submit(new TaskWithResult(i)));
}
for (Future<String> result : futureList) {
try {
// get() 會阻塞知道任務完成
System.out.println(result.get());
} catch (InterruptedException e) {
System.out.println(e);
return;
} catch (ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
}
}
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
@Override
public String call() throws Exception {
return "result of TaskWithResult " + id;
}
} /* Output:
result of TaskWithResult 0
result of TaskWithResult 1
result of TaskWithResult 2
result of TaskWithResult 3
result of TaskWithResult 4
result of TaskWithResult 5
result of TaskWithResult 6
result of TaskWithResult 7
result of TaskWithResult 8
result of TaskWithResult 9
*///:~
submit()方法會產生Future物件,它用Callable返回結果的特定型別進行了引數化。你可以用isDone()方法來查詢Future是否已經完成。當任務完成時,它具有一個結果,你可以呼叫get()方法來獲取該結果。你也可以不用isDone()進行檢查就直接呼叫get(),在這種情況下,get()將阻塞,直至結果準備就緒。你還可以在試圖呼叫get()來獲取結果之前,先呼叫具有超時的get(),或者呼叫isDone()來檢視任務是否完成。
由於執行緒的本質特性,使得你不能捕獲從執行緒中逃逸的異常。下面的任務總是丟擲一個異常,該異常會傳播到run()方法的外部,並且main()展示了當你執行它時所發生的事情:
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Mr.Sun
* @date 2022年09月03日 17:00
*
* 執行緒丟擲異常
*/
public class ExceptionThread implements Runnable {
@Override
public void run() {
throw new RuntimeException();
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
exec.shutdown();
}
}
執行結果如下圖:
將main()的主體放到try-catch子句中是沒有作用的,如下:
我們發現使用try-catch後,異常仍未被捕獲。
為了解決這個問題,我們要修改Executor產生執行緒的方式。Thread.UncaughtExceptionHandler是Java SE5中的新介面,它允許你在每個Thread物件上都附著一個例外處理器。Thread.UncaughtExceptionHandler.uncaughtException()會線上程因未捕獲的異常而臨近死亡時被呼叫。為了使用它,我們建立了一個新型別的ThreadFactory,它將在每個新建立的Thread物件上附著一個Thread.UncaughtExceptionHandler。我們將這個工廠傳遞給Executors建立新的ExecutorService的方法:
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* @author Mr.Sun
* @date 2022年09月03日 17:10
*
* 為每個Thread物件附著一個例外處理器
*/
public class CaptureUncaughtException {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
exec.execute(new ExceptionThread2());
}
}
class ExceptionThread2 implements Runnable {
@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by " + t);
System.out.println("eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}
class HandlerThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread");
Thread t = new Thread(r);
System.out.println("created " + t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println("eh = " + t.getUncaughtExceptionHandler());
return t;
}
} /* Output:
concurrency.HandlerThreadFactory@1540e19d creating new Thread
created Thread[Thread-0,5,main]
eh = concurrency.MyUncaughtExceptionHandler@677327b6
run() by Thread[Thread-0,5,main]
eh = concurrency.MyUncaughtExceptionHandler@677327b6
caught java.lang.RuntimeException
*///:~
在程式中新增了額外的跟蹤機制,用來驗證工廠建立的執行緒會傳遞給UncaughtExceptionHandler。你可以看到,未捕獲的異常是通過uncaughtException來捕獲的。
考慮下面的例子,其中一個任務產生產生偶數,而其他任務消費這些數位。而這些消費者任務的唯一工作就是校驗偶數的有效性
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月03日 21:49
*
* <p>
* 建立一個名為IntGenerator的抽象類,它包含EventChecker必須瞭解的必不可少的方法,即一個next()方法,和一個可以執行復原的方法
* </p>
*/
public abstract class IntGenerator {
private volatile boolean canceled = false;
public abstract int next();
/**
* 修改canceled標誌的狀態
*/
public void cancel() {
canceled = true;
}
/**
* 檢視該物件是否已被複原
*/
public boolean isCanceled() {
return canceled;
}
}
任何IntGenerator 都可以使用下面的EventChecker類來測試:
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Mr.Sun
* @date 2022年09月03日 21:52
*
* 校驗偶數的有效性
*/
public class EventChecker implements Runnable {
private IntGenerator generator;
private final int id;
public EventChecker(IntGenerator generator, int id) {
this.generator = generator;
this.id = id;
}
@Override
public void run() {
while (!generator.isCanceled()) {
int val = generator.next();
if (val % 2 != 0) {
System.out.println(val + " not event");
// 取消 事件檢查
generator.cancel();
}
}
}
public static void test(IntGenerator gp, int count) {
System.out.println("按 Ctrl + C 退出");
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < count; i++) {
exec.execute(new EventChecker(gp, i));
}
exec.shutdown();
}
public static void test(IntGenerator gp) {
// count 預設值
test(gp, 10);
}
}
注意,在本例中可以被複原的類不是Runnable,而所有依賴於IntGenerator物件的EvenChecker任務將測試它,以檢視它是否已經被複原,正如你在run()中所見。通過這種方式,共用公共資源(IntGenerator)的任務可以觀察該資源的終止訊號。這可以消除所謂競爭條件,即兩個或更多的任務競爭響應某個條件,因此產生衝突或不一致結果的情況。你必須仔細考慮並防範並行系統失敗的所有可能途徑,例如,一個任務不能依賴於另一個任務,因為任務關閉的順序無法得到保證。這裡,通過使任務依賴於非任務物件,我們可以消除潛在的競爭條件。
test()方法通過啟動大量使用相同的IntGenerator的EvenChecker,設定並執行對任何型別的IntGenerator的測試。如果IntGenerator引發失敗,那麼test()將報告它並返回,否則,你必須按下Ctrl-C來終止它。
我們看到的第一個IntGenerator有一個可以產生一系列數值的next():
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月03日 22:02
*/
public class EventGenerator extends IntGenerator {
private int currentEventVal = 0;
@Override
public int next() {
++currentEventVal;
// 可能出現資源競爭問題的地方
++currentEventVal;
return currentEventVal;
}
public static void main(String[] args) {
EventChecker.test(new EventGenerator());
}
}
一個任務有可能在另一個任務執行第一個對currentEvenValue的遞增操作之後,但是沒有執行第二個操作之前,呼叫next()方法。這將使這個值處於「不恰當」的狀態。為了證明這是可能發生的,EvenChecker.test()建立了一組EvenChecker物件,以連續地讀取並輸出同一個EvenGenerator,並測試檢查每個數值是否都是偶數。如果不是,就會報告錯誤,而程式也將關閉。
有一點很重要,那就是要注意到遞增程式自身也需要多個步驟,並且在遞增過程中任務可能會被執行緒機制掛起——也就是說,在Java中,遞增不是原子性的操作。因此,如果不保護任務,即使單一的遞增也不是安全的。
前面的範例展示了使用執行緒時的一個基本問題∶你永遠都不知道一個執行緒何時在執行。想象一下,你坐在桌邊手拿叉子,正要去叉盤子中的最後一片食物,當你的叉子就要夠著它時,這片食物突然消失了,因為你的執行緒被掛起了,而另一個餐者進入並吃掉了它。這正是在你編寫並行程式時需要處理的問題。對於並行工作,你需
某種方式來防止兩個任務存取相同的資源,至少在關鍵階段不能出現這種情況。
防止這種衝突的方法就是當資源被一個任務使用時,在其上加鎖。第一個存取某項資源的任務必須鎖定這項資源,使其他任務在其被解鎖之前,就無法存取它了,而在其被解鎖之時,另一個任務就可以鎖定並使用它,以此類推。
基本上所有的並行模式在解決執行緒衝突問題的時候,都是採用序列化存取共用資源的方案。這意味著在給定時刻只允許一個任務存取共用資源。通常這是通過在程式碼前面加上一條鎖語句來實現的,這就使得在一段時間內只有一個任務可以執行這段程式碼。因為鎖語句產生了一種互相排斥的效果,所以這種機制常常稱為互斥量(mutex)。
考慮一下屋子裡的浴室多個人(即多個由執行緒驅動的任務)都希望能單獨使用浴室(即共用資源)。為了使用浴室,一個人先敲門,看看是否能使用。如果沒人的話,他就進入浴室並鎖上門。這時其他人要使用浴室的話,就會被「阻擋」,所以他們要在浴室門口等待,直到浴室可以使用。當浴室使用完畢,就該把浴室給其他人使用了(別的任務就可以存取資源了),這個比喻就有點不太準確了。事實上,人們並沒有排隊,我們也不能確定誰將是下一個使用浴室的人,因為執行緒排程機制並不是確定性的。實際情況是∶等待使用浴室的人們簇擁在浴室門口,當鎖住浴室門的那個人開啟鎖準備離開的時候,離門最近的那個人可能進入浴室。如前所述,可以通過yield()和setPriority()來給執行緒排程器提供建議,但這些建議未必會有多大效果,這取決於你的具體平臺和JVM實現。
Java以提供關鍵字synchronized的形式,為防止資源衝突提供了內建支援。當任務要執行被synchronized關鍵字保護的程式碼片段的時候,它將檢查鎖是否可用,然後獲取鎖,執行程式碼,釋放鎖。共用資源一般是以物件形式存在的記憶體片段,但也可以是檔案、輸入/輸出埠,或者是印表機。要控制對共用資源的存取,得先把它包裝進一個物件。然後把所有要存取這個資源的方法標記為synchronized。如果某個任務處於一個對標記為synchronized的方法的呼叫中,那麼在這個執行緒從該方法返回之前,其他所有要呼叫類中任何標記為synchronized方法的執行緒都會被阻塞。
在生成偶數的程式碼中,你已經看到了,你應該將類的資料成員都宣告為private的,而且只能通過方法來存取這些資料;所以可以把方法標記為synchronized來防止資源衝突。下面是宣告synchronized方法的方式∶
synchronized void f() { /* ... */ }
synchronized void g() { /* ... */ }
所有物件都自動含有單一的鎖(也稱為監視器)。當在物件上呼叫其任意synchronized方法的時候,此物件都被加鎖,這時該物件上的其他synchronized方法只有等到前一個方法呼叫完畢並釋放了鎖之後才能被呼叫。對於前面的方法,如果某個任務對物件呼叫了f(),對於同一個物件而言,就只能等到f()呼叫結束並釋放了鎖之後,其他任務才能呼叫f()和g()。所以,對於某個特定物件來說,其所有synchronized方法共用同一個鎖,這可以被用來防止多個任務同時存取被編碼為物件記憶體。
注意,在使用並行時,將域設定為private是非常重要的,否則,synchronized關鍵字就不能防止其他任務直接存取域,這樣就會產生衝突。
一個任務可以多次獲得物件的鎖。如果一個方法在同一個物件上呼叫了第二個方法,後者又呼叫了同一物件上的另一個方法,就會發生這種情況。JVM負責跟蹤物件被加鎖的次數。如果一個物件被解鎖(即鎖被完全釋放),其計數變為0。在任務第一次給物件加鎖的時候,計數變為1。每當這個相同的任務在這個物件上獲得鎖時,計數都會遞增。顯然,只有首先獲得了鎖的任務才能允許繼續獲取多個鎖。每當任務離開一個synchronized方法,計數遞減,當計數為零的時候,鎖被完全釋放,此時別的任務就可以使用此資源。
針對每個類,也有一個鎖(作為類的Class物件的一部分);所以synchronized static方法可以在類的範圍內防止對static資料的並行存取。
同步控制EventGenerator,通過在EventGenerator.java中加入synchronized 關鍵字,可以防止不希望的執行緒存取:
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月04日 10:49
*
* 同步控制EventGenerator
*/
public class SyncEventGenerator extends IntGenerator {
private int currentCountVal = 0;
@Override
public synchronized int next() {
++currentCountVal;
Thread.yield();
++currentCountVal;
return currentCountVal;
}
public static void main(String[] args) {
EventChecker.test(new SyncEventGenerator());
}
}
對Thread.yield()的呼叫被插入到了兩個遞增操作之間,以提高在currentEvenValue是奇數狀態時上下文切換的可能性。因為互斥可以防止多個任務同時進入臨界區,所以這不會產生任何失敗。但是如果失敗將會發生,呼叫yield()是一種促使其發生的有效方式。
第一個進入next()的任務將獲得鎖,任何其他試圖獲取鎖的任務都將從其開始嘗試之時被阻塞,直至第一個任務釋放鎖。通過這種方式,任何時刻只有一個任務可以通過由互斥量看護的程式碼。
Java SE5的java.util.concurrent類庫還包含有定義在java.util.concurrent.locks中的顯式的互斥機制。Lock物件必須被顯式地建立、鎖定和釋放。因此,它與內建的鎖形式相比,程式碼缺乏優雅性。但是,對於解決某些型別的問題來說,它更加靈活。下面用顯式的Lock重寫的是SyncEventGenerator.java:
package concurrency;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Mr.Sun
* @date 2022年09月04日 11:13
*
* 使用顯示的Lock物件重寫SyncEventGenerator
*/
public class MutexEventGenerator extends IntGenerator{
private int currentCountVal = 0;
private Lock lock = new ReentrantLock();
@Override
public int next() {
lock.lock();
try {
++currentCountVal;
Thread.yield();
++currentCountVal;
return currentCountVal;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
EventChecker.test(new MutexEventGenerator());
}
}
MutexEvenGenerator新增了一個被互斥呼叫的鎖,並使用lock()和unlock()方法在next()內部建立了臨界資源。當你在使用Lock物件時,將這裡所示的慣用法內部化是很重要的:緊接著的對lock()的呼叫,你必須放置在finally子句中帶有unlock()的try-finally語句中。注意,return 語句必須在try子句中出現,以確保unlock()不會過早發生,從而將資料暴露給了第二個任務。
儘管try-finally所需的程式碼比synchronized關鍵字要多,但是這也代表了顯式的Lock物件的優點之一。如果在使用synchronized關鍵字時,某些事物失敗了,那麼就會丟擲一個異常。但是你沒有機會去做任何清理工作,以維護系統使其處於良好狀態。有了顯式的Lock物件,你就可以使用finally子句將系統維護在正確的狀態了。
大體上,當你使用synchronized關鍵字時,需要寫的程式碼量更少,並且使用者錯誤出現的可能性也會降低,因此通常只有在解決特殊問題時,才使用顯式的Lock物件。例如,用synchronized 關鍵字不能嘗試著獲取鎖且最終獲取鎖會失敗,或者嘗試著獲取鎖一段時間,然後放棄它,要實現這些,你必須使用concurrent類庫∶
package concurrency;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Mr.Sun
* @date 2022年09月04日 11:20
*
* 演示使用Lock物件嘗試獲取鎖
*/
public class AttemptLocking {
private ReentrantLock lock = new ReentrantLock();
public void unTimed () {
boolean captured = lock.tryLock();
try {
System.out.println("tryLock(): " + captured);
} finally {
// 獲取到鎖才能執行釋放鎖操作,否則會出現IllegalMonitorStateException異常
if (captured) {
lock.unlock();
}
}
}
public void timed () {
boolean captured = false;
try {
captured = lock.tryLock(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
System.out.println("lock.tryLock(2, TimeUnit.MINUTES): " + captured);
} finally {
if (captured) {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
final AttemptLocking al = new AttemptLocking();
al.unTimed(); // True -- 鎖是可用的
al.timed(); // True -- 鎖是可用的
// 現在建立一個單獨的任務來獲取鎖
new Thread() {
{setDaemon(true);}
@Override
public void run() {
al.lock.lock();
System.out.println("acquired");
}
}.start();
// 給第二個任務一個機會
Thread.sleep(100);
al.unTimed(); // false -- lock grabbed by task
al.timed(); // false -- lock grabbed by task
}
} /* Output:
tryLock(): true
lock.tryLock(2, TimeUnit.MINUTES): true
acquired
tryLock(): false
lock.tryLock(2, TimeUnit.MINUTES): false
*///:~
ReentrantLock允許你嘗試著獲取但最終未獲取鎖,這樣如果其他人已經獲取了這個鎖,那你就可以決定離開去執行其他一些事情,而不是等待直至這個鎖被釋放,就像在untimed()方法中所看到的。在timed()中,做出了嘗試去獲取鎖,該嘗試可以在2秒之後失敗(注意,使用了Java SE5的TimeUnit類來指定時間單位)。在main()中,作為匿名類而建立了一個單獨的Thread,它將獲取鎖,這使得untimed()和timed()方法對某些事物將產生競爭。
顯式的Lock物件在加鎖和釋放鎖方面,相對於內建的synchronized鎖來說,還賦予了你更細粒度的控制力。這對於實現專有同步結構是很有用的,例如用於遍歷連結列表中的節點的節節傳遞的加鎖機制(也稱為鎖耦合),這種遍歷程式碼必須在釋放當前節點的鎖之前捕獲下一個節點的鎖。
volatile關鍵字確保了應用中的可視性。如果你將一個域宣告為volatile的,那麼只要對這個域產生了寫操作,那麼所有的讀操作就都可以看到這個修改。即便使用了本地快取,情況也確實如此,volatile域會立即被寫入到主記憶體中,而讀取操作就發生在主記憶體中。
理解原子性和易變性是不同的概念這一點很重要。在非volatile域上的原子操作不必重新整理到主記憶體中去,因此其他讀取該域的任務也不必看到這個新值。如果多個任務在同時存取某個域,那麼這個域就應該是volatile的,否則,這個域就應該只能經由同步來存取。同步也會導致向主記憶體中重新整理,因此如果一個域完全由synchronized方法或語句塊來防護,那就不必將其設定為是volatile的。
使用volatile而不是synchronized的唯一安全的情況是類中只有一個可變的域。再次提醒,你的第一選擇應該是使用synchronized關鍵字,這是最安全的方式,而嘗試其他任何方式都是有風險的。
Java SE5引入了諸如AtomicInteger、AtomicLong、AtomicReference等特殊的原子性變數類,它們提供下面形式的原子性條件更新操作∶
boolean compareAndSet(expectedValue, updateValue);
這些類被調整為可以使用在某些現代處理器上的可獲得的,並且是在機器級別上的原子性,因此在使用它們時,通常不需要擔心。對於常規程式設計來說,它們很少會派上用場,但是在涉及效能調優時,它們就大有用武之地了。例如,我們可以使用AtomicInteger來重寫MutexEventGenerator.java∶
package concurrency;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Mr.Sun
* @date 2022年09月04日 15:17
*
* 使用AtomicInteger來重寫MutexEventGenerator
*/
public class AtomicEventGenerator extends IntGenerator {
private AtomicInteger currentEventVal = new AtomicInteger(0);
@Override
public int next() {
return currentEventVal.addAndGet(2);
}
public static void main(String[] args) {
EventChecker.test(new AtomicEventGenerator());
}
}
所有其他形式的同步通過使用AtomicInteger得到了根除。
應該強調的是,Atomic類被設計用來構建javautil.concurrent中的類,因此只有在特殊情況下才在自己的程式碼中使用它們,即便使用了也需要確保不存在其他可能出現的問題。通常依賴於鎖要更安全一些(要麼是synchronized關鍵字,要麼是顯式的Lock物件)。
有時,你只是希望防止多個執行緒同時存取方法內部的部分程式碼而不是防止存取整個方法。通過這種方式分離出來的程式碼段被稱為臨界區(critical section),它也使用synchronized關鍵字建立。這裡,synchronized被用來指定某個物件,此物件的鎖被用來對花括號內的程式碼進行同步控制∶
synchronized(syncObject) {
// This code can be accessed
// by only one task at a time 1
}
這也被稱為同步拉制塊;在進入此段程式碼前,必須得到syncObject物件的鎖。如果其他執行緒已經得到這個鎖,那麼就得等到鎖被釋放以後,才能進入臨界區。
通過使用同步控制塊,而不是對整個方法進行同步控制,可以使多個任務存取物件的時間效能得到顯著提高,下面的例子比較了這兩種同步控制方法。此外,它也演示瞭如何把一個非保護型別的類,在其他類的保護和控制之下,應用於多執行緒的環境∶
package concurrency;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Mr.Sun
* @date 2022年09月04日 15:27
*
* 臨界區 測試
* <p>
* 演示瞭如何把一個非執行緒安全的類,在其他類的保護和控制之下,應用於多執行緒的環境
* </p>
*/
public class CriticalSelection {
public static void main(String[] args) {
PairManager pman1 = new PairManager1(),
pman2 = new PairManager2();
testApproaches(pman1, pman2);
}
// 測試兩種不同的方法
static void testApproaches(PairManager pman1, PairManager pman2) {
ExecutorService exec = Executors.newCachedThreadPool();
PairManipulator
pm1 = new PairManipulator(pman1),
pm2 = new PairManipulator(pman2);
PairChecker
pcheck1 = new PairChecker(pman1),
pcheck2 = new PairChecker(pman2);
exec.execute(pm1);
exec.execute(pm2);
exec.execute(pcheck1);
exec.execute(pcheck2);
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch(InterruptedException e) {
System.out.println("Sleep interrupted");
} finally {
exec.shutdown();
}
System.out.println("pm1: " + pm1 + "\npm2: " + pm2);
System.exit(0);
}
}
class PairChecker implements Runnable {
private PairManager pm;
public PairChecker(PairManager pm) {
this.pm = pm;
}
@Override
public void run() {
while(true) {
pm.checkCounter.incrementAndGet();
pm.getPair().checkState();
}
}
}
class PairManipulator implements Runnable {
private PairManager pm;
public PairManipulator(PairManager pm) {
this.pm = pm;
}
@Override
public void run() {
while(true) {
pm.increment();
}
}
public String toString() {
return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCounter.get();
}
}
// 執行緒安全的Pair
abstract class PairManager {
AtomicInteger checkCounter = new AtomicInteger(0);
protected Pair p = new Pair();
private List<Pair> storage = Collections.synchronizedList(new ArrayList<>());
public synchronized Pair getPair() {
// 複製一份以確保原件的安全:
return new Pair(p.getX(), p.getY());
}
// 假設這是一個耗時的操作
protected void store(Pair p) {
storage.add(p);
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException ignore) {}
}
public abstract void increment();
}
// 同步整個方法
class PairManager1 extends PairManager {
@Override
public synchronized void increment() {
p.incrementX();
p.incrementY();
store(getPair());
}
}
// 同步臨界區程式碼塊
class PairManager2 extends PairManager {
@Override
public void increment() {
Pair temp;
synchronized(this) {
p.incrementX();
p.incrementY();
temp = getPair();
}
store(temp);
}
}
// 非執行緒安全
class Pair {
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair() {
this(0, 0);
}
public int getX() {
return x;
}
public int getY() {
return y;
}
public void incrementX () {
// 自增操作不是執行緒安全的
x++;
}
public void incrementY () {
// 自增操作不是執行緒安全的
y++;
}
@Override
public String toString() {
return "x: " + x + ", y: " + y ;
}
public class PairValueNotEqualException extends RuntimeException {
public PairValueNotEqualException() {
super("一對不相等的值:" + Pair.this);
}
}
// 任意不變數 —— 兩個變數必須相等
public void checkState() {
if(x != y) {
throw new PairValueNotEqualException();
}
}
} /* Output: (Sample)
pm1: Pair: x: 15, y: 15 checkCounter = 272565
pm2: Pair: x: 16, y: 16 checkCounter = 3956974
*///:~
正如註釋中註明的,Pair不是執行緒安全的,因為它的約束條件(雖然是任意的)需要兩個變數要維護成相同的值。此外,如本章前面所述,自增加操作不是執行緒安全的,並且因為Pair類沒有任何方法被標記為synchronized,所以不能保證一個Pair物件在多執行緒程式中不會被破壞。
你可以想象一下這種情況∶某人交給你一個非執行緒安全的Pair類,而你需要在一個執行緒環境中使用它。通過建立PairManager類就可以實現這一點,PairManager類持有一個Pair物件並控制對它的一切存取。注意唯一的public方法是getPair(),它是synchronized的。對於抽象方法increment(),對increment()的同步控制將在實現的時候進行處理。
至於PairManager類的結構,它的一些功能在基礎類別中實現,並且其一個或多個抽象方法在派生類中定義,這種結構在設計模式中稱為模板方法。設計模式使你得以把變化封裝在程式碼裡;在此,發生變化的部分是模板方法increment()。在PairManager1中,整個increment()方法是被同步控制的;但在PairManager2中,increment()方法使用同步控制塊進行同步。注意,synchronized關鍵字不屬於方法特徵簽名的組成部分,所以可以在覆蓋方法的時候加上去。
store()方法將一個Pair物件新增到了synchronized ArrayList中,所以這個操作是執行緒安全的。因此,該方法不必進行防護,可以放置在PairManager2的synchronized語句塊的外部。
PairManipulator被建立用來測試兩種不同型別的PairManager,其方法是在某個任務中呼叫increment(),而PairChecker則在另一個任務中執行。為了跟蹤可以執行測試的頻度,PairChecker在每次成功時都遞增checkCounter。在main()中建立了兩個PairManipulator物件,並允許它們執行一段時間,之後每個PairManipulator的結果會得到展示。
儘管每次執行的結果可能會非常不同,但一般來說,對於PairChecker的檢查頻率,PairManagerl.increment()不允許有PairManager2.increment()那樣多。後者採用同步控制塊進行同步,所以物件不加鎖的時間更長。這也是寧願使用同步控制塊而不是對整個方法進行同步控制的典型原因∶使得其他執行緒能更多地存取(在安全的情況下儘可能多)。
synchronized塊必須給定一個在其上進行同步的物件,並且最合理的方式是,使用其方法正在被呼叫的當前物件∶synchronized(this),這正是PairManager2所使用的方式。在這種方式中,如果獲得了synchronized塊上的鎖,那麼該物件其他的synchronized方法和臨界區就不能被呼叫了。因此,如果在this上同步,臨界區的效果就會直接縮小在同步的範圍內。
有時必須在另一個物件上同步,但是如果你要這麼做,就必須確保所有相關的任務都是在同一個物件上同步的。下面的範例演示了兩個任務可以同時進入同一個物件,只要這個物件上的方法是在不同的鎖上同步的即可∶
package concurrency;
import java.util.Objects;
/**
* @author Mr.Sun
* @date 2022年09月04日 16:19
*
* 在其它物件上同步
*/
public class SyncObject {
public static void main(String[] args) {
final DualSync ds = new DualSync();
new Thread() {
@Override
public void run() {
ds.f();
}
}.start();
ds.g();
}
}
class DualSync {
private Object obj = new Object();
public synchronized void f() {
for (int i = 0; i < 5; i++) {
System.out.println("f()");
Thread.yield();
}
}
public void g() {
synchronized (obj) {
for (int i = 0; i < 5; i++) {
System.out.println("g()");
Thread.yield();
}
}
}
} /* Output: (Sample)
g()
g()
f()
g()
f()
g()
f()
g()
f()
f()
*///:~
DualSync.f()(通過同步整個方法)在this同步,而g()有一個在syncObject上同步的synchronized塊。因此,這兩個同步是互相獨立的。通過在main()中建立呼叫f()的Thread對這一點進行了演示,因為main()執行緒是被用來呼叫g()的。從輸出中可以看到,這兩個方式在同時執行,因此任何一個方法都沒有因為對另一個方法的同步而被阻塞。
防止任務在共用資源上產生衝突的第二種方式是根除對變數的共用。執行緒本地儲存是一種自動化機制,可以為使用相同變數的每個不同的執行緒都建立不同的儲存。因此,如果你有5個執行緒都要使用變數x所表示的物件,那執行緒本地儲存就會生成5個用於x的不同的儲存塊。主要是,它們使得你可以將狀態與執行緒關聯起來。
建立和管理執行緒本地儲存可以由java.lang.ThreadLocal類來實現,如下所示∶
package concurrency;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月04日 16:29
*
* 建立和管理執行緒本地儲存可以使用ThreadLocal來實現
*/
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
private Random random = new Random(47);
@Override
protected Integer initialValue() {
return random.nextInt(10000);
}
};
public static void increment () {
value.set(value.get() + 1);
}
public static int get() {
return value.get();
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Accessor(i));
}
TimeUnit.MILLISECONDS.sleep(1000);
exec.shutdown();
System.exit(0);
}
}
class Accessor implements Runnable {
private final int id;
public Accessor(int id) {
this.id = id;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
@Override
public String toString() {
return "#" + id + ": " + ThreadLocalVariableHolder.get();
}
} /* Output: (Sample)
#0: 9259
#1: 556
#2: 6694
#3: 1862
#4: 962
#0: 9260
#1: 557
#2: 6695
#3: 1863
#4: 963
...
*///:~
ThreadLocal物件通常當作靜態域儲存。在建立ThreadLocal時,你只能通過get()和set()方法來存取該物件的內容,其中,get()方法將返回與其執行緒相關聯的物件的副本,而set()會將引數插入到為其執行緒儲存的物件中,並返回儲存中原有的物件。increment()和get()方法在ThreadLocalVariableHolder中演示了這一點。注意,increment()和get()方法都不是synchronized 的,因為ThreadLocal保證不會出現競爭條件。
當執行這個程式時,你可以看到每個單獨的執行緒都被分配了自己的儲存,因為它們每個都需要跟蹤自己的計數值,即便只有一個ThreadLocalVariableHolder物件。
在某些情況下,任務必須突然地終止,首先,讓我們觀察一個範例,它不僅演示了終止問題,而且還是一個資源共用的範例:
在這個模擬程式中,花園委員會希望瞭解每天通過多個大門進入公園的總人數。每個大門都有一個十字轉門或者某種其他形式的計數器,並且任何一個十字轉門的計數值遞增時,就表示公園中的總人數的共用計數值也會遞增。
package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月04日 20:11
*
* 裝飾花園:演示終結任務
* <p>
* 在這個模擬程式中,花園委員會希望瞭解每天通過多個大門進入公園的總人數。
* 每個大門都有一個十字轉門或者某種其他形式的計數器,並且任何一個十字轉門的計數值遞增時,就表示公園中的總人數的共用計數值也會遞增。
* </p>
*/
public class OrnamentalGarden {
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Entrance(i));
}
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
exec.shutdown();
if (!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) {
System.out.println("某些任務未被終止!");
}
System.out.println("Total: " + Entrance.getTotalCount());
System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
}
}
class Count {
private int count = 0;
private Random ran = new Random(47);
// 刪除synchronized關鍵字以檢視計數失敗
public synchronized int increment() {
int temp = count;
if (ran.nextBoolean()) {
Thread.yield();
}
return count = ++temp;
}
public synchronized int value () {
return count;
}
}
class Entrance implements Runnable {
private static Count count = new Count();
private static List<Entrance> entranceList = new ArrayList<>();
private int number = 0;
private final int id;
private static volatile boolean canceled = false;
/**
* 原子操作
*/
public static void cancel() {
canceled = true;
}
public Entrance(int id) {
this.id = id;
// 將此任務保留在列表中, 也防止無用任務被垃圾回收
entranceList.add(this);
}
@Override
public void run() {
while (! canceled) {
synchronized (this) {
++ number;
}
System.out.println(this + " Total: " + count.increment());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
System.out.println("sleep interrupted");
}
}
System.out.println("Stopping " + this);
}
public synchronized int getValue() { return number; }
@Override
public String toString() {
return "Entrance " + id + ": " + getValue();
}
public static int getTotalCount() {
return count.value();
}
public static int sumEntrances() {
int sum = 0;
for(Entrance entrance : entranceList)
sum += entrance.getValue();
return sum;
}
} /* Output: (Sample)
Entrance 0: 1 Total: 1
Entrance 2: 1 Total: 3
Entrance 1: 1 Total: 2
Entrance 4: 1 Total: 5
Entrance 3: 1 Total: 4
Entrance 2: 2 Total: 6
Entrance 4: 2 Total: 7
Entrance 0: 2 Total: 8
...
Entrance 3: 29 Total: 143
Entrance 0: 29 Total: 144
Entrance 4: 29 Total: 145
Entrance 2: 30 Total: 147
Entrance 1: 30 Total: 146
Entrance 0: 30 Total: 149
Entrance 3: 30 Total: 148
Entrance 4: 30 Total: 150
Stopping Entrance 2: 30
Stopping Entrance 1: 30
Stopping Entrance 0: 30
Stopping Entrance 3: 30
Stopping Entrance 4: 30
Total: 150
Sum of Entrances: 150
*///:~
這裡使用單個的Count物件來跟蹤花園參觀者的主計數值,並且將其當作Entrance類中的一個靜態域進行儲存。Count.increment()和Count.value()都是synchronized的,用來控制對count域的存取。increment()方法使用了Random物件,目的是在從把count讀取到temp中,到遞增temp並將其儲存回count的這段時間裡,有大約一半的時間產生讓步。如果你將increment()上的synchronized關鍵字註釋掉,那麼這個程式就會崩潰,因為多個任務將同時存取並修改count(yield()會使問題更快地發生)。
每個Entrance任務都維護著一個本地值number,它包含通過某個特定入口進入的參觀者的數量。這提供了對count物件的雙重檢査,以確保其記錄的參觀者數量是正確的。Entrance.run()只是遞增number和count物件,然後休眠100毫秒。
因為Entrance.canceled是一個volatile布林標誌,而它只會被讀取和賦值(不會與其他域組合在一起被讀取),所以不需要同步對其的存取,就可以安全地操作它。如果你對諸如此類的情況有任何疑慮,那麼最好總是使用synchronized。這個程式在以穩定的方式關閉所有事物方面還有一些小麻煩,其部分原因是為了說明在終止多執行緒程式時你必須相當小心,而另一部分原因是為了演示interrupt()的值,稍後你將學習有關這個值的知識。
在3秒鐘之後,main()向Entrance傳送static cancel()訊息,然後呼叫exec物件的shutdown。方法,之後呼叫exec上的awaitTermination()方法。ExecutorService.awaitTermination()等待每個任務結束,如果所有的任務在超時時間達到之前全部結束,則返回true,否則返回false,表示不是所有的任務都已經結束了。儘管這會導致每個任務都退出其run()方法,並因此作為任務而終止,但是Entrance物件仍舊是有效的,因為在構造器中,每個Entrance物件都儲存在稱為entranceList的靜態List
當這個程式執行時,你將看到,在人們通過十字轉門時,將顯示總人數和通過每個入口的人數。如果移除Count.increment()上面的synchronized宣告,你將會注意到總人數與你的期望有差異,每個十字轉門統計的人數將與count中的值不同。只要用互斥來同步對Count的存取,問題就可以解決了。請記住,Count.increment()通過使用temp和yield(),增加了失敗的可能性。在真正的執行緒問題中,失敗的可能性從統計學角度看可能非常小,因此你可能很容易就掉進了輕信所有事物都將正確工作的陷阱裡。就像在上面的範例中,有些還未發生的問題就有可能會隱藏起來,因此在複審並行程式碼時,要格外地仔細。
前面範例中的Entrance.run()在其迴圈中包含對sleep()的呼叫。我們知道,sleep()最終將喚醒,而任務也將返回迴圈的開始部分,去檢查canceled標誌,從而決定是否跳出迴圈。但是,sleep()一種情況,它使任務從執行狀態變為被阻塞狀態,而有時你必須終止被阻塞的任務。
一個執行緒可以處於以下四種狀態之一:
1)新建(new)∶當執行緒被建立時,它只會短暫地處於這種狀態。此時它已經分配了必需的系統資源,並執行了初始化。此刻執行緒已經有資格獲得CPU時間了,之後排程器將把這個執行緒轉變為可執行狀態或阻塞狀態。
2)就緒(Runnable)∶在這種狀態下,只要排程器把時間片分配給執行緒,執行緒就可以執行。也就是說,在任意時刻,執行緒可以執行也可以不執行。只要排程器能分配時間片給執行緒,它就可以執行;這不同於死亡和阻塞狀態。
3)阻塞(Blocked)∶執行緒能夠執行,但有某個條件阻止它的執行。當執行緒處於阻塞狀態時,排程器將忽略執行緒,不會分配給執行緒任何CPU時間。直到執行緒重新進入了就緒狀態,它才有可能執行操作。
4)死亡(Dead)∶處於死亡或終止狀態的執行緒將不再是可排程的,並且再也不會得到CPU時間,它的任務已結束,或不再是可執行的。任務死亡的通常方式是從run()方法返回,但是任務的執行緒還可以被中斷,你將要看到這一點。
一個任務進入阻塞狀態,可能有如下原因∶
1)通過呼叫sleep(milliseconds)使任務進入休眠狀態,在這種情況下,任務在指定的時間內不會執行。
2)你通過呼叫wait()使執行緒掛起。直到執行緒得到了notify()或notifyAll()訊息(或者在Java SE5的java.util.concurrent類庫中等價的signal()或signalAll()訊息),執行緒才會進入就緒狀態。我們將在稍後的小節中驗證這一點。
3)任務在等待某個輸入/輸出完成。
4)任務試圖在某個物件上呼叫其同步控制方法,但是物件鎖不可用,因為另一個任務已經獲取了這個鎖。
Thread類包含interrupt()方法,因此你可以終止被阻塞的任務,這個方法將設定執行緒的中斷狀態。如果一個執行緒已經被阻塞,或者試圖執行一個阻塞操作,那麼設定這個執行緒的中斷狀態將丟擲InterruptedException。當丟擲該異常或者該任務呼叫Thread.interrupted()時,中斷狀態將被複位。正如你將看到的,Thread.interrupted()提供了離開run()迴圈而不丟擲異常的第二種方式。
為了呼叫interrupt(),你必須持有Thread物件。你可能已經注意到了,新的concurrent類庫似乎在避免對Thread物件的直接操作,轉而儘量通過Executor來執行所有操作。如果你在Executor上呼叫shutdownNow(),那麼它將傳送一個interrupt()呼叫給它啟動的所有執行緒。這麼做是有意義的,因為當你完成工程中的某個部分或者整個程式時,通常會希望同時關閉某個特定Executor的所有任務。然而,你有時也會希望只中斷某個單一任務。如果使用Executor,那麼通過呼叫submit()而不是executor()來啟動任務,就可以持有該任務的上下文。submit()將返回一個泛型Future<?>,其中有一個未修飾的引數,因為你永遠都不會在其上呼叫get()——持有這種Future的關鍵在於你可以在其上呼叫cancel(),並因此可以使用它來中斷某個特定任務。如果你將true傳遞給cancel(),那麼它就會擁有在該執行緒上呼叫interrupt()以停止這個執行緒的許可權。因此,cancel()是一種中斷由Executor啟動的單個執行緒的方式。
package concurrency;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月05日 10:33
*
* 演示interrupt()的使用
*/
public class Interrupting {
private static ExecutorService exex = Executors.newCachedThreadPool();
static void test(Runnable r) throws InterruptedException {
Future<?> future = exex.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Interrupting " + r.getClass().getName());
future.cancel(true);
System.out.println("Interrupting sent to " + r.getClass().getName());
}
public static void main(String[] args) throws Exception {
test(new SleepBlocked());
test(new IOBlocked(System.in));
test(new SynchronizedBlocked());
TimeUnit.SECONDS.sleep(3);
System.out.println("Aborting with System.exit(0)");
System.exit(0);
}
}
class SleepBlocked implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
System.out.println("InterruptedException");
}
System.out.println("Exiting SleepBlocked.run()");
}
}
class IOBlocked implements Runnable {
private InputStream in;
public IOBlocked(InputStream in) {
this.in = in;
}
@Override
public void run() {
System.out.println("Waiting for read(): ");
try {
in.read();
} catch (IOException e) {
System.out.println("Interrupted from blocked I/O");
}
System.out.println("Exiting IOBlocked.run()");
}
}
class SynchronizedBlocked implements Runnable {
public synchronized void f() {
// 永不釋放鎖,一直佔有
while (true) {
// Thread.yield();
}
}
public SynchronizedBlocked() {
new Thread() {
@Override
public void run() {
f(); // 鎖會一直被當前執行緒佔用
}
}.start();
}
@Override
public void run() {
System.out.println("Trying to call f()");
f();
System.out.println("Exiting SynchronizedBlocked.run()");
}
} /* Output: (95% match)
Interrupting concurrency.SleepBlocked
InterruptedException
Exiting SleepBlocked.run()
Interrupting sent to concurrency.SleepBlocked
Waiting for read():
Interrupting concurrency.IOBlocked
Interrupting sent to concurrency.IOBlocked
Trying to call f()
Interrupting concurrency.SynchronizedBlocked
Interrupting sent to concurrency.SynchronizedBlocked
Aborting with System.exit(0)
*///:~
上面的每個任務都表示了一種不同型別的阻塞。SleepBlock是可中斷的阻塞範例,而IOBlocked和SynchronizedBlocked是不可中斷的阻塞範例。這個程式證明I/O和在synchronized 塊上的等待是不可中斷的,但是通過瀏覽程式碼,你也可以預見到這一點——無論是I/O還是嘗試呼叫synchronized方法,都不需要任何InterruptedException處理器。
前兩個類很簡單直觀∶在第一個類中run()方法呼叫了sleep(),而在第二個類中呼叫了read()。但是,為了演示SynchronizedBlock,我們必須首先獲取鎖。這是通過在構造器中建立匿名的Thread類的範例來實現的,這個匿名Thread類的物件通過呼叫f()獲取了物件鎖(這個執行緒必須有別於為SynchronizedBlock驅動run()的執行緒,因為一個執行緒可以多次獲得某個物件鎖)。由於f()永遠都不返回,因此這個鎖永遠不會釋放,而SynchronizedBlock.run()在試圖呼叫f(),並阻塞以等待這個鎖被釋放。
從輸出中可以看到,你能夠中斷對sleep()的呼叫(或者任何要求丟擲InterruptedException 的呼叫)。但是,你不能中斷正在試圖獲取synchronized鎖或者試圖執行I/O操作的執行緒。這有點令人煩惱,特別是在建立執行I/O的任務時,因為這意味著I/O具有鎖住你的多執行緒程式的潛在可能。特別是對於基於Web的程式,這更是關乎利害。
對於這類問題,有一個略顯笨拙但是有時確實行之有效的解決方案,即關閉任務在其上發生阻塞的底層資源∶
package concurrency;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月05日 11:01
*
* 中斷I/O操作的執行緒:關閉任務在其上發生阻塞的底層資源
*/
public class CloseResource {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
InputStream in = new Socket("localhost", 8080).getInputStream();
exec.execute(new IOBlocked(in));
exec.execute(new IOBlocked(System.in));
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Shutting down all threads");
exec.shutdownNow();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + in.getClass().getName());
in.close();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + System.in.getClass().getName());
System.in.close();
}
} /* Output: (85% match)
Waiting for read():
Waiting for read():
Shutting down all threads
Closing java.net.SocketInputStream
Interrupted from blocked I/O
Exiting IOBlocked.run()
Closing java.io.BufferedInputStream
Exiting IOBlocked.run()
*///:~
有兩種形式的wait()。第一種版本接受毫秒數作為引數,含義與sleep()方法裡引數的意思相同,都是指「在此期間暫停」。但是與sleep()不同的是,對於wait()而言∶
1)在wait()期間物件鎖是釋放的。而呼叫sleep()的時候鎖並沒有被釋放,呼叫yield()也屬於這種情況。
2)可以通過notify()、notifyAll(),或者令時間到期,從wait()中恢復執行。
第二種,也是更常用形式的wait()不接受任何引數。這種wait()將無限等待下去,直到執行緒接收到notify()或者notifyAll()訊息。
wait()、notify()以及notifyAll()有一個比較特殊的方面,那就是這些方法是基礎類別Object的一部分,而不是屬於Thread的一部分。儘管開始看起來有點奇怪——僅僅針對執行緒的功能卻作為通用基礎類別的一部分而實現,不過這是有道理的,因為這些方法操作的鎖也是所有物件的一部分。所以,你可以把wait()放進任何同步控制方法裡,而不用考慮這個類是繼承自Thread還是實現了Runnable介面。實際上,只能在同步控制方法或同步控制塊裡呼叫wait()、notify()和notifyAll() (因為不用操作鎖,所以sleep()可以在非同步控制方法裡呼叫)。如果在非同步控制方法裡呼叫這些方法,程式能通過編譯,但執行的時候,將得到IlegalMonitorStateException異常,並伴隨著一些含糊的訊息,比如「當前執行緒不是擁有者」。訊息的意思是,呼叫wait()、notify()和notifyAll()的任務在呼叫這些方法前必須"擁有"(獲取)物件的鎖。
可以讓另一個物件執行某種操作以維護其自己的鎖。要這麼做的話,必須首先得到物件的鎖。比如,如果要向物件x傳送notifyAll(),那麼就必須在能夠取得x的鎖的同步控制塊中這麼做∶
synchronized(x) {
x.notifyAll();
}
讓我們看一個簡單的範例,WaxOMatic.java有兩個過程∶一個是將蠟塗到Car上,一個是拋光它。拋光任務在塗蠟任務完成之前,是不能執行其工作的,而塗蠟任務在塗另一層蠟之前,必須等待拋光任務完成。WaxOn和WaxOff都使用了Car物件,該物件在這些任務等待條件變化的時候,使用wait()和notifyAll()來掛起和重新啟動這些任務∶
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月05日 15:25
*/
public class WaxOMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow(); // 中斷所有任務
}
}
class Car {
private boolean waxOn = false;
// 塗蠟
public synchronized void waxed() {
waxOn = true; // 準備拋光
notifyAll();
}
// 拋光
public synchronized void buffed() {
waxOn = false; // 為塗另外一層蠟做準備
notifyAll();
}
// 等待塗蠟
public synchronized void waitForWaxing() throws InterruptedException {
while (!waxOn) {
wait();
}
}
// 等待拋光
public synchronized void waitForBuffing() throws InterruptedException {
while (waxOn) {
wait();
}
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car car) {
this.car = car;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.print("Wax on! ");
TimeUnit.MILLISECONDS.sleep(200);
// 給車塗蠟
car.waxed();
// 塗蠟完成,準備拋光
car.waitForBuffing();
}
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax on task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car car) {
this.car = car;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
car.waitForWaxing();
System.out.print("Wax off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax off task");
}
} /* Output:
Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Exiting via interrupt
Ending Wax off task
Exiting via interrupt
Ending Wax on task
*///:~
這裡,Car有一個單一的布林屬性waxOn,表示塗蠟-拋光處理的狀態。
在waitForWaxing()中將檢查waxOn標誌,如果它為false,那麼這個呼叫任務將通過呼叫wait()而被掛起。這個行為發生在synchronized方法中這一點很重要,因為在這樣的方法中,任務已經獲得了鎖。當你呼叫wait()時,執行緒被掛起,而鎖被釋放。鎖被釋放這一點是本質所在,因為為了安全地改變物件的狀態(例如,將waxOn改變為true,如果被掛起的任務要繼續執行,就必須執行該動作),其他某個任務就必須能夠獲得這個鎖。在本例中,如果另一個任務呼叫waxed()來表示「是時候該乾點什麼了」,那麼就必須獲得這個鎖,從而將waxOn改變為true。之後,waxed()呼叫notifyAll(),這將喚醒在對wait()的呼叫中被掛起的任務。為了使該任務從wait()中喚醒,它必須首先重新獲得當它進入wait()時釋放的鎖。在這個鎖變得可用之前,這個任務是不會被喚醒的。
WaxOn.run()表示給汽車打蠟過程的第一個步驟,因此它將執行它的操作∶呼叫sleep()以模擬需要塗蠟的時間,然後告知汽車塗蠟結束,並呼叫waitForBuffing(),這個方法會用一個wait()呼叫來掛起這個任務,直至WaxOff任務呼叫這輛車的buffed(),.從而改變狀態並呼叫notifyAll() 為止。另一方面,WaxOff.run()立即進入waitForWaxing(),並因此而被掛起,直至WaxOn塗完蠟並且waxed()被呼叫。在執行這個程式時,你可以看到當控制權在兩個任務之間來回互相傳遞時,這個兩步驟過程在不斷地重複。在5秒鐘之後,interrupt()會中止這兩個執行緒;當你呼叫某個ExecutorService的shutdownNow()時,它會呼叫所有由它控制的執行緒的interrupt()。
使用notify()而不是notifyAll()是一種優化。使用notify()時,在眾多等待同一個鎖的任務中只有一個會被喚醒,因此如果你希望使用notify(),就必須保證被喚醒的是恰當的任務。另外,為了使用notify(),所有任務必須等待相同的條件,因為如果你有多個任務在等待不同的條件,那麼你就不會知道是否喚醒了恰當的任務。如果使用notify(),當條件發生變化時,必須只有一個任務能夠從中受益。最後,這些限制對所有可能存在的子類都必須總是起作用的。如果這些規則中有任何一條不滿足,那麼你就必須使用notifyAll()而不是notify()。
在有關Java的執行緒機制的討論中,有一個令人困惑的描述∶notifyAll()將喚醒「所有正在等待的任務」。這是否意味著在程式中任何地方,任何處於wait()狀態中的任務都將被任何對notifyAll()的呼叫喚醒呢?在下面的範例中,與Task2相關的程式碼說明了情況並非如此——事實上,當notifyAll()因某個特定鎖而被呼叫時,只有等待這個鎖的任務才會被喚醒∶
package concurrency;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月05日 16:00
*
* notifyAll()並非喚醒所有「正在等待」的任務,而是隻會喚醒等待 呼叫notifyAll()某個特定鎖的任務
*/
public class NotifyVsNotifyAll {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Task());
}
exec.execute(new Task2());
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod = true;
@Override
public void run() {
if (prod) {
System.out.print("\nnotify() ");
Task.blocker.prod();
prod = false;
} else {
System.out.print("\nnotifyAll() ");
Task.blocker.prodAll();
prod = true;
}
}
}, 400, 400);
TimeUnit.SECONDS.sleep(5);
timer.cancel();
System.out.println("\nTimer canceled");
TimeUnit.MILLISECONDS.sleep(500);
System.out.print("Task2.blocker.prodAll() ");
Task2.blocker.prodAll();
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("\nShutting down");
exec.shutdownNow(); // 中斷所有任務
}
}
class Blocker {
synchronized void waitingCall() {
try {
while (!Thread.interrupted()) {
wait();
System.out.println(Thread.currentThread() + " ");
}
} catch (InterruptedException e) {
}
}
synchronized void prod() {
notify();
}
synchronized void prodAll() {
notifyAll();
}
}
class Task implements Runnable {
static Blocker blocker = new Blocker();
@Override
public void run() {
blocker.waitingCall();
}
}
class Task2 implements Runnable {
// 一個單獨的blocker物件
static Blocker blocker = new Blocker();
@Override
public void run() {
blocker.waitingCall();
}
} /* Output:
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main]
Timer canceled
Task2.blocker.prodAll() Thread[pool-1-thread-6,5,main]
Shutting down
*///:~
Task和Task2每個都有其自己的Blocker物件,因此每個Task物件都會在Task.blocker上阻塞,而每個Task2都會在Task2.blocker上阻塞。在main()中,java.util.Timer物件被設定為每4/10秒執行一次run()方法,而這個run()方法將經由「激勵」方法交替地在Task.blocker上呼叫notify()和notifyAll()。
從輸出中你可以看到,即使存在Task2.blocker上阻塞的Task2物件,也沒有任何在Task.blocker上的notify()或notifyAll()呼叫會導致Task2物件被喚醒。與此類似,在main()的結尾,呼叫了timer的cancel(),即使計時器被複原了,前5個任務也依然在執行,並仍舊在它們對Task.blocker.waitingCall()的呼叫中被阻塞。對Task2.blocker.prodAll()的呼叫所產生的輸出不包括任何在Task.blocker中的鎖上等待的任務。
請考慮這樣一個飯店,它有一個廚師和一個服務員。這個服務員必須等待廚師準備好膳食。當廚師準備好時,他會通知服務員,之後服務員上菜,然後返回繼續等待。這是一個任務共同作業的範例∶廚師代表生產者,而服務員代表消費者。兩個任務必須在膳食被生產和消費時進行握手,而系統必須以有序的方式關閉。下面是對這個敘述建模的程式碼∶
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月05日 16:34
*
* 生產者消費者範例
*/
public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
final WaitPerson waitPerson = new WaitPerson(this);
final Chef chef = new Chef(this);
public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
}
class Meal {
private final int orderNum;
public Meal(int orderNum) {
this.orderNum = orderNum;
}
@Override
public String toString() {
return "Meal " + orderNum;
}
}
// 服務員 - 消費者
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant restaurant) {
this.restaurant = restaurant;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal == null) {
wait(); // 等待廚師製作食物
}
}
System.out.println("Waitperson got " + restaurant.meal);
synchronized(restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll(); // Ready for another
}
}
} catch (InterruptedException e) {
System.out.println("WaitPerson interrupted");
}
}
}
// 廚師 - 生產者
class Chef implements Runnable {
private final Restaurant restaurant;
private int count = 0;
public Chef(Restaurant restaurant) {
this.restaurant = restaurant;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
if (restaurant.meal != null) {
wait(); // 等待被消費
}
}
if (++count == 10) {
System.out.println("Out of food, closing");
restaurant.exec.shutdownNow();
}
System.out.print("order up! ");
synchronized (restaurant.waitPerson) {
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("Chef interrupted");
}
}
} /* Output:
order up! Waitperson got Meal 1
order up! Waitperson got Meal 2
order up! Waitperson got Meal 3
order up! Waitperson got Meal 4
order up! Waitperson got Meal 5
order up! Waitperson got Meal 6
order up! Waitperson got Meal 7
order up! Waitperson got Meal 8
order up! Waitperson got Meal 9
Out of food, closing
WaitPerson interrupted
order up! Chef interrupted
*///:~
Restaurant是WaitPerson和Chef的焦點,他們都必須知道在為哪個Restaurant工作,因為他們必須和這家飯店的「餐窗」打交道,以便放置或拿取膳食restaurant.meal。在run()中,WaitPerson進入wait()模式,停止其任務,直至被Chef的notifyAll()喚醒。由於這是一個非常簡單的程式,因此我們知道只有一個任務將在WaitPerson的鎖上等待∶即WaitPerson任務自身。出於這個原因,理論上可以呼叫notify()而不是notifyAll()。但是,在更復雜的情況下,可能會有多個任務在某個特定物件鎖上等待,因此你不知道哪個任務應該被喚醒。因此,呼叫notifyAll() 要更安全一些,這樣可以喚醒等待這個鎖的所有任務,而每個任務都必須決定這個通知是否與自己相關。
一旦Chef送上Meal並通知WaitPerson,這個Chef就將等待,直至WaitPerson收集到訂單並通知Chef,之後Chef就可以燒下一份Meal了。
wait()和notifyAll()方法以一種非常低階的方式解決了任務互操作問題,即每次互動時都握手。在許多情況下,你可以瞄向更高的抽象級別,使用同步佇列來解決任務共同作業問題,同步佇列在任何時刻都只允許一個任務插入或移除元素。在java.util.concurrent.BlockingQueue介面中提供了這個佇列,這個介面有大量的標準實現。你通常可以使用LinkedBlockingQueue,它是一個無界佇列,還可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限數量的元素。
如果消費者任務試圖從佇列中獲取物件,而該佇列此時為空,那麼這些佇列還可以掛起消費者任務,並且當有更多的元素可用時恢復消費者任務。阻塞佇列可以解決非常大量的問題,而其方式與wait()和notifyAll()相比,則簡單並可靠得多。
考慮下面這個使用BlockingQueue的範例,有一臺機器具有三個任務∶一個製作吐司、一個給吐司抹黃油,另一個在抹過黃油的吐司上塗果醬。我們可以通過各個處理過程之間的BlockingQueue來執行這個吐司製作程式∶
package concurrency.juc;
import container.Stack;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 9:53
*
* 製作吐司 -> 塗黃油 -> 塗果醬 -> 咀嚼消費
*/
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butterQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
// 製作吐司
exec.execute(new Toaster(dryQueue));
// 為製作好的吐司塗上黃油
exec.execute(new Butterer(dryQueue, butterQueue));
// 為塗上黃油的吐司塗抹果醬
exec.execute(new Jammer(butterQueue, finishedQueue));
// 將塗抹好果醬的吐司給使用者咀嚼消費
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
class Toast {
public enum Status {DRY, BUFFERED, JAMMED}
private Status status = Status.DRY;
private final int id;
public Toast(int id) {
this.id = id;
}
// 塗抹黃油
public void butter() {
status = Status.BUFFERED;
}
// 塗果醬
public void jam() {
status = Status.JAMMED;
}
public Status getStatus() {
return status;
}
public int getId() {
return id;
}
@Override
public String toString() {
return "Toast " + id + ": " + status;
}
}
class ToastQueue extends LinkedBlockingQueue<Toast> { }
class Toaster implements Runnable {
private final ToastQueue toastQueue;
private int count = 0;
private final Random random = new Random(47);
public Toaster(ToastQueue toastQueue) {
this.toastQueue = toastQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(500));
// 製作吐司
Toast t = new Toast(count++);
System.out.println(t);
// 插入吐司佇列
toastQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("Toaster Interrupted");
}
System.out.println("Toaster off");
}
}
// 給吐司塗黃油
class Butterer implements Runnable {
private final ToastQueue dryQueue, butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
// 若剛製作的吐司佇列為空,則阻塞等待
Toast t = dryQueue.take();
// 不為空,則為該吐司塗上黃油
t.butter();
System.out.println(t);
// 將塗好黃油的吐司放進黃油吐司佇列中
butteredQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Butterer interrupted");
}
System.out.println("Butterer off");
}
}
// 給塗黃油過後的吐司塗上果醬
class Jammer implements Runnable {
private final ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
// 若已經塗黃油的吐司佇列為空,則阻塞等待
Toast t = butteredQueue.take();
// 獲取到塗抹黃油的吐司後,在塗上果醬
t.jam();
System.out.println(t);
// 放入已製作完成的土司佇列 中
finishedQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Jammer interrupted");
}
System.out.println("Jammer off");
}
}
// 消費吐司
class Eater implements Runnable {
private final ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedQueue = finished;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
// 若已完成的吐司佇列為空,則阻塞等待
Toast t = finishedQueue.take();
// 驗證吐司是否按順序進行,所有吐司是否已經加了果醬
if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>>> Error: " + t);
System.exit(1);
} else
System.out.println("咀嚼! " + t);
}
} catch(InterruptedException e) {
System.out.println("Eater interrupted");
}
System.out.println("Eater off");
}
}
Toast是一個使用enum值的優秀範例。注意,這個範例中沒有任何顯式的同步(即使用Lock 物件或synchronized關鍵字的同步),因為同步由佇列(其內部是同步的)和系統的設計隱式地管理了——每片Toast在任何時刻都只由一個任務在操作。因為佇列的阻塞,使得處理過程將被自動地掛起和恢復。你可以看到由BlockingQueue產生的簡化十分明顯。在使用顯式的wait()和notifyAll()時存在的類和類之間的耦合被消除了,因為每個類都只和它的BlockingQuene通訊。
我們已經知道,任務可以變成阻塞狀態,所以就可能出現這種情況:某個任務在等待另一個任務,而後者又在等待別的任務,這樣一直下去,直到這個鏈條上的任務又在等待第一個任務釋放鎖。這得到了一個任務之間相互等待的連續迴圈,沒有哪個執行緒能繼續,這被稱之為死鎖。
一個經典的死鎖例證就是哲學將就餐問題:給問題的基本描述是指定五個哲學家,這些哲學將花部分時間思考,花部分時間就餐。當他們思考的時候,不需要任何共用資源;當他們就餐的時候,將使用有限數量的餐具。問題中引入的難點是:作為哲學家,他們很窮,他們每人只買得起一根筷子。他們圍坐在桌子周圍,每人之間放一根筷子。當一個哲學家要就餐的時候,這個哲學家就必須同時得到左邊和右邊的筷子,如果一個哲學家左邊或右邊已經有人在使用筷子了,那麼這個哲學家就必須等待,直至可得到必需的筷子。
public class Chopstick {
// 筷子是否被拿走
private boolean taken = false;
// 拿筷子
public synchronized void take() throws InterruptedException {
while (taken) {
wait();
}
taken = true;
}
// 歸還筷子
public synchronized void drop() {
taken = false;
notifyAll();
}
}
任何兩個Philosopher(哲學家)都不能成功take()同一根筷子。另外,如果一根Chopstick(筷子)已經被某個Philosopher獲得,那麼另一個Philosopher可以wait(),直至這根Chopstick的當前持有者呼叫drop()使其可用為止。
當一個Philosopher任務呼叫take()時,這個Philosopher將等待,直至taken標誌變為false(直至當前持有Chopstick的Philosopher釋放它)。然後這個任務會將taken標誌設定為true,以表示現在由新的Philosopher持有這根Chopstick。當這個Philosopher使用完這根Chopstick時,它會呼叫drop()來修改標誌的狀態,並notifyAll()所有其他的Philosopher,這些Philosopher中有些可能就在wait()這根Chopstick。
public class Philosopher implements Runnable {
// 左邊的筷子
private final Chopstick left;
// 右邊的筷子
private final Chopstick right;
private final int id;
// 權重因子
private final int ponderFactor;
private Random random = new Random(47);
public Philosopher(Chopstick left, Chopstick right, int id, int ponderFactor) {
this.left = left;
this.right = right;
this.id = id;
this.ponderFactor = ponderFactor;
}
private void pause() throws InterruptedException {
if (ponderFactor == 0) {
return;
}
TimeUnit.MILLISECONDS.sleep(random.nextInt(ponderFactor * 250));
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println(this + " thinking");
pause();
// 哲學家餓了
System.out.println(this + " grabbing right");
right.take();
System.out.println(this + " grabbing left");
left.take();
System.out.println(this + " eating");
pause();
right.drop();
left.drop();
}
} catch (InterruptedException e) {
System.out.println(this + " exiting via interrupt");
}
}
@Override
public String toString() {
return "Philosopher " + id;
}
}
在Philosopher.run()中,每個Philosopher只是不斷地思考和吃飯。如果PonderFactor不為0,則pause()方法會休眠(sleeps)一段隨機的時間。通過使用這種方式,你將看到Philosopher會在思考上花掉一段隨機化的時間,然後嘗試著獲取(take())右邊和左邊的Chopstick,隨後在吃飯上再花掉一段隨機化的時間,之後重複此過程。
現在我們可以建立這個程式的將會產生死鎖的版本了∶
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 11:28
*
* 死鎖範例: 哲學家進餐問題
*
* {args: 0 5 timeout}
*/
public class DeadLockDiningPhilosopher {
public static void main(String[] args) throws Exception {
int ponder = 5;
if (args.length > 0 ) {
ponder = Integer.parseInt(args[0]);
}
int size = 5;
if (args.length > 1) {
size = Integer.parseInt(args[1]);
}
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for (int i = 0; i < size; i++) {
sticks[i] = new Chopstick();
}
for (int i = 0; i < size; i++) {
exec.execute(new Philosopher(sticks[i], sticks[(i+1) % size], i, ponder));
}
if (args.length == 3 && args[2].equals("timeout")) {
TimeUnit.SECONDS.sleep(10);
} else {
System.out.println("Press 'Enter' to quit");
System.in.read();
}
exec.shutdownNow();
}
}
你會發現,如果Philosopher花在思考上的時間非常少,那麼當他們想要進餐時,全都會在Chopstick上產生競爭,而死鎖也就會更快地發生。
第一個命令列引數可以調整ponder因子,從而影響每個Philosopher花費在思考上的時間長度。如果有許多Philosopher,或者他們花費很多時間去思考,那麼儘管存在死鎖的可能,但你可能永遠也看不到死鎖。值為0的命令列引數傾向於使死鎖儘快發生。
注意,Chopstick物件不需要內部識別符號,它們是由在陣列sticks中的位置來標識的。每個Philosopher構造器都會得到一個對左邊和右邊Chopstick物件的參照。除了最後一個Philosopher,其他所有的Philosopher都是通過將這個Philosopher定位於下一對Chopstick物件之間而被初始化的,而最後一個Philosopher右邊的Chopstick是第0個Chopstick,這樣這個迴圈表也就結束了。因為最後一個Philosopher坐在第一個Philosopher的右邊,所以他們會共用第0個Chopstick。現在,所有的Philosopher都有可能希望進餐,從而等待其臨近的Philosopher放下它們的Chopstick。這將使程式死鎖。
如果Philosopher花費更多的時間去思考而不是進餐(使用非0的ponder值,或者大量的Philosopher),那麼他們請求共用資源(Chopstick)的可能性就會小許多,這樣你就會確信該程式不會死鎖,儘管它們並非如此。這個範例相當有趣,因為它演示了看起來可以正確執行,但實際上會死鎖的程式。
要修正死鎖問題,你必須明白,當以下四個條件同時滿足時,就會發生死鎖∶
1)互斥條件。任務使用的資源中至少有一個是不能共用的。這裡,一根Chopstick一次就只能被一個Philosopher使用。
2)至少有一個任務它必須持有一個資源且正在等待獲取一個當前被別的任務持有的資源。也就是說,要發生死鎖,Philosopher必須拿著一根Chopstick並且等待另一根。
3)資源不能被任務搶佔,任務必須把資源釋放當作普通事件。Philosopher很有禮貌,他們不會從其他Philosopher那裡搶Chopstick。
4)必須有迴圈等待,這時,一個任務等待其他任務所持有的資源,後者又在等待另一個任務所持有的資源,這樣一直下去,直到有一個任務在等待第一個任務所持有的資源,使得大家都被鎖住。在DeadlockDiningPhilosophers.java中,因為每個Philosopher都試圖先得到右邊的Chopstick,然後得到左邊的Chopstick,所以發生了迴圈等待。
因為要發生死鎖的話,所有這些條件必須全部滿足;所以要防止死鎖的話,只需破壞其中一個即可。在程式中,防止死鎖最容易的方法是破壞第4個條件。有這個條件的原因是每個Philosopher都試圖用特定的順序拿Chopstick∶先右後左。正因為如此,就可能會發生「每個人都拿著右邊的Chopstick,並等待左邊的Chopstick」的情況,這就是迴圈等待條件。然而,如果最後一個Philosopher被初始化成先拿左邊的Chopstick,後拿右邊的Chopstick,那麼這個Philosopher將永遠不會阻止其右邊的Philosopher拿起他們的Chopstick。在本例中,這就可以防止迴圈等待。這只是問題的解決方法之一,也可以通過破壞其他條件來防止死鎖(具體細節請參考更高階的討論執行緒的書籍)∶
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 11:52
*
* 解決哲學家就餐的死鎖問題:通過修改迴圈等待條件,即修改最後一個哲學家先拿走左手邊的筷子,再拿走右手邊的筷子
* {args: 5 5 timeout}
*/
public class FixedDiningPhilosopher {
public static void main(String[] args) throws Exception {
int ponder = 5;
if (args.length > 0 ) {
ponder = Integer.parseInt(args[0]);
}
int size = 5;
if (args.length > 1) {
size = Integer.parseInt(args[1]);
}
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for (int i = 0; i < size; i++) {
sticks[i] = new Chopstick();
}
for (int i = 0; i < size; i++) {
// 若不是最後一個哲學家,則都先拿走右手邊的筷子
if (i < (size - 1)) {
exec.execute(new Philosopher(sticks[i], sticks[(i+1)], i, ponder));
} else {
// 最後一個哲學家先拿起左手邊的筷子
exec.execute(new Philosopher(sticks[0], sticks[(i)], i, ponder));
}
}
if (args.length == 3 && args[2].equals("timeout")) {
TimeUnit.SECONDS.sleep(10);
} else {
System.out.println("Press 'Enter' to quit");
System.in.read();
}
exec.shutdownNow();
}
}
通過確保最後一個Philosopher先拿起和放下左邊的Chopstick,我們可以移除死鎖,從而使這個程式平滑地執行。
Java SE5的java.util.concurrent引入了大量設計用來解決並行問題的新類,學習使用它們將有助於你編寫出更加簡單而健壯的並行程式。
它被用來同步一個或多個任務,強制它們等待由其他任務執行的一組操作完成。
你可以向CountDownLatch物件設定一個初始計數值,任何在這個物件上呼叫wait()的方法都將阻塞,直至這個計數值到達0。其他任務在結束其工作時,可以在該物件上呼叫countDown()來減小這個計數值。CountDownLatch被設計為只觸發一次,計數值不能被重置。如果你需要能夠重置計數值的版本,則可以使用CyclicBarrier。
呼叫countDown()的任務在產生這個呼叫時並沒有被阻塞,只有對await()的呼叫會被阻塞,直至計數值到達0。
CountDownLatch的典型用法是將一個程式分為n個互相獨立的可解決任務,並建立值為0的CountDownLatch。當每個任務完成時,都會在這個鎖存器上呼叫countDown()。等待問題被解決的任務在這個鎖存器上呼叫await(),將它們自己攔住,直至鎖存器計數結束。下面是演示這種技術的一個框架範例∶
package concurrency;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 15:02
*
* 倒計時鎖存器 使用範例
*/
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// 所有任務都必須共用一個CountDownLatch物件
CountDownLatch latch = new CountDownLatch(SIZE);
for (int i = 0; i < 10; i++) {
exec.execute(new WaitingTask(latch));
}
for (int i = 0; i < SIZE; i++) {
exec.execute(new TaskPortion(latch));
}
System.out.println("latch all tasks");
// 當所有任務都完成的時候退出程式
exec.shutdown();
}
}
class TaskPortion implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random random = new Random(47);
private final CountDownLatch latch;
public TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
doWork();
latch.countDown();
} catch (InterruptedException e) {
}
}
public void doWork() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
System.out.println(this + "completed");
}
@Override
public String toString() {
return String.format("%1$-3d", id);
}
}
class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;
public WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
System.out.println("Latch barrier passed for " + this);
} catch (InterruptedException e) {
System.out.println(this + " interrupted");
}
}
@Override
public String toString() {
return String.format("WaitingTask %1$-3d", id);
}
}
TaskPortion將隨機地休眠一段時間,以模擬這部分工作的完成,而WaitingTask表示系統中必須等待的部分,它要等待問題的初始化部分完成為止。所有任務都是用在main()中定義的同一個單一的CountDownLatch。
CyclicBarrier適用於這樣的情況∶你希望建立一組任務,它們並行地執行工作,然後在進行下一個步驟之前等待,直至所有任務都完成(看起來有些像join0)。它使得所有的並行任務都將在柵欄處列隊,因此可以一致地向前移動。這非常像CountDownLatch,只是CountDownLatch是隻觸發一次的事件,而CyclicBarrier可以多次重用。
下面以賽馬遊戲為範例,演示CyclicBarrier的使用:
package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
/**
* @author Mr.Sun
* @date 2022年09月06日 15:22
*
* 演示CyclicBarrier的使用
*/
public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nhorses, final int pause) {
barrier = new CyclicBarrier(nhorses, new Runnable() {
@Override
public void run() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++) {
s.append("="); // 跑道上的柵欄
}
System.out.println(s);
for (Horse horse : horses) {
System.out.println(horse.tracks());
}
for (Horse horse : horses) {
if (horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "won!");
exec.shutdownNow();
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
}
}
});
for (int i = 0; i < nhorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
if (args.length > 0) {
int n = new Integer(args[0]);
nHorses = n > 0 ? n : nHorses;
}
if (args.length > 1) {
int p = new Integer(args[1]);
pause = p > -1 ? p : pause;
}
new HorseRace(nHorses, pause);
}
}
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private Random random = new Random(47);
private CyclicBarrier barrier;
public Horse(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
strides += random.nextInt(3);
}
barrier.await();
}
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
public synchronized int getStrides() {
return strides;
}
@Override
public String toString() {
return "Horse " + id + " ";
}
public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++) {
s.append("*");
}
s.append(id);
return s.toString();
}
}
可以向CyclicBarrier提供一個"柵欄動作",它是一個Runnable,當計數值到達0時自動執行————這是CyclicBarrier和CountDownLatch之間的另一個區別。這裡,柵欄動作是作為匿名內部類建立的,它被提交給了CyclicBarrier的構造器。
我試圖讓每匹馬都列印自己,但是之後的顯示順序取決於工作管理員。CyclicBarrier使得每匹馬都要執行為了向前移動所必需執行的所有工作,然後必須在柵欄處等待其他所有的馬都準備完畢。當所有的馬都向前移動時,CyclicBarrier將自動呼叫Runnable柵欄動作任務,按順序顯示馬和終點線的位置。
一旦所有的任務都越過了柵欄,它就會自動地為下一回合比賽做好準備。
為了展示這個非常簡單的動畫效果,你需要將控制檯視窗的尺寸調整為小到只有馬時,才會展示出來。
這是一個無界的BlockingQueue,用於放置實現了Delayed介面的物件,其中的物件只能在其到期時才能從隊中取走。這種佇列是有序的,即隊頭物件的延遲到期的時間最長。如果沒有任何延遲到期,那麼就不會有任何頭元素,並且poll()將返回mull(正因為這樣,你不能將mull放置到這種佇列中)。
下面是一個範例,其中的Delayed物件自身就是任務,而DelayedTaskConsumer將最「緊急」的任務(到期時間最長的任務)從佇列中取出,然後執行它。注意,這樣DelayQueue就成為了優先順序佇列的一種變體∶
package concurrency.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* @author Mr.Sun
* @date 2022年09月06日 16:08
*
* 延遲佇列演示
*/
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<>();
// Fill with tasks that have random delays:
for(int i = 0; i < 20; i++) {
queue.put(new DelayedTask(rand.nextInt(5000)));
}
// Set the stopping point
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence = new ArrayList<>();
public DelayedTask(int delayMillisecond) {
this.delta = delayMillisecond;
this.trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
@Override
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask) arg;
if (trigger < that.trigger) {
return -1;
}
if (trigger > that.trigger) {
return 1;
}
return 0;
}
@Override
public void run() {
System.out.print(this + " ");
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
}
@Override
public String toString() {
return String.format("[%1$-4d]", delta) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
@Override
public void run() {
for(DelayedTask pt : sequence) {
System.out.print(pt.summary() + " ");
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
@Override
public void run() {
try {
while(!Thread.interrupted())
q.take().run(); // Run task with the current thread
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished DelayedTaskConsumer");
}
} /* Output:
[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6 (0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000)
[5000] Task 20 Calling shutdownNow()
Finished DelayedTaskConsumer
*///:~
DelayedTask包含一個稱為sequence的List
Delayed介面有一個方法名為getDelay(),它可以用來告知延遲到期有多長時間,或者延遲在多長時間之前已經到期。這個方法將強制我們去使用TimeUnit類,因為這就是引數型別。這會產生一個非常方便的類,因為你可以很容易地轉換單位而無需作任何宣告。例如,delta的值是以毫秒為單位儲存的,但是Java SE5的方法System.nanoTime()產生的時間則是以納秒為單位的。你可以轉換delta的值,方法是宣告它的單位以及你希望以什麼單位來表示,就像下面這樣∶
NANOSECONDS.convert(delta, MILLISECONDS);
在getDelay()中,希望使用的單位是作為unit引數傳遞進來的,你使用它將當前時間與觸發時間之間的差轉換為呼叫者要求的單位,而無需知道這些單位是什麼(這是策略設計模式的一個簡單範例,在這種模式中,演演算法的一部分是作為引數傳遞進來的)。
為了排序,Delayed介面還繼承了Comparable介面,因此必須實現compareTo(),使其可以產生合理的比較。toString()和summary()提供了輸出格式化,而巢狀的EndSentinel類提供了一種關閉所有事物的途徑,具體做法是將其放置為佇列的最後一個元素。
注意,因為DelayedTaskConsumer自身是一個任務,所以它有自己的Thread,它可以使用這個執行緒來執行從隊中獲取的所有任務。由於任務是按照佇列優先順序的順序執行的,因此在本例中不需要啟動任何單獨的執行緒來執行DelayedTask。
從輸出中可以看到,任務建立的順序對執行順序沒有任何影響,任務是按照所期望的延遲順序執行的。
這是一個很基礎的優先順序佇列,它具有可阻塞的讀取操作。下面是一個範例,其中在優先順序佇列中的物件是按照優先順序順序從佇列中出現的任務。PrioritizedTask被賦予了一個優先順序數位,以此來提供這種順序∶
package concurrency.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 16:35
*
* 演示優先順序阻塞佇列
*/
public class PriorityBlockingQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
}
}
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
private Random random = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
@Override
public int compareTo(PrioritizedTask arg) {
return priority < arg.priority ? 1 : (priority > arg.priority ? -1 :0);
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println(this);
}
@Override
public String toString() {
return String.format("[%1$-3d]", priority) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1); // 此程式中的最低優先順序
exec = e;
}
@Override
public void run() {
int count = 0;
for(PrioritizedTask pt : sequence) {
System.out.print(pt.summary());
if(++count % 5 == 0) {
System.out.println();
}
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e; // Used for EndSentinel
}
@Override
public void run() {
// 無界佇列,不會被阻塞
// 用隨機優先順序快速填充
for(int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
// 最高優先順序工作的任務
try {
for(int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
// 新增作業時,優先順序最低的優先:
for(int i = 0; i < 10; i++) {
queue.add(new PrioritizedTask(i));
}
// 停止所有任務的哨兵:
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
this.q = q;
}
@Override
public void run() {
try {
while(!Thread.interrupted())
// 使用當前執行緒執行任務
q.take().run();
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskConsumer");
}
}
與前一個範例相同,PrioritizedTask物件的建立序列被記錄在sequence List中,用於和實際的執行順序比較。run()方法將休眠一小段隨機的時間,然後列印物件資訊,而EndSentinel提供了和前面相同的功能,要確保它是佇列中最後一個物件。
PrioritizedTaskProducer和PrioritizedTaskComsumer通過PriorityBlockinQueue彼此連線。因為這種佇列的阻塞特性提供了所有必需的同步,所以你應該注意到了,這裡不需要任何顯式的同步——不必考慮當你從這種佇列中讀取時,其中是否有元素,因為這個佇列在沒有元素時,將直接阻塞讀取者。
正常的鎖(來自concurrent.locks或內建的synchronized鎖)在任何時刻都只允許一個任務存取一項資源,而計數號誌允許n個任務同時存取這個資源。你還可以將號誌看作是在向外分發使用資源的「許可證」,儘管實際上沒有使用任何許可證物件。
作為一個範例,請考慮物件池的概念,它管理著數量有限的物件,當要使用物件時可以簽出它們,而在使用者使用完畢時,可以將它們籤回。這種功能可以被封裝到一個泛型類中∶
package concurrency.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
/**
* @author Mr.Sun
* @date 2022年09月06日 17:13
*
* 演示 Semaphore
*/
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<>();
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];
// 載入具有可檢出物件的池
available = new Semaphore(size, true);
for (int i = 0; i < size; i++) {
try {
// Assumes a default constructor:
items.add(classObject.newInstance());
} catch(Exception e) {
throw new RuntimeException(e);
}
}
}
public T checkOut() throws InterruptedException {
available.acquire();
return getItem();
}
public void checkIn(T x) {
if(releaseItem(x)) {
available.release();
}
}
private synchronized T getItem() {
for(int i = 0; i < size; ++i)
if(!checkedOut[i]) {
checkedOut[i] = true;
return items.get(i);
}
return null; // 號誌阻止到達這裡
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if(index == -1) {
return false; // Not in the list
}
if(checkedOut[index]) {
checkedOut[index] = false;
return true;
}
return false; // Wasn't checked out
}
}
在這個簡化的形式中,構造器使用newInstance0來把物件載入到池中。如果你需要一個新物件,那麼可以呼叫checkOut(),並且在使用完之後,將其遞交給checkIn()。
boolean型別的陣列checkedOut可以跟蹤被簽出的物件,並且可以通過getItem()和releaseItem()方法來管理。而這些都將由Semaphore型別的available來加以確保,因此,在checkOut()中,如果沒有任何號誌許可證可用(這意味著在池中沒有更多的物件了),available 將阻塞呼叫過程。在checkIn()中,如果被簽入的物件有效,則會向號誌返回一個許可證。
為了建立一個範例,我們可以使用Fat,這是一種建立代價高昂的物件型別,因為它的構造器執行起來很耗時∶
package concurrency.juc;
/**
* @author Mr.Sun
* @date 2022年09月06日 17:22
*/
public class Fat {
private volatile double d; // 防止優化
private static int counter = 0;
private final int id = counter++;
public Fat() {
// 耗時,可中斷的操作
for(int i = 1; i < 10000; i++) {
d += (Math.PI + Math.E) / (double)i;
}
}
public void operation() {
System.out.println(this);
}
@Override
public String toString() {
return "Fat id: " + id;
}
}
我們在池中管理這些物件,以限制這個構造器所造成的影響。我們可以建立一個任務,它將簽出Fat物件,持有一段時間之後再將它們簽入,以此來測試Pool這個類∶
package concurrency.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 17:25
*/
public class SemaphoreDemo {
final static int SIZE = 25;
public static void main(String[] args) throws Exception {
final Pool<Fat> pool = new Pool<>(Fat.class, SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < SIZE; i++) {
exec.execute(new CheckoutTask<>(pool));
}
System.out.println("All CheckoutTasks created");
List<Fat> list = new ArrayList<>();
for(int i = 0; i < SIZE; i++) {
Fat f = pool.checkOut();
System.out.print(i + ": main() thread checked out ");
f.operation();
list.add(f);
}
Future<?> blocked = exec.submit(new Runnable() {
@Override
public void run() {
try {
// Semaphore prevents additional checkout,
// so call is blocked:
pool.checkOut();
} catch(InterruptedException e) {
System.out.println("checkOut() Interrupted");
}
}
});
TimeUnit.SECONDS.sleep(2);
blocked.cancel(true); // Break out of blocked call
System.out.println("Checking in objects in " + list);
for(Fat f : list) {
pool.checkIn(f);
}
for(Fat f : list) {
pool.checkIn(f); // Second checkIn ignored
}
exec.shutdown();
}
}
// 從池中檢出資源的任務:
class CheckoutTask<T> implements Runnable {
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;
public CheckoutTask(Pool<T> pool) {
this.pool = pool;
}
@Override
public void run() {
try {
T item = pool.checkOut();
System.out.println(this + "checked out " + item);
TimeUnit.SECONDS.sleep(1);
System.out.println(this +"checking in " + item);
pool.checkIn(item);
} catch(InterruptedException e) {
// Acceptable way to terminate
}
}
@Override
public String toString() {
return "CheckoutTask " + id + " ";
}
}
在main()中,建立了一個持有Fat物件的Pool,而一組CheckoutTask則開始操練這個Pool。然後,main()執行緒簽出池中的Fat物件,但是並不簽入它們。一旦池中所有的物件都被簽出,Semaphore將不再允許執行任何簽出操作。blocked的run()方法因此會被阻塞,2秒鐘之後,cancel()方法被呼叫,以此來掙脫Future的束縛。注意,冗餘的簽入將被Pool忽略。
Exchanger是在兩個任務之間交換物件的柵欄。當這些任務進入柵欄時,它們各自擁有一個物件,當它們離開時,它們都擁有之前由物件持有的物件。Exchanger的典型應用場景是∶一個任務在建立物件,這些物件的生產代價很高昂,而另一個任務在消費這些物件。通過這種方式,可以有更多的物件在被建立的同時被消費。