阻塞佇列在 Android 中有很多應用,比如:
阻塞佇列支援兩個核心方法,分別為:
在 Java 中,阻塞佇列的實現類有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue 等等。這些佇列通過使用 Lock 和 Condition 來保證佇列的執行緒安全。
是當阻塞佇列中沒有資料的時候,讀執行緒將被阻塞;當阻塞佇列已滿的時候,寫執行緒將被阻塞。阻塞佇列通過內部鎖來控制讀和寫操作的相互存取以及有序性,保證在多執行緒並行的時候,執行插入或讀取操作是廣泛可接受的,而不會引起任何問題。
執行緒池是維護一個工作執行緒的執行緒容器,用於優化執行緒的建立和銷燬帶來的開銷。下面簡單介紹一下執行緒池的底層實現原理。
ThreadPoolExecutor
建構函式賦值並建立核心執行緒池;execute()
方法將任務存放到阻塞佇列中,如果有空閒執行緒,則取出最先進入佇列的任務開始執行,否則加入等待中的佇列中;以下是執行緒池最基本的四類引數。
核心執行緒數(corePoolSize):執行緒池中的基本執行緒數。當任務提交後,分類討論:
最大執行緒數(maximumPoolSize):執行緒池中允許存在的最大執行緒數。當緩衝佇列滿時,對於大於 corePoolSize 的任務將會此項啟動更多的 Thread 來處理使用者請求,針對有界和無界不同表現。如有等待佇列功能滿了後會執行RejectedExecutionException
。在我看來這個設定很雞肋,往往在它小於corepoolsize
時候有表現,大於情況視大於多少對執行緒池複用機制上友好而已。有不對的歡迎評論區討論下;
佇列容量(workQueue):等待任務的佇列容量。在呼叫 execute() 方法時,阻塞佇列可以用以下三種型別之一:
空閒執行緒銷燬時間(keepAliveTime):執行緒池中沒有任務執行時,即空閒狀態下的執行緒沒事可做,這些空閒的執行緒會自動的銷燬,首先由coreThreadHandle(正常獲得鎖的執行緒)完成剩下工作,然後 wait coreHandle 釋放鎖資源,休眠到內部佇列不為空或超時返回,如果休眠超時則表示該執行緒超時了,在低於 corePoolSize 的情況下 除非設定allowCoreThreadTimout=true,否則永遠不會發生;
public class TestThreadPoll implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在執行...");
}
}
public static void main(String[] args) {
// 建立一個基本的執行緒池
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
Runnable task = new TestThreadPoll();
executor.execute(task);
}
executor.shutdown();
}
上面的建立了一個基本的執行緒池,並進行了執行緒數的新增。
當執行緒池中所有的執行緒都在執行任務時,如果新提交的任務需要被執行,那麼這個新的任務會進入到一個阻塞佇列中排隊等待。而ThreadPoolExecutor中有四種常見的阻塞佇列:SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue。
其中SynchronousQueue是沒有任何儲存功能的阻塞佇列,它只負責一個任務的傳遞,即前一個任務將後一個任務提交給執行緒池後直接結束,而不是進入等待佇列。由於該佇列沒有儲存空間,所以當一個執行緒提交任務時,該佇列會尋找與之對應的另一個執行緒來接收這個任務。如果此時執行緒池處於飽和狀態,這個任務就有可能被丟棄。
LinkedBlockingQueue則是一個帶儲存功能的無界佇列,也就是說,這個佇列可以一直往裡新增新的元素。因此,它的長度限制僅僅是由記憶體大小來控制,如果你的阻塞佇列需要儲存非常多的任務,那麼最好選用這個型別的阻塞佇列。
ArrayBlockingQueue也是一個帶儲存功能的有界佇列,它的長度是預設好的,如果你試圖往這個佇列中新增超過預設長度的任務,那麼新增的任務就必須要等待其他任務完成或者被移除後才能加入佇列了,換而言之,在這種佇列中,無法滿足新任務的請求時會出現阻塞情況。
PriorityBlockingQueue則是一個支援優先順序排序的無界佇列,也就是說,任務可以通過實現Comparable介面來設定一個佇列中的優先順序關係。當使用優先順序佇列時,可以為不同的任務設定不同的優先順序,程式會按照任務的優先順序順序執行具體的任務。
以下是一個簡單的自實現執行緒池範例:
public class MyThreadPool {
private final BlockingQueue<Runnable> workQueue;
private final WorkerThread[] workers;
public MyThreadPool(int nThreads) {
this.workQueue = new LinkedBlockingQueue<>();
this.workers = new WorkerThread[nThreads];
for (int i = 0; i < nThreads; i++) {
workers[i] = new WorkerThread();
workers[i].start();
}
}
public void execute(Runnable task) {
synchronized (workQueue) {
workQueue.add(task);
workQueue.notify();
}
}
private class WorkerThread extends Thread {
@Override
public void run() {
while(true) {
Runnable task;
synchronized (workQueue) {
while (workQueue.isEmpty()) {
try {
workQueue.wait();
} catch (InterruptedException e) {
return;
}
}
task = workQueue.poll();
}
try {
task.run();
} catch(Throwable t) {
// error handling code
}
}
}
}
}
這是一個最簡單的執行緒池實現,其核心思想就是:在.WorkerThread類中定義了一個死迴圈,負責從任務佇列中取出任務並執行。如果目前沒有任務,則等待新任務到來。
使用時初始化執行緒池:
MyThreadPool pool = new MyThreadPool(2);
之後將需要做的任務加入執行緒池中:
pool.execute(new Runnable() {
@Override
public void run() {
// 執行具體的任務程式碼
}
});
至此,自實現的執行緒池範例就完成了。
execute()
是Java中Executor介面的一個方法,它在執行傳遞給它的Runnable任務時使用。Executor框架提供了一種將任務提交給執行緒池以非同步執行的方式。
當我們想要某段程式碼在非同步環境中執行,又不需要知道每個任務何時完成時(所需時間可能會非常不同)的情況下,就可以使用這個框架。該框架管理和複用執行緒,以避免耗費建立他們的代價,並且在執行完任務後返回執行緒到執行緒池。
execute()
方法的目的是在預設池中安排一個任務來執行,由於這個方法只是將任務提交給執行緒池並立即返回,因此不能保證任務已經執行。這個方法只有在向執行緒池中新增任務時需要使用,例如對於沒有返回值的簡單操作或前置條件檢查。
以下是execute()
的方法語法:
void execute(Runnable command);
引數:
command
:Runnable物件,該介面定義了需要線上程上執行的任務。實現:
該方法直接將任務提交給主執行器以非同步執行。然後返回而不等待任務的結束。如果需要使用結果,則呼叫submit
方法。
//newFixedThreadPool 被過載用來覆蓋預設設定,指定核心執行緒數和最大執行緒數相同。
//此範例將執行緒池大小限制為 5 個工作執行緒。
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
Runnable worker = new ExampleRunnable("" + i);
executor.execute(worker);
}
// 當所有可執行任務都完成後關閉執行緒池
executor.shutdown();
// 阻塞當前執行緒直到關閉操作完成
while (!executor.isTerminated()) { }
System.out.println("Finished all threads");
以上程式碼中,新的執行緒池有預設的限制,而不需要明確地宣告執行緒池的大小或能力 。
執行緒池固定大小,當佇列中沒有可以在工作執行緒上執行的任務時(即阻止列表為空),則該任務將繼續等待直到可用。
在每個可用工作執行緒上使用while迴圈反覆啟動一個Runnable任務並讓其執行到池的關閉標誌被設定
執行緒池被關閉之後,isTerminated()方法返回true,並退出while迴圈。