【Java】執行緒池梳理

2023-01-06 12:01:21

【Java】執行緒池梳理

前言

執行緒池:本質上是一種物件池,用於管理執行緒資源。在任務執行前,需要從執行緒池中拿出執行緒來執行。在任務執行完成之後,需要把執行緒放回執行緒池。通過執行緒的這種反覆利用機制,可以有效地避免直接建立執行緒所帶來的壞處。

優點:1、降低資源的消耗。執行緒本身是一種資源,建立和銷燬執行緒會有CPU開銷;建立的執行緒也會佔用一定的記憶體;2、提高任務執行的響應速度。任務執行時,可以不必等到執行緒建立完之後再執行;3、提高執行緒的可管理性。執行緒不能無限制地建立,需要進行統一的分配、調優和監控。

缺點:1、頻繁的執行緒建立和銷燬會佔用更多的CPU和記憶體;2、頻繁的執行緒建立和銷燬會對GC產生比較大的壓力;3、執行緒太多,執行緒切換帶來的開銷將不可忽視;4、執行緒太少,多核CPU得不到充分利用,是一種浪費。

流程

  • 判斷核心執行緒池是否已滿,如果不是,則建立執行緒執行任務;
  • 如果核心執行緒池滿了,判斷佇列是否滿了,如果佇列沒滿,將任務放在佇列中;
  • 如果佇列滿了,則判斷執行緒池是否已滿,如果沒滿,建立執行緒執行任務;
  • 如果執行緒池也滿了,則按照拒絕策略對任務進行處理。

方式

入門級例子

package cn.com.codingce.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            executor.submit(() -> {
                System.out.println("Thread id is " + Thread.currentThread().getId());
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

在這個例子中,首先建立了一個固定長度為5的執行緒池。然後使用迴圈的方式往執行緒池中提交了10個任務,每個任務休眠1秒。在任務休眠之前,將任務所在的執行緒id進行列印輸出。

Thread id is 11
Thread id is 13
Thread id is 12
Thread id is 15
Thread id is 14
Thread id is 11
Thread id is 13
Thread id is 15
Thread id is 14
Thread id is 12

Executors

Executors是一個執行緒池工廠,提供了很多的工廠方法。

// 建立單一執行緒的執行緒池
public static ExecutorService newSingleThreadExecutor();
// 建立固定數量的執行緒池
public static ExecutorService newFixedThreadPool(int nThreads);
// 建立帶快取的執行緒池
public static ExecutorService newCachedThreadPool();
// 建立定時排程的執行緒池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 建立流式(fork-join)執行緒池
public static ExecutorService newWorkStealingPool();

newSingleThreadExecutor

建立一個單執行緒的執行緒池,若多個任務被提交到此執行緒池,那麼會被快取到佇列(佇列長度為Integer.MAX_VALUE ),可保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

private static void createSingleThreadPool() {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 10; i++) {
        final int index = i;
        executorService.execute(() -> {
            // 獲取執行緒名稱,預設格式:pool-1-thread-1
            System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + index);
            // 等待2秒
            try {
                sleep(2000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

output

Mon Jan 02 11:49:58 CST 2023 pool-1-thread-1 0
Mon Jan 02 11:50:00 CST 2023 pool-1-thread-1 1
Mon Jan 02 11:50:02 CST 2023 pool-1-thread-1 2
Mon Jan 02 11:50:04 CST 2023 pool-1-thread-1 3
Mon Jan 02 11:50:06 CST 2023 pool-1-thread-1 4
Mon Jan 02 11:50:08 CST 2023 pool-1-thread-1 5
Mon Jan 02 11:50:10 CST 2023 pool-1-thread-1 6
Mon Jan 02 11:50:12 CST 2023 pool-1-thread-1 7
Mon Jan 02 11:50:14 CST 2023 pool-1-thread-1 8
Mon Jan 02 11:50:16 CST 2023 pool-1-thread-1 9

因為只有一個執行緒,所以執行緒名均相同,且是每隔2秒按順序輸出的。

newFixedThreadPool

建立一個固定大小的執行緒池,可控制並行的執行緒數,超出的執行緒會在佇列中等待。和建立單一執行緒的執行緒池類似,只是可以並行處理任務的執行緒數更多一些。若多個任務被提交到此執行緒池,會有下面的處理過程。

  • 如果執行緒的數量未達到指定數量,則建立執行緒來執行任務;
  • 如果執行緒池的數量達到了指定數量,並且有執行緒是空閒的,則取出空閒執行緒執行任務;
  • 如果沒有執行緒是空閒的,則將任務快取到佇列(佇列長度為Integer.MAX_VALUE)。當執行緒空閒的時候,按照FIFO的方式進行處理
private static void createFixedThreadPool() {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 10; i++) {
        final int index = i;
        executorService.execute(() -> {
            // 獲取執行緒名稱,預設格式:pool-1-thread-1
            System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + index);
            // 等待2秒
            try {
                sleep(2000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

output

Mon Jan 02 11:49:10 CST 2023 pool-1-thread-2 1
Mon Jan 02 11:49:10 CST 2023 pool-1-thread-1 0
Mon Jan 02 11:49:10 CST 2023 pool-1-thread-3 2
Mon Jan 02 11:49:12 CST 2023 pool-1-thread-2 3
Mon Jan 02 11:49:12 CST 2023 pool-1-thread-3 5
Mon Jan 02 11:49:12 CST 2023 pool-1-thread-1 4
Mon Jan 02 11:49:14 CST 2023 pool-1-thread-1 6
Mon Jan 02 11:49:14 CST 2023 pool-1-thread-2 7
Mon Jan 02 11:49:14 CST 2023 pool-1-thread-3 8
Mon Jan 02 11:49:16 CST 2023 pool-1-thread-2 9

因為執行緒池大小是固定的,這裡設定的是3個執行緒,所以執行緒名只有3個。因為執行緒不足會進入佇列等待執行緒空閒,所以紀錄檔間隔2秒輸出。

newCachedThreadPool

建立一個可快取的執行緒池,若執行緒數超過處理所需,快取一段時間後會回收,若執行緒數不夠,則新建執行緒。這種方式建立的執行緒池,核心執行緒池的長度為0,執行緒池最大長度為Integer.MAX_VALUE。由於本身使用SynchronousQueue作為等待佇列的緣故,導致往佇列裡面每插入一個元素,必須等待另一個執行緒從這個佇列刪除一個元素。

private static void createCachedThreadPool() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 10; i++) {
        final int index = i;
        executorService.execute(() -> {
            // 獲取執行緒名稱,預設格式:pool-1-thread-1
            System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + index);
            // 等待2秒
            try {
                sleep(2000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

output

Mon Jan 02 11:56:03 CST 2023 pool-1-thread-8 7
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-3 2
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-1 0
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-5 4
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-9 8
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-6 5
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-2 1
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-4 3
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-7 6
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-10 9

因為初始執行緒池沒有執行緒,而執行緒不足會不斷新建執行緒,所以執行緒名都是不一樣的。

newScheduledThreadPool

建立一個週期性的執行緒池,支援定時及週期性執行任務。

private static void createScheduledThreadPool() {
    ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
    System.out.println(new Date() + " 提交任務");
    for (int i = 0; i < 10; i++) {
        final int index = i;
        executorService.schedule(() -> {
            // 獲取執行緒名稱,預設格式:pool-1-thread-1
            System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + index);
            // 等待2秒
            try {
                sleep(2000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 3, TimeUnit.SECONDS);
    }
}

Output

Mon Jan 02 11:59:19 CST 2023 提交任務
Mon Jan 02 11:59:22 CST 2023 pool-1-thread-1 0
Mon Jan 02 11:59:22 CST 2023 pool-1-thread-2 1
Mon Jan 02 11:59:22 CST 2023 pool-1-thread-3 2
Mon Jan 02 11:59:24 CST 2023 pool-1-thread-2 4
Mon Jan 02 11:59:24 CST 2023 pool-1-thread-1 5
Mon Jan 02 11:59:24 CST 2023 pool-1-thread-3 3
Mon Jan 02 11:59:26 CST 2023 pool-1-thread-2 6
Mon Jan 02 11:59:26 CST 2023 pool-1-thread-1 8
Mon Jan 02 11:59:26 CST 2023 pool-1-thread-3 7
Mon Jan 02 11:59:28 CST 2023 pool-1-thread-3 9

因為設定了延遲3秒,所以提交後3秒才開始執行任務。因為這裡設定核心執行緒數為3個,而執行緒不足會進入佇列等待執行緒空閒,所以紀錄檔間隔2秒輸出。

newWorkStealingPool(jdk1.8新增)

建立一個含有足夠多執行緒的執行緒池,來維持相應的並行級別,它會通過工作竊取的方式,使得多核的 CPU 不會閒置,總會有活著的執行緒讓 CPU 去執行。

工作竊取概念(Work stealing):工作竊取不是什麼 Java 獨有的東西,.NET 的 TPL 庫早就存在好幾年了。所謂工作竊取,指的是閒置的執行緒去處理本不屬於它的任務。每個處理器核,都有一個佇列儲存著需要完成的任務。對於多核的機器來說,當一個核對應的任務處理完畢後,就可以去幫助其他的核處理任務。

private static void createNewWorkStealingPool() {
    ExecutorService forkJoin = Executors.newWorkStealingPool();
    forkJoin.execute(() -> {
        System.out.println("i====>" + 1 + " " + Thread.currentThread().getId());

    });
    forkJoin.execute(() -> {
        System.out.println("i====>" + 2 + " " + Thread.currentThread().getId());

    });
    forkJoin.execute(() -> {
        System.out.println("i====>" + 3 + " " + Thread.currentThread().getId());

    });
    forkJoin.execute(() -> {
        System.out.println("i====>" + 4 + " " + Thread.currentThread().getId());

    });
    forkJoin.execute(() -> {
        System.out.println("i====>" + 5 + " " + Thread.currentThread().getId());
    });
}

output

i====>1 11
i====>2 11
i====>3 12
i====>4 12
i====>5 12

ThreadPoolExecutor

理論上,可以通過Executors來建立執行緒池,這種方式非常簡單。但正是因為簡單,所以限制了執行緒池的功能。比如:無長度限制的佇列,可能因為任務堆積導致OOM,這是非常嚴重的bug,應儘可能地避免。怎麼避免?歸根結底,還是需要通過更底層的方式來建立執行緒池。

ThreadPoolExecutor提供了好幾個構造方法,但是最底層的構造方法卻只有一個。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {}

這個構造方法有7個引數,逐一來進行分析。

  • corePoolSize,執行緒池中的核心執行緒數;
  • maximumPoolSize,執行緒池中的最大執行緒數;
  • keepAliveTime,空閒時間,當執行緒池數量超過核心執行緒數時,多餘的空閒執行緒存活的時間,即:這些執行緒多久被銷燬;
  • unit,空閒時間的單位,可以是毫秒、秒、分鐘、小時和天,等等;
  • workQueue,等待佇列,執行緒池中的執行緒數超過核心執行緒數時,任務將放在等待佇列,它是一個BlockingQueue型別的物件;
    • ArrayBlockingQueue,佇列是有界的,基於陣列實現的阻塞佇列;
    • LinkedBlockingQueue,佇列可以有界,也可以無界。基於連結串列實現的阻塞佇列;
    • SynchronousQueue,不儲存元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作將一直處於阻塞狀態。該佇列也是Executors.newCachedThreadPool()的預設佇列;
    • PriorityBlockingQueue,帶優先順序的無界阻塞佇列。
  • threadFactory,執行緒工廠,可以使用它來建立一個執行緒;Executors的實現使用了預設的執行緒工廠-DefaultThreadFactory。它的實現主要用於建立一個執行緒,執行緒的名字為pool-{poolNum}-thread-{threadNum}
  • handler,拒絕策略,當執行緒池和等待佇列都滿了之後,需要通過該物件的回撥函數進行回撥處理。
    • AbortPolicy:丟棄任務,拋執行時RejectedExecutionException異常;
    • CallerRunsPolicy:在呼叫者執行緒執行任務;
    • DiscardPolicy:忽視,任務直接丟棄,什麼都不會發生;
    • DiscardOldestPolicy:從佇列中踢出最先進入佇列(最後一個執行)的任務(最舊的那個任務),再嘗試執行當前任務。

執行緒池的執行規則如下:1、當執行緒數小於核心執行緒數時,建立執行緒;2、當執行緒數大於等於核心執行緒數,且任務佇列未滿時,將任務放入任務佇列;3、當執行緒數大於等於核心執行緒數,且任務佇列已滿。若執行緒數小於最大執行緒數,建立執行緒。若執行緒數等於最大執行緒數,丟擲異常,拒絕任務。

private static void createThreadPool() {
    ExecutorService executorService = new ThreadPoolExecutor(2, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(5, true), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    for (int i = 0; i < 10; i++) {
        final int index = i;
        executorService.execute(() -> {
            // 獲取執行緒名稱,預設格式:pool-1-thread-1
            System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + index);
            // 等待2秒
            try {
                sleep(2000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    executorService.shutdown();
}

output

Mon Jan 02 12:34:11 CST 2023 pool-1-thread-1 0
Mon Jan 02 12:34:11 CST 2023 pool-1-thread-2 1
Mon Jan 02 12:34:11 CST 2023 pool-1-thread-3 7
Mon Jan 02 12:34:11 CST 2023 pool-1-thread-5 9
Mon Jan 02 12:34:11 CST 2023 pool-1-thread-4 8
Mon Jan 02 12:34:13 CST 2023 pool-1-thread-4 2
Mon Jan 02 12:34:13 CST 2023 pool-1-thread-1 3
Mon Jan 02 12:34:13 CST 2023 pool-1-thread-2 4
Mon Jan 02 12:34:13 CST 2023 pool-1-thread-5 5
Mon Jan 02 12:34:13 CST 2023 pool-1-thread-3 6

因為核心執行緒數為2,佇列大小為5,存活時間1分鐘,所以流程是第0-1號任務來時,陸續建立2個執行緒,然後第2-6號任務來時,因為無執行緒可用,均進入了佇列等待,第7-9號任務來時,沒有空閒執行緒,佇列也滿了,所以陸續又建立了3個執行緒。所以你會發現7-9號任務反而是先執行的。又因為各任務只需要2秒,而執行緒存活時間有1分鐘,所以執行緒進行了複用,所以總共只建立了5個執行緒。

如何正確設定執行緒池的引數:CPU密集型:corePoolSize = CPU核數 + 1;IO密集型:corePoolSize = CPU核數 * 2。

提交任務的幾種方式:往執行緒池中提交任務,主要有兩種方法,execute()submit()submit()用於提交一個需要返回果的任務。該方法返回一個Future物件,通過呼叫這個物件的get()方法,就能獲得返回結果。get()方法會一直阻塞,直到返回結果返回。另外,也可以使用它的過載方法get(long timeout, TimeUnit unit),這個方法也會阻塞,但是在超時時間內仍然沒有返回結果時,將丟擲異常TimeoutException

public static void main(String[] args) throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    Future<Long> future = executor.submit(() -> {
        System.out.println("task is executed");
        return System.currentTimeMillis();
    });
    System.out.println("task execute time is: " + future.get());
}

output

task is executed
task execute time is: 1672634764296

執行緒池監控

  • ThreadPoolExecutor自帶:
    • long getTaskCount():獲取已經執行或正在執行的任務數;
    • long getCompletedTaskCount():獲取已經執行的任務數;
    • int getLargestPoolSize():獲取執行緒池曾經建立過的最大執行緒數,根據這個引數,可以知道執行緒池是否滿過;
    • int getPoolSize():獲取執行緒池執行緒數;
    • int getActiveCount():獲取活躍執行緒數(正在執行任務的執行緒數)。
  • ThreadPoolExecutor自定義處理:
    • protected void beforeExecute(Thread t, Runnable r):任務執行前被呼叫;
    • protected void afterExecute(Runnable r, Throwable t):任務執行後被呼叫;
    • protected void terminated():執行緒池結束後被呼叫。

關閉執行緒池:1、shutdown()會將執行緒池狀態置為SHUTDOWN,不再接受新的任務,同時會等待執行緒池中已有的任務執行完成再結束;2、shutdownNow()會將執行緒池狀態置為SHUTDOWN,對所有執行緒執行interrupt()操作,清空佇列,並將佇列中的任務返回回來。關閉執行緒池涉及到兩個返回boolean的方法,isShutdown()isTerminated,分別表示是否關閉和是否終止。

注意

  • 儘量使用手動的方式建立執行緒池,避免使用Executors工廠類;
  • 根據場景,合理設定執行緒池的各個引數,包括執行緒池數量、佇列、執行緒工廠和拒絕策略;
  • 在調執行緒池submit()方法的時候,一定要儘量避免任務執行異常被吞掉的問題。