作者:Grey
原文地址:
比較與交換的意思
舉個例子,記憶體有個值是 3,如果用 Java 通過多執行緒去存取這個數,每個執行緒都要把這個值 +1。
之前是需要加鎖,即synchronized
關鍵字來控制。但是 JUC 的包出現後,有了 CAS 操作,可以不需要加鎖來處理,流程是:
第一個執行緒:把 3 拿過來,執行緒本地區域做計算加 1,然後把 4 寫回去。
第二個執行緒:也把 3 這個數拿過來,執行緒本地區域做計算加 1 後,在回寫回去的時候,會做一次比較,如果原來的值還是 3,那麼說明這個值之前沒有被打擾過,就可以把 4 寫回去,如果這個值變了,假設變為了 4,那麼說明這個值已經被其他執行緒修改過了,那麼第二個執行緒需要重新執行一次,即把最新的 4 拿過來繼續計算,回寫回去的時候,繼續做比較,如果記憶體中的值依然是 4,說明沒有其他執行緒處理過,第二個執行緒就可以把 5 回寫回去了。
流程圖如下
CAS 會出現一個 ABA 的問題,即在一個執行緒回寫值的時候,其他執行緒其實動過那個原始值,只不過其他執行緒操作後這個值依然是原始值。
如何來解決 ABA 問題呢?
我們可以通過版本號或者時間戳來控制,比如資料原始的版本是 1.0,處理後,我們把這個資料的版本改成變成 2.0 版本, 時間戳來控制也一樣。
以 Java 為例,AtomicStampedReference
這個類,它內部不僅維護了物件值,還維護了一個時間戳。當AtomicStampedReference
對應的數值被修改時,除了更新資料本身外,還必須要更新時間戳。當AtomicStampedReference
設定物件值時,物件值以及時間戳都必須滿足期望值,寫入才會成功。因此,即使物件值被反覆讀寫,寫回原值,只要時間戳發生變化,就能防止不恰當的寫入。
程式碼範例
package git.snippets.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2022/9/10
* @since
*/
public class ABATest {
public static void main(String[] args) throws InterruptedException {
abaCorrect();
}
private static void abaCorrect() throws InterruptedException {
AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(10, 0);
Thread threadA = new Thread(() -> {
try {
int[] stamp = new int[1];
Integer value = ref.get(stamp); //同時獲取時間戳和資料,防止獲取到資料和版本不是一致的
System.out.println(String.format("%s 啟動,當前值是:%s,版本:%s", Thread.currentThread().getName(), ref.getReference(), stamp[0]));
TimeUnit.MILLISECONDS.sleep(1000);
int newValue = value + 1;
boolean writeOk = ref.compareAndSet(value, newValue, stamp[0], stamp[0] + 1);
System.out.println(String.format("%s:%s,%s", Thread.currentThread().getName(), "10->11", writeOk ? stamp[0] + 1 : stamp[0]));
stamp = new int[1];
value = ref.get(stamp); //同時獲取時間戳和資料,防止獲取到資料和版本不是一致的
newValue = value - 1;
writeOk = ref.compareAndSet(value, newValue, stamp[0], stamp[0] + 1);
System.out.println(String.format("%s:%s,%s", Thread.currentThread().getName(), "10->11->10", writeOk ? stamp[0] + 1 : stamp[0]));
} catch (InterruptedException e) {
}
}, "執行緒A");
Thread threadB = new Thread(() -> {
try {
int[] stamp = new int[1];
Integer value = ref.get(stamp); //同時獲取時間戳和資料,防止獲取到資料和版本不是一致的
System.out.println(String.format("%s 啟動,當前值是:%s,版本:%s", Thread.currentThread().getName(), ref.getReference(), stamp[0]));
TimeUnit.MILLISECONDS.sleep(2000);
int newValue = value + 2;
boolean writeOk = ref.compareAndSet(value, newValue, stamp[0], stamp[0] + 1);
System.out.println(String.format("%s: index是預期的10:%s,新值是:%s,版本:%s", Thread.currentThread().getName(), writeOk, ref.getReference(), writeOk ? stamp[0] + 1 : stamp[0]));
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "執行緒B");
threadA.start();
threadB.start();
threadA.join();
threadB.join();
}
}
CAS 的底層呼叫了組合的 LOCK_IF_MP 方法:
lock cmpxchg
雖然cmpxchg
指令不是原子的,但是加了lock
指令後,則cmpxhg
被上鎖,不允許被打斷。 在單核 CPU 中,無須加lock
,在多核 CPU 中,必須加lock
,可以參考 stackoverflow 上的這個回答: is-x86-cmpxchg-atomic-if-so-why-does-it-need-lock
使用 CAS 好處
jdk 早期是重量級別鎖 ,通過0x80
中斷 進行使用者態和核心態轉換,所以效率比較低,有了 CAS 操作,大大提升了效率。
過程如下:
synchronized 程式碼段多數時間是一個執行緒在執行,誰先來,這個就偏向誰,用當前執行緒標記一下。
偏向鎖復原,然後競爭,每個執行緒在自己執行緒棧中存一個LR(lock record)鎖記錄
偏向鎖和輕量級鎖都是使用者空間完成的,重量級鎖需要向作業系統申請。
兩個執行緒爭搶的方式將lock record的指標,指標指向哪個執行緒的LR,哪個執行緒就拿到鎖,另外的執行緒用 CAS 的方式繼續競爭
JVM 的 ObjectMonitor 去作業系統申請。
如果發生異常,synchronized
會自動釋放鎖,
範例程式碼如下:
package git.snippets.juc;
import java.util.concurrent.TimeUnit;
public class ExceptionCauseUnLock {
/*volatile */ boolean stop = false;
public static void main(String[] args) {
ExceptionCauseUnLock t = new ExceptionCauseUnLock();
new Thread(t::m, "t1").start();
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (t.stop) {
int m = 1 / 0;
}
}
synchronized void m() {
while (!stop) {
stop = true;
}
}
}
其中
int m = 1 / 0;
會丟擲異常,鎖會自動釋放。
synchronized
是可重入鎖, 可重入次數必須記錄,因為解鎖需要對應可重入次數的記錄。
偏向鎖:記錄線上程棧中,每重入一次,LR 加 1,備份原來的markword
輕量級鎖:類似偏向鎖
重量級鎖:記錄在ObjectMonitor
的一個欄位中
自旋鎖什麼時候升級為重量級鎖?
有執行緒超過十次自旋
-XX:PreBlockSpin(jdk1.6之前)
自旋的執行緒超過CPU核數一半
jdk1.6 以後,JVM自己控制
為什麼有偏向鎖啟動和偏向鎖未啟動?
未啟動:普通物件001 已啟動:匿名偏向101
為什麼有自旋鎖還需要重量級鎖?
因為自旋會佔用 CPU 時間,消耗 CPU 資源,如果自旋的執行緒多,CPU 資源會被消耗,所以會升級成重量級鎖(佇列)例如:ObjectMonitor
裡面的WaitSet
,重量級鎖會把執行緒都丟到WaitSet
中凍結, 不需要消耗 CPU 資源
偏向鎖是否一定比自旋鎖效率高?
明確知道多執行緒的情況下,不一定。 因為偏向鎖在多執行緒情況下,會涉及到鎖復原,這個時候直接使用自旋鎖,JVM 啟動過程,會有很多執行緒競爭,比如啟動的時候,肯定是多執行緒的,所以預設情況,啟動時候不開啟偏向鎖,過一段時間再開啟,JVM 有一個引數可以設定:BiasedLockingStartupDelay
預設是4s
package git.snippets.juc;
/**
* synchronized鎖定物件
*
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/15
* @since
*/
public class SynchronizedObject implements Runnable {
static SynchronizedObject instance = new SynchronizedObject();
final Object object = new Object();
static volatile int i = 0;
@Override
public void run() {
for (int j = 0; j < 1000000; j++) {
// 任何執行緒要執行下面的程式碼,必須先拿到object的鎖
synchronized (object) {
i++;
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
鎖定靜態方法相當於鎖定當前類
package git.snippets.juc;
/**
* synchronized鎖定靜態方法,相當於鎖定當前類
*
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/15
* @since
*/
public class SynchronizedStatic implements Runnable {
static SynchronizedStatic instance = new SynchronizedStatic();
static volatile int i = 0;
@Override
public void run() {
increase();
}
// 相當於synchronized(SynchronizedStatic.class)
synchronized static void increase() {
for (int j = 0; j < 1000000; j++) {
i++;
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
鎖定非靜態方法相當於鎖定該物件的範例或
synchronized(this)
package git.snippets.juc;
/**
* synchronized鎖定方法
*
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/15
* @since
*/
public class SynchronizedMethod implements Runnable {
static SynchronizedMethod instance = new SynchronizedMethod();
static volatile int i = 0;
@Override
public void run() {
increase();
}
void increase() {
for (int j = 0; j < 1000000; j++) {
synchronized (this) {
i++;
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
package git.snippets.juc;
import java.util.concurrent.TimeUnit;
/**
* 模擬髒讀
*
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/15
* @since
*/
public class DirtyRead {
String name;
double balance;
public static void main(String[] args) {
DirtyRead a = new DirtyRead();
Thread thread = new Thread(() -> a.set("zhangsan", 100.0));
thread.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(a.getBalance("zhangsan"));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(a.getBalance("zhangsan"));
}
public synchronized void set(String name, double balance) {
this.name = name;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.balance = balance;
}
// 如果get方法不加synchronized關鍵字,就會出現髒讀情況
public /*synchronized*/ double getBalance(String name) {
return this.balance;
}
}
其中的getBalance
方法,如果不加synchronized
,就會產生髒讀的問題。
一個同步方法可以呼叫另外一個同步方法,
一個執行緒已經擁有某個物件的鎖,再次申請的時候仍然會得到該物件的鎖(可重入鎖)
子類synchronized,如果呼叫父類別的synchronize方法:super.method(),如果不可重入,直接就會死鎖。
package git.snippets.juc;
import java.io.IOException;
/**
* 一個同步方法可以呼叫另外一個同步方法,一個執行緒已經擁有某個物件的鎖,再次申請的時候仍然會得到該物件的鎖.
*
* @author <a href="mailto:[email protected]">Grey</a>
* @since
*/
public class SynchronizedReentry implements Runnable {
public static void main(String[] args) throws IOException {
SynchronizedReentry myRun = new SynchronizedReentry();
Thread thread = new Thread(myRun, "t1");
Thread thread2 = new Thread(myRun, "t2");
thread.start();
thread2.start();
System.in.read();
}
synchronized void m1(String content) {
System.out.println(this);
System.out.println("m1 get content is " + content);
m2(content);
}
synchronized void m2(String content) {
System.out.println(this);
System.out.println("m2 get content is " + content);
}
@Override
public void run() {
m1(Thread.currentThread().getName());
}
}
程式在執行過程中,如果出現異常,預設情況鎖會被釋放 ,所以,在並行處理的過程中,有異常要多加小心,不然可能會發生不一致的情況。比如,在一個 web app 處理過程中,多個Servlet
執行緒共同存取同一個資源,這時如果例外處理不合適,在第一個執行緒中丟擲異常,其他執行緒就會進入同步程式碼區,有可能會存取到異常產生時的資料。因此要非常小心的處理同步業務邏輯中的異常。
範例程式碼
package git.snippets.juc;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* 程式在執行過程中,如果出現異常,預設情況鎖會被釋放
* 所以,在並行處理的過程中,有異常要多加小心,不然可能會發生不一致的情況。
* 比如,在一個web app處理過程中,多個servlet執行緒共同存取同一個資源,這時如果例外處理不合適,
* 在第一個執行緒中丟擲異常,其他執行緒就會進入同步程式碼區,有可能會存取到異常產生時的資料。
* 因此要非常小心的處理同步業務邏輯中的異常
*/
public class SynchronizedException implements Runnable {
int count = 0;
public static void main(String[] args) throws IOException {
SynchronizedException myRun = new SynchronizedException();
Thread thread = new Thread(myRun, "t1");
Thread thread2 = new Thread(myRun, "t2");
thread.start();
thread2.start();
System.in.read();
}
@Override
public void run() {
synchronized (this) {
while (true) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("current thread is " + Thread.currentThread().getName() + " count is " + count);
if (count == 5) {
count++;
int m = 1 / 0;
}
count++;
}
}
}
synchronized void m1(String content) {
System.out.println(this);
System.out.println("m1 get content is " + content);
m2(content);
}
synchronized void m2(String content) {
System.out.println(this);
System.out.println("m2 get content is " + content);
}
}
在早期的 JDK 使用的是作業系統級別的重量級鎖
後來的改進鎖升級的概念:
synchronized (Object)
markword 記錄這個執行緒ID (使用偏向鎖)
如果執行緒爭用:升級為 自旋鎖
10次自旋以後,升級為重量級鎖 - OS
所以,如果
執行時間短(加鎖程式碼),執行緒數少,用自旋。
執行時間長,執行緒數多,用系統鎖。
注:synchronized
不能鎖定String常數,Integer,Long等基礎型別
程式碼範例如下
package git.snippets.juc;
/**
* synchronized不能鎖定String常數,Integer,Long等基礎型別
* <p>
* 不要以字串常數作為鎖定物件
* 在下面的例子中,m1和m2其實鎖定的是同一個物件
* 這種情況還會發生比較詭異的現象,比如你用到了一個類庫,在該類庫中程式碼鎖定了字串「Hello」,
* 但是你讀不到原始碼,所以你在自己的程式碼中也鎖定了"Hello",這時候就有可能發生非常詭異的死鎖阻塞,
* 因為你的程式和你用到的類庫不經意間使用了同一把鎖
*
* @author <a href="mailto:[email protected]">Grey</a>
* @since
*/
public class SynchronizedBasicType implements Runnable {
public static Integer i = 0;
static SynchronizedBasicType instance = new SynchronizedBasicType();
static final String lock = "this is a lock";
static final String lock1 = "this is a lock";
public static void main(String[] args) throws InterruptedException {
m();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
public static void m() throws InterruptedException {
Thread m1 = new Thread(new Runnable() {
@Override
public void run() {
/*synchronized (this)*/
synchronized (lock) {
System.out.println("locked ...");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
System.out.println("unlocked ...");
}
}
});
m1.start();
Thread.sleep(1000);
Thread m2 = new Thread(new Runnable() {
@Override
public void run() {
/*synchronized (this)*/
synchronized (lock1) {
System.out.println("locked lock1 ...");
System.out.println("unlocked lock1 ...");
}
}
});
m2.start();
m1.join();
m2.join();
}
@Override
public void run() {
for (int j = 0; j < 10000000; j++) {
synchronized (i) {
i++;
}
}
}
}
鎖定某物件 o,如果 o 的屬性發生改變,不影響鎖的使用; 但是如果 o 指向另外一個物件,則鎖定的物件發生改變, 會影響鎖的使用,所以應該避免將鎖定物件的參照變成另外的物件。
package git.snippets.juc;
import java.util.concurrent.TimeUnit;
/**
* 鎖定某物件o,如果o的屬性發生改變,不影響鎖的使用
* 但是如果o變成另外一個物件,則鎖定的物件發生改變
* 應該避免將鎖定物件的參照變成另外的物件
*/
public class SyncSameObject {
Object object = new Object();
public static void main(String[] args) {
SyncSameObject t = new SyncSameObject();
new Thread(t::m).start();
Thread t2 = new Thread(t::m, "t2");
//鎖物件發生改變,所以t2執行緒得以執行,如果註釋掉這句話,執行緒2將永遠得不到執行機會
t.object = new Object();
t2.start();
}
void m() {
synchronized (object) {
while (true) {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("current thread is " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
以上程式碼,如果不執行t.object=new Object()
這句,m2 執行緒將永遠得不到執行。
兩個或兩個以上的執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去,這就是死鎖現象
死鎖產生的原因主要有如下幾點
系統的資源競爭
程式在執行過程中申請和釋放資源的順序不當
死鎖產生的必要條件
互斥條件:程序要求對所分配的資源(如印表機)進行排他性控制,即在一段時間內某資源僅為一個程序所佔有。此時若有其他程序請求該資源,則請求程序只能等待。
不剝奪條件:程序所獲得的資源在未使用完畢之前,不能被其他程序強行奪走,即只能由獲得該資源的程序自己來釋放(只能是主動釋放)。
請求和保持條件:程序已經保持了至少一個資源,但又提出了新的資源請求,而該資源已被其他程序佔有,此時請求程序被阻塞,但對自己已獲得的資源保持不放。
迴圈等待條件:存在一種程序資源的迴圈等待鏈,鏈中每一個程序已獲得的資源同時被鏈中下一個程序所請求。
模擬死鎖程式碼
/**
* 模擬死鎖
*/
public class DeadLock implements Runnable {
int flag = 1;
static Object o1 = new Object();
static Object o2 = new Object();
public static void main(String[] args) {
DeadLock lock = new DeadLock();
DeadLock lock2 = new DeadLock();
lock.flag = 1;
lock2.flag = 0;
Thread t1 = new Thread(lock);
Thread t2 = new Thread(lock2);
t1.start();
t2.start();
}
@Override
public void run() {
System.out.println("flag = " + flag);
if (flag == 1) {
synchronized (o2) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1) {
System.out.println("1");
}
}
}
if (flag == 0) {
synchronized (o1) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2) {
System.out.println("0");
}
}
}
}
}
如何避免死鎖?
1、讓程式每次至多隻能獲得一個鎖。當然,在多執行緒環境下,這種情況通常並不現實。
2、設計時考慮清楚鎖的順序,儘量減少嵌在的加鎖互動數量。
3、增加時限,比如使用Lock
類中的tryLock
方法去嘗試獲取鎖,這個方法可以指定一個超時時限,在等待超過該時限之後便會返回一個失敗資訊。
保持執行緒之間的可見性(不保證操作的原子性),依賴 MESI 協定
防止指令重排序,CPU的load fence
和store fence
原語支援
CPU 原來執行指令一步一步執行,現在是流水線執行,編譯以後可能會產生指令的重排序,這樣可以提高效能
關於volatile
不保證原子性的程式碼範例
package git.snippets.juc;
/**
* Volatile保持執行緒之間的可見性(不保證操作的原子性)
*
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/19
* @since
*/
public class VolatileNOTAtomic {
volatile static Data data;
public static void main(String[] args) {
Thread writer = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
data = new Data(i, i);
}
});
Thread reader = new Thread(() -> {
while (data == null) {
}
int a = data.a;
int b = data.b;
if (a != b) {
// 會出現這種情況是因為new Data(i,i)非原子操作,會產生中間狀態的物件,導致a和b的值會不一致
System.out.printf("a = %s, b=%s%n", a, b);
}
});
writer.start();
reader.start();
try {
writer.join();
reader.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
public static class Data {
int a;
int b;
Data(int a, int b) {
this.a = a;
this.b = b;
}
}
}
volatile
並不能保證多個執行緒共同修改running
變數時所帶來的不一致問題,也就是說volatile
不能替代synchronized
,
範例程式碼如下:
package git.snippets.juc;
import java.util.ArrayList;
import java.util.List;
/**
* volatile並不能保證多個執行緒共同修改變數時所帶來的不一致問題,也就是說volatile不能替代synchronized
*
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/19
* @since
*/
public class VolatileCanNotReplaceSynchronized {
volatile int count = 0;
int count2 = 0;
public static void main(String[] args) {
VolatileCanNotReplaceSynchronized t = new VolatileCanNotReplaceSynchronized();
List<Thread> threads = new ArrayList<>();
List<Thread> threads2 = new ArrayList<>();
for (int i = 0; i < 20; i++) {
threads.add(new Thread(t::m));
threads2.add(new Thread(t::m2));
}
threads.forEach(item -> item.start());
threads2.forEach(item -> item.start());
threads.forEach(item -> {
try {
item.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threads2.forEach(item -> {
try {
item.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(t.count);
System.out.println(t.count2);
}
void m() {
for (int i = 0; i < 1000; i++) {
count++;
}
}
synchronized void m2() {
for (int i = 0; i < 1000; i++) {
count2++;
}
}
}
什麼是 DCL,請參考設計模式學習筆記中的單例模式說明。
在New物件的時候,編譯完實際上是分了三步
物件申請記憶體,成員變數會被賦初始值
成員變數設為真實值
成員變數賦給物件
指令重排序可能會導致2和3進行指令重排,導致下一個執行緒拿到一個半初始化的物件,導致單例被破壞。所以 DCL 必須加volitile
此外,被volatile
關鍵字修飾的物件作為類變數或範例變數時,其物件中攜帶的類變數和範例變數也相當於被volatile
關鍵字修飾了
範例程式碼如下
package git.snippets.juc;
import java.util.concurrent.TimeUnit;
/**
* 被volatile關鍵字修飾的物件作為類變數或範例變數時,其物件中攜帶的類變數和範例變數也相當於被volatile關鍵字修飾了
*
* @author <a href="mailto:[email protected]">Grey</a>
* @since 1.8
*/
public class VolatileRef {
volatile M tag = new M();
public static void main(String[] args) {
VolatileRef t = new VolatileRef();
new Thread(t::m, "t1").start();
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
t.tag.n.x.stop = new Boolean(true);
}
void m() {
while (!tag.n.x.stop) {
}
}
}
class M {
N n = new N();
}
class N {
X x = new X();
}
class X {
public Boolean stop = new Boolean(false);
}
本文涉及到的所有程式碼和圖例
更多內容見:Java 多執行緒
本文來自部落格園,作者:Grey Zeng,轉載請註明原文連結:https://www.cnblogs.com/greyzeng/p/16678272.html