深入理解並行程式設計同步工具類

2022-10-11 15:01:21

大家好,我是陶朱公Boy。

今天跟大家分享一個並行程式設計領域中的一個知識點——同步工具類。

我將結合一個真實線上案例作為背景來展開講解這一知識點。給大家講清楚什麼是同步工具類、適合的場景、解決了什麼問題、各個實現方案的對比。希望對大家理解同步工具類這個知識點有所幫助。

我們先看一個案例:

需求描述

圖一:邏輯架構圖

有一個線上「臉部辨識」的應用,應用首次啟動要求多執行緒並行將儲存在DB中的人臉資料(512位元的double型別陣列)載入到本地應用快取中,主執行緒需要等待所有子執行緒完成任務後,才能繼續執行餘下的業務邏輯(比如載入dubbo元件)。

拿到這個需求,大家不妨先思考一下,如果讓你來實現,你打算怎麼做?思考點是什麼?

需求分析

讓我們一起來分析一下這個需求:

首先這個需求是應用首次啟動,需要用多執行緒並行執行任務的,充分利用CPU的多核機制,加快整體任務的處理速度。

其次大家先可以看下上述圖一,多執行緒並行執行下,主執行緒需要等待所有子執行緒完成任務後才能繼續執行餘下的業務邏輯。

要實現這個需求,我們就要思考一下看有沒有一種機制能讓主執行緒等待其他子執行緒完成任務後,它再繼續執行它餘下的業務邏輯?

方案實現

★方案一:Thread.join()

什麼是join?

join方法是Thread類內部的一個方法,是一種一個執行緒等待另一個或多個執行緒完成任務的機制。

基本語意:

如果一個執行緒A呼叫了thread.join()方法,那麼當前執行緒A需要等待thread執行緒完成任務後,才能從thread.join()阻塞處返回。

範例程式碼:

 public class JoinDemo {
 ​
   public static void main(String[] args) throws InterruptedException {
 ​
     Thread thread0=new Thread(()->{
       System.out.println(Thread.currentThread().getName()+"==start");
       try {
         Thread.sleep((long) (Math.random() * 10000));
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
       System.out.println(Thread.currentThread().getName()+"==end");
 ​
     });
 ​
     Thread thread1=new Thread(()->{
       System.out.println(Thread.currentThread().getName()+"==start");
       try {
         Thread.sleep((long) (Math.random() * 10000));
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
       System.out.println(Thread.currentThread().getName()+"==end");
 ​
     });
     thread0.start();
     thread1.start();
     thread1.join();
     System.out.println("main 1...");
     thread0.join();
     System.out.println("main 0...");
 ​
     System.out.println("====all finish===");
 ​
 ​
   }
 }

結果列印:

 

原理:

原始碼解析:

從原始碼細節來看(為了方便陳述,我們假設有一個執行緒A呼叫thread.join()),我們說執行緒A持有了thread物件的一把鎖,while迴圈判斷thread執行緒是否存活,如果返回false,表示thread執行緒任務尚未結束,那麼執行緒A就會被掛起,釋放鎖,執行緒狀態進入等待狀態,等待被喚醒。

而喚醒的更多細節是在thread執行緒退出時,底層呼叫exit方法,詳見hotspot關於thread.cpp檔案中JavaThread::exit部分。如下(倒數第二行):

 void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
   assert(this == JavaThread::current(), "thread consistency check");
   ...
   // Notify waiters on thread object. This has to be done after exit() is called
   // on the thread (if the thread is the last thread in a daemon ThreadGroup the
   // group should have the destroyed bit set before waiters are notified).
   ensure_join(this);
   assert(!this->has_pending_exception(), "ensure_join should have cleared");
   ...
 ​
 ​
 static void ensure_join(JavaThread* thread) {
   // We do not need to grap the Threads_lock, since we are operating on ourself.
   Handle threadObj(thread, thread->threadObj());
   assert(threadObj.not_null(), "java thread object must exist");
   ObjectLocker lock(threadObj, thread);
   // Ignore pending exception (ThreadDeath), since we are exiting anyway
   thread->clear_pending_exception();
   // Thread is exiting. So set thread_status field in java.lang.Thread class to TERMINATED.
   java_lang_Thread::set_thread_status(threadObj(), java_lang_Thread::TERMINATED);
   //這裡是清除native執行緒,這個操作會導致isAlive()方法返回false
   java_lang_Thread::set_thread(threadObj(), NULL);
   //喚醒等待在thread物件上的所有執行緒  lock.notify_all(thread);  // Ignore pending exception (ThreadDeath), since we are exiting anyway
   thread->clear_pending_exception();
 }

 

方案二:閉鎖(CountDownLatch)

什麼是閉鎖?

閉鎖是一種同步工具類,可以延遲執行緒進度直到其達到終止狀態。

閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何執行緒能通過,直到到達結束狀態時,這扇門將會永久開啟。
閉鎖用來確保某些任務直到其他任務都完成後才繼續執行。

基本語意:

countDownLatch的建構函式接收一個int型別的引數作為計數器,比如你傳入了引數N,那意思就是需要等待N個點完成。當我們呼叫countDown方法時,這個計數器就會減1,await方法會一直阻塞主執行緒,直到N變0為止。

原理:

 

適用場景:

像應用程式首次啟動,主執行緒需要等待其他子執行緒完成任務後,才能做餘下事情,並且是一次性的。 像作者文章開始處提的這個需求,其實比較適合用CountDownLatch這個方案,主執行緒必須等到子執行緒的任務完成,才能進一步載入其他元件,比如dubbo。

範例程式碼:

 public class CountDownLatchDemo {
     public static void main(String[] args) {
         ExecutorService service = Executors.newFixedThreadPool(3);
         final CountDownLatch latch = new CountDownLatch(3);
         for (int i = 0; i < 3; i++) {
             Runnable runnable = new Runnable() {
                 @Override
                 public void run() {
                     try {
                         System.out.println("子執行緒" + Thread.currentThread().getName() + "開始執行");
                         //睡眠個幾十毫秒
                         Thread.sleep((long) (Math.random() * 10000));
                         System.out.println("子執行緒" + Thread.currentThread().getName() + "執行完成");
                         latch.countDown();//當前執行緒呼叫此方法,則計數減一
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
                 }
             };
             service.execute(runnable);
         }
         try {
             System.out.println("主執行緒" + Thread.currentThread().getName() + "等待子執行緒執行完成...");
             latch.await(5,TimeUnit.SECONDS);//阻塞當前執行緒,直到計數器的值為0
             System.out.println("阻塞完畢!主執行緒" + Thread.currentThread().getName() + "繼續執行業務邏輯...");
             service.shutdownNow();
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
 ​
     }
 }
 結果列印:子執行緒pool-1-thread-1開始執行
 子執行緒pool-1-thread-2開始執行
 子執行緒pool-1-thread-3開始執行
 主執行緒main等待子執行緒執行完成...
 子執行緒pool-1-thread-2執行完成
 子執行緒pool-1-thread-1執行完成
 子執行緒pool-1-thread-3執行完成
 阻塞完畢!主執行緒main繼續執行業務邏輯...

原始碼解析:

 /**
  * 靜態內部類,自定義同步器元件
  */
 private final Sync sync;
  
 /**
  * 只有一個構造方法,接收一個count值
  */
 public CountDownLatch(int count) {
     // count值不能小於0
     if (count < 0) throw new IllegalArgumentException("count < 0");
     // 自定義一個同步元件;通過繼承AQS元件實現;
     this.sync = new Sync(count);
 }
 private static final class Sync extends AbstractQueuedSynchronizer {
     private static final long serialVersionUID = 4982264981922014374L;
  
     Sync(int count) {
         // 使用構造函傳遞的引數值count作為同步狀態值。
         setState(count);
     }
  
     /** 獲取當前的count值 */
     int getCount() {
         return getState();
     }
  
     /**共用式獲取同步狀態<br>
      * 這是AQS的模板方法acquireShared、acquireSharedInterruptibly等方法內部將會呼叫的方法,
      * 由子類實現,這個方法的作用是嘗試獲取一次共用鎖,對於AQS來說,
      * 此方法返回值大於等於0,表示獲取共用鎖成功,反之則獲取共用鎖失敗,
      * 而在這裡,實際上就是判斷count是否等於0,執行緒能否向下執行
      */
     protected int tryAcquireShared(int acquires) {
         // 此處判斷state的值是否為0,也就是判斷count是否為0,
         // 若count為0,返回1,表示獲取鎖成功,此時執行緒將不會阻塞,正常執行
         // 若count不為0,則返回-1,表示獲取鎖失敗,執行緒將會被阻塞
         // 從這裡我們已經可以看出CountDownLatch的實現方式了
         return (getState() == 0) ? 1 : -1;
     }
  
     /**共用式釋放同步狀態<br>
      * 此方法的作用是用來釋放AQS的共用鎖,返回true表示釋放成功,反之則失敗
      * 此方法將會在AQS的模板方法releaseShared中被呼叫,
      * 在CountDownLatch中,這個方法用來減小count值
      */
     protected boolean tryReleaseShared(int releases) {
         // 使用死迴圈不斷嘗試釋放鎖
         for (;;) {
             // 首先獲取當前state的值,也就是count值
             int c = getState();
             /**若count值已經等於0,則不能繼續減小了,於是直接返回false
             /* 為什麼返回的是false,因為等於0表示之前等待的那些執行緒已經被喚醒了,            *若返回true,AQS會嘗試喚醒執行緒,若返回false,則直接結束,所以
             * 在沒有執行緒等待的情況下,返回false直接結束是正確的            */
             if (c == 0)
                 return false;
             // 若count不等於0,則將其-1
             int nextc = c-1;
             // compareAndSetState的作用是將count值從c,修改為新的nextc
             // 此方法基於CAS實現,保證了操作的原子性
             if (compareAndSetState(c, nextc))
                 // 若nextc == 0,則返回的是true,表示已經沒有鎖了,執行緒可以執行了,
                 // 若nextc > 0,則表示執行緒還需要繼續阻塞,此處將返回false
                 return nextc == 0;
         }
     }
 ​
 }

我們看下範例程式碼中關於latch.countDown()方法原始碼部分:

 /**
  * 此方法的作用就是將count的值-1,如果count等於0了,就喚醒等待的執行緒
  */
 public void countDown() {
     // 這裡直接呼叫sync的releaseShared方法,這個方法的實現在AQS中,也是AQS提供的模板方法,
     // 這個方法的作用是當前執行緒釋放鎖,若釋放失敗,返回false,若釋放成功,則返回false,
     // 若鎖被釋放成功,則當前執行緒會喚醒AQS同步佇列中第一個被阻塞的執行緒,讓他嘗試獲取鎖
     // 對於CountDownLatch來說,釋放鎖實際上就是讓count - 1,只有當count被減小為0,
     // 鎖才是真正被釋放,執行緒才能繼續向下執行
     sync.releaseShared(1);
 }
 /**
 * 共用式的釋放同步狀態
 */
 public final boolean releaseShared(int arg) {
     // 呼叫tryReleaseShared嘗試釋放鎖,這個方法已經由Sycn重寫,請回顧上面對此方法的分析
     // 若tryReleaseShared返回true,表示count經過這次釋放後,等於0了,於是執行doReleaseShared
     if (tryReleaseShared(arg)) {
         // 這個方法的作用是喚醒AQS的同步佇列中,正在等待的第一個執行緒
         // 而我們分析acquireSharedInterruptibly方法時已經說過,
         // 若一個執行緒被喚醒,檢測到count == 0,會繼續喚醒下一個等待的執行緒
         // 也就是說,這個方法的作用是,在count == 0時,喚醒所有等待的執行緒
         doReleaseShared();
         return true;
     }
     return false;
 }

接下來我們看下另一個比較重要的方法即await方法部分原始碼:

 // 此方法用來讓當前執行緒阻塞,直到count減小為0才恢復執行
 public void await() throws InterruptedException {
     // 這裡直接呼叫sync的acquireSharedInterruptibly方法,這個方法定義在AQS中
     // 方法的作用是嘗試獲取共用鎖,若獲取失敗,則執行緒將會被加入到AQS的同步佇列中等待
     // 直到獲取成功為止。且這個方法是會響應中斷的,執行緒在阻塞的過程中,若被其他執行緒中斷,
     // 則此方法會通過丟擲異常的方式結束等待。
     sync.acquireSharedInterruptibly(1);
 }
 ​
 /**
 *此方法是AQS中提供的一個模板方法,用以獲取共用鎖,並且會響應中斷 */
 public final void acquireSharedInterruptibly(int arg)
     throws InterruptedException {
     // 首先判斷當前執行緒釋放被中斷,若被中斷,則直接丟擲異常結束
     if (Thread.interrupted())
         throw new InterruptedException();
     
     // 呼叫tryAcquireShared方法嘗試獲取鎖,這個方法被Sycn類重寫了,
     // 若count == 0,則這個方法會返回1,表示獲取鎖成功,則這裡會直接返回,執行緒不會被阻塞;否則返回-1
     // 若count < 0,將會執行下面的doAcquireSharedInterruptibly方法,
     // 此處請去檢視Sync中tryAcquireShared方法的實現
     if (tryAcquireShared(arg) < 0)
         // 下面這個方法的作用是,執行緒獲取鎖失敗,將會加入到AQS的同步佇列中阻塞等待,
         // 直到成功獲取到鎖,而此處成功獲取到鎖的條件就是count == 0,若當前執行緒在等待的過程中,
         // 成功地獲取了鎖,則它會繼續喚醒在它後面等待的執行緒,也嘗試獲取鎖,
         // 這也就是說,只要count == 0了,則被阻塞的執行緒都能恢復執行
         doAcquireSharedInterruptibly(arg);
 ​
 }

從原始碼細節來看,我們知道CountDownLatch底層是繼承了AQS框架,是一個自定義同步元件。

AQS的狀態變數被它當做了一個所謂的計數器實現。主執行緒呼叫await方法後,發現state的值不等於0,進入同步佇列中阻塞等待。子執行緒每次呼叫countDown方法後,計數器減一,直到為0。這時會喚醒處於阻塞狀態的主執行緒,然後主執行緒就會從await方法出返回。

方案三:柵欄(CyclicBarrier)

什麼是柵欄?

CyclicBarrier字面意思是可迴圈(Cyclic)使用的柵欄(Barrier)。它的意思是讓一組執行緒到達一個柵欄時被阻塞,直到最後一個耗時較長的執行緒完成任務後也到達柵欄時,柵欄才會開啟,此時所有被柵欄攔截的執行緒才會繼續執行。

基本語意:

CyclicBarrier有一個預設構造方法:CyclicBarrier(int parties),引數parties表示被柵欄攔截的執行緒數量。

每個執行緒呼叫await()方法告訴柵欄我已經到達柵欄,然後當前執行緒就會被阻塞,直到以下任一情況發生時,當前執行緒從await方法處返回。

  • 最後一個執行緒到達
  • 其他執行緒中斷當前執行緒
  • 其他執行緒等待柵欄超時;通過呼叫await帶超時時間的方法。
    await(long timeout, TimeUnit unit)
  • 其他一些執行緒在此屏障上呼叫重置

原理:

在CyclicBarrier的內部定義了一個Lock物件,每當一個執行緒呼叫await方法時,將攔截的執行緒數減1,然後判斷剩餘攔截數是否為初始值parties,如果不是,進入Lock物件的條件佇列等待。如果是,執行barrierAction物件的Runnable方法,然後將鎖的條件佇列中的所有執行緒放入鎖等待佇列中,這些執行緒會依次的獲取鎖、釋放鎖。

適用場景:

1)實現多人遊戲,直到所有玩家都加入才能開始。

2)經典場景:多執行緒計算資料,然後彙總結算結果場景。(比如一個Excel有多份sheet資料,開啟多執行緒,每個執行緒處理一個sheet,最終將每個sheet的計算結果進行彙總)

範例程式碼:

 public class CyclicBarrierTest2 {
     static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new CalculateResult());
 ​
     public static void main(String[] args) {
         new Thread(() -> {
             try {
                 System.out.println("執行緒A處理完sheet0資料...總計100");
                 cyclicBarrier.await();
             } catch (Exception e) {
 ​
             }
 ​
         }).start();
 ​
         try {
             System.out.println("執行緒B處理完sheet1資料...總計200");
             cyclicBarrier.await();
         } catch (Exception e) {
 ​
         }
     }
 ​
     static class CalculateResult implements Runnable {
 ​
         @Override
         public void run() {
             System.out.println("【匯匯流排程】開始統計各個子執行緒的計算結果...,總計300");
 ​
         }
     }
 }

響應結果列印:

 執行緒B處理完sheet1資料...總計200
 執行緒A處理完sheet0資料...總計100
 【匯匯流排程】開始統計各個子執行緒的計算結果...總計300

 

方案四:號誌(Semaphore)

什麼是號誌?

號誌是用來控制同時存取特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。

基本語意:

從Semaphore的構造方法Semaphore(int permits)來看,入參permits表示可用的許可數量。如果我們在方法內部執行操作前先執行了acquire()方法,那麼當前執行緒就會嘗試去獲取可用的許可,如果獲取不到,就會被阻塞(或者中途被其他執行緒中斷),直到有可用的許可為止。

執行release()方法意味著會釋放許可給Semaphore。此時許可數量就會加一。

使用場景:

Semaphore在有限公共資源場景下,應用比較廣泛,比如資料庫連線池場景。

大家可以想象一下,比如我們平時在用的C3P0、druid等資料庫連線池,因為資料庫連線數是有限制的,面對突如其來的激增流量,一下子把有限的連線數量給佔完了,那沒有獲取到可用的連線的執行緒咋辦?是直接失敗嗎?

我們期望的效果是讓這些沒獲取到連線的執行緒先暫時阻塞一會,而不是立即失敗,這樣一旦有可用的連線,這些被阻塞的執行緒就可以獲取到連線而繼續工作。

範例程式碼:

 public class BoundedHashSet<T> {
 ​
     private final Set<T> set;
 ​
     private final Semaphore sem;
 ​
     public BoundedHashSet(int bound) {
         this.set = Collections.synchronizedSet(new HashSet<T>());
         this.sem = new Semaphore(bound);
     }
 ​
     public boolean add(T o) throws InterruptedException {
         sem.acquire();
 ​
         boolean wasAdded = false;
 ​
         try {
             wasAdded = set.add(o);//如果元素已存在,返回false;否則true
             return wasAdded;
         } catch (Exception e) {
 ​
         } finally {
             if (!wasAdded) {//如果元素已經存在,則釋放許可給號誌
                 sem.release();
             }
         }
         return wasAdded;
     }
 ​
     public boolean remove(Object o) {
         boolean wasRemoved = set.remove(o);
         if (wasRemoved) {
             sem.release();//從容器中移除元素後,需要釋放許可給號誌。
         }
         return wasRemoved;
     }
 ​
 ​
 }

總結

上述需求的實現方案我例舉了join、CountDownLatch、CyclicBarrierSemaphore這幾種。

期間也介紹了每種方案的實現原理、適用場景、原始碼解析。它們語意上有一些相似的地方,但差異性也很明顯,接下來我們詳細對它們進行一下對比。

首先我們說當前執行緒呼叫t.join()儘管能達到當前執行緒等待執行緒t完成任務的業務語意。但細緻的區別是join方法呼叫後必須要等到t執行緒完成它的任務後,當前執行緒才能從阻塞出返回。而CountDownLatch、CyclicBarrier顯然提供了更細粒度的控制。像CountDownLatch只要主執行緒將countDownLatch範例物件傳遞給子執行緒,子執行緒在方法內部某個地方執行latch.countDownLatch(),每呼叫一次計數器就會減1,直到為0,最後主執行緒就能感知到並從await阻塞出返回,不需要等到任務的完成。

其次我們說在當前執行緒方法內部,一旦出現超過2個join方法,整體程式碼就會變的很髒、可讀性降低。反觀JUC分裝的CountDownLatch、CyclicBarrier等元件,通過對共用範例的操作(可以把這個範例傳給子執行緒,然後子執行緒任務執行的時候呼叫相應方法,比如latch.countDown()) 顯得更加清晰、優雅。

最後比較一下CyclicBarrier和CountDownLatch的差異性。比起CountDownLatch顯然CyclicBarrier功能更多,比如支援reset方法。CountDownLatch的計數器只能使用一次,而CyclicBarrier可以多次使用,只要呼叫reset方法即可。(比如CyclicBarrier典型的資料統計場景,因為中途可能部分執行緒統計出錯或外部資料的訂正,可能需要重新再來一次計算,那麼這個時候,CountDownLatch無能為力,而CyclicBarrier只要子執行緒呼叫reset方法即可)。

而Semaphore往往用來針對多執行緒並行存取指定有限資源的場景,比如資料庫連線池場景。

 

寫到最後

如果這篇文章你看了對你有幫助或啟發,麻煩關注、點贊一下作者。你的肯定是作者創作源源不斷的動力。

公眾號

歡迎大家關注我的公眾號:【陶朱公Boy

裡面不僅彙集了硬核的乾貨技術、還彙集了像左耳朵耗子、張朝陽總結的高效學習方法論、職場升遷竅門、軟技能。希望能輔助你到達你的夢想之地!

加群

同時作者還建了一個技術交流群,網際網路寒冬,大家一起抱團取暖!關注公眾號後回覆」加群「,拉你入群。與眾多高手一起切磋、交流。相信肯定會有所收穫!