阻塞佇列的原理及應用

2023-09-06 12:01:39

阻塞佇列是一種常用的並行程式設計工具,它能夠在多執行緒環境下提供一種安全而高效的資料傳輸機制。本文將介紹阻塞佇列的原理和使用場景,並通過範例演示其在多執行緒程式設計中的應用。

一、什麼是阻塞佇列

阻塞佇列是一種特殊的佇列,它具有以下幾個特點:

  1. 阻塞特性:當佇列為空時,從佇列中獲取元素的操作將會被阻塞,直到佇列中有新的元素被新增;當佇列已滿時,向佇列中新增元素的操作將會被阻塞,直到佇列中有空的位置,這就是等待喚醒機制。
  2. 執行緒安全:阻塞佇列內部通過鎖或其他同步機制來保證多執行緒環境下的資料一致性。
  3. 有界性:阻塞佇列可以設定容量上限,當佇列滿時,後續的元素將無法新增。
  4. 公平性:阻塞佇列可以選擇公平或非公平的策略來決定執行緒的獲取順序。公平佇列會按照執行緒的請求順序進行處理(執行緒按先來後到順序排隊獲取元素),而非公平佇列則允許新的執行緒插隊執行(執行緒競爭)。比如:SynchronousQueue。

阻塞佇列常用於解決生產者-消費者問題,它能夠有效地銜接生產者和消費者之間的速度差異,提供一種協調和安全的資料互動方式。
阻塞佇列底層一般採用陣列和連結串列這兩種資料結構儲存元素,ArrayBlockingQueue和PriorityBlockingQueue底層都是採用陣列儲存的,但是ArrayBlockingQueue是必須指定陣列大小,不能擴容,而PriorityBlockingQueue可以進行動態擴容(擴容的最大長度也是Integer.MAX_VALUE),LinkedBlockingQueue底層是連結串列結構儲存,雖然是連結串列,但是也有長度限制,預設是Integer.MAX_VALUE,一般認為的無界阻塞佇列,其實最大的佇列長度也就是Integer.MAX_VALUE。

二、阻塞佇列的核心方法

  • 新增
方法 描述 是否阻塞
add方法 往佇列尾部新增元素,內部是呼叫offer方法
put方法 往佇列尾部新增元素,如果佇列已滿,則阻塞等待
offer方法 往佇列尾部新增元素,如果佇列已滿,則返回false,不會阻塞
  • 獲取
方法 描述 是否阻塞
take方法 take方法:移除並返回佇列頭部的元素,如果佇列為空,則阻塞等待
poll方法 移除並返回佇列頭部的元素,如果佇列為空,則返回null,不會阻塞
peek方法 返回佇列頭部的元素(不移除),如果佇列為空,則返回null,不會阻塞

三、常見的阻塞佇列實現


通過圖中可以看到,BlockingQueue整合了Queue介面的功能,有多種子類實現,常用的如下:

  1. ArrayBlockingQueue:基於陣列實現的有界阻塞佇列,它的容量在建立時指定,並且不能動態擴充套件。
  2. LinkedBlockingQueue:基於連結串列實現的有界阻塞佇列,連結串列的長度可以通過建構函式顯式指定,如果使用預設的建構函式,則預設大小是Integer.MAX_VALUE。
  3. PriorityBlockingQueue:基於優先順序堆排序實現的阻塞佇列(可擴容),元素按照優先順序順序進行排序。
  4. SynchronousQueue:不儲存元素的阻塞佇列,每個插入操作都必須等待一個相應的刪除操作,反之亦然。

四、阻塞佇列的原理

常用的阻塞佇列,比如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue底層都是採用ReentrantLock鎖來實現執行緒的互斥,而ReentrantLock底層採用了AQS框架實現執行緒佇列的同步,執行緒的阻塞是呼叫LockSupport.park實現,喚醒是呼叫LockSupport.unpark實現,具體可以看我之前的文章,SynchronousQueue底層雖然沒有用AQS框架,但也用的是LockSupport實現執行緒的阻塞與喚醒。
一文讀懂LockSupport
AQS原始碼分析
阻塞佇列的原理可以通過兩個關鍵元件來解釋:鎖和條件變數。

阻塞佇列使用鎖來保護共用資源,控制執行緒的互斥存取。在佇列為空或已滿時,執行緒需要等待相應的條件滿足才能繼續執行。

  • 條件變數

條件變數是鎖的一個補充,在某些特定的條件下,執行緒會進入等待狀態。當條件滿足時,其他執行緒會通過呼叫條件變數的喚醒方法(比如signal()或signalAll())來通知等待的執行緒進行下一步操作。
當一個執行緒試圖從空的阻塞佇列中獲取元素時,它會獲取佇列的鎖,並檢查佇列是否為空。如果為空,這個執行緒將進入等待狀態,直到其他執行緒向佇列中插入元素並通過條件變數喚醒它。當一個執行緒試圖向已滿的阻塞佇列插入元素時,它會獲取佇列的鎖,並檢查佇列是否已滿。如果已滿,這個執行緒將進入等待狀態,直到其他執行緒從佇列中獲取元素並通過條件變數喚醒它。
接下來我們看下阻塞佇列的獲取元素和插入元素的核心程式碼:
ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue的帶阻塞的插入和獲取方法都是基於ReentrantLock鎖+條件變數的等待和通知來實現的。
主要看看ArrayBlockingQueue帶阻塞的插入和獲取元素的主要方法吧。

/**
 * 插入元素,帶阻塞
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    // 這裡使用的是ReentrantLock鎖
    final ReentrantLock lock = this.lock;
    // 獲取鎖並支援響應中斷,注意:獲取鎖的過程中不響應中斷,是在獲取到鎖後根據當前執行緒的中斷標識來處理。
    lock.lockInterruptibly();
    try {
        // 元素大小等於陣列長度時阻塞,說明放滿了,生產者需要暫停,阻塞在條件變數上,等待被喚醒
        while (count == items.length)
            notFull.await();
        // 放入元素到陣列指定的下標處
        enqueue(e);
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

/**
 * 插入元素,喚醒等待獲取元素的執行緒
 */
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
	// 放入元素後,通知消費執行緒繼續獲取元素
    notEmpty.signal();
}

/**
 * 獲取元素,帶阻塞
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 陣列無元素時阻塞,阻塞在條件變數上,等待被喚醒
        // 元素大小等於0時阻塞,說明陣列被取空了,消費者需要暫停,阻塞在條件變數上,等待被喚醒
        while (count == 0)
            notEmpty.await();
        // 移除元素並返回
        return dequeue();
    } finally {
        lock.unlock();
    }
}



/**
 * 移除元素並返回
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    // 陣列時迴圈使用的,取元素的index到達陣列長度時,下次需要從第0個位置
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 移除元素後,通知消費者執行緒可以繼續放入元素
    notFull.signal();
    return x;
}

SynchronousQueue不儲存元素,插入和刪除是配套使用的,它的插入和刪除有公平和非公平之分,公平是通過內部類TransferQueue實現的,非公平是通過TransferStack實現的,具體可以看transfer方法,最終會呼叫LockSupport.park實現執行緒阻塞,LockSupport.unpark實現執行緒繼續執行,這個就不貼程式碼了。

五、阻塞佇列的使用場景

  1. 生產者-消費者模型:阻塞佇列能夠很好地平衡生產者和消費者之間的速度差異,既能保護消費者不會消費到空資料,也能保護生產者不會造成佇列溢位,能夠有效地解耦生產者和消費者,提高系統的穩定性和吞吐量。
  2. 執行緒池:線上程池中,阻塞佇列可以作為任務緩衝區,將待執行的任務放入佇列中,由執行緒池中的工作執行緒按照一定的策略進行執行。
  3. 同步工具:阻塞佇列還可以作為一種同步工具,在多執行緒環境下實現執行緒之間的共同作業。
  4. 資料緩衝:阻塞佇列可以用作資料緩衝區,當生產者的速度大於消費者的速度時,資料可以先儲存在佇列中,等待消費者處理
  5. 事件驅動程式設計:阻塞佇列可以用於事件驅動的程式設計模型,當事件發生時,將事件物件放入佇列中,由消費者進行處理

六、阻塞佇列的使用

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) {
        // 建立一個容量為10的ArrayBlockingQueue
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
//        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
//        BlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10);
        // 建立生產者執行緒
        Thread producerThread = new Thread(() -> {
            try {
                for (int i = 0; i <= 5; i++) {
                    // 將資料放入佇列
                    queue.put(i);
                    System.out.println(Thread.currentThread().getName() + "Produced: " + i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 建立消費者執行緒
        Thread consumerThread = new Thread(() -> {
            try {
                for (int i = 0; i <= 5; i++) {
                    // 從佇列中取出資料
                    int num = queue.take();
                    System.out.println(Thread.currentThread().getName() + "Consumed: " + num);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 啟動生產者和消費者執行緒
        producerThread.start();
        consumerThread.start();
        // 等待執行緒執行完畢
        try {
            producerThread.join();
            consumerThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

執行輸出:

Thread-0Produced: 0
Thread-1Consumed: 0
Thread-0Produced: 1
Thread-1Consumed: 1
Thread-0Produced: 2
Thread-0Produced: 3
Thread-1Consumed: 2
Thread-0Produced: 4
Thread-0Produced: 5
Thread-1Consumed: 3
Thread-1Consumed: 4
Thread-1Consumed: 5

阻塞佇列的使用比較簡單,這裡是個簡單的使用例子,可設定合適的佇列大小和生產者消費者休眠時間來偵錯阻塞等待和喚醒通知。使用阻塞佇列可解決多執行緒並行存取資料安全問題,也能方便的實現執行緒間的協調工作。

總結

通過了解阻塞佇列的原理和使用場景,我們可以更好地應對多執行緒程式設計中的並行問題,提高程式碼的可維護性和可延伸性。阻塞佇列作為一種常見的並行程式設計工具,能夠幫助我們實現高效的資料傳輸和執行緒共同作業,為我們的應用程式提供更好的效能和可靠性保障。希望本文能夠為讀者對阻塞佇列的理解和應用提供一些幫助。