阻塞佇列是一種常用的並行程式設計工具,它能夠在多執行緒環境下提供一種安全而高效的資料傳輸機制。本文將介紹阻塞佇列的原理和使用場景,並通過範例演示其在多執行緒程式設計中的應用。
阻塞佇列是一種特殊的佇列,它具有以下幾個特點:
阻塞佇列常用於解決生產者-消費者問題,它能夠有效地銜接生產者和消費者之間的速度差異,提供一種協調和安全的資料互動方式。
阻塞佇列底層一般採用陣列和連結串列這兩種資料結構儲存元素,ArrayBlockingQueue和PriorityBlockingQueue底層都是採用陣列儲存的,但是ArrayBlockingQueue是必須指定陣列大小,不能擴容,而PriorityBlockingQueue可以進行動態擴容(擴容的最大長度也是Integer.MAX_VALUE),LinkedBlockingQueue底層是連結串列結構儲存,雖然是連結串列,但是也有長度限制,預設是Integer.MAX_VALUE,一般認為的無界阻塞佇列,其實最大的佇列長度也就是Integer.MAX_VALUE。
方法 | 描述 | 是否阻塞 |
---|---|---|
add方法 | 往佇列尾部新增元素,內部是呼叫offer方法 | 否 |
put方法 | 往佇列尾部新增元素,如果佇列已滿,則阻塞等待 | 是 |
offer方法 | 往佇列尾部新增元素,如果佇列已滿,則返回false,不會阻塞 | 否 |
方法 | 描述 | 是否阻塞 |
---|---|---|
take方法 | take方法:移除並返回佇列頭部的元素,如果佇列為空,則阻塞等待 | 是 |
poll方法 | 移除並返回佇列頭部的元素,如果佇列為空,則返回null,不會阻塞 | 否 |
peek方法 | 返回佇列頭部的元素(不移除),如果佇列為空,則返回null,不會阻塞 | 否 |
通過圖中可以看到,BlockingQueue整合了Queue介面的功能,有多種子類實現,常用的如下:
常用的阻塞佇列,比如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue底層都是採用ReentrantLock鎖來實現執行緒的互斥,而ReentrantLock底層採用了AQS框架實現執行緒佇列的同步,執行緒的阻塞是呼叫LockSupport.park實現,喚醒是呼叫LockSupport.unpark實現,具體可以看我之前的文章,SynchronousQueue底層雖然沒有用AQS框架,但也用的是LockSupport實現執行緒的阻塞與喚醒。
一文讀懂LockSupport
AQS原始碼分析
阻塞佇列的原理可以通過兩個關鍵元件來解釋:鎖和條件變數。
阻塞佇列使用鎖來保護共用資源,控制執行緒的互斥存取。在佇列為空或已滿時,執行緒需要等待相應的條件滿足才能繼續執行。
條件變數是鎖的一個補充,在某些特定的條件下,執行緒會進入等待狀態。當條件滿足時,其他執行緒會通過呼叫條件變數的喚醒方法(比如signal()或signalAll())來通知等待的執行緒進行下一步操作。
當一個執行緒試圖從空的阻塞佇列中獲取元素時,它會獲取佇列的鎖,並檢查佇列是否為空。如果為空,這個執行緒將進入等待狀態,直到其他執行緒向佇列中插入元素並通過條件變數喚醒它。當一個執行緒試圖向已滿的阻塞佇列插入元素時,它會獲取佇列的鎖,並檢查佇列是否已滿。如果已滿,這個執行緒將進入等待狀態,直到其他執行緒從佇列中獲取元素並通過條件變數喚醒它。
接下來我們看下阻塞佇列的獲取元素和插入元素的核心程式碼:
ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue的帶阻塞的插入和獲取方法都是基於ReentrantLock鎖+條件變數的等待和通知來實現的。
主要看看ArrayBlockingQueue帶阻塞的插入和獲取元素的主要方法吧。
/**
* 插入元素,帶阻塞
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
// 這裡使用的是ReentrantLock鎖
final ReentrantLock lock = this.lock;
// 獲取鎖並支援響應中斷,注意:獲取鎖的過程中不響應中斷,是在獲取到鎖後根據當前執行緒的中斷標識來處理。
lock.lockInterruptibly();
try {
// 元素大小等於陣列長度時阻塞,說明放滿了,生產者需要暫停,阻塞在條件變數上,等待被喚醒
while (count == items.length)
notFull.await();
// 放入元素到陣列指定的下標處
enqueue(e);
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 插入元素,喚醒等待獲取元素的執行緒
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 放入元素後,通知消費執行緒繼續獲取元素
notEmpty.signal();
}
/**
* 獲取元素,帶阻塞
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 陣列無元素時阻塞,阻塞在條件變數上,等待被喚醒
// 元素大小等於0時阻塞,說明陣列被取空了,消費者需要暫停,阻塞在條件變數上,等待被喚醒
while (count == 0)
notEmpty.await();
// 移除元素並返回
return dequeue();
} finally {
lock.unlock();
}
}
/**
* 移除元素並返回
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 陣列時迴圈使用的,取元素的index到達陣列長度時,下次需要從第0個位置
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 移除元素後,通知消費者執行緒可以繼續放入元素
notFull.signal();
return x;
}
SynchronousQueue不儲存元素,插入和刪除是配套使用的,它的插入和刪除有公平和非公平之分,公平是通過內部類TransferQueue實現的,非公平是通過TransferStack實現的,具體可以看transfer方法,最終會呼叫LockSupport.park實現執行緒阻塞,LockSupport.unpark實現執行緒繼續執行,這個就不貼程式碼了。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
// 建立一個容量為10的ArrayBlockingQueue
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
// BlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10);
// 建立生產者執行緒
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i <= 5; i++) {
// 將資料放入佇列
queue.put(i);
System.out.println(Thread.currentThread().getName() + "Produced: " + i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 建立消費者執行緒
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i <= 5; i++) {
// 從佇列中取出資料
int num = queue.take();
System.out.println(Thread.currentThread().getName() + "Consumed: " + num);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 啟動生產者和消費者執行緒
producerThread.start();
consumerThread.start();
// 等待執行緒執行完畢
try {
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
執行輸出:
Thread-0Produced: 0
Thread-1Consumed: 0
Thread-0Produced: 1
Thread-1Consumed: 1
Thread-0Produced: 2
Thread-0Produced: 3
Thread-1Consumed: 2
Thread-0Produced: 4
Thread-0Produced: 5
Thread-1Consumed: 3
Thread-1Consumed: 4
Thread-1Consumed: 5
阻塞佇列的使用比較簡單,這裡是個簡單的使用例子,可設定合適的佇列大小和生產者消費者休眠時間來偵錯阻塞等待和喚醒通知。使用阻塞佇列可解決多執行緒並行存取資料安全問題,也能方便的實現執行緒間的協調工作。
通過了解阻塞佇列的原理和使用場景,我們可以更好地應對多執行緒程式設計中的並行問題,提高程式碼的可維護性和可延伸性。阻塞佇列作為一種常見的並行程式設計工具,能夠幫助我們實現高效的資料傳輸和執行緒共同作業,為我們的應用程式提供更好的效能和可靠性保障。希望本文能夠為讀者對阻塞佇列的理解和應用提供一些幫助。