Java多執行緒

2022-06-25 06:08:10

1、多執行緒概述

1.1、程序和執行緒概述

程序:作業系統中的應用程式,一個程序就是一個應用程式。程序A和程序B的記憶體獨立不共用資源。

執行緒:CPU排程的最小單元,程序的一個執行流/指定單元,一個程序可以有多個執行緒。

PS:Java程式啟動的時候,JVM就是一個程序,JVM會執行main方法,main方法就是主執行緒,同時會再啟動一個垃圾回收執行緒(守護執行緒)GC進行垃圾回收。即:Java最少有兩個執行緒並行,主執行緒 main 方法和守護執行緒GC。

1.2、執行緒之間的關係

在Java語言中,堆記憶體方法區記憶體共用。但是棧記憶體獨立,一個執行緒一個棧。假設啟動10個執行緒,會有10個棧空間,每個棧和每個棧之間,互不干擾,各自執行各自的,這就是多執行緒並行。Java中之所以有多執行緒機制,目的就是為了提高程式的處理效率。

PS:火車站,可以看做是一個程序。火車站中的每一個售票視窗可以看做是一個執行緒。
我在視窗1購票,你可以在視窗2購票,你不需要等我,我也不需要等你。所以多執行緒並行可以提高效率。

1.3、實現多執行緒的條件

多核CPU的可以真正的是實現多執行緒並行,例如4核CPU表示同一個時間點上,可以真正的有4個程序並行執行。

單核的CPU不能夠做到真正的多執行緒並行,但是可以做到給人一種「多執行緒並行」的感覺,原因是CPU的執行速度很快。對於單核的CPU來說,在某一個時間點上實際上只能處理一件事情,但是由於CPU的處理速度極快,多個執行緒之間頻繁切換執行,給別人的感覺是:多個事情同時在做!!

同時,多執行緒程式並不是同時進行的,由於CPU的執行速度太快,CPU會在不同的執行緒之間快速的切換執行,這個現象就是上下文切換,即:CPU從一個執行緒或程序切換到另一個執行緒或程序。

1.4、執行緒的生命週期


2、執行緒的實現方法

2.1、繼承Thread類

public class ThreadTest02 {
    public static void main(String[] args) {
        // 啟動執行緒
        new MyThread().start();
        // 直接呼叫run()方法
        // new MyThread().run();

        // 主執行緒執行的程式
        for(int i = 0; i < 1000; i++){
            System.out.println("主執行緒--->" + i);
        }
    }
}

class MyThread extends Thread {
    @Override
    public void run() {
        // 編寫程式,這段程式執行在分支執行緒中(分支棧)。
        for(int i = 0; i < 1000; i++){
            System.out.println("分支執行緒--->" + i);
        }
    }
}

run() 方法不會啟動執行緒,只是普通的呼叫方法而已,不會分配新的分支棧(這種方式就是單執行緒)。

start() 方法的作用是:啟動一個分支執行緒,在JVM中開闢一個新的棧空間,這段程式碼任務完成之後,瞬間就結束了。

因此start()方法只是為了開啟一個新的棧空間,只要新的棧空間開出來,start()方法就結束了,執行緒就啟動成功了。

啟動成功的執行緒會自動呼叫run()方法,並且run()方法在分支棧的棧底部(壓棧)。

run方法在分支棧的棧底部,main方法在主棧的棧底部。run和main是平級的。

呼叫run()方法記憶體圖如下

呼叫start()方法記憶體圖如下


2.2、實現Runnable介面

這種方式相對於第一種方式,只是多了一個執行緒物件進行初始化,因為Thread的有參構造可以實現,其他的地方沒有過多的變化。

/**
 * 1. 建立類實現Runnable介面
 */
class MyRunnable implements Runnable{
    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println("分支執行緒->" + i);
        }
    }
}

public class CreateThread {
    public static void main(String[] args) {
        // 啟動執行緒
        new Thread(new MyRunnable()).start();
        // 主執行緒程式
        for (int i = 0; i < 100; i++) {
            System.out.println("主執行緒->" + i);
        }

        /**
         * 2. 通過匿名內部類實現
         */
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                System.out.println("分支執行緒->" + i);
            }
        }).start();
    }
}

2.3、實現Callable介面

Callable 介面類似於 Runnable,但是 Runnable 不會返回結果,並且無法丟擲經過檢查的異常,而 Callable 在不使用執行緒池的時候依賴 FutureTask 類獲取返回結果。

單個執行緒池: 使用ExecutorService、Callable、Future實現有返回結果的執行緒。

ExecutoreService提供了submit()方法,傳遞一個Callable,或Runnable,返回Future。如果Executor後臺執行緒池還沒有完成Callable的計算,這呼叫返回Future物件的get()方法,會阻塞直到計算完成。

不使用執行緒池實現

/**
 * 實現Callable介面
 */
class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() {
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum += i;
        }
        return sum;
    }
}

public class CreateThread {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 啟動執行緒
        new Thread(new FutureTask<>(new MyCallable()), "方式一").start();
        // FutureTask futureTask = new FutureTask<>(new MyCallable());
        // new Thread(futureTask).start();
        // 通過futureTask.get()獲取返回值
        System.out.println(futureTask.get());

        /**
         * 通過匿名內部類實現
         */
        new Thread(new FutureTask<>(() -> {
            int sum = 0;
            for (int i = 0; i < 100; i++) {
                sum += i;
            }
            return sum;
        }));

        // 主執行緒程式
        for (int i = 0; i < 100; i++) {
            System.out.println("主執行緒->" + i);
        }
    }
}

使用單個執行緒池實現

/**
 * 實現Callable介面
 */
class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() {
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum += i;
        }
        return sum;
    }
}

public class CreateThread {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 主執行緒程式
        for (int i = 0; i < 100; i++) {
            System.out.println("主執行緒->" + i);
        }

        /**
         * 使用單執行緒池實現
         */
        // 1. 建立固定大小的執行緒池物件
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        // 2. 提交執行緒任務,通過Future介面接受返回的結果
        Future<Integer> submit = executorService.submit(new MyCallable());
        // 3. 關閉執行緒池
        executorService.shutdown();
        // 4. 呼叫future.get()獲取callable執行完成的返回結果
        System.out.println(submit.get());
    }
}

三種方式的優缺點總結如下:

  • 整合Thread類
    • 優點:程式碼書寫比較簡單(實際也沒有簡單多少)
    • 缺點:由於Java的單繼承性,導致後期無法繼承其他的類,同時程式碼的耦合度比較高
  • 實現Runnable介面和Callable介面
    • 優點:適合多個相同的程式程式碼的執行緒去處理同一個資源,避免了Java單繼承的限制,程式碼可以被多個執行緒共用,程式碼和資料獨立,同時執行緒池只能放入實現Runable或callable類執行緒,不能直接放入繼承Thread的類。
    • 缺點:通過匿名內部類進行實現,雖然程式碼書寫簡單一點,但是隻適合執行緒使用一次的時候

3、執行緒排程和狀態裝換

3.1、執行緒的狀態轉換

  • 新建狀態(New):新建立了一個執行緒物件。

  • 就緒狀態(Runnable):執行緒物件建立後,其他執行緒呼叫了該物件的start()方法。該狀態的執行緒位於可執行執行緒池中,變得可執行,等待獲取CPU的使用權。

  • 執行狀態(Running):就緒狀態的執行緒獲取了CPU,執行程式程式碼。

  • 阻塞狀態(Blocked):阻塞狀態是執行緒因為某種原因放棄CPU使用權,暫時停止執行。直到執行緒進入就緒狀態,才有機會轉到執行狀態。阻塞的情況分三種:

    • 等待阻塞:執行的執行緒執行wait()方法,JVM會把該執行緒放入等待池中。(wait會釋放持有的鎖)

    • 同步阻塞:執行的執行緒在獲取物件的同步鎖時,若該同步鎖被別的執行緒佔用,則JVM會把該執行緒放入鎖池中。

    • 其他阻塞:執行的執行緒執行sleep()或join()方法,或者發出了I/O請求時,JVM會把該執行緒置為阻塞狀態。當sleep()狀態超時、join()等待執行緒終止或者超時、或者I/O處理完畢時,執行緒重新轉入就緒狀態。(注意,sleep是不會釋放持有的鎖)

  • 死亡狀態(Dead):執行緒執行完了或者因異常退出了run()方法,該執行緒結束生命週期。

3.2、執行緒排程

Java執行緒有優先順序,優先順序高的執行緒會獲得較多的執行機會,因此通過Thread類的setPriority()和getPriority()方法分別用來設定和獲取執行緒的優先順序。

JVM提供了10個執行緒優先順序,但與常見的作業系統都不能很好的對映。如果希望程式能移植到各個作業系統中,應該僅僅使用Thread類有以下三個靜態常數作為優先順序,這樣能保證同樣的優先順序採用了同樣的排程方式。其中,主程式使用的是NORM_PRIORITY,即5,同時還有MAX_PRIORITY=10和MIN_PRIORITY=1的靜態優先順序常數。

  • 執行緒睡眠:Thread.sleep(long millis)方法,使執行緒轉到阻塞狀態。millis引數設定睡眠的時間,以毫秒為單位。當睡眠結束後,就轉為就緒(Runnable)狀態。sleep()平臺移植性好。

  • 執行緒等待:Object類中的wait()方法,導致當前的執行緒等待,直到其他執行緒呼叫此物件的 notify() 方法或 notifyAll() 喚醒方法。這個兩個喚醒方法也是Object類中的方法,行為等價於呼叫 wait(0) 一樣。

  • 執行緒讓步:Thread.yield() 方法,暫停當前正在執行的執行緒物件,把執行機會讓給相同或者更高優先順序的執行緒。

  • 執行緒加入:join()方法,等待其他執行緒終止。在當前執行緒中呼叫另一個執行緒的join()方法,則當前執行緒轉入阻塞狀態,直到另一個程序執行結束,當前執行緒再由阻塞轉為就緒狀態。

  • 執行緒喚醒:Object類中的notify()方法,喚醒在此物件監視器上等待的單個執行緒。如果所有執行緒都在此物件上等待,則會選擇喚醒其中一個執行緒。選擇是任意性的,並在對實現做出決定時發生。執行緒通過呼叫其中一個 wait 方法,在物件的監視器上等待。 直到當前的執行緒放棄此物件上的鎖定,才能繼續執行被喚醒的執行緒。被喚醒的執行緒將以常規方式與在該物件上主動同步的其他所有執行緒進行競爭;例如,喚醒的執行緒在作為鎖定此物件的下一個執行緒方面沒有可靠的特權或劣勢。類似的方法還有一個notifyAll(),喚醒在此物件監視器上等待的所有執行緒。


3.2.1、執行緒停止

執行緒在正常的程式中啟動和停止,不需要額外的停止方式,會自動停止。但是有些情況下,有一些伺服執行緒還在執行,他們執行時間較長,只有當外部條件滿足時,他們才會停止。針對這樣的情況,提供瞭如下幾種停止執行緒的方式:

使用標誌位(推薦使用)

public class ThreadStopUse {
    public static void main(String[] args) {
        FlagStop flagStop = new FlagStop();
        new Thread(flagStop).start();

        for (int i = 0; i < 100; i++) {
            System.out.println("主執行緒執行的第" + i + "次");
            if (i == 90) {
                // 呼叫自己的stop方法切換標誌位,停止執行緒
                flagStop.stop();
                System.out.println("分支執行緒該停止了");
            }
        }
    }
}

class FlagStop implements Runnable {
    /**
     * 定義標誌
     */
    private volatile boolean exitFlag = true;

    /**
     * 標誌轉換
     */
    public void stop() {
        this.exitFlag = false;
    }

    @Override
    public void run() {
        int i = 0;
        while (exitFlag) {
            System.out.println("分支執行緒執行的第" + i + "次");
        }
    }
}

使用interrupted()方法(不推薦)

使用interrupted()方法來中斷執行緒有兩種情況:

  • 執行緒處於阻塞狀態時,如執行緒中使用了sleep(),同步鎖 wait(),socket的receiver,accept方法時,會使執行緒進入到阻塞狀態,當程式呼叫interrupted()方法時,會丟擲InterrupteException異常。阻塞中的那個方法丟擲異常,通過捕獲該異常,然後break跳出迴圈,從而結束該執行緒。注:不是呼叫了interrupted()方法就會結束執行緒,是捕獲到了interruptedException異常後,break 跳出迴圈後才能結束此執行緒。

  • 執行緒未處於阻塞狀態,呼叫interrupted()方法時,實際上是通過判斷執行緒的中斷標記來退出迴圈。

class InterruptedStop implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i <= 200; i++) {
            // 判斷是否被中斷,通過檢查標誌位
            if (Thread.currentThread().isInterrupted()) {
                // 處理中斷邏輯
                break;
            }
            System.out.println("i=" + i);
        }
    }
}
public class ThreadStopUse {
    public static void main(String[] args) throws InterruptedException {
        InterruptedStop interruptedStop = new InterruptedStop();
        Thread thread = new Thread(interruptedStop);
        thread.start();
        Thread.sleep(1000);
        thread.interrupt();
    }
}

注意:在上面這段程式碼中,我們增加了 Thread.isInterrupted() 來判斷當前執行緒是否被中斷了,如果是,則退出 for 迴圈,結束執行緒。

這種方式看起來與之前介紹的「使用標誌位終止執行緒」非常類似,但是在遇到 sleep() 或者 wait() 這樣的操作,我們只能通過中斷來處理了。

使用stop()方法停止(強烈不推薦)

使用Thread.stop()方法來結束執行緒的執行是很危險的,主要因為在程式呼叫Thread.stop()後會丟擲ThreadDeatherror()錯誤,並釋放子執行緒所持有的所有鎖,會導致被保護資料呈現不一致性,此過程不可控。

3.2.2、執行緒休眠

執行緒休眠是Thread.sleep(ms)方法,它的作用是讓當前執行緒進入休眠,進入「阻塞狀態」,放棄佔有CPU時間片,讓給其它執行緒使用。執行效果就是間隔特定的時間,去執行一段特定的程式碼,每隔多久執行一次。millis引數設定睡眠的時間,以毫秒為單位。當睡眠結束後,就轉為就緒(Runnable)狀態。

注意:每個物件都有一個鎖,sleep()方法不會釋放鎖。

public class ThreadSleepUse {
    public static void main(String[] args) {
        while (true) {
            try {
                Thread.sleep(1000);
                // 每隔一秒列印一下系統當前時間
                System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

3.2.3、執行緒禮讓

暫停當前正在執行的執行緒物件,但不阻塞,將執行緒從執行狀態轉為就緒狀態,把執行機會讓給相同或者更高優先順序的執行緒。讓CPU重新排程,但是禮讓不一定成功,因為當前執行緒和其他執行緒一同競爭CPU,使得所有執行緒回到同一起點,優先順序高的執行緒獲得的執行機會會多一點,這個過程不會釋放鎖。

public class ThreadYieldUse {
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            System.out.println("主執行緒執行了第" + i);
        }

        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                System.out.println(Thread.currentThread().getName() +  "執行了" + i + "次");
                if (i % 5 == 0) {
                    Thread.yield();
                    System.out.println("執行緒禮讓,重新爭搶CPU");
                }
            }
        }, "執行緒禮讓").start();
    }
}

3.2.4、執行緒加入

執行緒加入就是在當前執行緒中呼叫另一個執行緒的join()方法,則當前執行緒轉入阻塞狀態,直到另一個程序執行結束,當前執行緒再由阻塞轉為就緒狀態。

將一個執行緒合併到當前執行緒中,當前執行緒受阻塞,加入的執行緒執行直到結束,這個是無參join()方法的作用,使用join(long millis)方法則等待該執行緒終止的時間最長為 millis 毫秒;使用join(long millis, int nanos)方法則等待該執行緒終止的時間最長為 millis 毫秒 + nanos 納秒。

作用:一個執行完的執行緒需要另一個正在執行的執行緒的執行結果時

public class ThreadJoinUse {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            for (int j = 0; j < 50; j++) {
                System.out.println("VIP執行緒-" + Thread.currentThread().getName() + "執行了" + j + "次");
            }
        }, "執行緒加入");
        thread.start();

        for (int i = 0; i < 100; i++) {
            System.out.println(Thread.currentThread().getName() + "執行了" + i + "次");
            if (i == 50) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4、執行緒同步

4.1、執行緒安全和執行緒同步概述

4.1.1、執行緒安全

當多個執行緒存取一個物件時,如果不用考慮這些執行緒在執行時環境下的排程和交替執行,也不需要進行額外的同步,或者在呼叫方進行任何其他的協調操作,呼叫這個物件的行為都可以獲得正確的結果,那這個物件就是執行緒安全的。

問題:通常情況下,一個程序中的比較耗時的操作(如長迴圈、檔案上傳下載、網路資源獲取等),往往會採用多執行緒來解決。又比如實際生活中,銀行取錢問題、火車票多個售票視窗的問題,通常會涉及到並行的問題,從而需要多執行緒的技術。

當程序中有多個並行執行緒進入一個重要資料的程式碼塊時,在修改資料的過程中,很有可能引發執行緒安全問題,從而造成資料異常。例如,正常邏輯下,同一個編號的火車票只能售出一次,卻由於執行緒安全問題而被多次售出,從而引起實際業務異常。

執行緒安全問題產生的原因——共用記憶體資料,當多個執行緒同時操作同一共用資料時,導致共用資料出錯。

執行緒、主記憶體、工作記憶體三者的關係如圖:

在 Java 記憶體模型中,分為主記憶體和執行緒工作記憶體。每條執行緒有自己的工作記憶體,執行緒使用共用資料時,都是先從主記憶體中拷貝到工作記憶體,執行緒對該變數的所有操作都必須在工作記憶體中進行,而不能直接讀寫主記憶體中的變數,執行緒使用完成之後再寫入主記憶體。不同執行緒之間也無法直接存取對方工作記憶體中的變數,執行緒間變數值的傳遞均需要通過主記憶體來完成。

在多執行緒環境下,不同執行緒對同一份資料操作,就可能會產生不同執行緒中資料狀態不一致的情況,這就是執行緒安全問題的原因。

4.1.2、執行緒同步

多執行緒並行的環境下,有共用資料,並且這個資料還會被修改,此時就存線上程安全問題,怎麼解決這個問題?

要實現執行緒安全,需要保證資料操作的兩個特性:

  1. 原子性:對資料的操作不會受其他執行緒打斷,意味著一個執行緒運算元據過程中不會插入其他執行緒對資料的操作。

  2. 可見性:當執行緒修改了資料的狀態時,能夠立即被其他執行緒知曉,即資料修改後會立即寫入主記憶體,後續其他執行緒讀取時就能得知資料的變化。

以上兩個特性結合起來,其實就相當於同一時刻只能有一個執行緒去進行資料操作並將結果寫入主記憶體,這樣就保證了執行緒安全,這種機制稱為執行緒同步

執行緒同步就是執行緒不能並行,執行緒必須排隊執行,因此執行緒同步會犧牲一部分的效率,來提升安全性
執行緒排隊執行。(不能並行)。用排隊執行解決執行緒安全問題。

實現方式:

  1. 通過Synchronized關鍵字修飾程式碼塊或者方法,一個執行緒存取一個物件中的 synchronized(this) 同步程式碼塊時,其他試圖存取該物件的執行緒將被阻塞。
  2. Lock鎖,支援那些語意不同(重入、公平等)的鎖規則,可以在非阻塞式結構的上下文(包括 hand-over-hand 和鎖重排演演算法)中使用這些規則。

4.2、執行緒同步的實現方式

4.2.1 synchronized 鎖

synchronized 是 Java 中的關鍵字,是一種同步鎖。它修飾的物件有以下幾種:

  • 修飾一個程式碼塊,被修飾的程式碼塊稱為範例程式碼塊,其作用的範圍是大括號{}括起來的程式碼,鎖是 synchronized 括號裡設定的物件;如果作用在靜態方法中,則稱為靜態程式碼塊,鎖物件是當前類的位元組碼檔案;

  • 修飾一個方法,被修飾的方法稱為同步方法,其作用的範圍是整個方法,鎖這個方法所在的當前範例物件;

  • 修改一個靜態的方法,其作用的範圍是整個靜態方法,鎖是這個類的所有物件;

  • 修改一個類,其作用的範圍是 synchronized 後面括號括起來的部分,鎖是這個類的所有物件。

修飾一個程式碼塊

一個執行緒存取一個物件中的 synchronized(this) 同步程式碼塊時,其他試圖存取該物件的執行緒將被阻塞。

public class ThreadSafety {
    public static void main(String[] args) {
        System.out.println("使用關鍵字synchronized");
        SyncThread syncThread = new SyncThread();
        new Thread(syncThread, "SyncThread1").start();
        new Thread(syncThread, "SyncThread2").start();
    }
}

class SyncThread implements Runnable {
    private static int count = 0;

    @Override
    public void run() {
        synchronized (this) {
            for (int i = 0; i < 5; i++) {
                try {
                    System.out.println("執行緒名:" + Thread.currentThread().getName() + ":" + (count++));
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        // 其他邏輯
    }
}

當兩個並行執行緒(thread1 和 thread2)存取同一個物件(syncThread)中的 synchronized 程式碼塊時,在同一時刻只能有一個執行緒得到執行,另一個執行緒受阻塞,必須等待當前執行緒執行完這個程式碼塊以後才能執行該程式碼塊。Thread1 和 thread2 是互斥的,因為在執行 synchronized 程式碼塊時會鎖定當前的物件,只有執行完該程式碼塊才能釋放該物件鎖,下一個執行緒才能執行並鎖定該物件。

注:synchronized 只鎖定物件,多個執行緒要實現同步,所以執行緒必須以同一個 Runnable 物件為執行物件,即:()中的物件要是同一個

這時如果建立了兩個 SyncThread 的物件 syncThread1 和 syncThread2,執行緒 thread1 執行的是 syncThread1 物件中的 synchronized 程式碼(run),而執行緒 thread2 執行的是 syncThread2 物件中的 synchronized 程式碼(run);我們知道 synchronized 鎖定的是物件,這時會有兩把鎖分別鎖定 syncThread1 物件和 syncThread2 物件,而這兩把鎖是互不干擾的,不形成互斥,所以兩個執行緒可以同時執行。

當一個執行緒存取物件的一個 synchronized(this) 同步程式碼塊時,另一個執行緒仍然可以存取該物件中的非 synchronized(this) 同步程式碼塊。

如果 synchronized 作用在靜態方法中,修飾一塊程式碼,則稱為靜態程式碼塊,鎖物件是當前類的位元組碼檔案。

class SyncThread implements Runnable {
    private static int count = 0;
	
    /**
     * synchronized作用在靜態方法中,鎖物件實當前類的位元組碼檔案
     */
    public static void save() {
        synchronized (SyncThread.class) {
            count++;
        }
        // 其他操作
    }
}

修飾一個方法

Synchronized 修飾一個方法很簡單,就是在方法的前面加 synchronized, synchronized 修飾方法和修飾一個程式碼塊類似,只是作用範圍不一樣,修飾程式碼塊是大括號括起來的範圍,而修飾方法範圍是整個函數。

class SyncThread implements Runnable {
    private static int account = 100;

    /**
     * synchronized修飾一個方法,被修飾的方法稱為同步方法,其作用的範圍是整個方法,鎖物件為這個方法所在的當前範例物件
     * @param money
     */
    public synchronized void draw(Integer money) {
        account -= money;
    }
}

在用synchronized修飾方法時要注意以下幾點:

  1. synchronized 關鍵字不能繼承。 雖然可以使用 synchronized 來定義方法,但 synchronized 並不屬於方法定義的一部分,因此,synchronized 關鍵字不能被繼承。如果在父類別中的某個方法使用了 synchronized 關鍵字,而在子類中覆蓋了這個方法,在子類中的這個方法預設情況下並不是同步的,而必須顯式地在子類的這個方法中加上 synchronized 關鍵字才可以。

  2. 當然,還可以在子類方法中呼叫父類別中相應的方法,這樣雖然子類中的方法不是同步的,但子類呼叫了父類別的同步方法,因此,子類的方法也就相當於同步了。

  3. 在定義介面方法時不能使用 synchronized 關鍵字。

  4. 構造方法不能使用 synchronized 關鍵字,但可以使用 synchronized 程式碼塊來進行同步。

修飾靜態方法

靜態方法是屬於類的而不屬於物件的,synchronized 修飾的靜態方法鎖定的是這個類的所有物件,該類的所有物件用 synchronized 修飾的靜態方法的用的是同一把鎖。

修飾一個類

效果和 synchronized 修飾靜態方法是一樣的,synchronized 作用於一個類時,是給這個類加鎖,該類的所有物件用的是同一把鎖。


4.2.2 Lock 鎖

Lock 和 ReadWriteLock 鎖簡介

從JDK 5.0開始,Java提供了更強大的執行緒同步機制——通過顯式定義同步鎖物件來實現同步,Lock 和 ReadWriteLock 是兩大鎖的根介面,Lock 代表實現類是 ReentrantLock(可重入鎖),ReadWriteLock(讀寫鎖)的代表實現類是 ReentrantReadWriteLock。

  • Lock 介面支援那些語意不同(重入、公平等)的鎖規則,可以在非阻塞式結構的上下文(包括 hand-over-hand 和鎖重排演演算法)中使用這些規則,是控制多個執行緒對共用資源進行存取的工具。Lock鎖提供了對共用資源的獨佔存取,每次只能有一個執行緒對Lock物件加鎖,執行緒開始存取共用資源之前應先獲得Lock物件。主要的實現是 ReentrantLock,ReentrantLock類實現了Lock,它擁有與synchronized相同的並行性和記憶體語意,在實現執行緒安全的控制中,比較常用的是ReentrantLock,可以顯式加鎖、釋放鎖。

  • ReadWriteLock 介面以類似方式定義了一些讀取者可以共用而寫入者獨佔的鎖。此包只提供了一個實現,即 ReentrantReadWriteLock。但程式設計師可以建立自己的、適用於非標準要求的實現。

Lock 鎖與 synchronized 鎖比較

  • synchronized 是隱式鎖,出了作用域自動釋放,鎖的控制和釋放是在 synchronized 同步程式碼塊的開始和結束位置。而 Lock 是顯示鎖,鎖的開啟和關閉都是手動的,實現同步時,鎖的獲取和釋放可以在不同的程式碼塊、不同的方法中。

  • Lock 只有程式碼塊鎖,而 synchronized 有程式碼塊鎖和方法鎖。

  • Lock 介面提供了試圖獲取鎖的 tryLock() 方法,在呼叫 tryLock() 獲取鎖失敗時返回 false,這樣執行緒可以執行其它的操作而不至於使執行緒進入休眠。tryLock() 方法可傳入一個 long 型的時間引數,允許在一定的時間內來獲取鎖。

  • Lock 介面的實現類 ReentrantReadWriteLock 提供了讀鎖和寫鎖,允許多個執行緒獲得讀鎖、而只能有一個執行緒獲得寫鎖,讀鎖和寫鎖不能同時獲得。實現了讀和寫的分離,這一點在需要並行讀的應用中非常重要,如 lucene 允許多個執行緒讀取索引資料進行查詢但只能有一個執行緒負責索引資料的構建。

  • 基於以上幾點,使用 lock 鎖,JVM會花費更少的時候來排程執行緒,因此效能較好,同時有更好的可延伸性(提供更多的子類)。

Lock 獨有特徵

  • 嘗試非阻塞的獲取鎖:當前執行緒嘗試獲取鎖,如果這一時刻鎖沒有被其他執行緒獲取到,則成功獲取並持有鎖。

  • 能被中斷的獲取鎖:獲取到鎖的執行緒能夠響應中斷,當獲取到鎖的執行緒被中斷時,中斷異常將會被丟擲,同時鎖會被釋放。

  • 超時獲取鎖:在指定的截止時間之前獲取鎖,超過截止時間後仍舊無法獲取則返回。

Lock 鎖的使用場景

如果一個程式碼塊被 synchronized 關鍵字修飾,當一個執行緒獲取了對應的鎖,並執行該程式碼塊時,其他執行緒便只能一直等待直至佔有鎖的執行緒釋放鎖。

事實上,佔有鎖的執行緒釋放鎖一般會是以下三種情況之一:

  • 佔有鎖的執行緒執行完了該程式碼塊,然後釋放對鎖的佔有;

  • 佔有鎖執行緒執行發生異常,此時 JVM 會讓執行緒自動釋放鎖;

  • 佔有鎖執行緒進入 WAITING 狀態從而釋放鎖,例如在該執行緒中呼叫wait()方法等。

以下三種場景只能用 Lock:

  1. 使用 synchronized 關鍵字的情形下,假如佔有鎖的執行緒由於要等待 IO 或者其他原因(比如呼叫sleep方法)被阻塞了,但是又沒有釋放鎖,那麼其他執行緒就只能一直等待,別無他法。這會極大影響程式執行效率。因此,就需要有一種機制可以不讓等待的執行緒一直無期限地等待下去(比如只等待一定的時間 (解決方案:tryLock(long time, TimeUnit unit)) 或者 能夠響應中斷 (解決方案 :lockInterruptibly())),這種情況可以通過 Lock 解決。

  2. 當多個執行緒讀寫檔案時,讀操作和寫操作會發生衝突現象,寫操作和寫操作也會發生衝突現象,但是讀操作和讀操作不會發生衝突現象。但是如果採用 synchronized 關鍵字實現同步的話,就會導致一個問題,即當多個執行緒都只是進行讀操作時,也只有一個執行緒在可以進行讀操作,其他執行緒只能等待鎖的釋放而無法進行讀操作。因此,需要一種機制來使得當多個執行緒都只是進行讀操作時,執行緒之間不會發生衝突(解決方案:ReentrantReadWriteLock) 。

  3. 我們可以通過 Lock 得知執行緒有沒有成功獲取到鎖(解決方案:ReentrantLock) ,但這個是 synchronized 無法辦到的。

Lock 鎖的簡單使用

public class LockUse {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();
        new Thread(ticket).start();
        new Thread(ticket).start();
        new Thread(ticket).start();
    }
}

class Ticket implements Runnable {
    private static Integer ticketNums = 10;
    /**
     * 宣告可重入鎖
     */
    private final ReentrantLock lock = new ReentrantLock();

    /**
     * 不加鎖的情況下,執行緒不安全,因此可以使用Lock進行顯示的加鎖和解鎖,鎖lock必須緊跟try程式碼塊,且unlock要放到finally第一行。
     */
    @Override
    public void run() {
        while (true) {
            // 加鎖,鎖lock必須緊跟try程式碼塊,且unlock要放到finally第一行。
            lock.lock();
            try {
                // lock.lock(); 可以出現在這個位置,但是不建議,因為如果在獲取鎖時發生了異常,異常丟擲的同時也會導致鎖無法被釋放;
                if (ticketNums > 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 模擬買票,票自減
                    System.out.println(ticketNums--);
                }
            } finally {
                // 必須放到第一行
                lock.unlock();
            }
        }
    }
}

5、執行緒通訊

5.1、生產者和消費者問題

場景:兩個共用固定大小緩衝區的執行緒——即所謂的「生產者」和「消費者」——在實際執行時會發生的問題。生產者的主要作用是生成一定量的資料放到緩衝區中,然後重複此過程。與此同時,消費者也在緩衝區消耗這些資料。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入資料,消費者也不會在緩衝區中空時消耗資料。

分析:

  • 對於生產者,沒有生產產品前,要通知消費者等待,生產產品後,通知消費者消費。

  • 對於消費者,消費後,通知生產者生產新的產品消費。

Java提供的解決執行緒通訊問題的方法,即:等待/喚醒機制

  • wait(), 表示執行緒一直等待,直到其他執行緒通知,與sleep不同,會釋放鎖。

  • notify() ,喚醒一個處於等待狀態的執行緒

  • notifyAll(), 喚醒同一個物件上所有呼叫wait()方法的執行緒,優先順序高的執行緒優先排程

注意:以上方法只能在同步方法或者同步程式碼塊中使用,否則丟擲異常,IlleagalMonitorStateException

方式:

  1. 管道法:採用並行共同作業模型,加入「緩衝區」,生產者將生產好的資料放入緩衝區,消費者從緩衝區拿出資料。

  2. 訊號燈:在生產者與消費者之間傳遞訊號的一個標誌。如當生產者或消費者執行緒完成自己的工作,等待另一個執行緒進行時,通過修改訊號值來通知對方:我的事情做完了,該你了。另一者獲取訊號的變化後便會做出對應的行為。在這個過程中,訊號值一直被反覆更改,直到所有執行緒均執行完畢。

5.2、管道法

5.2.1、產品、生產者和消費者

class Product {
    /**
     * 產品編號
     */
    Integer productId;

    public Product(Integer productId) {
        this.productId = productId;
    }
}

/**
 * 生產者
 */
class Production extends Thread {
    Buffers buffer;

    public Production(Buffers buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            try {
                buffer.push(new Product(i));
                System.out.println("生產了" + i + "個商品");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 消費者
 */
class Customer extends Thread {
    Buffers buffer;

    public Customer(Buffers buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            try {
                System.out.println("消費了-->" + buffer.commodity().productId + "個產品");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

5.2.2、緩衝區

緩衝區的大小設定之後,設定一個計數器,生產者和消費者通過計數器去進行產品的生產和消費。生產者生產產品時,首先根據計數器去判斷緩衝區是否已經滿了,滿了的話就等待,然後通知消費者進行消費,如果沒有滿的話,就繼續往裡面生產產品。消費者消費的時候,也是通過判斷緩衝區中是否有產品存在,如果存在的話就消費,否則等待生產者進行生產,整個生產和消費的過程都是針對緩衝區進行的。

class Buffers {
    /**
     * 設定容器大小,產品最大數量
     */
    Product[] product = new Product[10];

    /**
     * 計數器
     */
    private int count = 0;

    /**
     * 生產者生產品
     *
     * @param products
     * @throws InterruptedException
     */
    public synchronized void push(Product products) throws InterruptedException {
        // 如果容器滿了,就等待消費者消費
        if (count == product.length) {
            // 通知消費者消費,生產者等待,wait(),表示執行緒一直等待,直到其它執行緒通知,與sleep不同,會釋放鎖
            this.wait();
        }
        // 如果沒滿,就丟入產品
        product[count] = products;
        count++;

        // 通知消費者進行消費,notify(),喚醒同一個物件上所有呼叫wait()方法的執行緒,優先順序高的執行緒優先排程
        this.notifyAll();
    }

    /**
     * 消費者消費產品
     *
     * @return
     * @throws InterruptedException
     */
    public synchronized Product commodity() throws InterruptedException {
        // 判斷是否有產品可以消費
        if (count == 0) {
            // 消費者等待,等待生產者生產
            this.wait();
        }
        // 消費者進行消費
        count--;
        Product products = product[count];

        // 消費完後,通知生產者生產
        this.notifyAll();

        return products;
    }
}

5.3、訊號燈

5.3.1、廚師和顧客

/**
 * 生產者:廚師
 */
class Cook extends Thread {
    Food food;

    public Cook(Food food) {
        this.food = food;
    }

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            try {
                food.make("涼皮");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 消費者:顧客
 */
class Judge extends Thread {
    Food food;

    public Judge(Food food) {
        this.food = food;
    }

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            try {
                food.eat();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

5.3.2、訊號處理

使用訊號法時,需要設定一個標誌位,通過修改標誌位的方式,使得生產者和消費者進行協同工作。當標誌位為true時,生產者進行生產,消費者等待,所以在製作食物的方法中,首先讓消費者(顧客)進行等待,等待生產者(廚師)進行食物製作,廚師製作完成之後,通知顧客吃飯,同時修改標誌位。顧客吃飯的時候,廚師等待,顧客吃完後,通知廚師繼續做飯,同時修改標誌位,使得兩個執行緒有序的進行協同工作。

class Food {
    /**
     * 設定標誌位,true為廚師烹飪食物,顧客等待,false為廚師等待,顧客吃飯
     */
    boolean flag = true;

    /**
     * 食物
     */
    String foodName;

    /**
     * 烹飪食物
     *
     * @param foodName 食物
     * @throws InterruptedException
     */
    public synchronized void make(String foodName) throws InterruptedException {
        // 如果flag為false則廚師等待顧客吃飯,生產者廚師等待,消費者顧客進行吃飯
        if (!flag) {
            this.wait();
        }
        System.out.println("廚師做了一道" + foodName);

        // 喚醒消費者消費
        this.notifyAll();
        // 將廚師做的菜傳遞給總的菜類
        this.foodName = foodName;
        // 讓flag為false,則消費者消費
        this.flag = !this.flag;
    }

    /**
     * 消費者吃飯
     *
     * @throws InterruptedException
     */
    public synchronized void eat() throws InterruptedException {
        // flag為true則顧客等待廚師做飯,消費者等待,生產者生產
        if (flag) {
            this.wait();
        }
        System.out.println("顧客吃了" + foodName);
        // 喚醒,喚醒生產者(廚師)做菜
        this.notifyAll();
        // 使flag為true,讓生產者繼續生產
        this.flag = !this.flag;
    }
}

6、執行緒池

6.1、執行緒池概述

6.1.1、執行緒池的基本概念

在物件導向程式設計中,建立和銷燬物件是很費時間的,對於執行緒來說也是如此,尤其是當執行緒中執行的是簡單任務的話,則大部分的時間都花費線上程的建立和銷燬上。

因此為了解決這種資源浪費的情況,使用池化技術——執行緒池,本質上是一種物件池,用於管理執行緒資源,對執行緒進行復用,一個執行緒執行完當前任務後並不馬上銷燬,而是從任務佇列中取出一個任務繼續執行。即在任務執行前,需要從執行緒池中拿出執行緒來執行,在任務執行完成之後,需要把執行緒放回執行緒池。通過執行緒的這種反覆利用機制,可以有效地避免直接建立執行緒所帶來的壞處。這種做法提高了執行緒的利用率,也減少了系統開銷。

執行緒池作用就是限制系統中執行執行緒的數量。根據系統的環境情況,可以自動或手動設定執行緒數量,達到執行的最佳效果;少了浪費了系統資源,多了造成系統擁擠效率不高。用執行緒池控制執行緒數量,其他執行緒排隊等候。一個任務執行完畢,再從佇列的中取最前面的任務開始執行。若佇列中沒有等待程序,執行緒池的這一資源處於等待。當一個新任務需要執行時,如果執行緒池中有等待的工作執行緒,就可以開始執行了;否則進入等待佇列。

6.1.2、執行緒池的優缺點

優點

  • 降低資源消耗:重用存在的執行緒,減少物件建立銷燬的開銷。

  • 提高響應速度:可有效的控制最大並行執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。

  • 提高執行緒的可管理性:執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。

  • 附加功能:提供定時執行、定期執行、單執行緒、並行數控制等功能。

缺點

  • 頻繁的執行緒建立和銷燬會佔用更多的CPU和記憶體,對GC產生比較大的壓力

  • 執行緒太多,執行緒切換帶來的開銷將不可忽視。同時執行緒太少,多核CPU得不到充分利用,是一種浪費

6.1.3、執行緒池的狀態

執行緒池的5種狀態:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

  • RUNNING:-1 << COUNT_BITS,即高三位為111,該狀態的執行緒池會接收新任務,並處理阻塞佇列中的任務;RUNNING也是執行緒池的初始狀態。

  • SHUTDOWN:0 << COUNT_BITS,即高三位為000,該狀態的執行緒池不會接收新任務,但會處理阻塞佇列中的任務;呼叫shutdown()方法將狀態轉換至 SHUTDOWN。

  • STOP:1 << COUNT_BITS,即高3位為001,該狀態的執行緒不會接收新任務,也不會處理阻塞佇列中的任務,而且會中斷正在執行的任務;

  • TINYING:2 << COUNT_BITS,即高3為010,表示所有任務已經終止,workerCount是0,執行緒過度到tinying狀態需要執行terminated()方法;執行緒池在SHUTDOWN狀態且阻塞佇列為空並且執行緒池中執行的任務也為空時,就會由 SHUTDOWN轉換為TIDYING。

  • TERMINATED:3 << COUNT_BITS,即高3位為011,表示terminated()方法完成後的狀態。

6.1.4、執行緒池的實現原理


通過上圖,我們看到了執行緒池的主要處理流程。我們的關注點在於,任務提交之後是怎麼執行的。大致如下:

  1. 判斷核心執行緒池是否已滿,如果不是,則建立執行緒執行任務。

  2. 如果核心執行緒池滿了,判斷佇列是否滿了,如果佇列沒滿,將任務放在佇列中。

  3. 如果佇列滿了,則判斷執行緒池是否已滿,如果沒滿,建立執行緒執行任務。

  4. 如果執行緒池也滿了,則按照拒絕策略對任務進行處理。


6.2、執行緒池的使用

6.2.1 Executors工具類(不推薦使用)

Executors是一個執行緒池工廠,提供了很多的工廠方法,我們來看看它大概能建立哪些執行緒池。

  • 建立單一執行緒的執行緒池:ExecutorService newSingleThreadExecutor();這是一個始終都只有一個執行緒的池子,所有的任務都通過一個執行緒來執行,若多個任務被提交到此執行緒池,那麼會被快取到佇列(佇列長度為Integer.MAX_VALUE),當執行緒空閒的時候,按照FIFO的方式進行處理。

  • 建立固定數量的執行緒池:ExecutorService newFixedThreadPool(int nThreads);建立一個具有固定執行緒數的執行緒池,當所有執行緒都在執行任務時,新提交的任務會一直提交到阻塞佇列中。若多個任務被提交到此執行緒池,則會有如下處理過程:

    • 如果執行緒的數量未達到指定數量,則建立執行緒來執行任務

    • 如果執行緒池的數量達到了指定數量,並且有執行緒是空閒的,則取出空閒執行緒執行任務

    • 如果沒有執行緒是空閒的,則將任務快取到佇列(佇列長度為Integer.MAX_VALUE)。當執行緒空閒的時候,按照FIFO的方式進行處理

  • 建立帶快取的執行緒池:ExecutorService newCachedThreadPool();這種方式建立的執行緒池,核心執行緒池的長度為0,執行緒池最大長度為Integer.MAX_VALUE。由於本身使用SynchronousQueue作為等待佇列的緣故,導致往佇列裡面每插入一個元素,必須等待另一個執行緒從這個佇列刪除一個元素,會根據執行緒任務的數量來進行執行緒的建立和釋放。

  • 建立定時排程的執行緒池:ScheduledExecutorService newScheduledThreadPool(int corePoolSize);和上面3個工廠方法返回的執行緒池型別有所不同,它返回的是ScheduledThreadPoolExecutor型別的執行緒池。平時我們實現定時排程功能的時候,可能更多的是使用第三方類庫,比如:quartz等。但是對於更底層的功能,我們仍然需要了解。

    1. scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit),定時排程,每個排程任務會至少等待period的時間,如果任務執行的時間超過period,則等待的時間為任務執行的時間。

    2. scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit),定時排程,第二個任務執行的時間 = 第一個任務執行時間 + delay。

    3. schedule(Runnable command, long delay, TimeUnit unit),定時排程,延遲delay後執行,且只執行一次。

我們寫一個例子來看看如何使用定時排程:

public class ThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

        // 定時排程,每個排程任務會至少等待`period`的時間,如果任務執行的時間超過`period`,則等待的時間為任務執行的時間
        executor.scheduleAtFixedRate(() -> {
            try {
                Thread.sleep(10000);
                System.out.println(System.currentTimeMillis() / 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 0, 2, TimeUnit.SECONDS);

        // 定時排程,第二個任務執行的時間 = 第一個任務執行時間 + `delay`
        executor.scheduleWithFixedDelay(() -> {
            try {
                Thread.sleep(5000);
                System.out.println(System.currentTimeMillis() / 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 0, 2, TimeUnit.SECONDS);

        // 定時排程,延遲`delay`後執行,且只執行一次
        executor.schedule(() -> System.out.println("5 秒之後執行 schedule"), 5, TimeUnit.SECONDS);
    }
}

注意: 通過閱讀底層原始碼可以看出,四種常見的執行緒池都直接或間接的繼承自ThreadPoolExecutor類,而《阿里巴巴Java開發手冊》中則強制執行緒池不允許使用 Executors 去建立,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式則必須更加明確執行緒池的執行規則,從而規避資源耗盡的風險。


6.2.2、ThreadPoolExecutor手動設定

理論上,我們可以通過Executors來建立執行緒池,這種方式非常簡單。但正是因為簡單,所以限制了執行緒池的功能。比如:無長度限制的佇列,可能因為任務堆積導致OOM,這是非常嚴重的bug,應儘可能地避免。同時,根據《阿里巴巴Java開發手冊》中則強制執行緒池不允許使用 Executors 去建立,而是通過 ThreadPoolExecutor 的方式,因此歸根結底,還是需要我們通過更底層的方式來建立執行緒池。

從Executors的底層實現上不難看出,其中的幾個方法都使用了 ThreadPoolExecutor 的預設設定,拋開定時排程的執行緒池不管,ThreadPoolExecutor最底層的構造方法卻只有一個。那麼,我們就從這個構造方法著手分析。

public ThreadPoolExecutor(int corePoolSize,                     // 核心執行緒數
                          int maximumPoolSize,                  // 最大執行緒數
                          long keepAliveTime,                   // 最長存活時間
                          TimeUnit unit,                        // 存活時間單位
                          BlockingQueue<Runnable> workQueue,    // 阻塞佇列
                          ThreadFactory threadFactory,          // 執行緒工廠
                          RejectedExecutionHandler handler) {   // 飽和策略
    /*
     * 使用兩個if語句進行引數合法性判斷
     */
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)                  
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

由構造方法可知,ThreadPoolExecutor 類的構造引數總共有7個,我們逐一進行分析。

  • corePoolSize:執行緒池中的核心執行緒數。當提交一個任務時,執行緒池建立一個新執行緒執行任務,直到當前執行緒數等於corePoolSize, 即使有其他空閒執行緒能夠執行新來的任務, 也會繼續建立執行緒;如果當前執行緒數為corePoolSize,繼續提交的任務被儲存到阻塞佇列中,等待被執行。

  • maximumPoolSize:執行緒池中的最大執行緒數。如果當前阻塞佇列滿了,且繼續提交任務,則建立新的執行緒執行任務,直到當前執行緒數等於maximumPoolSize則停止建立;當阻塞佇列是無界佇列時,maximumPoolSize則不起作用, 因為無法提交至核心執行緒池的執行緒會一直持續地放入 workQueue(阻塞佇列)。

  • keepAliveTime:空閒時間,當執行緒池數量超過核心執行緒數時,多餘的空閒執行緒存活的時間,即:這些執行緒多久被銷燬。預設情況下,該引數只線上程數大於corePoolSize(核心執行緒數)時才有用, 超過這個時間的空閒執行緒將被終止。

  • unit:空閒時間的單位,可以是毫秒、秒、分鐘、小時和天等等。

  • workQueue:等待(阻塞)佇列,執行緒池中的執行緒數超過核心執行緒數時,任務將放在等待佇列,等待佇列預設是 BlockingQueue 型別的,同時JDK內部自帶的主要有以下幾種:

    • ArrayBlockingQueue:基於陣列結構的有界阻塞佇列,按FIFO( First Input First Output,即先進先出、先來先服務)排序任務;

    • BlockingQueue workQueue:基於連結串列結構的阻塞佇列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQuene;

    • LinkedBlockingQueue:基於連結串列實現的阻塞佇列,佇列可以有界,也可以無界。

    • SynchronousQuene:一個不儲存元素的阻塞佇列(即只有一個位置),每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene;

    • PriorityBlockingQuene:具有優先順序的無界阻塞佇列;

  • threadFactory:執行緒工廠,我們可以使用它來建立一個執行緒,通過自定義的執行緒工廠可以給每個新建的執行緒設定一個具有識別度的執行緒名。預設為DefaultThreadFactory,Executors的實現使用了預設的執行緒工廠 DefaultThreadFactory。

  • handler:拒絕策略(飽和策略),當執行緒池和等待佇列都滿了之後,需要通過該物件的回撥函數進行回撥處理。即:如果繼續提交任務,必須採取一種策略處理該任務,執行緒池提供了4種策略:

    • AbortPolicy:直接丟擲異常,預設策略;

    • CallerRunsPolicy:用呼叫者所在的執行緒來執行任務;

    • DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,並執行當前任務;

    • DiscardPolicy:直接丟棄任務;

通常情況下,我們需要指定阻塞佇列的上界(比如1024)。另外,如果執行的任務很多,我們可能需要將任務進行分類,然後將不同分類的任務放到不同的執行緒池中執行。

四種拒絕策略各有優劣,比較常用的是DiscardPolicy,但是這種策略有一個弊端就是任務執行的軌跡不會被記錄下來。所以,我們往往需要實現自定義的拒絕策略, 通過實現RejectedExecutionHandler介面的方式。

簡單實現

public class ThreadPool {
    public static void main(String[] args) {
        /**
         * 初始化一個指定的執行緒池,核心執行緒2個,最大執行緒5個,銷燬時間1秒,阻塞佇列使用ArrayBlockingQueue
         */
        ExecutorService executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("beforeExecute is called:呼叫執行之前被呼叫");
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("afterExecute is called:呼叫執行之後被呼叫");
            }

            @Override
            protected void terminated() {
                System.out.println("terminated is called:終止呼叫");
            }
        };

        // 提交任務
        executor.submit(() -> System.out.println("this is a task"));
        // 關閉執行緒池
        executor.shutdown();
    }
}

6.2.3、提交任務

ExecutorService總共提供了兩種任務提交的方法,分別是execute()方法和submit()方法,主要區別如下:

  • execute()方法提交的任務,必須實現Runnable介面,該方式提交的任務不能獲取返回值,因此無法判斷任務是否執行成功。而submit()方法既可以提交Runnable型別的任務,也可以提交Callable型別的任務,會有一個型別為Future的返回值,但當任務型別為Runnable時,返回值為null。

  • execute()方法如果遇到異常會直接丟擲,而submit()方法不會直接丟擲,只有在使用Future的get方法獲取返回值時,才會丟擲異常。

  • 如果提交的任務不需要一個結果的話直接用execute()會提升很多效能。如果你需要的是一個空結果,那麼submit(yourRunnable)與submit(yourRunnable,null)是等價的!

public class TaskSubmit {
    public static void main(String[] args) {
        // 建立執行緒池
        ExecutorService executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));

        // 只能提交Runnable任務
        executor.execute(() -> System.out.println("execute()方法只能提交Runnable任務"));

        // 既可以提交Runnable任務,又可以提交Callable任務,只是前者返回null,後者返回值
        Future<Integer> callableFuture = executor.submit(() -> 1 + 1);
        Future<?> runnableFuture = executor.submit(() -> System.out.println("Runnable任務會返回null"));
        try {
            // 只有獲取返回值的時候才需要處理異常
            System.out.println(callableFuture.get());
            System.out.println(runnableFuture.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        executor.shutdown();
    }
}

6.2.4、關閉執行緒池

ExecutorService提供了shutDown()和shutDownNow()兩個函數來關閉執行緒池,底層還是通過逐個呼叫執行緒的interrupt()函數來實現中斷執行緒從而關閉執行緒池的。

  • shutdown函數會把執行緒池的狀態則立刻變成SHUTDOWN狀態。此時,則不能再往執行緒池中新增任何任務,否則將會丟擲RejectedExecutionException異常。但是,此時執行緒池不會立刻退出,直到新增到執行緒池中的任務都已經處理完成,才會退出。(即將當前所有執行緒任務執行完畢再銷燬執行緒池)

  • shutdownNow方法會先將執行緒池狀態修改為STOP,然後呼叫執行緒池裡的所有執行緒的interrupt方法,並把工作佇列中尚未來得及執行的任務清空到一個List中返回,getTask()方法返回null,從而執行緒退出 。但是ShutdownNow()並不代表執行緒池就一定立即就能退出,它可能必須要等待所有正在執行的任務都執行完成了才能退出。(即直接銷燬執行緒池,不會考慮是否有執行緒任務再執行)