30行自己寫並行工具類(Semaphore, CyclicBarrier, CountDownLatch)是什麼體驗?

2022-07-23 06:01:06

30行自己寫並行工具類(Semaphore, CyclicBarrier, CountDownLatch)是什麼體驗?

前言

在本篇文章當中首先給大家介紹三個工具Semaphore, CyclicBarrier, CountDownLatch該如何使用,然後仔細剖析這三個工具內部實現的原理,最後會跟大家一起用ReentrantLock實現這三個工具。

並行工具類的使用

CountDownLatch

CountDownLatch最主要的作用是允許一個或多個執行緒等待其他執行緒完成操作。比如我們現在有一個任務,有\(N\)個執行緒會往陣列data[N]當中對應的位置根據不同的任務放入資料,在各個執行緒將資料放入之後,主執行緒需要將這個陣列當中所有的資料進行求和計算,也就是說主執行緒在各個執行緒放入之前需要阻塞住!在這樣的場景下,我們就可以使用CountDownLatch

上面問題的程式碼:

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

    public static int[] data = new int[10];

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(10);

        for (int i = 0; i < 10; i++) {
            int temp = i;
            new Thread(() -> {
                Random random = new Random();
                data[temp] = random.nextInt(100001);
                latch.countDown();
            }).start();
        }

        // 只有函數 latch.countDown() 至少被呼叫10次
        // 主執行緒才不會被阻塞
        // 這個10是在CountDownLatch初始化傳遞的10
        latch.await();
        System.out.println("求和結果為:" + Arrays.stream(data).sum());
    }
}

在上面的程式碼當中,主執行緒通過呼叫latch.await();將自己阻塞住,然後需要等他其他執行緒呼叫方法latch.countDown()只有這個方法被呼叫的次數等於在初始化時給CountDownLatch傳遞的引數時,主執行緒才會被釋放。

CyclicBarrier

CyclicBarrier它要做的事情是,讓一 組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。我們通常也將CyclicBarrier稱作路障

範例程式碼:

public class CycleBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(5);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "開始等待");
                    // 所有執行緒都會呼叫這行程式碼
                    // 在這行程式碼呼叫的執行緒個數不足5
                    // 個的時候所有的執行緒都會阻塞在這裡
                    // 只有到5的時候,這5個執行緒才會被放行
                    // 所以這行程式碼叫做同步點 
                    barrier.await();
                    // 如果有第六個執行緒執行這行程式碼時
                    // 第六個執行緒也會被阻塞 知道第10
                    // 執行緒執行這行程式碼 6-10 這5個執行緒
                    // 才會被放行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "等待完成");
            }).start();
        }
    }
}

我們在初始化CyclicBarrier物件時,傳遞的數位為5,這個數位表示只有5個執行緒到達同步點的時候,那5個執行緒才會同時被放行,而如果到了6個執行緒的話,第一次沒有被放行的執行緒必須等到下一次有5個執行緒到達同步點barrier.await()時,才會放行5個執行緒。

  • 比如剛開始的時候5個執行緒的狀態如下,同步點還沒有5個執行緒到達,因此不會放行。

  • 當有5個執行緒或者更多的執行緒到達同步點barrier.await()的時候,才會放行5個執行緒,注意是5個執行緒,如果有多的執行緒必須等到下一次集合5個執行緒才會進行又一次放行,也就是說每次只放行5個執行緒,這也是它叫做CyclicBarrier(迴圈路障)的原因(因為每次放行5個執行緒,放行完之後重新計數,直到又有5個新的執行緒到來,才再次放行)。

Semaphore

Semaphore號誌)通俗一點的來說就是控制能執行某一段程式碼的執行緒數量,他可以控制程式的並行量!

semaphore.acquire

\(\mathcal{R}\)

semaphore.release

比如在上面的acquirerelease之間的程式碼\(\mathcal{R}\)就是我們需要控制的程式碼,我們可以通過號誌控制在某一個時刻能有多少個執行緒執行程式碼\(\mathcal{R}\)。在號誌內部有一個計數器,在我們初始化的時候設定為\(N\),當有執行緒呼叫acquire函數時,計數器需要減一,呼叫release函數時計數器需要加一,只有當計數器大於0時,執行緒呼叫acquire時才能夠進入程式碼塊\(\mathcal{R}\),否則會被阻塞,只有執行緒呼叫release函數時,被阻塞的執行緒才能被喚醒,被喚醒的時候計數器會減一。

範例程式碼:

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore mySemaphore = new Semaphore(5);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "準備進入臨界區");
                try {
                    mySemaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "已經進入臨界區");
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "準備離開臨界區");
                mySemaphore.release();
                System.out.println(Thread.currentThread().getName() + "已經離開臨界區");
            }).start();
        }
    }
}

自己動手寫並行工具類

在這一小節當中主要使用ReentrantLock實現上面我們提到的三個並行工具類,因此你首先需要了解ReentrantLock這個工具。ReentrantLock中有兩個主要的函數lockunlock,主要用於臨界區的保護,在同一個時刻只能有一個執行緒進入被lockunlock包圍的程式碼塊。除此之外你還需要了解ReentrantLock.newCondition函數,這個函數會返回一個條件變數Condition,這個條件變數有三個主要的函數awaitsignalsignalAll,這三個函數的作用和效果跟Object類的waitnotifynotifyAll一樣,在閱讀下文之前,大家首先需要了解他們的用法。

  • 哪個執行緒呼叫函數condition.await,那個執行緒就會被掛起。
  • 如果執行緒呼叫函數conditon.signal,則會喚醒一個被condition.await函數阻塞的執行緒。
  • 如果執行緒呼叫函數conditon.signalAll,則會喚醒所有被condition.await函數阻塞的執行緒。

CountDownLatch

我們在使用CountDownLatch時,會有執行緒呼叫CountDownLatchawait函數,其他執行緒會呼叫CountDownLatchcountDown函數。在CountDownLatch內部會有一個計數器,計數器的值我們在初始化的時候可以進行設定,執行緒每呼叫一次countDown函數計數器的值就會減一。

  • 如果線上程在呼叫await函數之前,計數器的值已經小於或等於0時,呼叫await函數的執行緒就不會阻塞,直接放行。
  • 如果線上程在呼叫await函數之前,計數器的值大於0時,呼叫await函數的執行緒就會被阻塞,當有其他執行緒將計數器的值降低為0時,那麼這個將計數器降低為0執行緒就需要使用condition.signalAll()函數將其他所有被await阻塞的函數喚醒。
  • 執行緒如果想阻塞自己的話可以使用函數condition.await(),如果某個執行緒在進入臨界區之後達到了喚醒其他執行緒的條件,我們則可以使用函數condition.signalAll()喚醒所有被函數await阻塞的執行緒。

上面的規則已經將CountDownLatch的整體功能描述清楚了,為了能夠將程式碼解釋清楚,我將對應的文字解釋放在了程式碼當中:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyCountDownLatch {
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private int curValue;

    public MyCountDownLatch(int targetValue) {
        // 我們需要有一個變數去儲存計數器的值
        this.curValue = targetValue;
    }

    public void countDown() {
        // curValue 是一個共用變數
        // 我們需要用鎖保護起來
        // 因此每次只有一個執行緒進入 lock 保護
        // 的程式碼區域
        lock.lock();
        try {
            // 每次執行 countDown 計數器都需要減一
            // 而且如果計數器等於0我們需要喚醒哪些被
            // await 函數阻塞的執行緒
            curValue--;
            if (curValue <= 0)
                condition.signalAll();
        }catch (Exception ignored){}
        finally {
            lock.unlock();
        }
    }

    public void await() {
        lock.lock();
        try {
            // 如果 curValue 的值大於0
            // 則說明 countDown 呼叫次數還不夠
            // 需要將執行緒掛起 否則直接放行
            if (curValue > 0)
                // 使用條件變數 condition 將執行緒掛起
                condition.await();
        }catch (Exception ignored){}
        finally {
            lock.unlock();
        }
    }
}

可以使用下面的程式碼測試我們自己寫的CountDownLatch

public static void main(String[] args) throws InterruptedException {
    MyCountDownLatch latch = new MyCountDownLatch(5);
    for (int i = 0; i < 3; i++) {
        new Thread(() -> {
            latch.countDown();
            System.out.println(Thread.currentThread().getName() + "countDown執行完成");
        }).start();
    }

    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                latch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() +  "latch執行完成");
        }).start();
    }
}

CyclicBarrier

CyclicBarrier有一個路障(同步點),所有的執行緒到達路障之後都會被阻塞,當被阻塞的執行緒個數達到指定的數目的時候,就需要對指定數目的執行緒進行放行。

  • CyclicBarrier當中會有一個資料threadCount,表示在路障需要達到這個threadCount個執行緒的時候才進行放行,而且需要放行threadCount個執行緒,這裡我們可以迴圈使用函數condition.signal()去喚醒指定個數的執行緒,從而將他們放行。如果執行緒需要將自己阻塞住,可以使用函數condition.await()
  • CyclicBarrier當中需要有一個變數currentThreadNumber,用於記錄當前被阻塞的執行緒的個數。
  • 使用者還可以給CyclicBarrier傳入一個Runnable物件,當放行的時候需要執行這個Runnable物件,你可以新開一個執行緒去執行這個Runnable物件,或者讓喚醒其他執行緒的這個執行緒執行Runnable物件。

根據上面的CyclicBarrier要求,寫出的程式碼如下(分析和解釋在註釋當中):

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyCyclicBarrier {

    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private int threadCount;
    private int currentThreadNumber;
    private Runnable runnable;

    public MyBarrier(int count) {
        threadCount = count;
    }

    /**
     * 允許傳入一個 runnable 物件
     * 當放行一批執行緒的時候就執行這個 runnable 函數
     * @param count
     * @param runnable
     */
    public MyBarrier(int count, Runnable runnable) {
        this(count);
        this.runnable = runnable;
    }
    
    public void await() {
        lock.lock();
        currentThreadNumber++;
        try {
            // 如果阻塞的執行緒數量不到 threadCount 需要進行阻塞
            // 如果到了需要由這個執行緒喚醒其他執行緒
            if (currentThreadNumber == threadCount) {
                // 放行之後需要重新進行計數
                // 因為放行之後 condition.await();
                // 阻塞的執行緒個數為 0
                currentThreadNumber = 0;
                if (runnable != null) {
                    new Thread(runnable).start();
                }
                // 喚醒 threadCount - 1 個執行緒 因為當前這個執行緒
                // 已經是在執行的狀態 所以只需要喚醒 threadCount - 1
                // 個被阻塞的執行緒
                for (int i = 1; i < threadCount; i++)
                    condition.signal();
            }else {
                // 如果數目還沒有達到則需要阻塞執行緒
                condition.await();
            }
        }catch (Exception ignored){}
        finally {
            lock.unlock();
        }
    }

}

下面是測試我們自己寫的路障的程式碼:

public static void main(String[] args) throws InterruptedException {
    MyCyclicBarrier barrier = new MyCyclicBarrier(5, () -> {
        System.out.println(Thread.currentThread().getName() + "開啟一個新執行緒");
        for (int i = 0; i < 1; i++) {
            System.out.println(i);
        }
    });

    for (int i = 0; i < 5; i++) {
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "進入阻塞");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            barrier.await();
            System.out.println(Thread.currentThread().getName() + "阻塞完成");
        }).start();
    }
}

Semaphore

Semaphore可以控制執行某一段臨界區程式碼的執行緒數量,在Semaphore當中會有兩個計數器semCountcurCount

  • semCount表示可以執行臨界區程式碼的執行緒的個數。
  • curCount表示正在執行臨界區程式碼的執行緒的個數。

這個工具實現起來也並不複雜,具體分析都在註釋當中:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MySemaphore {

    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private int semCount;
    private int curCount;

    public MySemaphore(int semCount) {
        this.semCount = semCount;
    }

    public void acquire() {
        lock.lock();
        try {
            // 正在執行臨界區程式碼的執行緒個數加一
            curCount++;
            // 如果執行緒個數大於指定的能夠執行的執行緒個數
            // 需要將當前這個執行緒阻塞起來
            // 否則直接放行
            if (curCount > semCount) {
                condition.await();
            }
        }catch (Exception ignored) {}
        finally {
            lock.unlock();
        }
    }

    public void release() {
        lock.lock();
        try {
            // 執行緒執行完臨界區的程式碼
            // 將要離開臨界區 因此 curCount 
            // 需要減一
            curCount--;
            // 如果有執行緒阻塞需要喚醒被阻塞的執行緒
            // 如果沒有被阻塞的執行緒 這個函數執行之後
            // 對結果也不會產生影響 因此在這裡不需要進行
            // if 判斷
            condition.signal();
            // signal函數只對在呼叫signal函數之前
            // 被await函數阻塞的執行緒產生影響 如果
            // 某個執行緒呼叫 await 函數在 signal 函數
            // 執行之後,那麼前面那次 signal 函數呼叫
            // 不會影響後面這次 await 函數
        }catch (Exception ignored){}
        finally {
            lock.unlock();
        }
    }
}

使用下面的程式碼測試我們自己寫的MySemaphore

public static void main(String[] args) {
    MySemaphore mySemaphore = new MySemaphore(5);
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            mySemaphore.acquire();
            System.out.println(Thread.currentThread().getName() + "已經進入臨界區");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            mySemaphore.release();
            System.out.println(Thread.currentThread().getName() + "已經離開臨界區");
        }).start();
    }
}

總結

在本文當中主要給大家介紹了三個在並行當中常用的工具類該如何使用,然後介紹了我們自己實現三個工具類的細節,其實主要是利用條件變數實現的,因為它可以實現執行緒的阻塞和喚醒,其實只要大家瞭解條件變數的使用方法,和三種工具的需求大家也可以自己實現一遍。

以上就是本文所有的內容了,希望大家有所收穫,我是LeHung,我們下期再見!!!(記得點贊收藏哦!)


更多精彩內容合集可存取專案:https://github.com/Chang-LeHung/CSCore

關注公眾號:一無是處的研究僧,瞭解更多計算機(Java、Python、計算機系統基礎、演演算法與資料結構)知識。