Java 多執行緒:鎖(三)

2022-09-11 18:01:50

Java 多執行緒:鎖(三)

作者:Grey

原文地址:

部落格園:Java 多執行緒:鎖(三)

CSDN:Java 多執行緒:鎖(三)

StampedLock

StampedLock其實是對讀寫鎖的一種改進,它支援在讀同時進行一個寫操作,也就是說,它的效能將會比讀寫鎖更快。

更通俗的講就是在讀鎖沒有釋放的時候是可以獲取到一個寫鎖,獲取到寫鎖之後,讀鎖阻塞,這一點和讀寫鎖一致,唯一的區別在於讀寫鎖不支援在沒有釋放讀鎖的時候獲取寫鎖

StampedLock 有三種模式:

  • 悲觀讀:允許多個執行緒獲取悲觀讀鎖。

  • 寫鎖:寫鎖和悲觀讀是互斥的。

  • 樂觀讀:無鎖機制,類似於資料庫中的樂觀鎖,它支援在不釋放樂觀讀的時候是可以獲取到一個寫鎖。

參考: 有沒有比讀寫鎖更快的鎖?

範例程式碼:

悲觀讀 + 寫鎖:

package git.snippets.juc;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.StampedLock;
import java.util.logging.Logger;

// 悲觀讀 + 寫鎖
public class StampedLockPessimistic {
    private static final Logger log = Logger.getLogger(StampedLockPessimistic.class.getName());
    private static final StampedLock lock = new StampedLock();
    //快取中儲存的資料
    private static final Map<String, String> mapCache = new HashMap<>();
    //模擬資料庫儲存的資料
    private static final Map<String, String> mapDb = new HashMap<>();

    static {
        mapDb.put("zhangsan", "你好,我是張三");
        mapDb.put("sili", "你好,我是李四");
    }

    private static void getInfo(String name) {
        //獲取悲觀讀
        long stamp = lock.readLock();
        log.info("執行緒名:" + Thread.currentThread().getName() + " 獲取了悲觀讀鎖" + "    使用者名稱:" + name);
        try {
            if ("zhangsan".equals(name)) {
                log.info("執行緒名:" + Thread.currentThread().getName() + " 休眠中" + "    使用者名稱:" + name);
                Thread.sleep(3000);
                log.info("執行緒名:" + Thread.currentThread().getName() + " 休眠結束" + "    使用者名稱:" + name);
            }
            String info = mapCache.get(name);
            if (null != info) {
                log.info("在快取中獲取到了資料");
                return;
            }
        } catch (InterruptedException e) {
            log.info("執行緒名:" + Thread.currentThread().getName() + " 釋放了悲觀讀鎖");
            e.printStackTrace();
        } finally {
            //釋放悲觀讀
            lock.unlock(stamp);
        }

        //獲取寫鎖
        stamp = lock.writeLock();
        log.info("執行緒名:" + Thread.currentThread().getName() + " 獲取了寫鎖" + "    使用者名稱:" + name);
        try {
            //判斷一下快取中是否被插入了資料
            String info = mapCache.get(name);
            if (null != info) {
                log.info("獲取到了寫鎖,再次確認在快取中獲取到了資料");
                return;
            }
            //這裡是往資料庫獲取資料
            String infoByDb = mapDb.get(name);
            //將資料插入快取
            mapCache.put(name, infoByDb);
            log.info("快取中沒有資料,在資料庫獲取到了資料");
        } finally {
            //釋放寫鎖
            log.info("執行緒名:" + Thread.currentThread().getName() + " 釋放了寫鎖" + "     使用者名稱:" + name);
            lock.unlock(stamp);
        }
    }

    public static void main(String[] args) {
//執行緒1
        Thread t1 = new Thread(() -> {
            getInfo("zhangsan");
        });

        //執行緒2
        Thread t2 = new Thread(() -> {
            getInfo("lisi");
        });

        //執行緒啟動
        t1.start();
        t2.start();

        //執行緒同步
        try {
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

樂觀讀:

package git.snippets.juc;

import java.util.concurrent.locks.StampedLock;
import java.util.logging.Logger;

// 樂觀寫
public class StampedLockOptimistic {
    private static final Logger log = Logger.getLogger(StampedLockOptimistic.class.getName());
    private static final StampedLock lock = new StampedLock();
    private static int num1 = 1;
    private static int num2 = 1;

    /**
     * 修改成員變數的值,+1
     *
     * @return
     */
    private static int sum() {
        log.info("求和方法被執行了");
        //獲取樂觀讀
        long stamp = lock.tryOptimisticRead();
        int cnum1 = num1;
        int cnum2 = num2;
        log.info("獲取到的成員變數值,cnum1:" + cnum1 + "   cnum2:" + cnum2);
        try {
            //休眠3秒,目的是為了讓其他執行緒修改掉成員變數的值。
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //判斷在執行期間是否存在寫操作   true:不存在   false:存在
        if (!lock.validate(stamp)) {
            log.info("存在寫操作!");
            //存在寫鎖
            //升級悲觀讀鎖
            stamp = lock.readLock();
            try {
                log.info("升級悲觀讀鎖");
                cnum1 = num1;
                cnum2 = num2;
                log.info("重新獲取了成員變數的值=========== cnum1=" + cnum1 + "    cnum2=" + cnum2);
            } finally {
                //釋放悲觀讀鎖
                lock.unlock(stamp);
            }
        }
        return cnum1 + cnum2;
    }

    //使用寫鎖修改成員變數的值
    private static void updateNum() {
        long stamp = lock.writeLock();
        try {
            num1 = 2;
            num2 = 2;
        } finally {
            lock.unlock(stamp);
        }
    }


    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            int sum = sum();
            log.info("求和結果:" + sum);
        });
        t1.start();
        //休眠1秒,目的為了讓執行緒t1能執行到獲取成員變數之後
        Thread.sleep(1000);
        updateNum();
        t1.join();
        log.info("執行完畢");

    }

}

使用 StampedLock 的注意事項

  1. 看名字就能看出來StampedLock不支援重入鎖。

  2. 它適用於讀多寫少的情況,如果不是這種情況,請慎用,效能可能還不如synchronized

  3. StampedLock的悲觀讀鎖、寫鎖不支援條件變數。

  4. 千萬不能中斷阻塞的悲觀讀鎖或寫鎖,如果呼叫阻塞執行緒的interrupt(),會導致cpu飆升,如果希望StampedLock支援中斷操作,請使用readLockInterruptibly(悲觀讀鎖)與writeLockInterruptibly(寫鎖)。

CountDownLatch

類似門閂的概念,可以替代join,但是比join靈活,因為一個執行緒裡面可以多次countDown,但是join一定要等執行緒完成才能執行。

其底層原理是:呼叫await()方法的執行緒會利用AQS排隊,一旦數位減為0,則會將AQS中排隊的執行緒依次喚醒。

程式碼如下:

package git.snippets.juc;

import java.util.concurrent.CountDownLatch;

/**
 * CountDownLatch可以用Join替代
 */
public class CountDownLatchAndJoin {
    public static void main(String[] args) {
        useCountDownLatch();
        useJoin();
    }

    public static void useCountDownLatch() {
        // use countdownlatch
        long start = System.currentTimeMillis();
        Thread[] threads = new Thread[100000];
        CountDownLatch latch = new CountDownLatch(threads.length);

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                int result = 0;
                for (int i1 = 0; i1 < 1000; i1++) {
                    result += i1;
                }
                // System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result);
                latch.countDown();
            });
        }
        for (Thread thread : threads) {
            thread.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();

        System.out.println("end latch down, time is " + (end - start));

    }

    public static void useJoin() {
        long start = System.currentTimeMillis();

        // use join
        Thread[] threads = new Thread[100000];

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                int result = 0;
                for (int i1 = 0; i1 < 1000; i1++) {
                    result += i1;
                }
                // System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result);
            });
        }
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long end = System.currentTimeMillis();

        System.out.println("end join, time is " + (end - start));
    }
}

CyclicBarrier

類似柵欄,類比:滿了20個乘客就發車 這樣的場景。

比如:一個程式可能收集如下來源的資料:

  1. 資料庫

  2. 網路

  3. 檔案

程式可以並行執行,用執行緒操作1,2,3,然後操作完畢後再合併, 然後執行後續的邏輯操作,就可以使用CyclicBarrier

程式碼如下:

package git.snippets.juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * CyclicBarrier範例:滿員發車
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @since 1.8
 */
public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(20, () -> {
            System.out.println("滿了20,發車");
        });
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore

表示號誌,有如下兩個操作:

s.acquire() 號誌減1

s.release()號誌加1

到 0 以後,就不能執行了,這個可以用於限流

底層原理是:如果沒有執行緒許可可用,則執行緒阻塞,並通過 AQS 來排隊,可以通過release()方法來釋放許可,當某個執行緒釋放了某個許可後,會從 AQS 中正在排隊的第一個執行緒依次開始喚醒,直到沒有空閒許可。

Semaphore 使用範例:有N個執行緒來存取,我需要限制同時執行的只有號誌大小的執行緒數。

程式碼如下:

package git.snippets.juc;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * Semaphore用於限流
 */
public class SemaphoreUsage {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(1);
        new Thread(() -> {
            try {
                semaphore.acquire();
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 1 executed");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }).start();

        new Thread(() -> {
            try {
                semaphore.acquire();
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 2 executed");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }).start();
    }
}

Semaphore可以有公平非公平的方式進行設定。

SemaphoreCountDownLatch的區別?

Semaphore 是號誌,可以做限流,限制 n 個執行緒並行,釋放一個執行緒後就又能進來一個新的執行緒。

CountDownLatch 是閉鎖,帶有阻塞的功能,必須等到 n 個執行緒都執行完後,被阻塞的執行緒才能繼續往下執行。

Guava RateLimiter

採用令牌桶演演算法,用於限流

範例程式碼如下

package git.snippets.juc;

import com.google.common.util.concurrent.RateLimiter;
import java.util.List;
import java.util.concurrent.Executor;

/**
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/21
 * @since
 */
public class RateLimiterUsage {
    //每秒只發出2個令牌
    static final RateLimiter rateLimiter = RateLimiter.create(2.0);
    static void submitTasks(List<Runnable> tasks, Executor executor) {
        for (Runnable task : tasks) {
            rateLimiter.acquire(); // 也許需要等待
            executor.execute(task);
        }
    }
}

注:上述程式碼需要引入 Guava 包

Phaser(Since jdk1.7)

遺傳演演算法,可以用這個結婚的場景模擬: 假設婚禮的賓客有 5 個人,加上新郎和新娘,一共 7 個人。 我們可以把這 7 個人看成 7 個執行緒,有如下步驟要執行。

  1. 到達婚禮現場

  2. 吃飯

  3. 離開

  4. 擁抱(只有新郎和新娘執行緒可以執行)

每個階段執行完畢後才能執行下一個階段,其中擁抱階段只有新郎新娘這兩個執行緒才能執行。

以上需求,我們可以通過 Phaser 來實現,具體程式碼和註釋如下:

package git.snippets.juc;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class PhaserUsage {
    static final Random R = new Random();
    static WeddingPhaser phaser = new WeddingPhaser();

    static void millSleep() {
        try {
            TimeUnit.MILLISECONDS.sleep(R.nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        // 賓客的人數
        final int guestNum = 5;
        // 新郎和新娘
        final int mainNum = 2;
        phaser.bulkRegister(mainNum + guestNum);
        for (int i = 0; i < guestNum; i++) {
            new Thread(new Person("賓客" + i)).start();
        }
        new Thread(new Person("新娘")).start();
        new Thread(new Person("新郎")).start();
    }

    static class WeddingPhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println("所有人到齊");
                    return false;
                case 1:
                    System.out.println("所有人吃飯");
                    return false;
                case 2:
                    System.out.println("所有人離開");
                    return false;
                case 3:
                    System.out.println("新郎新娘擁抱");
                    return true;
                default:
                    return true;
            }
        }
    }

    static class Person implements Runnable {
        String name;

        Person(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            // 先到達婚禮現場
            arrive();
            // 吃飯
            eat();
            // 離開
            leave();
            // 擁抱,只保留新郎和新娘兩個執行緒可以執行
            hug();
        }

        private void arrive() {
            millSleep();
            System.out.println("name:" + name + " 到來");
            phaser.arriveAndAwaitAdvance();
        }

        private void eat() {
            millSleep();
            System.out.println("name:" + name + " 吃飯");
            phaser.arriveAndAwaitAdvance();
        }

        private void leave() {
            millSleep();
            System.out.println("name:" + name + " 離開");
            phaser.arriveAndAwaitAdvance();
        }

        private void hug() {
            if ("新娘".equals(name) || "新郎".equals(name)) {
                millSleep();
                System.out.println("新娘新郎擁抱");
                phaser.arriveAndAwaitAdvance();
            } else {
                phaser.arriveAndDeregister();
            }
        }
    }
}

Exchanger

用於執行緒之間交換資料,exchange()方法是阻塞的,所以要兩個exchange行為都執行到才會觸發交換。

package git.snippets.juc;

import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;

/**
 * Exchanger用於兩個執行緒之間交換變數
 */
public class ExchangerUsage {
    static Exchanger<String> semaphore = new Exchanger<>();

    public static void main(String[] args) {

        new Thread(() -> {
            String s = "T1";
            try {
                s = semaphore.exchange(s);
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 1(T1) executed, Result is " + s);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            String s = "T2";
            try {
                s = semaphore.exchange(s);
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 2(T2) executed, Result is " + s);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

LockSupport

其他鎖的底層用的是AQS

原先讓執行緒等待需要wait/await,現在僅需要LockSupport.park()

原先叫醒執行緒需要notify/notifyAll,現在僅需要LockSupport.unpark(), LockSupport.unpark()還可以叫醒指定執行緒,

範例程式碼:

package git.snippets.juc;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

/**
 * 阻塞指定執行緒,喚醒指定執行緒
 */
public class LockSupportUsage {
    public static void main(String[] args) {
        Thread t = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    if (i == 5) {
                        LockSupport.park();
                    }
                    if (i == 8) {
                        LockSupport.park();
                    }
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t.start();
        // unpark可以先於park呼叫
        //LockSupport.unpark(t);
        try {
            TimeUnit.SECONDS.sleep(8);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        LockSupport.unpark(t);
        System.out.println("after 8 seconds");
    }
}

實現一個監控元素的容器

實現一個容器,提供兩個方法

// 向容器中增加一個元素
void add(T t);
// 返回容器大小
int size();

有兩個執行緒,執行緒1新增10個元素到容器中,執行緒2實現監控元素的個數,當個數到5個時,執行緒2給出提示並結束

方法 1. 使用wait + notify實現

方法 2. 使用CountDownLatch實現

方法 3. 使用LockSupport實現

程式碼如下:

package git.snippets.juc;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

// 實現一個容器,提供兩個方法,add,size,有兩個執行緒,
// 執行緒1新增10個元素到容器中,
// 執行緒2實現監控元素的個數,
// 當個數到5個時,執行緒2給出提示並結束
public class MonitorContainer {

    public static void main(String[] args) {

        useLockSupport();
        // useCountDownLatch();
        // useNotifyAndWait();
    }


    /**
     * 使用LockSupport
     */
    private static void useLockSupport() {
        System.out.println("use LockSupport...");
        Thread adder;
        List<Object> list = Collections.synchronizedList(new ArrayList<>());
        Thread finalMonitor = new Thread(() -> {
            LockSupport.park();
            if (match(list)) {
                System.out.println("filled 5 elements size is " + list.size());
                LockSupport.unpark(null);
            }
        });
        adder = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                increment(list);
                if (match(list)) {
                    LockSupport.unpark(finalMonitor);
                }
            }
        });
        adder.start();
        finalMonitor.start();
    }

    /**
     * 使用CountDownLatch
     */
    private static void useCountDownLatch() {
        System.out.println("use CountDownLatch...");
        List<Object> list = Collections.synchronizedList(new ArrayList<>());
        CountDownLatch latch = new CountDownLatch(5);
        Thread adder = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                increment(list);
                if (i <= 4) {
                    latch.countDown();
                }
            }
        });
        Thread monitor = new Thread(() -> {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (match(list)) {
                System.out.println("filled 5 elements");
            }
        });
        adder.start();
        monitor.start();
    }

    /**
     * notify + wait 實現
     */
    private static void useNotifyAndWait() {
        System.out.println("use notify and wait...");
        List<Object> list = Collections.synchronizedList(new ArrayList<>());
        final Object o = new Object();
        Thread adder = new Thread(() -> {
            synchronized (o) {
                for (int i = 0; i < 10; i++) {
                    increment(list);
                    if (match(list)) {
                        o.notify();
                        try {
                            o.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                System.out.println("add finished");
                o.notify();
            }
        });
        Thread monitor = new Thread(() -> {
            synchronized (o) {
                if (match(list)) {
                    System.out.println("5 elements added " + list.size());
                    o.notify();
                    try {
                        o.wait();
                        System.out.println("monitor finished");
                        o.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            }
        });
        adder.start();
        monitor.start();
    }

    /**
     * 只要是5的倍數,就回圈列印
     */
    private static void useNotifyAndWaitLoop() {
        List<Object> list = Collections.synchronizedList(new ArrayList<>());
        final Object o = new Object();
        Thread adder = new Thread(() -> {
            synchronized (o) {
                for (; ; ) {
                    increment(list);
                    if (match(list)) {
                        o.notify();
                        try {
                            o.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        });
        Thread monitor = new Thread(() -> {
            synchronized (o) {
                while (true) {
                    if (match(list)) {
                        System.out.println("filled 5 elements");
                    }
                    o.notify();
                    try {
                        o.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        adder.start();
        monitor.start();
    }

    private static void increment(List<Object> list) {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        list.add(new Object());
        System.out.println("list add the ele, size is " + list.size());
    }

    private static boolean match(List<Object> list) {
        return list.size() % 5 == 0;
    }
}

生產者消費者問題

寫一個固定容量的同步容器,擁有putget方法,以及getCount方法,能夠支援 2 個生產者執行緒以及 10 個消費者執行緒的阻塞呼叫。

方法 1. 使用wait/notifyAll

方法 2. ReentrantLockCondition,本質就是等待佇列

package git.snippets.juc;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

// 寫一個固定容量的同步容器,擁有put和get方法,以及getCount方法,能夠支援2個生產者執行緒以及10個消費者執行緒的阻塞呼叫。
public class ProducerAndConsumer {
    public static void main(String[] args) {
        // MyContainerByCondition container = new MyContainerByCondition(100);
        MyContainerByNotifyAndWait container = new MyContainerByNotifyAndWait(100);
        for (int i = 0; i < 25; i++) {
            new Thread(container::get).start();
        }
        for (int i = 0; i < 20; i++) {
            new Thread(() -> container.put(new Object())).start();
        }
    }
}

// 使用ReentrantLock的Condition
class MyContainerByCondition {
    static ReentrantLock lock = new ReentrantLock();
    final int MAX;
    private final LinkedList<Object> list = new LinkedList<>();
    Condition consumer = lock.newCondition();
    Condition producer = lock.newCondition();

    public MyContainerByCondition(int limit) {
        this.MAX = limit;
    }

    public void put(Object object) {
        lock.lock();
        try {
            while (getCount() == MAX) {
                System.out.println("container is full");
                try {
                    producer.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(object);
            consumer.signalAll();
            System.out.println("contain add a object, current size " + getCount());

        } finally {
            lock.unlock();
        }

    }

    public Object get() {
        lock.lock();
        try {
            while (getCount() == 0) {
                try {
                    System.out.println("container is empty");
                    consumer.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Object object = list.removeFirst();
            producer.signalAll();

            System.out.println("contain get a object, current size " + getCount());
            return object;
        } finally {
            lock.unlock();
        }

    }

    public synchronized int getCount() {
        return list.size();
    }
}

// 使用synchronized的wait和notifyAll
class MyContainerByNotifyAndWait {
    LinkedList<Object> list = null;
    final int limit;

    MyContainerByNotifyAndWait(int limit) {
        this.limit = limit;
        list = new LinkedList<>();
    }

    synchronized int getCount() {
        return list.size();
    }

    // index 從0開始計數
    synchronized Object get() {
        while (list.size() == 0) {
            System.out.println("container is empty");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Object o = list.removeFirst();

        System.out.println("get a data");
        this.notifyAll();
        return o;
    }

    synchronized void put(Object data) {
        while (list.size() > limit) {
            System.out.println("container is full , do not add any more");
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.add(data);

        System.out.println("add a data");
        this.notifyAll();
    }
}

說明

本文涉及到的所有程式碼和圖例

圖例

程式碼

更多內容見:Java 多執行緒

參考資料

實戰Java高並行程式設計(第2版)

深入淺出Java多執行緒

多執行緒與高並行-馬士兵

Java並行程式設計實戰

Java中的共用鎖和排他鎖(以讀寫鎖ReentrantReadWriteLock為例)

【並行程式設計】面試官:有沒有比讀寫鎖更快的鎖?

圖解Java多執行緒設計模式