package com.morris.concurrent.lock.reentrantlock.trace;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class InterruptDemo {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
LockSupport.park();
System.out.println("t1:" + Thread.interrupted()); // true
System.out.println("t1:" + Thread.currentThread().isInterrupted()); // false
});
t1.start();
TimeUnit.SECONDS.sleep(1);
t1.interrupt();
System.out.println("main:" + t1.isInterrupted()); // true
}
}
package com.morris.concurrent.lock.reentrantlock.trace;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 演示獨佔鎖的中斷
*/
public class InterruptThreadDemo {
public static void main(String[] args) throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("t1");
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}, "t1").start();
TimeUnit.SECONDS.sleep(1); // 等待t1啟動
Thread t2 = new Thread(() -> {
try {
reentrantLock.lockInterruptibly();
try {
System.out.println("t2 get lock");
} finally {
reentrantLock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
t2.start();
TimeUnit.SECONDS.sleep(1); // 等待t2啟動
t2.interrupt(); // 中斷t2,會丟擲InterruptedException異常
}
}
java.util.concurrent.locks.ReentrantLock#lockInterruptibly
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireInterruptibly
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 還沒獲取鎖之前就中斷了就直接丟擲異常
throw new InterruptedException();
if (!tryAcquire(arg)) // 嘗試獲取鎖,與不可中斷的邏輯一致,區分公平與非公平
doAcquireInterruptibly(arg);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireInterruptibly
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); // 將當前執行緒封裝成Node節點加入到同步佇列尾部
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 被中斷了直接丟擲異常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
package com.morris.concurrent.lock.reentrantlock.trace;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 演示獨佔鎖中不可中斷方法lock()對中斷的處理
*/
public class InterruptThreadDemo2 {
public static void main(String[] args) throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
new Thread(() -> {
reentrantLock.lock();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}, "t1").start();
TimeUnit.SECONDS.sleep(1); // 等待t1啟動
Thread t2 = new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("t2: " + Thread.currentThread().isInterrupted()); // true
}finally {
reentrantLock.unlock();
}
}, "t2");
t2.start();
TimeUnit.SECONDS.sleep(1); // 等待t2啟動
t2.interrupt(); // 中斷t2
System.out.println("main: " + t2.isInterrupted()); // false
}
}
從執行結果可以發現,當對t2進行中斷處理後,t2的中斷標記位本應該為true,但是由於此時正在獲取鎖(不可中斷),所以AQS將這個中斷標記位先置為false,等待獲取到鎖後,重新將這個中斷標記位還原為true。
下面從原始碼進行分析:
直接定位到java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 這裡只是記錄了一下中斷標記,如果是第一個等待的執行緒又會嘗試獲取一次鎖,沒獲取到繼續休眠,當這個執行緒獲得鎖後,會將中斷標記狀態返回
}
} finally {
if (failed)
cancelAcquire(node);
}
}
上面的方法執行完後返回到下面的方法。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); // 執行緒被中斷了會進入到這裡
}
static void selfInterrupt() {
Thread.currentThread().interrupt(); // 自己把自己中斷,置中斷標記位為true
}
超時獲取鎖的使用如下:
package com.morris.concurrent.lock.reentrantlock.trace;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 演示獨佔鎖的超時
*/
public class TimeoutThreadDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
ReentrantLock reentrantLock = new ReentrantLock();
new Thread(()->{
reentrantLock.lock();
try {
countDownLatch.countDown();
System.out.println("t1");
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}, "t1").start();
countDownLatch.await();
new Thread(()->{
try {
if(reentrantLock.tryLock(3, TimeUnit.SECONDS)) {
System.out.println("t2 get lock");
reentrantLock.unlock();
} else {
System.out.println("t2 get lock timeout");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
}
java.util.concurrent.locks.ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit)
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted()) // 還沒獲取鎖之前就中斷了就直接丟擲異常
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireNanos
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout; // 超時時間點
final Node node = addWaiter(Node.EXCLUSIVE); // 將當前執行緒封裝成Node節點加入到同步佇列尾部
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime(); // 剩餘等待時間
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) // 超時時間小於1000納秒則直接自旋,不會休眠
LockSupport.parkNanos(this, nanosTimeout); // 帶超時時間的休眠
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}