ThreadPool實現機制

2023-03-22 18:01:44

Android中阻塞佇列的應用有哪些

阻塞佇列在 Android 中有很多應用,比如:

  1. 執行緒池:執行緒池任務的執行就是基於一個阻塞佇列,如果執行緒池任務已滿,則任務需要等待阻塞佇列中的其他任務完成。
  2. Handler 訊息佇列:Handler 的訊息佇列也是一種阻塞佇列。handler傳送訊息時,首先將訊息加入到訊息佇列中,在空閒狀態下 MessageQueue 佇列是阻塞的狀態,直到佇列不為空,Looper 開始輪詢 MessageQueue 裡面的 Message。

阻塞佇列支援兩個核心方法,分別為:

  • put(E e):將元素插入隊尾,支援阻塞式插入,在容量無限制的情況下一直等待直到佇列有空閒位置。
  • take(): 獲取且移除此佇列的頭部,在佇列為空時,阻塞等待這個頭部可用,並返回被取出的元素。

在 Java 中,阻塞佇列的實現類有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue 等等。這些佇列通過使用 Lock 和 Condition 來保證佇列的執行緒安全。

阻塞佇列的原理

是當阻塞佇列中沒有資料的時候,讀執行緒將被阻塞;當阻塞佇列已滿的時候,寫執行緒將被阻塞。阻塞佇列通過內部鎖來控制讀和寫操作的相互存取以及有序性,保證在多執行緒並行的時候,執行插入或讀取操作是廣泛可接受的,而不會引起任何問題。

執行緒池是維護一個工作執行緒的執行緒容器,用於優化執行緒的建立和銷燬帶來的開銷。下面簡單介紹一下執行緒池的底層實現原理。

執行緒池執行流程

  1. 在初始化時,執行緒池會通過ThreadPoolExecutor建構函式賦值並建立核心執行緒池;
  2. execute()方法將任務存放到阻塞佇列中,如果有空閒執行緒,則取出最先進入佇列的任務開始執行,否則加入等待中的佇列中;
  3. 當佇列長度達到閾值時,不再繼續增加,此時若仍有新任務提交,則以上空閒執行緒處理任務;若新增的執行緒數仍超過最大執行緒數時,則拒絕該任務。

引數說明:

以下是執行緒池最基本的四類引數。

  • 核心執行緒數(corePoolSize):執行緒池中的基本執行緒數。當任務提交後,分類討論:

    • 若執行緒池中的執行緒數小於 corePoolSize,那麼即使執行緒池中還有空餘的執行緒,也會直接新增一個新執行緒去處理當前的任務;
    • 若執行緒池已經有了 corePoolSize 個執行緒,那麼任務就被加入阻塞佇列進行緩衝,等待有空閒的執行緒來處理;
    • 一次性只能建立 corePoolSize 個執行緒。
  • 最大執行緒數(maximumPoolSize):執行緒池中允許存在的最大執行緒數。當緩衝佇列滿時,對於大於 corePoolSize 的任務將會此項啟動更多的 Thread 來處理使用者請求,針對有界和無界不同表現。如有等待佇列功能滿了後會執行RejectedExecutionException。在我看來這個設定很雞肋,往往在它小於corepoolsize時候有表現,大於情況視大於多少對執行緒池複用機制上友好而已。有不對的歡迎評論區討論下;

  • 佇列容量(workQueue):等待任務的佇列容量。在呼叫 execute() 方法時,阻塞佇列可以用以下三種型別之一:

    • FIFO(先進先出)佇列,使用java.util.concurrent.LinkedBlockingQueue,預設無界限大小。因此 newFixedThreadPool() 和 newSingleThreadExecutor() 的執行緒數最多也可能會達到 Integer.MAX_VALUE,會導致OOM等問題。但你可指定其大小。
    • LIFO(後進先出)佇列,使用java.util.concurrent.LinkedBlockingDeque,特點是新任務插到隊尾上,但在 JDK7 中不再使用這個佇列,因為它是不符合 Java 應用程式的行為模式的;
    • 優先順序佇列,使用java.util.concurrent.PriorityBlockingQueue 類實現,按照元素等級來排序,低等級物件會先被獲取,對於高等級的任務可以提前執行。
  • 空閒執行緒銷燬時間(keepAliveTime):執行緒池中沒有任務執行時,即空閒狀態下的執行緒沒事可做,這些空閒的執行緒會自動的銷燬,首先由coreThreadHandle(正常獲得鎖的執行緒)完成剩下工作,然後 wait coreHandle 釋放鎖資源,休眠到內部佇列不為空或超時返回,如果休眠超時則表示該執行緒超時了,在低於 corePoolSize 的情況下 除非設定allowCoreThreadTimout=true,否則永遠不會發生;

範例程式碼

public class TestThreadPoll implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "正在執行...");
    }
}

  public static void main(String[] args) {
      // 建立一個基本的執行緒池
      ExecutorService executor = Executors.newCachedThreadPool();

      for (int i = 0; i < 10; i++) {
          Runnable task = new TestThreadPoll();
          executor.execute(task);
      }
      executor.shutdown();
  }

上面的建立了一個基本的執行緒池,並進行了執行緒數的新增。

執行緒池排隊機制

當執行緒池中所有的執行緒都在執行任務時,如果新提交的任務需要被執行,那麼這個新的任務會進入到一個阻塞佇列中排隊等待。而ThreadPoolExecutor中有四種常見的阻塞佇列:SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue。

其中SynchronousQueue是沒有任何儲存功能的阻塞佇列,它只負責一個任務的傳遞,即前一個任務將後一個任務提交給執行緒池後直接結束,而不是進入等待佇列。由於該佇列沒有儲存空間,所以當一個執行緒提交任務時,該佇列會尋找與之對應的另一個執行緒來接收這個任務。如果此時執行緒池處於飽和狀態,這個任務就有可能被丟棄。

LinkedBlockingQueue則是一個帶儲存功能的無界佇列,也就是說,這個佇列可以一直往裡新增新的元素。因此,它的長度限制僅僅是由記憶體大小來控制,如果你的阻塞佇列需要儲存非常多的任務,那麼最好選用這個型別的阻塞佇列。

ArrayBlockingQueue也是一個帶儲存功能的有界佇列,它的長度是預設好的,如果你試圖往這個佇列中新增超過預設長度的任務,那麼新增的任務就必須要等待其他任務完成或者被移除後才能加入佇列了,換而言之,在這種佇列中,無法滿足新任務的請求時會出現阻塞情況。

PriorityBlockingQueue則是一個支援優先順序排序的無界佇列,也就是說,任務可以通過實現Comparable介面來設定一個佇列中的優先順序關係。當使用優先順序佇列時,可以為不同的任務設定不同的優先順序,程式會按照任務的優先順序順序執行具體的任務。

自實現一個執行緒池範例

以下是一個簡單的自實現執行緒池範例:

public class MyThreadPool {
    private final BlockingQueue<Runnable> workQueue;
    private final WorkerThread[] workers;

    public MyThreadPool(int nThreads) {
        this.workQueue = new LinkedBlockingQueue<>();
        this.workers = new WorkerThread[nThreads];

        for (int i = 0; i < nThreads; i++) {
            workers[i] = new WorkerThread();
            workers[i].start();
        }
    }

    public void execute(Runnable task) {
        synchronized (workQueue) {
            workQueue.add(task);
            workQueue.notify();
        }
    }

    private class WorkerThread extends Thread {
        @Override
        public void run() {
            while(true) {
                Runnable task;
                synchronized (workQueue) {
                    while (workQueue.isEmpty()) {
                        try {
                            workQueue.wait();
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                    task = workQueue.poll();
                }
                try {
                    task.run();
                } catch(Throwable t) {
                    // error handling code
                }
            }
        }
    }
}

這是一個最簡單的執行緒池實現,其核心思想就是:在.WorkerThread類中定義了一個死迴圈,負責從任務佇列中取出任務並執行。如果目前沒有任務,則等待新任務到來。

使用時初始化執行緒池:

MyThreadPool pool = new MyThreadPool(2);

之後將需要做的任務加入執行緒池中:

pool.execute(new Runnable() {
    @Override
    public void run() {
        // 執行具體的任務程式碼
    }
});

至此,自實現的執行緒池範例就完成了。

execute框架解讀

execute()是Java中Executor介面的一個方法,它在執行傳遞給它的Runnable任務時使用。Executor框架提供了一種將任務提交給執行緒池以非同步執行的方式。
當我們想要某段程式碼在非同步環境中執行,又不需要知道每個任務何時完成時(所需時間可能會非常不同)的情況下,就可以使用這個框架。該框架管理和複用執行緒,以避免耗費建立他們的代價,並且在執行完任務後返回執行緒到執行緒池。
execute()方法的目的是在預設池中安排一個任務來執行,由於這個方法只是將任務提交給執行緒池並立即返回,因此不能保證任務已經執行。這個方法只有在向執行緒池中新增任務時需要使用,例如對於沒有返回值的簡單操作或前置條件檢查。
以下是execute()的方法語法:

void execute(Runnable command);

引數:

  • command:Runnable物件,該介面定義了需要線上程上執行的任務。

實現:
該方法直接將任務提交給主執行器以非同步執行。然後返回而不等待任務的結束。如果需要使用結果,則呼叫submit方法。

//newFixedThreadPool 被過載用來覆蓋預設設定,指定核心執行緒數和最大執行緒數相同。
//此範例將執行緒池大小限制為 5 個工作執行緒。
ExecutorService executor = Executors.newFixedThreadPool(5);

for (int i = 0; i < 10; i++) {
    Runnable worker = new ExampleRunnable("" + i);
    executor.execute(worker);
}
// 當所有可執行任務都完成後關閉執行緒池
executor.shutdown();
// 阻塞當前執行緒直到關閉操作完成
while (!executor.isTerminated()) { }

System.out.println("Finished all threads");

以上程式碼中,新的執行緒池有預設的限制,而不需要明確地宣告執行緒池的大小或能力 。

執行緒池固定大小,當佇列中沒有可以在工作執行緒上執行的任務時(即阻止列表為空),則該任務將繼續等待直到可用。

在每個可用工作執行緒上使用while迴圈反覆啟動一個Runnable任務並讓其執行到池的關閉標誌被設定

執行緒池被關閉之後,isTerminated()方法返回true,並退出while迴圈。