通俗來講,鎖要保證的就是原子性,就是一個程式碼塊不允許多執行緒同時執行,就是鎖。從生活的角度上來說,就比如你要去上廁所,當你在上廁所期間,你會把門鎖上,其他人只能排隊。不允許多個人同時上廁所。
java語言是執行在jvm之上,jvm是由C++實現的。java本身沒有對應的底層鎖實現,它將鎖的問題拋給了C++。C++將鎖的實現拋給了組合語言,組合語言將問題拋給作業系統。所以最後還是由作業系統提供的cmpxchg指令。通過 lock cmpxchg指令實現了cpu對於單個變數的原子操作。lock cmpxchg涉及到快取一致性協定(MESI)與匯流排鎖。(個人能力有限,自行了解吧)
執行緒不停執行某一個程式碼塊,直到滿足條件或者操作重試次數,也就是我們所說的CAS。
boolean state = true/false;標識有鎖與無鎖。但是一旦有鎖衝入的情況,我們就需要引入一個新的變數去儲存鎖衝入的次數。所以JUC中使用int state,預設值為0代表無鎖狀態。上增的數值代表所衝入的次數。
通過cas實現多執行緒搶鎖
1.自旋,繼續搶鎖直到搶成功
2.阻塞,執行緒阻塞直到有執行緒喚醒
3.自旋+阻塞,自旋搶鎖一定次數,如果失敗,執行緒阻塞。
1.優點:節省執行緒上線文切換的時間。適用於執行步驟少且快的場景,節省cpu資源
2.缺點:佔用cpu資源,消耗cpu效能
注意點:當cpu個數增加且執行緒數增加,可能導致自旋鎖的優點退化成缺點。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;//最重要的一個變數
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&//嘗試獲取鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果獲取鎖失敗,新增到佇列中,由於ReentrantLock是獨佔鎖所以節點必須是EXCLUSIVE型別
selfInterrupt();//新增中斷標識位
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//新建節點
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;//獲取到尾指標
if (pred != null) {//尾指標不等於空,將當前節點替換為尾指標
node.prev = pred;
if (compareAndSetTail(pred, node)) {//採用尾插法,充分利用時間區域性性和空間區域性性。尾插的節點一般不容易被取消。
pred.next = node;
return node;
}
}
enq(node);//cas失敗後執行入隊操作,繼續嘗試
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;//獲取尾指標
if (t == null) { //代表當前佇列沒有節點
if (compareAndSetHead(new Node()))//將當前節點置為頭結點
tail = head;
} else {//當前佇列有節點
node.prev = t;//
if (compareAndSetTail(t, node)) {//將當前節點置為尾結點
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//找到當前節點的前驅節點
if (p == head && tryAcquire(arg)) {//前驅節點等於頭節點嘗試cas搶鎖。
setHead(node);//搶鎖成功將當前節點設定為頭節點
p.next = null; // help GC 當頭結點置空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&//當佇列中有節點在等待,判斷是否應該阻塞
parkAndCheckInterrupt())//阻塞等待,檢查中斷標識位
interrupted = true;//將中斷標識位置為true
}
} finally {
if (failed)//
cancelAcquire(node);//取消當前節點
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//獲取上一個節點的等待狀態
if (ws == Node.SIGNAL)//如果狀態為SIGNAL,代表後續節點有節點可以喚醒,可以安心阻塞去
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {//如果當前狀態大於0,代表節點為CANCELLED狀態
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;//從尾節點開始遍歷,找到下一個狀態不是CANCELLED的節點。將取消節點斷鏈移除
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//這裡需要注意ws>0時,已經找到了一個不是取消狀態的前驅節點。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//將找到的不是CANCELLED節點的前驅節點,將其等待狀態置為SIGNAL
}
return false;
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)//當前節點為空直接返回
return;
node.thread = null;//要取消了將當前節點的執行緒置為空
// Skip cancelled predecessors
Node pred = node.prev;//獲取到當前節點的前驅節點
while (pred.waitStatus > 0)//如果當前節點的前驅節點的狀態大於0,代表是取消狀態,一直找到不是取消狀態的節點
node.prev = pred = pred.prev;
Node predNext = pred.next;//將當前要取消的節點斷鏈
node.waitStatus = Node.CANCELLED;//將當前節點的等待狀態置為CANCELLED
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {//如果當前節點是尾結點,將尾結點替換為淺語節點
compareAndSetNext(pred, predNext, null);//將當前節點的下一個節點置為空,因為當前節點是最後一個節點沒有next指標
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&//前驅節點不等於頭結點
((ws = pred.waitStatus) == Node.SIGNAL ||//前驅節點的狀態不等於SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驅節點的狀態小於0,並且cas將前驅節點的等待置為SIGNAL
pred.thread != null) {//前驅節點的執行緒補位空
Node next = node.next;//獲取當前節點的next指標
if (next != null && next.waitStatus <= 0)//如果next指標不等於空並且等待狀態小於等於0,標識節點有效
compareAndSetNext(pred, predNext, next);//將前驅節點的next指標指向下一個有效節點
} else {
unparkSuccessor(node);//喚醒後續節點 條件:1.前驅節點是頭結點 2.當前節點不是signal,在ReentransLock中基本不會出現,在讀寫鎖時就會出現
}
node.next = node; // help GC 將參照指向自身
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//獲取當前節點狀態
if (ws < 0)//如果節點為負數也即不是取消節點
compareAndSetWaitStatus(node, ws, 0);//cas將當前節點置為0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//獲取到下一個節點
if (s == null || s.waitStatus > 0) {//下一個節點等於空或者下一個節點是取消節點
s = null;//將s置為空
for (Node t = tail; t != null && t != node; t = t.prev)//從尾結點遍歷找到一個不是取消狀態的節點
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//如果s不等於空
LockSupport.unpark(s.thread);//喚醒當前節點s
}
總結:AQS提供了統一的模板,對於如何入隊出隊以及執行緒的喚醒都由AQS提供預設的實現,只需要子類實現自己上鎖和解鎖的邏輯。
public class ReentrantLock extends Thread {
private static ReentrantLock lock=new ReentrantLock(true); //引數為true表示為公平鎖,請對比輸出結果
public void run() {
for(int i=0; i<100; i++) {
lock.lock();
try{
System.out.println(Thread.currentThread().getName()+"獲得鎖");
}finally{
lock.unlock();
}
}
}
public static void main(String[] args) {
ReentrantLock rl=new ReentrantLock();
Thread th1=new Thread(rl);
Thread th2=new Thread(rl);
th1.start();
th2.start();
}
}
public void lock() {
sync.lock();
}
在AQS中對於具體的lock方法,並不做具體的實現,由子類自由拓展。ReentrantLock中lock方法中的實現分為公平鎖和非公平鎖的實現。所以lock方法有兩個實現。
final void lock() {
acquire(1);//獲取鎖
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&//嘗試獲取鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果獲取鎖失敗,新增到佇列中,由於ReentrantLock是獨佔鎖所以節點必須是EXCLUSIVE型別
selfInterrupt();//新增中斷標識位
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();//獲取到當前執行緒
int c = getState();//獲取當前的同步狀態值
if (c == 0) {//代表沒有執行緒佔有鎖
if (!hasQueuedPredecessors() &&//是否有前驅節點
compareAndSetState(0, acquires)) {//如果沒有前驅節點,cas將當前狀態值置為acquires,也就是1,成功代表獲取到鎖
setExclusiveOwnerThread(current);//標識當前屬於互斥狀態執行緒的擁有者是當前執行緒
return true;//true,代表獲取鎖成功
}
}
else if (current == getExclusiveOwnerThread()) {//進入這裡代表state不為0,有其他執行緒獲得鎖
int nextc = c + acquires;//鎖衝入,將衝入次數加1
if (nextc < 0)//衝入次數不能少於0,少於0是非法值,丟擲異常
throw new Error("Maximum lock count exceeded");
setState(nextc);//設定state狀態值
return true;//true,代表獲取鎖成功
}
return false;//返回false,代表獲取鎖失敗
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // 獲取佇列的尾指標
Node h = head;// 獲取佇列的頭指標
Node s;
return h != t &&//如果頭結點和尾結點不是同一個節點
((s = h.next) == null || s.thread != Thread.currentThread());//頭結點的下一個節點為空或者當前頭結點的執行緒不等於當前執行緒。
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//新建節點
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;//獲取到尾指標
if (pred != null) {//尾指標不等於空,將當前節點替換為尾指標
node.prev = pred;
if (compareAndSetTail(pred, node)) {//採用尾插法,充分利用時間區域性性和空間區域性性。尾插的節點一般不容易被取消。
pred.next = node;
return node;
}
}
enq(node);//cas失敗後執行入隊操作,繼續嘗試
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;//獲取尾指標
if (t == null) { //代表當前佇列沒有節點
if (compareAndSetHead(new Node()))//將當前節點置為頭結點
tail = head;
} else {//當前佇列有節點
node.prev = t;//
if (compareAndSetTail(t, node)) {//將當前節點置為尾結點
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//找到當前節點的前驅節點
if (p == head && tryAcquire(arg)) {//前驅節點等於頭節點嘗試cas搶鎖。
setHead(node);//搶鎖成功將當前節點設定為頭節點
p.next = null; // help GC 當頭結點置空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&//當佇列中有節點在等待,判斷是否應該阻塞
parkAndCheckInterrupt())//阻塞等待,檢查中斷標識位
interrupted = true;//將中斷標識位置為true
}
} finally {
if (failed)//
cancelAcquire(node);//取消當前節點
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//獲取上一個節點的等待狀態
if (ws == Node.SIGNAL)//如果狀態為SIGNAL,代表後續節點有節點可以喚醒,可以安心阻塞去
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {//如果當前狀態大於0,代表節點為CANCELLED狀態
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;//從尾節點開始遍歷,找到下一個狀態不是CANCELLED的節點。將取消節點斷鏈移除
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//這裡需要注意ws>0時,已經找到了一個不是取消狀態的前驅節點。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//將找到的不是CANCELLED節點的前驅節點,將其等待狀態置為SIGNAL
}
return false;
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)//當前節點為空直接返回
return;
node.thread = null;//要取消了將當前節點的執行緒置為空
// Skip cancelled predecessors
Node pred = node.prev;//獲取到當前節點的前驅節點
while (pred.waitStatus > 0)//如果當前節點的前驅節點的狀態大於0,代表是取消狀態,一直找到不是取消狀態的節點
node.prev = pred = pred.prev;
Node predNext = pred.next;//將當前要取消的節點斷鏈
node.waitStatus = Node.CANCELLED;//將當前節點的等待狀態置為CANCELLED
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {//如果當前節點是尾結點,將尾結點替換為淺語節點
compareAndSetNext(pred, predNext, null);//將當前節點的下一個節點置為空,因為當前節點是最後一個節點沒有next指標
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&//前驅節點不等於頭結點
((ws = pred.waitStatus) == Node.SIGNAL ||//前驅節點的狀態不等於SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驅節點的狀態小於0,並且cas將前驅節點的等待置為SIGNAL
pred.thread != null) {//前驅節點的執行緒補位空
Node next = node.next;//獲取當前節點的next指標
if (next != null && next.waitStatus <= 0)//如果next指標不等於空並且等待狀態小於等於0,標識節點有效
compareAndSetNext(pred, predNext, next);//將前驅節點的next指標指向下一個有效節點
} else {
unparkSuccessor(node);//喚醒後續節點 條件:1.前驅節點是頭結點 2.當前節點不是signal,在ReentransLock中基本不會出現,在讀寫鎖時就會出現
}
node.next = node; // help GC 將參照指向自身
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//獲取當前節點狀態
if (ws < 0)//如果節點為負數也即不是取消節點
compareAndSetWaitStatus(node, ws, 0);//cas將當前節點置為0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//獲取到下一個節點
if (s == null || s.waitStatus > 0) {//下一個節點等於空或者下一個節點是取消節點
s = null;//將s置為空
for (Node t = tail; t != null && t != node; t = t.prev)//從尾結點遍歷找到一個不是取消狀態的節點
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//如果s不等於空
LockSupport.unpark(s.thread);//喚醒當前節點s
}
final void lock() {
if (compareAndSetState(0, 1))//上來直接搶鎖。
setExclusiveOwnerThread(Thread.currentThread());//搶鎖成功,將當前互斥鎖的擁有執行緒設定為當前執行緒
else
acquire(1);//cas失敗去acquire
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&//嘗試獲取鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果獲取鎖失敗,新增到佇列中,由於ReentrantLock是獨佔鎖所以節點必須是EXCLUSIVE型別
selfInterrupt();//新增中斷標識位
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();//獲取到當前執行緒
int c = getState();//獲取當前狀態
if (c == 0) {//代表沒有鎖
if (compareAndSetState(0, acquires)) {//cas自旋嘗試獲得鎖
setExclusiveOwnerThread(current);//當前獨佔鎖的擁有執行緒設定為當前執行緒
return true;//返回
}
}
else if (current == getExclusiveOwnerThread()) {//如果當前執行緒之前已經獲取到鎖
int nextc = c + acquires;//鎖重入
if (nextc < 0) // overflow 鎖重入不能為0
throw new Error("Maximum lock count exceeded");
setState(nextc);//設定最新的state狀態
return true;
}
return false;
}
acquireQueued方法和addWaiter方法與公平鎖的實現一致
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {//嘗試釋放鎖
Node h = head;
if (h != null && h.waitStatus != 0)//如果頭結點不為空並且頭結點的等待狀態不等於0
unparkSuccessor(h);//喚醒頭結點
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//當前的狀態減去釋放鎖的數量
if (Thread.currentThread() != getExclusiveOwnerThread())//如果當前執行緒不是獨佔鎖的執行緒,沒鎖還要釋放,不就丟擲異常了嗎
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//狀態為0
free = true;
setExclusiveOwnerThread(null);//將當前互斥鎖的擁有執行緒設定為空
}
setState(c);//設定狀態位
return free;
}
本文章只是JUC 中AQS的一部分,後續的文章會對基於AQS鎖實現的子類進行拓展講解,以上文章內容基於個人以及結合別人文章的理解,如果有問題或者不當之處歡迎大家留言交流。由於為了保證觀看流暢性,其中一部分原始碼有重複的地方。請見諒