在開發場景下,對於寫操作我們為了保證原子性所以需要上鎖,但是對於讀操作,由於其不改變資料,只是單純對資料進行讀取,但是每次都上一把互斥鎖,阻塞所有請求。這個明顯不符合讀多寫少的場景。所以將鎖分為兩把:讀鎖和寫鎖。
讀寫鎖的底層實現依舊依賴於AQS。具體底層實現,請檢視上一篇文章的介紹
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;//最重要的一個變數
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&//嘗試獲取鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果獲取鎖失敗,新增到佇列中,由於ReentrantLock是獨佔鎖所以節點必須是EXCLUSIVE型別
selfInterrupt();//新增中斷標識位
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//新建節點
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;//獲取到尾指標
if (pred != null) {//尾指標不等於空,將當前節點替換為尾指標
node.prev = pred;
if (compareAndSetTail(pred, node)) {//採用尾插法,充分利用時間區域性性和空間區域性性。尾插的節點一般不容易被取消。
pred.next = node;
return node;
}
}
enq(node);//cas失敗後執行入隊操作,繼續嘗試
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;//獲取尾指標
if (t == null) { //代表當前佇列沒有節點
if (compareAndSetHead(new Node()))//將當前節點置為頭結點
tail = head;
} else {//當前佇列有節點
node.prev = t;//
if (compareAndSetTail(t, node)) {//將當前節點置為尾結點
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//找到當前節點的前驅節點
if (p == head && tryAcquire(arg)) {//前驅節點等於頭節點嘗試cas搶鎖。
setHead(node);//搶鎖成功將當前節點設定為頭節點
p.next = null; // help GC 當頭結點置空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&//當佇列中有節點在等待,判斷是否應該阻塞
parkAndCheckInterrupt())//阻塞等待,檢查中斷標識位
interrupted = true;//將中斷標識位置為true
}
} finally {
if (failed)//
cancelAcquire(node);//取消當前節點
}
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)//當前節點為空直接返回
return;
node.thread = null;//要取消了將當前節點的執行緒置為空
// Skip cancelled predecessors
Node pred = node.prev;//獲取到當前節點的前驅節點
while (pred.waitStatus > 0)//如果當前節點的前驅節點的狀態大於0,代表是取消狀態,一直找到不是取消狀態的節點
node.prev = pred = pred.prev;
Node predNext = pred.next;//將當前要取消的節點斷鏈
node.waitStatus = Node.CANCELLED;//將當前節點的等待狀態置為CANCELLED
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {//如果當前節點是尾結點,將尾結點替換為淺語節點
compareAndSetNext(pred, predNext, null);//將當前節點的下一個節點置為空,因為當前節點是最後一個節點沒有next指標
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&//前驅節點不等於頭結點
((ws = pred.waitStatus) == Node.SIGNAL ||//前驅節點的狀態不等於SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驅節點的狀態小於0,並且cas將前驅節點的等待置為SIGNAL
pred.thread != null) {//前驅節點的執行緒補位空
Node next = node.next;//獲取當前節點的next指標
if (next != null && next.waitStatus <= 0)//如果next指標不等於空並且等待狀態小於等於0,標識節點有效
compareAndSetNext(pred, predNext, next);//將前驅節點的next指標指向下一個有效節點
} else {
unparkSuccessor(node);//喚醒後續節點 條件:1.前驅節點是頭結點 2.當前節點不是signal,在ReentransLock中基本不會出現,在讀寫鎖時就會出現
}
node.next = node; // help GC 將參照指向自身
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//獲取當前節點狀態
if (ws < 0)//如果節點為負數也即不是取消節點
compareAndSetWaitStatus(node, ws, 0);//cas將當前節點置為0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//獲取到下一個節點
if (s == null || s.waitStatus > 0) {//下一個節點等於空或者下一個節點是取消節點
s = null;//將s置為空
for (Node t = tail; t != null && t != node; t = t.prev)//從尾結點遍歷找到一個不是取消狀態的節點
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//如果s不等於空
LockSupport.unpark(s.thread);//喚醒當前節點s
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//獲取上一個節點的等待狀態
if (ws == Node.SIGNAL)//如果狀態為SIGNAL,代表後續節點有節點可以喚醒,可以安心阻塞去
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {//如果當前狀態大於0,代表節點為CANCELLED狀態
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;//從尾節點開始遍歷,找到下一個狀態不是CANCELLED的節點。將取消節點斷鏈移除
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//這裡需要注意ws>0時,已經找到了一個不是取消狀態的前驅節點。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//將找到的不是CANCELLED節點的前驅節點,將其等待狀態置為SIGNAL
}
return false;
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)//當前節點為空直接返回
return;
node.thread = null;//要取消了將當前節點的執行緒置為空
// Skip cancelled predecessors
Node pred = node.prev;//獲取到當前節點的前驅節點
while (pred.waitStatus > 0)//如果當前節點的前驅節點的狀態大於0,代表是取消狀態,一直找到不是取消狀態的節點
node.prev = pred = pred.prev;
Node predNext = pred.next;//將當前要取消的節點斷鏈
node.waitStatus = Node.CANCELLED;//將當前節點的等待狀態置為CANCELLED
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {//如果當前節點是尾結點,將尾結點替換為淺語節點
compareAndSetNext(pred, predNext, null);//將當前節點的下一個節點置為空,因為當前節點是最後一個節點沒有next指標
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&//前驅節點不等於頭結點
((ws = pred.waitStatus) == Node.SIGNAL ||//前驅節點的狀態不等於SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驅節點的狀態小於0,並且cas將前驅節點的等待置為SIGNAL
pred.thread != null) {//前驅節點的執行緒補位空
Node next = node.next;//獲取當前節點的next指標
if (next != null && next.waitStatus <= 0)//如果next指標不等於空並且等待狀態小於等於0,標識節點有效
compareAndSetNext(pred, predNext, next);//將前驅節點的next指標指向下一個有效節點
} else {
unparkSuccessor(node);//喚醒後續節點 條件:1.前驅節點是頭結點 2.當前節點不是signal,在ReentransLock中基本不會出現,在讀寫鎖時就會出現
}
node.next = node; // help GC 將參照指向自身
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//獲取當前節點狀態
if (ws < 0)//如果節點為負數也即不是取消節點
compareAndSetWaitStatus(node, ws, 0);//cas將當前節點置為0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//獲取到下一個節點
if (s == null || s.waitStatus > 0) {//下一個節點等於空或者下一個節點是取消節點
s = null;//將s置為空
for (Node t = tail; t != null && t != node; t = t.prev)//從尾結點遍歷找到一個不是取消狀態的節點
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//如果s不等於空
LockSupport.unpark(s.thread);//喚醒當前節點s
}
public final boolean release(int arg) {
if (tryRelease(arg)) {//子類實現如何釋放鎖
Node h = head;//獲取到頭結點
if (h != null && h.waitStatus != 0)//獲取到頭結點,如果頭結點不為空,等待狀態不為0,喚醒後續節點
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//獲取節點的等待狀態
if (ws < 0)//如果等待狀態小於0,標識節點屬於有效節點
compareAndSetWaitStatus(node, ws, 0);//將當前節點的等待狀態置為0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//獲取到下一個節點
if (s == null || s.waitStatus > 0) {//如果節點是空,或者是取消狀態的節點,就找到一個非取消狀態的節點,將取消狀態的節點斷鏈後由垃圾回收器進行回收
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//節點不用空
LockSupport.unpark(s.thread);//喚醒當前等待的有效節點S
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//由子類實現
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//將共用節點也即讀執行緒入隊並返回
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//找到節點的前驅節點
if (p == head) {//如果前驅節點等於頭結點
int r = tryAcquireShared(arg);//嘗試獲取共用鎖數量
if (r >= 0) {//如果鎖的數量大於0,表示還有多餘的共用鎖。這裡等於0也需要進一步判斷。由於如果當執行到這裡時,有另外的執行緒釋放了共用鎖,如果不進行判斷,將會導致釋放鎖的執行緒沒辦法喚醒其他執行緒。所以這裡會偽喚醒一個節點,喚醒的節點後續如果沒有鎖釋放,依舊阻塞在當前parkAndCheckInterrupt方法中
setHeadAndPropagate(node, r);//將當前節點的等待狀態設定為Propagate。
p.next = null; // help GC
if (interrupted)//判斷是會否中斷過
selfInterrupt();//設定中斷標識位
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&//判斷是否應該阻塞等待
parkAndCheckInterrupt方法中())//阻塞並檢查中斷標識
interrupted = true;//重置中斷標識位
}
} finally {
if (failed)//如果失敗
cancelAcquire(node);//取消節點
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//將當前節點置為頭結點
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 //可獲取的共用鎖也即讀鎖的數量,對於ReentrantReadWriteLock而言,永遠都是1,所以會繼續喚醒下一個讀執行緒
|| h == null //如果舊的頭結點為空
|| h.waitStatus < 0 ||//頭結點的等待狀態不為0
(h = head) == null || h.waitStatus < 0) {//舊頭節點不為空並且等待狀態小於0也即是有效節點
Node s = node.next;//獲取到node的下一個節點
if (s == null || s.isShared())//如果node的下一個節點為空或者是共用節點
doReleaseShared();//喚醒下一個執行緒
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//子類實現釋放鎖
doReleaseShared();//喚醒後續執行緒
return true;//釋放成功
}
return false;//釋放是吧
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;//獲取到當前頭結點
if (h != null && h != tail) {//如果頭結點不為空並且不等於尾結點
int ws = h.waitStatus;//獲取當前節點的等待狀態
if (ws == Node.SIGNAL) {//如果狀態為SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//cas將SIGNAL狀態置為0。SIGNAL標識後續有執行緒需要喚醒
continue; // loop to recheck cases
unparkSuccessor(h);//喚醒後續執行緒
}
else if (ws == 0 &&//如果當前狀態為0。表示有執行緒將其置為0
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//cas將0狀態置為PROPAGATE。在多個共用鎖同時釋放時,方便繼續進行讀傳播,喚醒後續節點
continue; // loop on failed CAS
}
if (h == head)//如果頭結點沒有改變,證明沒有必要繼續迴圈等待了,直接退出吧,如果頭結點放生變化,可能有其他執行緒釋放了鎖。
break;
}
}
總結:AQS提供了統一的模板,對於如何入隊出隊以及執行緒的喚醒都由AQS提供預設的實現,只需要子類實現自己上鎖和解鎖的邏輯。
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class RWDictionary {
private final Map<String, String> m = new TreeMap<String, String>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public String get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try {
return (String[])m.keySet().toArray();
}
finally { r.unlock(); }
}
public String put(String key, String value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }//高16位元代表共用鎖也即讀鎖
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//低16位元代表獨佔鎖也即寫鎖
}
//讀寫鎖中公平鎖的實現:如果後續佇列中有等待的節點,那麼進行排隊。但是這樣的效能並不高,可能造成寫飢餓。
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
//如果佇列中有節點在排隊,那麼就進行等待
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
//非公平鎖的實現相對來說效能較高,不阻塞,可以直接搶鎖,防止寫飢餓。同時有寫執行緒排隊,那麼後續的所有讀操作都需要進行排隊
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge //非公平鎖寫鎖永遠不阻塞,防止寫飢餓
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
//如果佇列中有任務並且不是共用節點也即不是讀操作,那麼返回true,進行阻塞
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&//頭結點不等於空
(s = h.next) != null &&//頭結點的下一個節點補位空
!s.isShared() &&//並且當前節點不是共用節點,也即讀執行緒
s.thread != null;//執行緒不為空
}
public void lock() {
sync.acquire(1);
}
//同樣呼叫acquire的方法,請參考上文AQS的acquire方法的實現。同樣都是獲取失敗進行排隊
//子類獲取鎖的過程
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();//獲取當前執行緒
int c = getState();//獲取當前state狀態值
int w = exclusiveCount(c);//獲取讀鎖的數量,這裡doug lea一直都是使用二進位制的風格
if (c != 0) {//代表持有鎖
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())//如果寫鎖為0,或則當前執行緒不是寫鎖的持有執行緒
return false;//返回false
if (w + exclusiveCount(acquires) > MAX_COUNT)//當前鎖重入次數大於最大值,丟擲異常
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);//直接設定狀態,代表鎖重入次數加1
return true;//返回獲取鎖成功
}
if (writerShouldBlock() ||//寫鎖是否應該阻塞
!compareAndSetState(c, c + acquires))//cas嘗試獲取鎖
return false;
setExclusiveOwnerThread(current);//cas搶鎖成功,直接將寫鎖的持有執行緒設定為當前執行緒
return true;//返回true代表獲取鎖成功
}
//writerShouldBlock的實現請參考上面的公平鎖和非公平鎖的實現
public void unlock() {
sync.release(1);
}
//這裡是AQS的模板方法
public final boolean release(int arg) {
if (tryRelease(arg)) {//子類實現如何釋放鎖
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//如果不是獨佔鎖也即寫鎖,直接拋異常
throw new IllegalMonitorStateException();
int nextc = getState() - releases;//將鎖的數量減少
boolean free = exclusiveCount(nextc) == 0;//如果獨佔鎖也即寫鎖數量為0,標識已經釋放。大於1是因為鎖重入的問題
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
public void lock() {
sync.acquireShared(1);
}
//acquireShared方法請看上面AQS的實現
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();//獲取當前執行緒
int c = getState();//獲取當前state狀態
if (exclusiveCount(c) != 0 &&//如果獨佔鎖也即寫鎖的數量不等於0
getExclusiveOwnerThread() != current)//獨佔鎖的擁有執行緒不是當前執行緒
return -1;//直接返回-1,獲取失敗
int r = sharedCount(c);//獲取共用鎖的數量
if (!readerShouldBlock() &&//讀鎖是否應該被阻塞
r < MAX_COUNT &&//讀鎖數量是否小於最大數量
compareAndSetState(c, c + SHARED_UNIT)) {//cas將讀鎖+1
if (r == 0) {//當前共用鎖也即讀鎖數量為0
firstReader = current;//標記為第一個讀執行緒
firstReaderHoldCount = 1;//將第一個讀執行緒持有鎖數量置為1,主要為了判斷讀鎖的重入
} else if (firstReader == current) {//第一個執行緒讀等於當前執行緒
firstReaderHoldCount++;//第一個讀執行緒持有鎖數量
} else {
HoldCounter rh = cachedHoldCounter;//先讀取到最後一個執行緒持有共用鎖的數量
if (rh == null || rh.tid != getThreadId(current))//如果為空或者最後一個執行緒不是當前執行緒
cachedHoldCounter = rh = readHolds.get();//從快取中獲取當前執行緒持有讀鎖的數量
else if (rh.count == 0)//如果讀鎖持有數量為0
readHolds.set(rh);//將值設定並儲存
rh.count++;//持有執行緒數+1
}
return 1;//返回1標識獲取鎖成功
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;//
for (;;) {
int c = getState();//獲取當前狀態值
if (exclusiveCount(c) != 0) {//如果持有寫鎖的執行緒數補位0
if (getExclusiveOwnerThread() != current)//持有寫鎖的執行緒不是當前執行緒
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {//讀鎖是否應該被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)//如果讀執行緒的數量等於最大數量,丟擲異常
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {//cas嘗試搶鎖
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
public void unlock() {
sync.releaseShared(1);
}
//詳情請檢視上面AQS的實現
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {//如果當前執行緒是第一個讀執行緒
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)//當前第一個讀執行緒持有的讀鎖的數量是1
firstReader = null;//直接置位空
else
firstReaderHoldCount--;//否則將持有讀鎖的數量減1
} else {
HoldCounter rh = cachedHoldCounter;//先獲取最後一個讀執行緒的持有資訊
if (rh == null || rh.tid != getThreadId(current))//如果rh為空或者最後一個讀執行緒不是當前執行緒
rh = readHolds.get();//從快取中獲取當前執行緒的持有鎖的資訊
int count = rh.count;//獲取到數量
if (count <= 1) {//如果持有讀鎖的數量小於等於1
readHolds.remove();//直接清空快取
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;//減去持有讀鎖的數量
}
for (;;) {
int c = getState();//獲取當前狀態
int nextc = c - SHARED_UNIT;//將讀鎖的數量減1
if (compareAndSetState(c, nextc))//cas原子性賦值
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;//如果為0表示當前執行緒已經釋放了所有讀鎖。
}
}
本文章只是JUC 中AQS的一部分,後續的文章會對基於AQS鎖實現的子類進行拓展講解,以上文章內容基於個人以及結合別人文章的理解,如果有問題或者不當之處歡迎大家留言交流。由於為了保證觀看流暢性,其中一部分原始碼有重複的地方。請見諒