AQS原始碼分析之中斷與超時獲取鎖

2020-09-25 11:00:29

AQS原始碼分析之中斷與超時獲取鎖

中斷基礎

  • Thread.interrupt():物件方法,置中斷標記為true。
  • Thread.currentThread().isInterrupted():物件方法,返回執行緒當前的中斷標記狀態,不會清除中斷標誌位。
  • Thread.interrupted():靜態方法,返回執行緒當前的中斷標記狀態同時清除中斷標誌位(置為false)。
package com.morris.concurrent.lock.reentrantlock.trace;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class InterruptDemo {
    public static void main(String[] args) throws InterruptedException {

        Thread t1 = new Thread(() -> {
            LockSupport.park();
            System.out.println("t1:" + Thread.interrupted()); // true
            System.out.println("t1:" + Thread.currentThread().isInterrupted()); // false
        });
        t1.start();

        TimeUnit.SECONDS.sleep(1);
        t1.interrupt();
        System.out.println("main:" + t1.isInterrupted()); // true
    }
}

中斷方法lockInterruptibly()的原始碼分析

package com.morris.concurrent.lock.reentrantlock.trace;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 演示獨佔鎖的中斷
 */
public class InterruptThreadDemo {

    public static void main(String[] args) throws InterruptedException {

        ReentrantLock reentrantLock = new ReentrantLock();
        new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("t1");
                TimeUnit.SECONDS.sleep(30);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }, "t1").start();

        TimeUnit.SECONDS.sleep(1); // 等待t1啟動

        Thread t2 = new Thread(() -> {
            try {
                reentrantLock.lockInterruptibly();
                try {
                    System.out.println("t2 get lock");
                } finally {
                    reentrantLock.unlock();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2");
        t2.start();

        TimeUnit.SECONDS.sleep(1); // 等待t2啟動

        t2.interrupt(); // 中斷t2,會丟擲InterruptedException異常
    }

}

java.util.concurrent.locks.ReentrantLock#lockInterruptibly

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireInterruptibly

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted()) // 還沒獲取鎖之前就中斷了就直接丟擲異常
        throw new InterruptedException();
    if (!tryAcquire(arg)) // 嘗試獲取鎖,與不可中斷的邏輯一致,區分公平與非公平
        doAcquireInterruptibly(arg);
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireInterruptibly

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE); // 將當前執行緒封裝成Node節點加入到同步佇列尾部
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException(); // 被中斷了直接丟擲異常
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

不可中斷方法lock()對中斷的處理

package com.morris.concurrent.lock.reentrantlock.trace;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 演示獨佔鎖中不可中斷方法lock()對中斷的處理
 */
public class InterruptThreadDemo2 {

    public static void main(String[] args) throws InterruptedException {

        ReentrantLock reentrantLock = new ReentrantLock();
        new Thread(() -> {
            reentrantLock.lock();
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }, "t1").start();

        TimeUnit.SECONDS.sleep(1); // 等待t1啟動

        Thread t2 = new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("t2: " + Thread.currentThread().isInterrupted()); // true
            }finally {
                reentrantLock.unlock();
            }
        }, "t2");
        t2.start();

        TimeUnit.SECONDS.sleep(1); // 等待t2啟動

        t2.interrupt(); // 中斷t2
        System.out.println("main: " + t2.isInterrupted()); // false
    }

}

從執行結果可以發現,當對t2進行中斷處理後,t2的中斷標記位本應該為true,但是由於此時正在獲取鎖(不可中斷),所以AQS將這個中斷標記位先置為false,等待獲取到鎖後,重新將這個中斷標記位還原為true。

下面從原始碼進行分析:

直接定位到java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; // 這裡只是記錄了一下中斷標記,如果是第一個等待的執行緒又會嘗試獲取一次鎖,沒獲取到繼續休眠,當這個執行緒獲得鎖後,會將中斷標記狀態返回
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上面的方法執行完後返回到下面的方法。

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt(); // 執行緒被中斷了會進入到這裡
}

static void selfInterrupt() {
    Thread.currentThread().interrupt(); // 自己把自己中斷,置中斷標記位為true
}

超時獲取鎖原始碼分析

超時獲取鎖的使用如下:

package com.morris.concurrent.lock.reentrantlock.trace;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 演示獨佔鎖的超時
 */
public class TimeoutThreadDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        ReentrantLock reentrantLock = new ReentrantLock();
        new Thread(()->{
            reentrantLock.lock();
            try {
                countDownLatch.countDown();
                System.out.println("t1");
                TimeUnit.SECONDS.sleep(30);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }, "t1").start();

        countDownLatch.await();

        new Thread(()->{
            try {
                if(reentrantLock.tryLock(3, TimeUnit.SECONDS)) {
                    System.out.println("t2 get lock");
                    reentrantLock.unlock();
                } else {
                    System.out.println("t2 get lock timeout");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }

}

java.util.concurrent.locks.ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit)

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireNanos

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted()) // 還沒獲取鎖之前就中斷了就直接丟擲異常
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireNanos

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout; // 超時時間點
    final Node node = addWaiter(Node.EXCLUSIVE); // 將當前執行緒封裝成Node節點加入到同步佇列尾部
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime(); // 剩餘等待時間
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold) // 超時時間小於1000納秒則直接自旋,不會休眠
                LockSupport.parkNanos(this, nanosTimeout); // 帶超時時間的休眠
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}