作者:Grey
原文地址:
StampedLock
其實是對讀寫鎖的一種改進,它支援在讀同時進行一個寫操作,也就是說,它的效能將會比讀寫鎖更快。
更通俗的講就是在讀鎖沒有釋放的時候是可以獲取到一個寫鎖,獲取到寫鎖之後,讀鎖阻塞,這一點和讀寫鎖一致,唯一的區別在於讀寫鎖不支援在沒有釋放讀鎖的時候獲取寫鎖。
StampedLock
有三種模式:
悲觀讀:允許多個執行緒獲取悲觀讀鎖。
寫鎖:寫鎖和悲觀讀是互斥的。
樂觀讀:無鎖機制,類似於資料庫中的樂觀鎖,它支援在不釋放樂觀讀的時候是可以獲取到一個寫鎖。
參考: 有沒有比讀寫鎖更快的鎖?
範例程式碼:
悲觀讀 + 寫鎖:
package git.snippets.juc;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.StampedLock;
import java.util.logging.Logger;
// 悲觀讀 + 寫鎖
public class StampedLockPessimistic {
private static final Logger log = Logger.getLogger(StampedLockPessimistic.class.getName());
private static final StampedLock lock = new StampedLock();
//快取中儲存的資料
private static final Map<String, String> mapCache = new HashMap<>();
//模擬資料庫儲存的資料
private static final Map<String, String> mapDb = new HashMap<>();
static {
mapDb.put("zhangsan", "你好,我是張三");
mapDb.put("sili", "你好,我是李四");
}
private static void getInfo(String name) {
//獲取悲觀讀
long stamp = lock.readLock();
log.info("執行緒名:" + Thread.currentThread().getName() + " 獲取了悲觀讀鎖" + " 使用者名稱:" + name);
try {
if ("zhangsan".equals(name)) {
log.info("執行緒名:" + Thread.currentThread().getName() + " 休眠中" + " 使用者名稱:" + name);
Thread.sleep(3000);
log.info("執行緒名:" + Thread.currentThread().getName() + " 休眠結束" + " 使用者名稱:" + name);
}
String info = mapCache.get(name);
if (null != info) {
log.info("在快取中獲取到了資料");
return;
}
} catch (InterruptedException e) {
log.info("執行緒名:" + Thread.currentThread().getName() + " 釋放了悲觀讀鎖");
e.printStackTrace();
} finally {
//釋放悲觀讀
lock.unlock(stamp);
}
//獲取寫鎖
stamp = lock.writeLock();
log.info("執行緒名:" + Thread.currentThread().getName() + " 獲取了寫鎖" + " 使用者名稱:" + name);
try {
//判斷一下快取中是否被插入了資料
String info = mapCache.get(name);
if (null != info) {
log.info("獲取到了寫鎖,再次確認在快取中獲取到了資料");
return;
}
//這裡是往資料庫獲取資料
String infoByDb = mapDb.get(name);
//將資料插入快取
mapCache.put(name, infoByDb);
log.info("快取中沒有資料,在資料庫獲取到了資料");
} finally {
//釋放寫鎖
log.info("執行緒名:" + Thread.currentThread().getName() + " 釋放了寫鎖" + " 使用者名稱:" + name);
lock.unlock(stamp);
}
}
public static void main(String[] args) {
//執行緒1
Thread t1 = new Thread(() -> {
getInfo("zhangsan");
});
//執行緒2
Thread t2 = new Thread(() -> {
getInfo("lisi");
});
//執行緒啟動
t1.start();
t2.start();
//執行緒同步
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
樂觀讀:
package git.snippets.juc;
import java.util.concurrent.locks.StampedLock;
import java.util.logging.Logger;
// 樂觀寫
public class StampedLockOptimistic {
private static final Logger log = Logger.getLogger(StampedLockOptimistic.class.getName());
private static final StampedLock lock = new StampedLock();
private static int num1 = 1;
private static int num2 = 1;
/**
* 修改成員變數的值,+1
*
* @return
*/
private static int sum() {
log.info("求和方法被執行了");
//獲取樂觀讀
long stamp = lock.tryOptimisticRead();
int cnum1 = num1;
int cnum2 = num2;
log.info("獲取到的成員變數值,cnum1:" + cnum1 + " cnum2:" + cnum2);
try {
//休眠3秒,目的是為了讓其他執行緒修改掉成員變數的值。
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//判斷在執行期間是否存在寫操作 true:不存在 false:存在
if (!lock.validate(stamp)) {
log.info("存在寫操作!");
//存在寫鎖
//升級悲觀讀鎖
stamp = lock.readLock();
try {
log.info("升級悲觀讀鎖");
cnum1 = num1;
cnum2 = num2;
log.info("重新獲取了成員變數的值=========== cnum1=" + cnum1 + " cnum2=" + cnum2);
} finally {
//釋放悲觀讀鎖
lock.unlock(stamp);
}
}
return cnum1 + cnum2;
}
//使用寫鎖修改成員變數的值
private static void updateNum() {
long stamp = lock.writeLock();
try {
num1 = 2;
num2 = 2;
} finally {
lock.unlock(stamp);
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
int sum = sum();
log.info("求和結果:" + sum);
});
t1.start();
//休眠1秒,目的為了讓執行緒t1能執行到獲取成員變數之後
Thread.sleep(1000);
updateNum();
t1.join();
log.info("執行完畢");
}
}
看名字就能看出來StampedLock
不支援重入鎖。
它適用於讀多寫少的情況,如果不是這種情況,請慎用,效能可能還不如synchronized
。
StampedLock
的悲觀讀鎖、寫鎖不支援條件變數。
千萬不能中斷阻塞的悲觀讀鎖或寫鎖,如果呼叫阻塞執行緒的interrupt()
,會導致cpu飆升,如果希望StampedLock
支援中斷操作,請使用readLockInterruptibly
(悲觀讀鎖)與writeLockInterruptibly
(寫鎖)。
類似門閂的概念,可以替代join
,但是比join
靈活,因為一個執行緒裡面可以多次countDown
,但是join
一定要等執行緒完成才能執行。
其底層原理是:呼叫await()
方法的執行緒會利用AQS
排隊,一旦數位減為0,則會將AQS
中排隊的執行緒依次喚醒。
程式碼如下:
package git.snippets.juc;
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch可以用Join替代
*/
public class CountDownLatchAndJoin {
public static void main(String[] args) {
useCountDownLatch();
useJoin();
}
public static void useCountDownLatch() {
// use countdownlatch
long start = System.currentTimeMillis();
Thread[] threads = new Thread[100000];
CountDownLatch latch = new CountDownLatch(threads.length);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int i1 = 0; i1 < 1000; i1++) {
result += i1;
}
// System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result);
latch.countDown();
});
}
for (Thread thread : threads) {
thread.start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("end latch down, time is " + (end - start));
}
public static void useJoin() {
long start = System.currentTimeMillis();
// use join
Thread[] threads = new Thread[100000];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int i1 = 0; i1 < 1000; i1++) {
result += i1;
}
// System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result);
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println("end join, time is " + (end - start));
}
}
類似柵欄,類比:滿了20個乘客就發車 這樣的場景。
比如:一個程式可能收集如下來源的資料:
資料庫
網路
檔案
程式可以並行執行,用執行緒操作1,2,3,然後操作完畢後再合併, 然後執行後續的邏輯操作,就可以使用CyclicBarrier
程式碼如下:
package git.snippets.juc;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier範例:滿員發車
*
* @author <a href="mailto:[email protected]">Grey</a>
* @since 1.8
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(20, () -> {
System.out.println("滿了20,發車");
});
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
表示號誌,有如下兩個操作:
s.acquire()
號誌減1
s.release()
號誌加1
到 0 以後,就不能執行了,這個可以用於限流。
底層原理是:如果沒有執行緒許可可用,則執行緒阻塞,並通過 AQS 來排隊,可以通過release()
方法來釋放許可,當某個執行緒釋放了某個許可後,會從 AQS 中正在排隊的第一個執行緒依次開始喚醒,直到沒有空閒許可。
Semaphore 使用範例:有N個執行緒來存取,我需要限制同時執行的只有號誌大小的執行緒數。
程式碼如下:
package git.snippets.juc;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Semaphore用於限流
*/
public class SemaphoreUsage {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(1);
new Thread(() -> {
try {
semaphore.acquire();
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 1 executed");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
new Thread(() -> {
try {
semaphore.acquire();
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 2 executed");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
Semaphore
可以有公平和非公平的方式進行設定。
Semaphore
和CountDownLatch
的區別?
Semaphore 是號誌,可以做限流,限制 n 個執行緒並行,釋放一個執行緒後就又能進來一個新的執行緒。
CountDownLatch 是閉鎖,帶有阻塞的功能,必須等到 n 個執行緒都執行完後,被阻塞的執行緒才能繼續往下執行。
採用令牌桶演演算法,用於限流
範例程式碼如下
package git.snippets.juc;
import com.google.common.util.concurrent.RateLimiter;
import java.util.List;
import java.util.concurrent.Executor;
/**
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/21
* @since
*/
public class RateLimiterUsage {
//每秒只發出2個令牌
static final RateLimiter rateLimiter = RateLimiter.create(2.0);
static void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // 也許需要等待
executor.execute(task);
}
}
}
注:上述程式碼需要引入 Guava 包。
遺傳演演算法,可以用這個結婚的場景模擬: 假設婚禮的賓客有 5 個人,加上新郎和新娘,一共 7 個人。 我們可以把這 7 個人看成 7 個執行緒,有如下步驟要執行。
到達婚禮現場
吃飯
離開
擁抱(只有新郎和新娘執行緒可以執行)
每個階段執行完畢後才能執行下一個階段,其中擁抱階段只有新郎新娘這兩個執行緒才能執行。
以上需求,我們可以通過 Phaser 來實現,具體程式碼和註釋如下:
package git.snippets.juc;
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class PhaserUsage {
static final Random R = new Random();
static WeddingPhaser phaser = new WeddingPhaser();
static void millSleep() {
try {
TimeUnit.MILLISECONDS.sleep(R.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 賓客的人數
final int guestNum = 5;
// 新郎和新娘
final int mainNum = 2;
phaser.bulkRegister(mainNum + guestNum);
for (int i = 0; i < guestNum; i++) {
new Thread(new Person("賓客" + i)).start();
}
new Thread(new Person("新娘")).start();
new Thread(new Person("新郎")).start();
}
static class WeddingPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("所有人到齊");
return false;
case 1:
System.out.println("所有人吃飯");
return false;
case 2:
System.out.println("所有人離開");
return false;
case 3:
System.out.println("新郎新娘擁抱");
return true;
default:
return true;
}
}
}
static class Person implements Runnable {
String name;
Person(String name) {
this.name = name;
}
@Override
public void run() {
// 先到達婚禮現場
arrive();
// 吃飯
eat();
// 離開
leave();
// 擁抱,只保留新郎和新娘兩個執行緒可以執行
hug();
}
private void arrive() {
millSleep();
System.out.println("name:" + name + " 到來");
phaser.arriveAndAwaitAdvance();
}
private void eat() {
millSleep();
System.out.println("name:" + name + " 吃飯");
phaser.arriveAndAwaitAdvance();
}
private void leave() {
millSleep();
System.out.println("name:" + name + " 離開");
phaser.arriveAndAwaitAdvance();
}
private void hug() {
if ("新娘".equals(name) || "新郎".equals(name)) {
millSleep();
System.out.println("新娘新郎擁抱");
phaser.arriveAndAwaitAdvance();
} else {
phaser.arriveAndDeregister();
}
}
}
}
用於執行緒之間交換資料,exchange()
方法是阻塞的,所以要兩個exchange
行為都執行到才會觸發交換。
package git.snippets.juc;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
/**
* Exchanger用於兩個執行緒之間交換變數
*/
public class ExchangerUsage {
static Exchanger<String> semaphore = new Exchanger<>();
public static void main(String[] args) {
new Thread(() -> {
String s = "T1";
try {
s = semaphore.exchange(s);
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 1(T1) executed, Result is " + s);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
String s = "T2";
try {
s = semaphore.exchange(s);
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 2(T2) executed, Result is " + s);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
其他鎖的底層用的是AQS
原先讓執行緒等待需要wait/await
,現在僅需要LockSupport.park()
原先叫醒執行緒需要notify/notifyAll
,現在僅需要LockSupport.unpark()
, LockSupport.unpark()
還可以叫醒指定執行緒,
範例程式碼:
package git.snippets.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* 阻塞指定執行緒,喚醒指定執行緒
*/
public class LockSupportUsage {
public static void main(String[] args) {
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
if (i == 5) {
LockSupport.park();
}
if (i == 8) {
LockSupport.park();
}
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
// unpark可以先於park呼叫
//LockSupport.unpark(t);
try {
TimeUnit.SECONDS.sleep(8);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(t);
System.out.println("after 8 seconds");
}
}
實現一個容器,提供兩個方法
// 向容器中增加一個元素
void add(T t);
// 返回容器大小
int size();
有兩個執行緒,執行緒1新增10個元素到容器中,執行緒2實現監控元素的個數,當個數到5個時,執行緒2給出提示並結束
方法 1. 使用wait + notify
實現
方法 2. 使用CountDownLatch
實現
方法 3. 使用LockSupport
實現
程式碼如下:
package git.snippets.juc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
// 實現一個容器,提供兩個方法,add,size,有兩個執行緒,
// 執行緒1新增10個元素到容器中,
// 執行緒2實現監控元素的個數,
// 當個數到5個時,執行緒2給出提示並結束
public class MonitorContainer {
public static void main(String[] args) {
useLockSupport();
// useCountDownLatch();
// useNotifyAndWait();
}
/**
* 使用LockSupport
*/
private static void useLockSupport() {
System.out.println("use LockSupport...");
Thread adder;
List<Object> list = Collections.synchronizedList(new ArrayList<>());
Thread finalMonitor = new Thread(() -> {
LockSupport.park();
if (match(list)) {
System.out.println("filled 5 elements size is " + list.size());
LockSupport.unpark(null);
}
});
adder = new Thread(() -> {
for (int i = 0; i < 10; i++) {
increment(list);
if (match(list)) {
LockSupport.unpark(finalMonitor);
}
}
});
adder.start();
finalMonitor.start();
}
/**
* 使用CountDownLatch
*/
private static void useCountDownLatch() {
System.out.println("use CountDownLatch...");
List<Object> list = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(5);
Thread adder = new Thread(() -> {
for (int i = 0; i < 10; i++) {
increment(list);
if (i <= 4) {
latch.countDown();
}
}
});
Thread monitor = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (match(list)) {
System.out.println("filled 5 elements");
}
});
adder.start();
monitor.start();
}
/**
* notify + wait 實現
*/
private static void useNotifyAndWait() {
System.out.println("use notify and wait...");
List<Object> list = Collections.synchronizedList(new ArrayList<>());
final Object o = new Object();
Thread adder = new Thread(() -> {
synchronized (o) {
for (int i = 0; i < 10; i++) {
increment(list);
if (match(list)) {
o.notify();
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("add finished");
o.notify();
}
});
Thread monitor = new Thread(() -> {
synchronized (o) {
if (match(list)) {
System.out.println("5 elements added " + list.size());
o.notify();
try {
o.wait();
System.out.println("monitor finished");
o.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
adder.start();
monitor.start();
}
/**
* 只要是5的倍數,就回圈列印
*/
private static void useNotifyAndWaitLoop() {
List<Object> list = Collections.synchronizedList(new ArrayList<>());
final Object o = new Object();
Thread adder = new Thread(() -> {
synchronized (o) {
for (; ; ) {
increment(list);
if (match(list)) {
o.notify();
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
Thread monitor = new Thread(() -> {
synchronized (o) {
while (true) {
if (match(list)) {
System.out.println("filled 5 elements");
}
o.notify();
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
adder.start();
monitor.start();
}
private static void increment(List<Object> list) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
list.add(new Object());
System.out.println("list add the ele, size is " + list.size());
}
private static boolean match(List<Object> list) {
return list.size() % 5 == 0;
}
}
寫一個固定容量的同步容器,擁有put
和get
方法,以及getCount
方法,能夠支援 2 個生產者執行緒以及 10 個消費者執行緒的阻塞呼叫。
方法 1. 使用wait/notifyAll
方法 2. ReentrantLock
的Condition
,本質就是等待佇列
package git.snippets.juc;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
// 寫一個固定容量的同步容器,擁有put和get方法,以及getCount方法,能夠支援2個生產者執行緒以及10個消費者執行緒的阻塞呼叫。
public class ProducerAndConsumer {
public static void main(String[] args) {
// MyContainerByCondition container = new MyContainerByCondition(100);
MyContainerByNotifyAndWait container = new MyContainerByNotifyAndWait(100);
for (int i = 0; i < 25; i++) {
new Thread(container::get).start();
}
for (int i = 0; i < 20; i++) {
new Thread(() -> container.put(new Object())).start();
}
}
}
// 使用ReentrantLock的Condition
class MyContainerByCondition {
static ReentrantLock lock = new ReentrantLock();
final int MAX;
private final LinkedList<Object> list = new LinkedList<>();
Condition consumer = lock.newCondition();
Condition producer = lock.newCondition();
public MyContainerByCondition(int limit) {
this.MAX = limit;
}
public void put(Object object) {
lock.lock();
try {
while (getCount() == MAX) {
System.out.println("container is full");
try {
producer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(object);
consumer.signalAll();
System.out.println("contain add a object, current size " + getCount());
} finally {
lock.unlock();
}
}
public Object get() {
lock.lock();
try {
while (getCount() == 0) {
try {
System.out.println("container is empty");
consumer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object object = list.removeFirst();
producer.signalAll();
System.out.println("contain get a object, current size " + getCount());
return object;
} finally {
lock.unlock();
}
}
public synchronized int getCount() {
return list.size();
}
}
// 使用synchronized的wait和notifyAll
class MyContainerByNotifyAndWait {
LinkedList<Object> list = null;
final int limit;
MyContainerByNotifyAndWait(int limit) {
this.limit = limit;
list = new LinkedList<>();
}
synchronized int getCount() {
return list.size();
}
// index 從0開始計數
synchronized Object get() {
while (list.size() == 0) {
System.out.println("container is empty");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object o = list.removeFirst();
System.out.println("get a data");
this.notifyAll();
return o;
}
synchronized void put(Object data) {
while (list.size() > limit) {
System.out.println("container is full , do not add any more");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(data);
System.out.println("add a data");
this.notifyAll();
}
}
本文涉及到的所有程式碼和圖例
更多內容見:Java 多執行緒
本文來自部落格園,作者:Grey Zeng,轉載請註明原文連結:https://www.cnblogs.com/greyzeng/p/16684446.html