20.AQS家族的「外門弟子」:CyclicBarrier

2023-06-20 12:01:22

關注王有志,一個分享硬核Java技術的互金摸魚俠
歡迎你加入Java人的提桶跑路群共同富裕的Java人

今天我們來學習AQS家族的「外門弟子」:CyclicBarrier。

為什麼說CyclicBarrier是AQS家族的「外門弟子」呢?那是因為CyclicBarrier自身和內部類Generation並沒有繼承AQS,但在原始碼的實現中卻深度依賴AQS家族的成員ReentrantLock。就像修仙小說中,大家族會區分外門和內門,外門弟子通常會藉助內門弟子的名聲行事,CyclicBarrier正是這樣,因此算是AQS家族的「外門弟子」。在實際的面試中,CyclicBarrier的出現的次數較少,通常會出現在與CountDownLatch比較的問題當中

今天我們就逐步拆解CyclicBarrier,來看看它與CountDownLatch之間到底有什麼差別。

CyclicBarrier是什麼?

先從CyclicBarrier的名字開始入手,Cyclic是形容詞,譯為「迴圈的,週期的」,Barrier是名詞,譯為「屏障,柵欄」,組合起來就是「迴圈的屏障」,那麼該怎麼理解「迴圈的屏障」呢?我們來看CyclicBarrier的註釋是怎麼解釋的:

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
CyclicBarrier是一種同步輔助工具,允許一組執行緒等待彼此到達共同的屏障點。

The barrier is called cyclic because it can be re-used after the waiting threads are released.
因為在等待執行緒釋放後可以重複使用,所以屏障被稱為迴圈屏障。

看起來與CountDownLatch有些相似,我們通過一張圖來展示下CyclicBarrier是怎樣工作的:

部分執行緒到達屏障後,會在屏障處等待,只有全部執行緒都到達屏障後,才會繼續執行。如果以CountDownLatch中越野徒步來舉例的話,把老闆拿掉,選手之間的互相等待,就是CyclicBarrier了。

另外,註釋中說CyclicBarrier是「re-used」,即可重複使用的。回想一下CountDownLatch的實現,並未做任何重置計數器的工作,即當CountDownLatch的計數減為0後不能恢復,也就是說CountDownLatch的功能是一次性的

Tips:實際上,可以用CountDownLatch實現類似於CyclicBarrier的功能。

CyclicBarrier怎麼用?

我們用沒有老闆參加的越野徒步來舉例,部分先到的選手要等待後到的選手一起吃午飯,用CyclicBarrier來實現的程式碼是這樣的:

// 初始化CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

for (int i = 0; i < 10; i++) {
  int finalI = i;
  new Thread(() -> {
    try {
      TimeUnit.SECONDS.sleep((finalI + 1));
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    try {
      System.out.println("選手[" + finalI + "]到達終點,等待其他選手!!!");
      
      // 執行緒在屏障點處等待
      cyclicBarrier.await();
      
      System.out.println("選手[" + finalI + "]開始吃午飯啦!!!");
    } catch (InterruptedException | BrokenBarrierException e) {
      throw new RuntimeException(e);
    }
  }).start();
}

用法和CountDownLatch很相似,建構函式設定CyclicBarrier需要多少個執行緒達到屏障後統一行動,區別是CyclicBarrier在每個執行緒中都呼叫了CyclicBarrier#await,而我們在使用CountDownLatch時只在主執行緒中呼叫了一次CountDownLatch#await

那CountDownLatch可以線上程中呼叫CountDownLatch#await嗎?答案是可以的,這樣使用的效果和CyclicBarrier是一樣的:

CountDownLatch countDownLatch = new CountDownLatch(10);

for (int i = 0; i < 10; i++) {
  int finalI = i;
  new Thread(() -> {
    try {
      TimeUnit.SECONDS.sleep((finalI + 1));
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    System.out.println("選手[" + finalI + "]到達終點!!!");
    countDownLatch.countDown();
    try {
      countDownLatch.await();
      System.out.println("選手[" + finalI + "]開始吃午飯啦!!!");
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }).start();
}

通過上面的例子,我們不難想到CyclicBarrier#await方法是同時具備了CountDownLatch#countDown方法和CountDownLatch#await方法的能力,即執行了計數減1,又執行了暫停執行緒

CyclicBarrier是怎麼實現的?

我們先整體認識一下CyclicBarrier:

CyclicBarrier的內部結構比CountDownLatch複雜一些,除了我們前面提到的藉助AQS的「內門弟子」ReentrantLock型別的lock和Condition型別的trip外,CyclicBarrier還有兩個「特別」的地方:

  • 內部類Generation,直譯過來是「代」,它起到什麼作用?

  • Runnable型別的成員變數barrierCommand,它又做了些什麼?

其餘的部分,大部分可以在CountDownLatch中找到對應的方法,或者通過名稱我們就很容易得知它們的作用。

CyclicBarrier的構造方法

CyclicBarrier提供了兩個(實際是一個)構造方法:

// 需要到達屏障的執行緒數
private final int parties;

// 所有執行緒都到達後執行的動作
private final Runnable barrierCommand;

// 計數器
private int count;

public CyclicBarrier(int parties) {
  this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) {
    throw new IllegalArgumentException();
  }
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}

第二個建構函式接收了兩個引數:

  • parties:表示需要多少個執行緒到達屏障處呼叫CyclicBarrier#await

  • barrierAction:所有執行緒到達屏障後執行的動作。

構造方法的程式碼一如既往的簡單,只有一處比較容易產生疑惑,partiescount有什麼區別?

首先來看成員變數的宣告,parties使用了final,表明它是不可變的物件,代表CyclicBarrier需要幾個執行緒共同到達屏障處;而count是計數器,初始值是parties,隨著到達屏障處的執行緒數量增多count會逐步減少至0。

CyclicBarrier的內部類Generation

private static class Generation {
  Generation() {}  
  
  boolean broken;
}

Generation用於標記CyclicBarrier的當前代,Doug Lea是這麼解釋它的作用的:

Each use of the barrier is represented as a generation instance. The generation changes whenever the barrier is tripped, or is reset.
每次使用屏障(CyclicBarrier)都需要一個Generation範例。無論是通過屏障還是重置屏障,Generation都會發生改變。

Generation中的broken用於標記當前的CyclicBarrier是否被打破,預設為false,值為true時表示當前CyclicBarrier已經被打破,此時CyclicBarrier不能正常使用,需要呼叫CyclicBarrier#reset方法重置CyclicBarrier的狀態。

CyclicBarrier#await方法

前面我們猜測CyclicBarrier#await方法即實現了計數減1,又實現了執行緒等待的功能,下面我們就通過原始碼來驗證我們的想法:

public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}

public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  return dowait(true, unit.toNanos(timeout));
}

兩個過載方法都指向了CyclicBarrier#dowait方法:

private int dowait(boolean timed, long nanos)  throws InterruptedException, BrokenBarrierException, TimeoutException {
  // 使用ReentrantLock
  final ReentrantLock lock = this.lock;
  lock.lock();
  
  try {
    // 第2部分
    // 獲取CyclicBarrier的當前代,並檢查CyclicBarrier是否被打破
    final Generation g = generation;
    if (g.broken) {
      throw new BrokenBarrierException();
    }
    
    // 執行緒被中斷時,呼叫breakBarrier方法
    if (Thread.interrupted()) {
      breakBarrier();
      throw new InterruptedException();
    }
    
    // 第3部分
    //計數器減1
    int index = --count;
    // 計數器為0時表示所有執行緒都到達了,此時要做的就是喚醒等待中的執行緒
    if (index == 0) {
      boolean ranAction = false;
      try {
        // 執行喚醒前的操作
        final Runnable command = barrierCommand;
        if (command != null) {
          command.run();
        }
        ranAction = true;
        // CyclicBarrier進入下一代
        nextGeneration();
        return 0;
      } finally {
        if (!ranAction) {
          breakBarrier();
        }
      }
    }
    
    // 第4部分
    // 只有部分執行緒到達屏障處的情況
    for (;;) {
      try {
        //呼叫等待邏輯)
        if (!timed) {
          trip.await();
        } else if (nanos > 0L) {
          nanos = trip.awaitNanos(nanos);
        }
      } catch (InterruptedException ie) {
        // 執行緒被中斷時,呼叫breakBarrier方法
        if (g == generation && ! g.broken) {
          breakBarrier();
          throw ie;
        } else {
          Thread.currentThread().interrupt();
        }
      }
      if (g.broken) {
        throw new BrokenBarrierException();
      }
      // 如果不是當前代,返回計數器的值
      if (g != generation) {
        return index;
      }
      // 如果等待超時,呼叫breakBarrier方法
      if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
      }
    }
  } finally {
    lock.unlock();
  }
}

CyclicBarrier#dowait方法看起來很長,但如果拆成3部分來看邏輯並不複雜:

  • 第1部分:CyclicBarrier與執行緒的狀態校驗;

  • 第2部分:當計數器減1後值為0時,喚醒所有等待中的執行緒;

  • 第3部分:當計數器減1後值不為0時,執行緒進入等待狀態。

先來看第1部分,CyclicBarrier與執行緒的狀態校驗的部分,先是判斷CyclicBarrier是否被打破,接著判斷當前執行緒是否為中斷狀態,如果是則呼叫CyclicBarrier#breakBarrier方法:

private void breakBarrier() {
  generation.broken = true;
  count = parties;
  trip.signalAll();
}

CyclicBarrier#breakBarrier方法非常簡單,只做了3件事:

  • 標記CyclicBarrier被打破;

  • 重置CyclicBarrier的計數器;

  • 喚醒全部等待中的執行緒。

也就是說,一旦有個執行緒標記為中斷狀態,都會直接打破CyclicBarrier的屏障。

我們先跳過第2部分的喚醒邏輯,直接來看第3部分執行緒進入等待狀態的邏輯。根據timed引數選擇呼叫Condition不同的等待方法,隨後是對異常的處理和執行緒中斷狀態的處理,同樣是呼叫CyclicBarrier#breakBarrier,標記CyclicBarrier不可用。執行緒進入等待狀態的邏輯並不複雜,本質上是通過AQS的Condition來實現的。

最後來看第2部分喚醒所有等待中執行緒的操作,根據計數器是否為0判斷是否需要進行喚醒。如果需要喚醒,最後一個執行CyclicBarrier#await的執行緒執行barrierCommand(此時尚未執行任何執行緒喚醒的操作),做通過屏障前的處理操作,接著呼叫CyclicBarrier#nextGeneration方法:

private void nextGeneration() {
  trip.signalAll();
  count = parties;
  generation = new Generation();
}

CyclicBarrier#nextGeneration方法也做了3件事:

  • 喚醒所有Condition上等待的執行緒;

  • 重置CyclicBarrier的計數器;

  • 建立新的Generation物件。

很符合進入「下一代」的名字,先喚醒「上一代」所有等待中的執行緒,然後重置CyclicBarrier的計數器,最後更新CyclicBarrier的Generation物件,對CyclicBarrier進行重置工作,讓CyclicBarrier進入下一個紀元。

到這裡我們不難發現,CyclicBarrier自身只做了維護計數器和重置計數器的工作,而保證互斥性和執行緒的等待與喚醒則是依賴AQS家族的成員完成的:

  • ReentrantLock保證了同一時間只有一個執行緒可以執行CyclicBarrier#await,即同一時間只有一個執行緒可以維護計數器;

  • Condition為CyclicBarrier提供了條件等待佇列,完成了執行緒的等待與喚醒的工作。

CyclicBarrier#reset方法

最後我們來看CyclicBarrier#reset方法:

public void reset() {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    // 主動打破CyclicBarrier
    breakBarrier();
    // 使CyclicBarrier進入下一代
    nextGeneration();
  } finally {
    lock.unlock();
  }
}

CyclicBarrier#reset方法都是老面孔,先是CyclicBarrier#breakBarrier打破上一代CyclicBarrier,既然要重新開始就不要再「懷念」過去了;最後呼叫CyclicBarrier#nextGeneration開始新的時代。需要注意的是,這裡加鎖的目的是為了保證執行CyclicBarrier#reset時,沒有任何執行緒正在執行CyclicBarrier#await方法。

好了,到這裡CyclicBarrier的核心內容我們就一起分析完了,剩下的方法就非常簡單了,相信通過名字大家就可以瞭解它們的作用,並猜到它們的實現了。

TipsCyclicBarrier#getNumberWaiting中加了鎖,這是為什麼?

CountDownLatch和Cyclicbarrier有什麼區別?

最後的部分,我們來解答下開篇時的面試題,CountDownLatch和Cyclicbarrier有什麼區別?

第1點:CyclicBarrier可以重複使用,CountDownLatch不能重複使用

無論是正常使用結束,還是呼叫CyclicBarrier#reset方法,Cyclicbarrier都可以重置內部的計數器

第2點:Cyclicbarrier只阻塞呼叫CyclicBarrier#await方法的執行緒,而CountDownLatch可以阻塞任意一個或多個執行緒

CountDownLatch將計數減1與阻塞拆分成了CountDownLatch#countDownCountDownLatch#await兩個方法,而Cyclicbarrier只通過CyclicBarrier#await完成兩步操作。如果在同一個執行緒中連續CountDownLatch#countDownCountDownLatch#await則實現了與CyclicBarrier#await方法相同的功能。

結語

好了,今天就到這裡結束了。如果本文對你有幫助的話,請多多點贊支援。最後歡迎大家關注分享硬核技術的金融摸魚俠王有志,我們下次再見!