公平鎖和非公平鎖在原始碼層的兩點區別:
1、非公平上來直接搶鎖
2、當state=0時,非公平直接搶,公平鎖還會判斷佇列還有沒有前置節點
下面就讓我們跟蹤RL的lock()和unLock()原始碼來看看程式碼級別是怎麼實現的吧!
需要注意的是,本文跟蹤的是非公平鎖的加解鎖過程,公平鎖的實現大體一致,當原始碼中有與公平鎖的顯著差別時我會通過註釋給出解釋
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
List<Thread> list = new ArrayList<>();
ReentrantLock lock = new ReentrantLock();
for (int i = 0; i < 1000; i++) {
Thread thread = new Thread(()-> {
for (int j = 0; j < 1000; j++) {
// 解鎖
lock.lock();
count++;
// 釋放鎖
lock.unlock();
}
});
list.add(thread);
}
for (Thread thread : list) {
thread.start();
}
for (Thread thread : list) {
thread.join();
}
System.out.println("auto.count = " + count + "耗時:" + (System.currentTimeMillis() -start));
}
跟蹤lock.lock()發現其呼叫的是內部類Sync的lock()方法,該方法是一個抽象方法,具體實現由FairSync和NonfairSync實現,由於我們構造RL時呼叫的是無參建構函式,所以這裡會直接進入NonfairSync的lock()方法;具體實現程式碼和註釋如下:
/**
* java.util.concurrent.locks.ReentrantLock.NonfairSync#lock()
*/
final void lock() {
// 由於是非公平鎖所以這裡上來直接爭搶資源,嘗試通過CAS操作將state的值由0變成1
if (compareAndSetState(0, 1))
// 如果成功將state值變成1表示爭搶鎖成功,設定當前擁有獨佔存取權的執行緒。
setExclusiveOwnerThread(Thread.currentThread());
else
// 爭搶失敗再進入與公平鎖一樣的排隊邏輯
acquire(1);
}
tips:
1、上面的compareAndSetState方法也是由AQS提供的,裡面藉助Unsafe實現了對state的cas操作更新
2、setExclusiveOwnerThread也可以理解成由AQS提供(其實是AQS的父類別,不過不影響理解),給exclusiveOwnerThread變數賦值,exclusiveOwnerThread表示當前正在擁有鎖的執行緒
3、acquire方法同樣由AQS提供,其內部實現也是lock環節比較關鍵的程式碼,下面我會詳細解釋
acquire方法的原始碼如下:
/**
* java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire(int)
*/
public final void acquire(int arg) {
/**
* 1、嘗試獲取鎖;如果成功此方法結束,當前執行緒執行同步程式碼塊
* 2、如果獲取失敗,則構造Node節點並加入CLH佇列
* 3、然後繼續等待鎖
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果獲取鎖失敗,新增CLH佇列也失敗,那麼直接中斷當前執行緒
selfInterrupt();
}
tips:
1、tryAcquire方法是AQS的一個模板方法,RL下的公平和非公平鎖都有不同的實現,下面會詳解
2、addWaiter方法是AQS的一個預設實現方法,負責構造當前執行緒所在的Node,並將其設定到佇列的尾巴上
3、acquireQueued方法也是AQS的預設實現,旨在設定CLH佇列的head和阻塞當前執行緒
上面的三個方法下面也會一一介紹
/**
* java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire(int)
*/
final boolean nonfairTryAcquire(int acquires) {
// 當前執行緒
final Thread current = Thread.currentThread();
// 獲取當前state的值
int c = getState();
if (c == 0) {
/**
* 非公平鎖發現資源未被佔用時直接CAS嘗試搶佔資源;而公平鎖發現資源未被佔用時
* 先判斷佇列裡是否還有前置節點再等待,沒有才會去搶佔資源
*/
if (compareAndSetState(0, acquires)) {
// 如果成功將state值變成1表示爭搶鎖成功,設定當前擁有獨佔存取權的執行緒。
setExclusiveOwnerThread(current);
return true;
}
}
/**
* 如果state!=0表示有爭用,再判斷當前系統擁有獨佔許可權的執行緒是不是當前執行緒,
* 如果是,則需要支援執行緒重入,將state的值加1
*/
else if (current == getExclusiveOwnerThread()) {// 處理可重入的邏輯
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// state既不等於0也不需要重入則返回false;表示獲取鎖失敗,程式碼返回後繼續執行acquireQueued方法
return false;
}
/**
* java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node)
*/
private Node addWaiter(Node mode) {
// 構建Node物件
Node node = new Node(Thread.currentThread(), mode);
/**
* 將當前佇列的尾節點賦值給pred,通過命名和下面的程式碼其實可以發現就是想讓tail作為當前節點的前置節點;
* 但是為什麼不直接用tail而將其賦值給pred再用呢?我想應該是考慮並行環境下tail的參照有可能會被其他執行緒改變
*/
Node pred = tail;
if (pred != null) {
// 如果當前佇列的尾結點(tail)不為空,就將其作為當前Node節點的前置節點
node.prev = pred;
// 然後通過AQS自帶的cas方法將當前構建的Node節點插入到佇列的尾巴上
if (compareAndSetTail(pred, node)) {
// 如果成功了,前置節點也就是之前的tail節點的後繼節點就是當前節點,賦值
pred.next = node;
// 返回構建的Node節點,即當前佇列的tail節點
return node;
}
}
// 如果佇列的tail節點為空,或者cas設定tail節點失敗的話呼叫此方法;旨在重新設定佇列的tail節點
enq(node);
return node;
}
/**
* java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node, int)
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取tail節點的前置節點
final Node p = node.predecessor();
/**
* 如果前置節點就是頭節點表示當前tail節點就是第二個節點,就可以嘗試著去獲取鎖,
* 然後將tail節點設定成頭節點,返回執行緒中斷狀態為false;表示當前執行緒獲取到鎖
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
/**
* 既然tail已經獲取到鎖了,那麼前置節點就沒用了,這裡將前置節點的next設定為空,
* 是為了方便垃圾回收,因為如果不指定為空,前置節點的next就是當前的tail節點,
* 不會被回收
*/
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* 如果前置節點不為head,或者雖然前置節點是head但是獲取鎖失敗,那麼就
* 需要在這裡將執行緒阻塞,阻塞利用的是LockSupport.park(thread)來實現的
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 退出獲取鎖
cancelAcquire(node);
}
}
至此,RL非公平鎖加鎖的過程的原始碼跟蹤完畢,流程也不算複雜,下面簡單梳理一遍:
1、上來直接嘗試獲取鎖(修改state值),成功表示獲取成功
2、否則執行tryAcquire方法嘗試通過cas的方式獲取鎖,並處理可能存在的重入操作
3、獲取失敗則通過addWriter方法構建Node節點並加入CLH佇列的末尾
4、然後在acquireQueued裡再次獲取鎖,獲取失敗則阻塞當前執行緒;
下面簡單畫了一下lock()方法的呼叫泳道圖
1、呼叫父類別AQS的compareAndSetState通過cas的模式嘗試將state狀態改為1,修改成功則持有鎖,將當前執行緒設為ExclusiveOwnerThread
/**
* java.util.concurrent.locks.AbstractQueuedSynchronizer#release(int)
*/
public final boolean release(int arg) {
// 嘗試釋放鎖
if (tryRelease(arg)) {
// 釋放成功,判斷當前佇列頭節點是否為空,不為空並且等待狀態不等於0則喚醒當前佇列的頭節點
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* java.util.concurrent.locks.ReentrantLock.Sync#tryRelease(int)
* @param releases
* @return
*/
protected final boolean tryRelease(int releases) {
// state減1
int c = getState() - releases;
// 如果當前執行緒不是正在獲取到鎖的執行緒直接拋異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果state減1後等於0表示沒有重入,表示釋放鎖成功,將當前獲取鎖的執行緒置空
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 將最新的state狀態更新到AQS中
setState(c);
return free;
}
unlock()總結:
1、呼叫父類別AQS的release方法實際呼叫的是tryRelease這個模板方法由ReentrantLock本身實現
2、tryRelease方法嘗試將state減1,如果減完等於0表示解鎖成功,將ExclusiveOwner執行緒設為空;並且喚醒佇列的頭節點(unparkSuccessor)。
3、如果不等於0表示解鎖失敗,將state設為減1過後的值;也是為了可重入