Java 多執行緒:鎖(二)

2022-09-11 09:00:12

Java 多執行緒:鎖(二)

作者:Grey

原文地址:

部落格園:Java 多執行緒:鎖(二)

CSDN:Java 多執行緒:鎖(二)

AtomicLong VS LongAddr VS Synchronized

需要實際測試一下。

範例程式碼見:

package git.snippets.juc;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
 * 對比AddByAdder, AddByAtomic, AddBySync幾個程式,在資料量比較大的情況下,AddByAdder的效率最高
 */
public class AddWays {
    public static void main(String[] args) {
        addBySync();
        addByAtomicLong();
        addByLongAdder();
    }

    // 使用AtomicLong
    public static void addByAtomicLong() {
        AtomicLong count = new AtomicLong(0);
        Thread[] all = new Thread[1000];
        AddWays t = new AddWays();
        for (int i = 0; i < all.length; i++) {
            all[i] = new Thread(() -> {
                for (int j = 0; j < 1000000; j++) {
                    count.incrementAndGet();
                }
            });
        }
        long start = System.currentTimeMillis();
        for (Thread thread : all) {
            thread.start();
        }
        for (Thread thread : all) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("result is " + count.get() + " time is " + (end - start) + "ms (by AtomicLong)");

    }

    // 使用LongAdder
    public static void addByLongAdder() {
        Thread[] all = new Thread[1000];
        LongAdder count = new LongAdder();
        for (int i = 0; i < all.length; i++) {
            all[i] = new Thread(() -> {
                for (int j = 0; j < 1000000; j++) {
                    count.increment();
                }
            });
        }
        long start = System.currentTimeMillis();
        for (Thread thread : all) {
            thread.start();
        }
        for (Thread thread : all) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("result is " + count + " time is " + (end - start) + "ms (by LongAdder)");

    }

    static long count = 0;

    public static void addBySync() {


        Thread[] all = new Thread[1000];
        Object o = new Object();
        for (int i = 0; i < all.length; i++) {
            all[i] = new Thread(() -> {
                for (int j = 0; j < 1000000; j++) {
                    synchronized (o) {
                        count++;
                    }
                }
            });
        }
        long start = System.currentTimeMillis();
        for (Thread thread : all) {
            thread.start();
        }
        for (Thread thread : all) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("result is " + count + " time is " + (end - start) + "ms (by synchronized)");

    }
}

Java SE 11 下,執行得到的執行結果是:

result is 1000000000 time is 10035ms (by synchronized)
result is 1000000000 time is 15818ms (by AtomicLong)
result is 1000000000 time is 963ms (by LongAdder)

可以看到,在巨量資料量的情況下,LongAdder 的效率最高。關於 LongAdder 的一些說明,參考如下兩篇部落格:

ReentrantLock

其中「ReentrantReadWriteLock」,「讀鎖的插隊策略」,"鎖的升降級" 部分參考瞭如下檔案中的內容

Java中的共用鎖和排他鎖(以讀寫鎖ReentrantReadWriteLock為例)

ReentrantLock vs sychronized

ReentrantLock是可重入鎖,可以替代sychronizedReentrantLocksychronized的區別在於:

ReentrantLock可以tryLock,嘗試若干時間片內獲取鎖。

程式碼如下:

package git.snippets.juc;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockTryLock {
    ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        ReentrantLockTryLock t = new ReentrantLockTryLock();
        new Thread(t::m).start();
        try {
            SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 由於前一個執行緒先執行m1,鎖定this,所以只能等前一個執行緒執行完畢後才能執行下面執行緒的操作
        new Thread(t::m2).start();

    }

    void m() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                try {
                    SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
                if (i == 2) {
                    m2();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void m2() {
        boolean locked = false;
        try {
            // 在1s內嘗試獲取鎖
            locked = lock.tryLock(1, SECONDS);
            if (locked) {
                System.out.println("get lock");
                System.out.println("start m2");
            } else {
                System.out.println("not get m2");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (locked) {
                lock.unlock();
            }
        }

    }
}


ReentrantLock可以用lockInterruptibly,在lock的時候可以被打斷,一旦被打斷,可以作出響應,而sychronized一旦wait後,必須得讓別人notify,才能醒來。

程式碼如下:

package git.snippets.juc;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockInterrupt {
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("a thread started and sleep forever");
                SECONDS.sleep(Integer.MAX_VALUE);
                System.out.println("a thread stopped");
            } catch (InterruptedException e) {
                System.out.println("the thread has been interrupted");
            } finally {
                lock.unlock();
            }
        });
        t1.start();
        Thread t2 = new Thread(() -> {
            try {
                lock.lockInterruptibly();
                System.out.println("if lock thread is interrupted, it will run");
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                System.out.println("interrupted");
            } finally {
                lock.unlock();
            }
        });
        t2.start();
        try {
            SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t2.interrupt();

    }
}

ReentrantLock可以設定公平與否,公平的概念是,每個執行緒來了以後會檢查等待佇列裡面會不會有等待的執行緒,如果有,則進入佇列等待。

程式碼如下

package git.snippets.juc;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockFair extends Thread {
    static ReentrantLock lock = new ReentrantLock(true/*false*/);

    public static void main(String[] args) {
        ReentrantLockFair tl = new ReentrantLockFair();
        Thread t1 = new Thread(tl);
        Thread t2 = new Thread(tl);
        t1.start();
        t2.start();
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            lock.lock();
            try {
                System.out.println("current thread :" + Thread.currentThread().getName() + " get the lock");
                SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }

        }
    }
}

注:不管是公平鎖還是非公平鎖,一旦沒有競爭到鎖,都會進行排隊,當鎖釋放時,都是喚醒排在最前面的執行緒,所以非公平鎖只是體現在了執行緒加鎖階段,而沒有體現線上程被喚醒階段。

synchronized鎖的是物件,鎖資訊儲存在物件頭中,ReentrantLock通過程式碼中int型別的state標識來標識鎖的狀態。

注:在使用 ReentrantLock 的時候一定要記得 unlock,因為如果使用 synchronized 遇到異常,JVM 會自動釋放鎖,但是用 ReentrantLock 必須手動釋放鎖,因此經常在finally 中進行鎖的釋放

程式碼如下:

package git.snippets.juc;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockAndSynchronized {
    ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        ReentrantLockAndSynchronized t = new ReentrantLockAndSynchronized();
        new Thread(t::m).start();
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 由於前一個執行緒先執行m1,鎖定this,所以只能等前一個執行緒執行完畢後才能執行下面執行緒的操作
        new Thread(t::m2).start();

    }

    void m() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
                if (i == 2) {
                    m2();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    void m2() {
        lock.lock();
        try {
            System.out.println("start m2");
            int i = 1 / 0;
        } finally {
            // 如果不加這句unlock,程式會一直卡在這裡
            lock.unlock();
        }

    }
}

package git.snippets.juc;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * 程式在執行過程中,如果出現異常,預設情況鎖會被釋放
 * 所以,在並行處理的過程中,有異常要多加小心,不然可能會發生不一致的情況。
 * 比如,在一個web app處理過程中,多個servlet執行緒共同存取同一個資源,這時如果例外處理不合適,
 * 在第一個執行緒中丟擲異常,其他執行緒就會進入同步程式碼區,有可能會存取到異常產生時的資料。
 * 因此要非常小心的處理同步業務邏輯中的異常
 */
public class SynchronizedException implements Runnable {
    int count = 0;

    public static void main(String[] args) throws IOException {
        SynchronizedException myRun = new SynchronizedException();
        Thread thread = new Thread(myRun, "t1");
        Thread thread2 = new Thread(myRun, "t2");
        thread.start();
        thread2.start();
        System.in.read();

    }

    @Override
    public void run() {
        synchronized (this) {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("current thread is " + Thread.currentThread().getName() + " count is " + count);
                if (count == 5) {
                    count++;
                    // 遇到異常,synchronized 會自動釋放鎖
                    int m = 1 / 0;
                }
                count++;
            }
        }
    }

    synchronized void m1(String content) {
        System.out.println(this);
        System.out.println("m1 get content is " + content);
        m2(content);
    }

    synchronized void m2(String content) {
        System.out.println(this);
        System.out.println("m2 get content is " + content);

    }
}

ReentrantReadWriteLock

在 ReentrantReadWriteLock 中包含讀鎖和寫鎖,其中讀鎖是可以多執行緒共用的,即共用鎖, 而寫鎖是排他鎖,在更改時候不允許其他執行緒操作。讀寫鎖其實是一把鎖,所以會有同一時刻不允許讀寫鎖共存的規定。之所以要細分讀鎖和寫鎖也是為了提高效率,將讀和寫分離,

範例:

package git.snippets.juc;


import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * ReentrantReadWriteLock讀寫鎖範例
 **/
public class ReentrantLockReadAndWrite {

    private static ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
    private static ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
    private static ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();

    public static void read() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "獲取讀鎖,開始執行");
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "釋放讀鎖");
        }
    }

    public static void write() {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "獲取寫鎖,開始執行");
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName() + "釋放寫鎖");
        }
    }

    public static void main(String[] args) {
        new Thread(() -> read(), "Thread1").start();
        new Thread(() -> read(), "Thread2").start();
        new Thread(() -> write(), "Thread3").start();
        new Thread(() -> write(), "Thread4").start();
    }
}

讀鎖的插隊策略

設想如下場景:

在非公平的ReentrantReadWriteLock鎖中,執行緒2和執行緒4正在同時讀取,執行緒3想要寫入,拿不到鎖(同一時刻是不允許讀寫鎖共存的),於是進入等待佇列, 執行緒5不在佇列裡,現在過來想要讀取,

策略1

如果允許讀插隊,就是說執行緒5讀先於執行緒3寫操作執行,因為讀鎖是共用鎖,不影響後面的執行緒3的寫操作,
這種策略可以提高一定的效率,卻可能導致像執行緒3這樣的執行緒一直在等待中,因為可能執行緒5讀操作之後又來了n個執行緒也進行讀操作,造成執行緒飢餓;

策略2

不允許插隊,即執行緒5的讀操作必須排線上程3的寫操作之後,放入佇列中,排線上程3之後,這樣能避免執行緒飢餓。
事實上 ReentrantReadWriteLock 在非公平情況下,讀鎖採用的就是策略2:不允許讀鎖插隊,避免執行緒飢餓。更加確切的說是:在非公平鎖情況下,允許寫鎖插隊,也允許讀鎖插隊,

但是讀鎖插隊的前提是佇列中的頭節點不能是想獲取寫鎖的執行緒。

以上是在非公平ReentrantReadWriteLock鎖中,

在公平鎖中,讀寫鎖都是是不允許插隊的,嚴格按照執行緒請求獲取鎖順序執行。

程式碼如下

package git.snippets.juc;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * ReentrantLock的讀鎖插隊策略
 */
public class ReentrantLockCut {
    private static final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
    private static final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
    private static final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();

    public static void read() {
        System.out.println(Thread.currentThread().getName() + "開始嘗試獲取讀鎖");
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "獲取讀鎖,開始執行");
            Thread.sleep(20);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "釋放讀鎖");
        }
    }

    public static void write() {
        System.out.println(Thread.currentThread().getName() + "開始嘗試獲取寫鎖");
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "獲取寫鎖,開始執行");
            Thread.sleep(40);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + "釋放寫鎖");
            writeLock.unlock();
        }
    }

    public static void main(String[] args) {
        new Thread(ReentrantLockCut::write, "Thread1").start();
        new Thread(ReentrantLockCut::read, "Thread2").start();
        new Thread(ReentrantLockCut::read, "Thread3").start();
        new Thread(ReentrantLockCut::write, "Thread4").start();
        new Thread(ReentrantLockCut::read, "Thread5").start();
        new Thread(() -> {
            Thread[] threads = new Thread[1000];
            for (int i = 0; i < 1000; i++) {
                threads[i] = new Thread(ReentrantLockCut::read, "子執行緒建立的Thread" + i);
            }
            for (int i = 0; i < 1000; i++) {
                threads[i].start();
            }
        }).start();
    }

}

鎖的升降級

ReentrantReadWriteLock讀寫鎖中,只支援寫鎖降級為讀鎖,而不支援讀鎖升級為寫鎖,

之所以ReentrantReadWriteLock不支援鎖的升級(其它鎖可以支援),主要是避免死鎖,

例如兩個執行緒A和B都在讀, A升級要求B釋放讀鎖,B升級要求A釋放讀鎖,互相等待形成死迴圈。

如果能嚴格保證每次都只有一個執行緒升級那也是可以的。

程式碼如下

package git.snippets.juc;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 讀鎖無法升級為寫鎖
 * 寫鎖可以降級成讀鎖
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/21
 * @since
 */
public class ReentrantReadWriteLockUpAndDown {
    private static final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
    private static final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
    private static final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();

    public static void read() {
        System.out.println(Thread.currentThread().getName() + "開始嘗試獲取讀鎖");
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "獲取讀鎖,開始執行");
            Thread.sleep(20);
            System.out.println(Thread.currentThread().getName() + "嘗試升級讀鎖為寫鎖");
            //讀鎖升級為寫鎖(失敗)
            writeLock.lock();
            System.out.println(Thread.currentThread().getName() + "讀鎖升級為寫鎖成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "釋放讀鎖");
        }
    }

    public static void write() {
        System.out.println(Thread.currentThread().getName() + "開始嘗試獲取寫鎖");
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "獲取寫鎖,開始執行");
            Thread.sleep(40);
            System.out.println(Thread.currentThread().getName() + "嘗試降級寫鎖為讀鎖");
            //寫鎖降級為讀鎖(成功)
            readLock.lock();
            System.out.println(Thread.currentThread().getName() + "寫鎖降級為讀鎖成功");
            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + "釋放寫鎖");
            writeLock.unlock();
            readLock.unlock();
        }
    }

    public static void main(String[] args) {
        new Thread(ReentrantReadWriteLockUpAndDown::write, "Thread1").start();
        new Thread(ReentrantReadWriteLockUpAndDown::read, "Thread2").start();
    }
}

CAS,Synchronized,Lock的使用情景

對於資源競爭較少(執行緒衝突較輕)的情況,使用synchronized同步鎖進行執行緒阻塞和喚醒切換以及使用者態核心態間的切換操作額外浪費消耗cpu資源;而CAS基於硬體實現,不需要進入核心,不需要切換執行緒,操作自旋機率較少,因此可以獲得更高的效能。

對於資源競爭嚴重(執行緒衝突嚴重)的情況,CAS 自旋的概率會比較大,從而浪費更多的 CPU 資源,效率低於synchronized

注: synchronized在jdk1.6之後,已經改進優化。synchronized的底層實現主要依靠 Lock-Free 的佇列,基本思路是自旋後阻塞,競爭切換後繼續競爭鎖,稍微犧牲了公平性,但獲得了高吞吐量。線上程衝突較少的情況下,可以獲得和 CAS 類似的效能;而執行緒衝突嚴重的情況下,效能遠高於 CAS。

synchronized作為悲觀鎖,比較適合寫入操作比較頻繁的場景,如果出現大量的讀取操作,每次讀取的時候都會進行加鎖,這樣會增加大量的鎖的開銷,降低了系統的吞吐量。

在資源競爭不是很激烈的情況下,偶爾會有同步的情形下,synchronized是很合適的。原因在於,編譯程式通常會盡可能的進行優化synchronized,另外可讀性非常好,不管用沒用過5.0多執行緒包的程式設計師都能理解。預設是非公平鎖:後等待的執行緒可以先獲得鎖。

ReentrantLock比較適合讀取操作比較頻繁的場景,如果出現大量的寫入操作,資料發生衝突的可能性就會增大,為了保證資料的一致性,應用層需要不斷的重新獲取資料,這樣會增加大量的查詢操作,降低了系統的吞吐量。

Atomic和上面的類似,不激烈情況下,效能比synchronized略遜,而激烈的時候,也能維持常態。激烈的時候,Atomic的效能會優於ReentrantLock一倍左右。但是其有一個缺點,就是隻能同步一個值,一段程式碼中只能出現一個Atomic的變數,多於一個同步無效。因為他不能在多個Atomic之間同步。

說明

本文涉及到的所有程式碼和圖例

圖例

程式碼

更多內容見:Java 多執行緒

參考資料

實戰Java高並行程式設計(第2版)

深入淺出Java多執行緒

多執行緒與高並行-馬士兵

Java並行程式設計實戰

設計模式學習筆記

從LONGADDER看更高效的無鎖實現

Java 8 Performance Improvements: LongAdder vs AtomicLong

Java中的共用鎖和排他鎖(以讀寫鎖ReentrantReadWriteLock為例)

【並行程式設計】面試官:有沒有比讀寫鎖更快的鎖?

圖解Java多執行緒設計模式