參考b站狂神說
JUC簡稱java util concurrent,是java中有關執行緒的三個包。
這裏我們給出API文件
鏈接:https://pan.baidu.com/s/1k9s_96Ip8tnNug0NoTadmw
提取碼:eexh
進程:執行中的應用程式***.exe
的集合(QQ.exe、IDEA.exe)
從上圖可以看出,執行緒數大於進程數。其實:一個進程往往包含多個執行緒,最少包含一個執行緒!
java預設有2個執行緒!main和GC(垃圾回收)
執行緒:進程中的一個執行流程。比如說我們的IDEA是一個進程,那我他的自動儲存機制 機製就算是一個執行緒。
面試題:
native void start0();
一個原生的start0方法。也就是說java沒有開啓執行緒的權利,因爲java執行在虛擬機器上,無法操作硬體,只是呼叫了一個底層是C++的start0()方法。public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0();
一皇兩後是併發,兩皇一後是並行
併發:多個執行緒操作同一個資源
並行:多個執行緒可以同時執行
併發程式設計的本質:充分利用CPU的資源
1、執行緒有幾個狀態?
6個,
我們可以在Thread類下看到一個State的列舉型別,其中包含了現成的6個狀態。
public enum State {
// 新生
NEW,
//執行
RUNNABLE,
// 阻塞
BLOCKED,
// 等待
WAITING,
//超市等待
TIMED_WAITING,
// 終止
TERMINATED;
}
wait/sleep的區別
來自不同的類
wait=>Object
sleep=>Thread
關於鎖的釋放
wait會釋放鎖
sleep抱着鎖睡覺,不會釋放鎖
使用的範圍不同
wait必須在同步程式碼塊中
sleep可以再任何地方用
// 模擬賣票
public class Synchronized {
public static void main(String[] args) {
// 併發操作,多執行緒操作同一個資源,吧資源丟入執行緒
Ticket ticket = new Ticket();
new Thread(()-> {
for(int i=0;i<30;i++){
ticket.sale();
}
},"A").start();
new Thread(()-> {
for(int i=0;i<30;i++){
ticket.sale();
}
},"B").start();
new Thread(()-> {
for(int i=0;i<30;i++){
ticket.sale();
}
},"C").start();
}
}
class Ticket{
// 總票數
private int num = 30;
// 賣票方法
//public void sale(){
public synchronized void sale(){
if(num>0){
System.out.println(Thread.currentThread().getName()+"賣出第"+(num--)+"張票,剩餘了"+num+"票");
}
}
}
我們可以看到,synchronized確實解決了我們 多執行緒操作同一個資源的問題。
他是我們java.util.concurrent.locks下的一個介面
他有三個實現類,我們一般使用可重入鎖
那什麼是可重入鎖呢???
如何使用?
三部曲
我們在new ReentrantLock的時候,有兩個建構函式
我們一般使用非公平鎖,執行緒執行的先後順序由cpu決定,我們不加幹預。
因爲如果使用公平鎖的話,前一個執行緒執行時間是3h,而下一個執行緒執行時間爲3s,則會造成資源分配問題。
還是剛剛的買票系統,我們換成Lock方式
package com.xionger.demo01;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class lockTest02 {
public static void main(String[] args) {
// 併發操作,多執行緒操作同一個資源,吧資源丟入執行緒
Ticket2 ticket = new Ticket2();
new Thread(() -> {
for (int i = 0; i < 30; i++) {
ticket.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 30; i++) {
ticket.sale();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 30; i++) {
ticket.sale();
}
}, "C").start();
}
}
class Ticket2{
// 總票數
private int num = 30;
Lock lock = new ReentrantLock();
// 賣票方法
public void sale(){
//加鎖
lock.lock();
try {
// 業務程式碼
if(num>0){
System.out.println(Thread.currentThread().getName()+"賣出第"+(num--)+"張票,剩餘了"+num+"票");
}
}catch (Exception e){
e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
兩個執行緒A、B同時操作數據data,只有當數據滿足A的條件時,纔會執行A執行緒呼叫的方法,否則就會等待,當A執行完畢後,需要通知B,我已經執行完了,你可以執行了。
public class Test01 {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
// 判斷 等待,業務,通知
class Data{
private int num = 0;
// +1
public synchronized void increment() throws InterruptedException {
// 等待
if (num != 0) this.wait();
num++;
System.out.println(Thread.currentThread().getName() + "==>" + num);
// 通知其他執行緒+1完畢
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
// 等待
if (num == 0) this.wait();
num--;
System.out.println(Thread.currentThread().getName() + "==>" + num);
// 通知其他執行緒-1完畢
this.notifyAll();
}
}
但是這種用if作爲判斷語句的做法有一個問題!!!,那就是虛假喚醒問題
問題:假如存在ABCD四個執行緒!!!
當我們new了4個執行緒的時候,執行結果會出現3或者2甚至更大或者更小。
解答:程式在用if判斷的話,喚醒後執行緒會從wait之後的程式碼開始執行,不會重新判斷if條件,直接繼續執行if程式碼塊之後的程式碼。
假如B也是+1執行緒。當A執行緒+1之後,會執行notifyAll方法,這就很有可能將wait中的B執行緒喚醒,再次執行+1操作。
其實這裏官方已經給出了我們解答方式。
在java.lang.Object中的wait方法
爲了防止虛假喚醒問題,我們需要將wait放入while回圈中
// 等待
while (num == 0)
this.wait();
這時就不會出現虛假喚醒問題了
其實就是將對應的方法替換就可以了
同步監視器condition
具體的程式碼實現
public class Test02 {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i=0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
// 判斷 等待,業務,通知
class Data2{
private int num = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// +1
public void increment() throws InterruptedException {
lock.lock();
try {
// 業務程式碼
// 等待
while (num != 0) condition.await();
num++;
System.out.println(Thread.currentThread().getName() + "==>" + num);
// 通知其他執行緒+1完畢
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// -1
public void decrement() throws InterruptedException {
lock.lock();
try {
// 業務程式碼
// 等待
while (num == 0) condition.await();
num--;
System.out.println(Thread.currentThread().getName() + "==>" + num);
// 通知其他執行緒+1完畢
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
執行結果如果所示,但是我們發現JUC的鎖和synchronized的鎖,實現的效果是一樣的,既然這樣,那我們爲什麼還要多此一舉用更爲麻煩的lock鎖呢?
其實,lock鎖可以實現有序的abcd。
public class Test03 {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(()->{
for (int i = 0;i<10;i++){
data3.printA();
}
},"A").start();
new Thread(()->{
for (int i = 0;i<10;i++){
data3.printB();
}
},"B").start();
new Thread(()->{
for (int i = 0;i<10;i++){
data3.printC();
}
},"C").start();
}
}
// 判斷 等待,業務,通知
// 當num==1時執行printA(),當=2B,3C
class Data3{
private Lock lock = new ReentrantLock(); //鎖
private Condition condition1 = lock.newCondition(); //同步監視器1,監視A
private Condition condition2 = lock.newCondition(); //同步監視器2,監視B
private Condition condition3 = lock.newCondition(); //同步監視器3,監視C
private int num = 1;
public void printA(){
try {
lock.lock();
while (num != 1) condition1.await(); // 等待
System.out.println(Thread.currentThread().getName()+"=>AAAAA");
num = 2;
condition2.signal(); // 指定喚醒condition2
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB(){
try {
lock.lock();
while (num != 2) condition2.await(); // 等待
System.out.println(Thread.currentThread().getName()+"=>BBBBB");
num = 3;
condition3.signal(); // 指定喚醒condition3
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC(){
try {
lock.lock();
while (num != 3) condition3.await(); // 等待
System.out.println(Thread.currentThread().getName()+"=>CCCCCCCC");
num = 1;
condition1.signal(); // 指定喚醒condition1
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
我們可以看出,無論跑多少次,總是按順序執行。
其實就是多new一些condition接聽各自的行爲。
用途: 生產線要按序執行
其實就是八個關於鎖經典面試題
public class Test01 {
public static void main(String[] args) {
Phone phone = new Phone();
//執行緒A呼叫發簡訊方法
new Thread(()->{
phone.message();
},"A").start();
// 睡眠1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//執行緒B呼叫打電話方法
new Thread(()->{
phone.call();
},"B").start();
}
}
class Phone{
public synchronized void message(){
// 睡眠4s
// try {
// TimeUnit.SECONDS.sleep(4);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("發簡訊");
}
public synchronized void call(){
System.out.println("打電話");
}
}
1、程式執行的結果是什麼?爲什麼會這樣?
2、如果讓A執行緒內部也睡1秒(message方法註釋),結果又會是什麼樣?
執行結果總是先發簡訊,然後再打電話!
因爲:有鎖的存在!!!首先我們要明白鎖的是誰?鎖的是方法的呼叫者,也就是(phone.message())程式碼中的phone,由於這兩個方法用的是同一個物件 phone,也就是說擁有同一把鎖,那麼誰先拿到這個鎖就是誰先執行,還有一點就是sleep是抱着鎖睡覺的,就算A執行緒內部也睡了一秒,那他也是抱着鎖睡覺的,不會輪到B執行緒執行。
3、假如call方法是一個普通方法,那執行結果又會是怎麼樣的?
這裏的執行結果就跟cpu的排程有關了,如果A執行緒鎖的時間過長,那麼B就會先執行。
爲什麼呢?因爲B執行緒呼叫的call是一個普通方法,不收鎖的限制。
4、假如有兩個Phone物件呢?結果又是什麼
public static void main(String[] args) {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
//執行緒A呼叫發簡訊方法
new Thread(()->{
phone1.message();
},"A").start();
// 睡眠1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//執行緒B呼叫打電話方法
new Thread(()->{
phone2.call();
},"B").start();
}
不言而喻,當然是先發簡訊再打電話了
因爲這時有兩把鎖,互不幹 不乾擾,而B執行緒有延遲,當然是A執行緒先執行
5、假如message和call是兩個靜態方法,只有一個phone物件,執行結果又是什麼呢?
class Phone{
public static synchronized void message(){
System.out.println("發簡訊");
}
public static synchronized void call(){
System.out.println("打電話");
}
}
由於被static修飾,所以在類一載入鎖就有了。並且鎖的是Class(Phone.class),Class全域性唯一,這兩個方法都被static修飾了,所以用的是同一把鎖。結果自然就是先發簡訊,再打電話了。
6、假如有兩個Phone物件,message和call是兩個靜態方法,執行結果又是什麼呢
和問題5答案一樣,雖然這裏有兩個Phone物件,但是它們的Class類別範本是同一個(Phone.class),所以用的是同一把鎖。結果還是先發簡訊,再打電話 。
7、一個靜態方法,一個非靜態方法。一個物件。執行結果?
public class Test04 {
public static void main(String[] args) {
Phone4 phone = new Phone4();
// 靜態方法,鎖的是Class
new Thread(()->{
phone.message();
},"A").start();
// 睡眠1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 非靜態方法,鎖的是phone物件
new Thread(()->{
phone.call();
},"B").start();
}
}
class Phone4{
public static synchronized void message(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("發簡訊");
}
public synchronized void call(){
System.out.println("打電話");
}
}
不難看出,這裏會有兩把鎖,A執行緒鎖的是Class,B執行緒鎖的是phone物件,所以這裏的結果就跟CPU的排程有關了,先打電話再發簡訊。
8、一個靜態方法,一個非靜態方法。兩個物件。執行結果?
跟問題7一樣,雖然有兩個物件,但是message方法鎖的還是Class,call方法鎖的還是phone物件,所以結果跟CPU的排程有關,先打電話,再發簡訊。
小結:其實就是看鎖的是物件還是Class,是否是同一把鎖!如果是new出來的,則鎖的是物件,如果是static靜態方法,則鎖的是Class。
public class ListTest {
public static void main(String[] args) {
/*
0、List<String> list = new ArrayList<>(); 出現併發修改異常java.util.ConcurrentModificationException
解決辦法:
1、List<String> list = new Vector<>(); 用synchronized解決;底層的add方法被synchronized修飾
2、List<String> list = Collections.synchronizedList(new ArrayList<>()); 返回由指定列表支援的同步(執行緒安全)列表
3、List<String> list = new CopyOnWriteArrayList<>(); 用juc的lock鎖解決,底層的add方法用的lock鎖
CopyOnWrite寫入時賦值 COW 計算機程式設計領域的一種優化策略,多個執行緒呼叫下,執行寫入的時候避免覆蓋,造成數據問題。讀寫分離
*/
//List<String> list = new ArrayList<>();
//List<String> list = new Vector<>();
//List<String> list = Collections.synchronizedList(new ArrayList<>());
List<String> list = new CopyOnWriteArrayList<>();
for(int i = 1 ; i <= 10 ; i++){
new Thread(()->{
list.add("aaa");
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
public class SetTest {
public static void main(String[] args) {
/*
同上,set也會出現併發異常
Set<String> set = new HashSet<>(); java.util.ConcurrentModificationException
解決辦法:
1、Set<String> set = Collections.synchronizedSet(new HashSet<>());
2、Set<String> set = new CopyOnWriteArraySet();
*/
//Set<String> set = new HashSet<>();
//Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet();
for(int i = 1 ; i <= 30 ; i++){
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
順便問一下?HashSet的底層是什麼?
public class MapTest {
public static void main(String[] args) {
// 1、公司中一般不會用hashMap,因爲她執行緒不安全 java.util.ConcurrentModificationException
// 2、new HashMap();預設new HashMap(16,0.75);
//Map<String,String> map = new HashMap<>(); java.util.ConcurrentModificationException
//Map<String,String> map = new Hashtable<>(); synchronized
Map<String,String> map = new ConcurrentHashMap<>(); //juc
for(int i = 0 ; i<30 ; i++){
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
- Hashtable的任何操作都會把整個表鎖住,是阻塞的。好處是總能獲取最實時的更新,比如說執行緒A呼叫putAll寫入大量數據,期間執行緒B呼叫get,執行緒B就會被阻塞,直到執行緒A完成putAll,因此執行緒B肯定能獲取到執行緒A寫入的完整數據。壞處是所有呼叫都要排隊,效率較低。
- ConcurrentHashMap 是設計爲非阻塞的。在更新時會區域性鎖住某部分數據,但不會把整個表都鎖住。同步讀取操作則是完全非阻塞的。好處是在保證合理的同步前提下,效率很高。壞處是嚴格來說讀取操作不能保證反映最近的更新。例如執行緒A呼叫putAll寫入大量數據,期間執行緒B呼叫get,則只能get到目前爲止已經順利插入的部分數據。
- 應該根據具體的應用場景選擇合適的HashMap。
1、可以有返回值
2、可以拋出異常
3、方法不同run()/call()
我們可以看出Thread的建構函式中並沒有關於Callable的參數,也就意味意味着,不能直接new 一個Callable來啓動執行緒。
但是官方文件給出java.lang.Runnable的實現類java.util.concurrent.FutureTask可以包裝Callable物件
所以:
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread myThread = new MyThread();
FutureTask futureTask = new FutureTask(myThread); //適配類,本質就是一個Runnable
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();
String o = (String)futureTask.get(); //獲取callable的返回結果,可能會造成阻塞,一般放在最後。
System.out.println(o);
}
}
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("call()");
return "熊二吃屁"; //加入之前有耗時操作,則會造成阻塞
}
}
執行結果
兩個執行緒爲什麼只輸出了一個call()?
通過原始碼可以看出,FutureTask內部維護了一個state變數,在每次執行run方法的時候都會進行判斷。而我們的B執行緒執行run方法的時候,state已經不是new的狀態了,直接return。
數量減的計數器
public class CDLC {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5); // new一個大小爲5的計數器
for (int i = 0 ; i < 5 ; i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"Go out");
countDownLatch.countDown(); // 計數器數量-1
},String.valueOf(i)).start();
}
countDownLatch.await(); // 等待數量歸零,然後再向下執行
System.out.println("hello");
}
}
數量加的計數器
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{ //七個執行緒執行完才能 纔能該執行緒
System.out.println("龍珠召喚成功");
});
for (int i = 1 ; i < 8 ; i++){
int temp = i;
new Thread(()->{
System.out.println("召喚"+temp+"號龍珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
可以限定可通行的執行緒數量,然後只有acquire可用的執行緒才能 纔能執行,當然這個acquire的數量不能超過可通行的執行緒數量。只有當release之後,纔會重新分配acquire。
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3); //一共開闢了3個車位
for (int i = 0 ; i < 6 ; i++){
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"搶到車位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"離開車位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release(); //釋放當前車位
}
},String.valueOf(i)).start();
}
}
}
就像搶車位一樣,只有當有車離開時,其他車纔可以搶到車位。
java.util.concurrent.locks
Interface ReadWriteLock
public class ReadWriteLockTest {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 5; i++) {
int temp = i;
new Thread(()->{
myCache.put(temp+"",temp);
},String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"寫入"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"寫入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//取
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"讀取"+key);
map.get(key);
System.out.println(Thread.currentThread().getName()+"讀取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
上了讀寫鎖之後就會寫完再讀
使用途徑:多執行緒併發處理、執行緒池
大家族
方法 | 拋出異常 | 有返回值,不拋出異常 | 阻塞 等待 | 超時等待 |
---|---|---|---|---|
新增 | add() | offer() | put() | offer(, ,) |
移出 | remove() | poll() | take() | poll(,) |
判斷隊首 | element() | peek() |
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
//test1(); //報異常
//test2(); //返回值
//test3(); //一直等待
test4(); //超時等待
}
public static void test1() throws InterruptedException {
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.add("a")); //true
System.out.println(blockingQueue.add("b")); //true
System.out.println(blockingQueue.add("c")); //true
//System.out.println(blockingQueue.add("d")); //Exception in thread "main" java.lang.IllegalStateException: Queue full
System.out.println("===============");
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.remove()); //a
System.out.println(blockingQueue.remove()); //b
System.out.println(blockingQueue.remove()); //c
blockingQueue.remove(); // 報異常Exception in thread "main" java.util.NoSuchElementException
}
public static void test2() {
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.offer("a")); //true
System.out.println(blockingQueue.offer("b")); //true
System.out.println(blockingQueue.offer("c")); //true
System.out.println(blockingQueue.offer("d")); //false
System.out.println("===============");
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll()); //a
System.out.println(blockingQueue.poll()); //b
System.out.println(blockingQueue.poll()); //c
System.out.println(blockingQueue.poll()); //null
}
public static void test3() throws InterruptedException {
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//blockingQueue.put("d"); //程式一直等待
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//System.out.println(blockingQueue.take()); //程式一直等待,(阻塞)
}
public static void test4() throws InterruptedException {
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.offer("a")); //true
System.out.println(blockingQueue.offer("b")); //true
System.out.println(blockingQueue.offer("c")); //true
System.out.println(blockingQueue.offer("d",2, TimeUnit.SECONDS)); //等待2秒
System.out.println("===============");
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll()); //a
System.out.println(blockingQueue.poll()); //b
System.out.println(blockingQueue.poll()); //c
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS)); //等待2s,飯後返回null
}
}
沒有容量,不儲存元素,進去一個元素,必須等這個元素出來,才能 纔能往裏面放下一個元素。
// 同步佇列
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> strings = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
strings.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
strings.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
strings.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"get"+strings.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"get"+strings.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"get"+strings.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
通過執行結果看出,每次只有前一個執行緒get之後,下一個執行緒纔會put。
在上面的文章中,我們使用執行緒的時候就去建立一個執行緒,這樣實現起來非常簡便,但是就會有一個問題:
如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因爲頻繁建立執行緒和銷燬執行緒需要時間。
那麼有沒有一種辦法使得執行緒可以複用,就是執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務?
在Java中可以通過執行緒池來達到這樣的效果。接下來我們就來詳細講解一下Java的執行緒池
執行緒池的好處:
3大方法、7大參數、4中拒絕策略
public class ThreadPoolTest {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor(); //單個執行緒的執行緒池
// ExecutorService threadPool = Executors.newFixedThreadPool(5); //開闢5個固定大小的執行緒池
ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸縮的執行緒池
try {
for (int i = 0; i <10 ; i++) {
// 使用了執行緒池之後,使用執行緒池來建立執行緒
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 執行緒池用完,程式結束,關閉執行緒池
threadPool.shutdown();
}
}
}
執行結果:
首先要明白是哪7大參數,那就是java.util.concurrent包下的ThreadPoolExecutor的建構函式。
如何理解各個參數的含義呢?
把7個參數想象成去銀辦理業務的場景。辦理的視窗一般不是全部開放,只會開放一兩個,這裏對應的就是7個參數裏面的核心執行緒數;而視窗總數則象徵最大執行緒數;如果長時間銀行不來人,就會關閉銀行,對應的就是執行緒開啓時間;單位就不用說 了;辦理業務要排隊,必須等上一個人處理完下一個人才能 纔能處理,等待區對應的就是阻塞佇列;執行緒工廠是用來建立執行緒的,一般不用修改;如果銀行人爆滿,後來的人就會被拒絕辦理,對應的就是最後一個參數。
第七個參數對應四個實現類,也就是4種拒絕策略
手寫執行緒池(自定義執行緒池)
/*
1、new ThreadPoolExecutor.AbortPolicy() //報異常java.util.concurrent.RejectedExecutionException:
2、new ThreadPoolExecutor.CallerRunsPolicy() //從哪來回哪去,這裏就會讓main執行緒執行
3、new ThreadPoolExecutor.DiscardOldestPolicy() //嘗試讓最先執行完的執行緒去執行,但不會影響其他執行緒的執行,不會拋出異常
4、new ThreadPoolExecutor.DiscardPolicy() //佇列滿了,丟掉任務,不會拋出異常
*/
public class ThreadPoolTest02 {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
2,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for (int i = 0; i < 9; i++) {
threadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+"===>ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
最大執行緒該如何定義????
這裏有兩種原則:
CPU密集型(CPU-bound)
- CPU密集型也叫計算密集型,指的是系統的硬碟、記憶體效能相對CPU要好很多,此時,系統運作大部分的狀況是CPU Loading 100%,CPU要讀/寫I/O(硬碟/記憶體),I/O在很短的時間就可以完成,而CPU還有許多運算要處理,CPU Loading很高。
- 在多重程式系統中,大部份時間用來做計算、邏輯判斷等CPU動作的程式稱之CPU bound。例如一個計算圓周率至小數點一千位以下的程式,在執行的過程當中絕大部份時間用在三角函數和開根號的計算,便是屬於CPU bound的程式。
- CPU bound的程式一般而言CPU佔用率相當高。這可能是因爲任務本身不太需要存取I/O裝置,也可能是因爲程式是多執行緒實現因此遮蔽掉了等待I/O的時間。
爲了使CPU的效率達到最高,你的計算機有多少邏輯處理器的那就將最大執行緒數設定爲幾。
Runtime.getRuntime().availableProcessors() //獲取計算機邏輯處理器的大小
IO密集型(I/O bound)
- IO密集型指的是系統的CPU效能相對硬碟、記憶體要好很多,此時,系統運作,大部分的狀況是CPU在等I/O (硬碟/記憶體) 的讀/寫操作,此時CPU Loading並不高。
- I/O bound的程式一般在達到效能極限時,CPU佔用率仍然較低。這可能是因爲任務本身需要大量I/O操作,而pipeline做得不是很好,沒有充分利用處理器能力。
這裏就要根據軟體的具體情況來定了。看其中有多少個IO十分佔用資源的任務。
I/O密集型適合讀寫,比如數據庫的讀寫操作,CPU密集型適合運算。
阿裡java開發手冊中明確指出,不要用Executors去建立執行緒池。所以還是自己設定7大參數,不要使用上面的3大方法。
阿裡巴巴java開發手冊
鏈接:https://pan.baidu.com/s/1m03HRJchaZWDvcSjPpGpyw
提取碼:w5pr
從JDK1.7開始,Java提供ForkJoin框架用於並行執行任務,它的思想就是將一個大任務分割成若幹小任務,最終彙總每個小任務的結果得到這個大任務的結果。
1、ForkJoinPool
既然任務是被逐漸的細化的,那就需要把這些任務存在一個池子裏面,這個池子就是ForkJoinPool,它與其它的ExecutorService區別主要在於它使用「工作竊取「,那什麼是工作竊取呢?
一個大任務會被劃分成無數個小任務,這些任務被分配到不同的佇列,這些佇列有些幹活乾的塊,有些幹得慢。於是幹得快的,一看自己沒任務需要執行了,就去隔壁的佇列裏面拿去任務執行。
2、ForkJoinTask
ForkJoinTask就是ForkJoinPool裏面的每一個任務。他主要有兩個子類:RecursiveAction和RecursiveTask。然後通過fork()方法去分配任務執行任務,通過join()方法彙總任務結果,
(1)RecursiveAction 一個遞回無結果的ForkJoinTask(沒有返回值)
(2)RecursiveTask 一個遞回有結果的ForkJoinTask(有返回值)
ForkJoinPool由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成,ForkJoinTask陣列負責存放程式提交給ForkJoinPool的任務,而ForkJoinWorkerThread陣列負責執行這些任務。
連加操作,如果需要連加的數太大,則分任務執行。
有返回值
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> result = pool.submit(new MyRecuesiveTask(0,100));
System.out.println(result.get());
}
private static class MyRecuesiveTask extends RecursiveTask<Integer>{
private final int start;
private final int end;
public MyRecuesiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start < 10){
return IntStream.rangeClosed(start,end).sum();
}else{
int mid = (start + end) / 2;
MyRecuesiveTask task1 = new MyRecuesiveTask(start,mid);
MyRecuesiveTask task2 = new MyRecuesiveTask(mid+1,end);
task1.fork();
task2.fork();
return task1.join() + task2.join();
}
}
}
}
無返回值
public class ForkJoinDemo02 {
private final static AtomicInteger sum = new AtomicInteger();
public static void main(String[] args) throws InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
pool.submit(new MyRecuesiveAction(0,100));
pool.awaitTermination(100, TimeUnit.MILLISECONDS);
System.out.println(sum);
}
private static class MyRecuesiveAction extends RecursiveAction {
private final int start;
private final int end;
public MyRecuesiveAction(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start < 10){
sum.addAndGet(IntStream.rangeClosed(start,end).sum());
}else{
int mid = (start + end) / 2;
MyRecuesiveAction task1 = new MyRecuesiveAction(start,mid);
MyRecuesiveAction task2 = new MyRecuesiveAction(mid+1,end);
task1.fork();
task2.fork();
}
}
}
}
我們都知道,java的用戶端和伺服器之間的通訊採用ajax非同步通訊,其實執行緒之間的執行也可以使用非同步方式,當前一個執行緒阻塞的時候,下一個執行緒不會等待,而是繼續執行。
程式碼:
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"completableFuture執行");
});
System.out.println("11111111111");
completableFuture.get(); // 獲取阻塞結果
}
}
說JMM就必須談談對Volatile的理解
Volatile是java虛擬機器提供的輕量級的同步機制 機製
什麼是JMM?
JMM即爲JAVA 記憶體模型(java memory model)。因爲在不同的硬體生產商和不同的操作系統下,記憶體的存取邏輯有一定的差異,結果就是當你的程式碼在某個系統環境下執行良好,並且執行緒安全,但是換了個系統就出現各種問題。Java記憶體模型,就是爲了遮蔽系統和硬體的差異,讓一套程式碼在不同平臺下能到達相同的存取結果。JMM從java 5開始的JSR-133發佈後,已經成熟和完善起來。
JVM在設計時候考慮到,如果JAVA執行緒每次讀取和寫入變數都直接操作主記憶體,對效能影響比較大,所以每條執行緒擁有各自的工作記憶體,工作記憶體中的變數是主記憶體中的一份拷貝,執行緒對變數的讀取和寫入,直接在工作記憶體中操作,而不能直接去操作主記憶體中的變數。但是這樣就會出現一個問題,當一個執行緒修改了自己工作記憶體中變數,對其他執行緒是不可見的,會導致執行緒不安全的問題。因此JMM制定了一套標準來保證開發者在編寫多執行緒程式的時候,能夠控制什麼時候記憶體會被同步給其他執行緒。
記憶體互動操作有8種,虛擬機器實現必須保證每一個操作都是原子的,不可在分的(對於double和long型別的變數來說,load、store、read和write操作在某些平臺上允許例外)
lock (鎖定):作用於主記憶體的變數,把一個變數標識爲執行緒獨佔狀態
unlock (解鎖):作用於主記憶體的變數,它把一個處於鎖定狀態的變數釋放出來,釋放後的變數纔可以被其他執行緒鎖定
read (讀取):作用於主記憶體變數,它把一個變數的值從主記憶體傳輸到執行緒的工作記憶體中,以便隨後的load動作使用
load (載入):作用於工作記憶體的變數,它把read操作從主記憶體中變數放入工作記憶體中
use (使用):作用於工作記憶體中的變數,它把工作記憶體中的變數傳輸給執行引擎,每當虛擬機器遇到一個需要使用到變數的值,就會使用到這個指令
assign (賦值):作用於工作記憶體中的變數,它把一個從執行引擎中接受到的值放入工作記憶體的變數副本中
store (儲存):作用於主記憶體中的變數,它把一個從工作記憶體中一個變數的值傳送到主記憶體中,以便後續的write使用
write (寫入):作用於主記憶體中的變數,它把store操作從工作記憶體中得到的變數的值放入主記憶體的變數中
JMM對這八種指令的使用,制定瞭如下規則:
保證可見性
每個工作執行緒都有自己的工作記憶體,所以當某個執行緒修改完某個變數之後,在其他的執行緒中,未必能觀察到該變數已經被修改。volatile關鍵字要求被修改之後的變數要求立即更新到主記憶體,每次使用前從主記憶體處進行讀取。因此volatile可以保證可見性。除了volatile以外,synchronized和final也能實現可見性。synchronized保證unlock之前必須先把變數重新整理回主記憶體。final修飾的欄位在構造器中一旦完成初始化,並且構造器沒有this逸出,那麼其他執行緒就能看到final欄位的值。
public class Test01 {
// 如果不加volatile程式將會陷入死回圈,因爲執行緒A對於執行緒main來說,是不可見的。
// 加上volatile則可以保證可見性
private volatile static int num = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
while (num == 0){
}
},"A").start();
TimeUnit.SECONDS.sleep(2);
num = 1;
System.out.println(num);
}
}
不保證原子性
例如上面八項操作,在操作系統裏面是不可分割的單元。被synchronized關鍵字或其他鎖包裹起來的操作也可以認爲是原子的。從一個執行緒觀察另外一個執行緒的時候,看到的都是一個個原子性的操作。
// 不保證原子性
public class Test02 {
private volatile static int num = 0; // 加上volatile不能保證程式具有原子性
public static void add(){
num++;
}
public static void main(String[] args) {
// 理論上結果應該爲2萬
for (int i = 0 ; i < 20 ; i++){
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){ // 保證上面的執行緒可以執行完成,只剩下main和gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+num);
}
}
加上synchronized或者lock可以讓程式具有原子性。
那還有沒有其他的方法讓程式具有原子性???
java.util.concurrent.atomic包下具有可以支援原子性操作的類
// 不保證原子性
public class Test02 {
private static AtomicInteger num = new AtomicInteger(); // 院子類的Integer
public static void add(){
num.getAndIncrement(); // AtomicInteger的+1操作
}
public static void main(String[] args) {
// 理論上結果應該爲2萬
for (int i = 0 ; i < 20 ; i++){
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){ // 保證上面的執行緒可以執行完成,只剩下main和gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+num);
}
}
禁止指令重排
什麼是指令重排?
指令重排是指JVM在編譯Java程式碼的時候,或者CPU在執行JVM位元組碼的時候,對現有的指令順序進行重新排序。
指令重排的目的是爲了在不改變程式執行結果的前提下,優化程式的執行效率。需要注意的是,這裏所說的不改變執行結果,指的是不改變單執行緒下的程式執行結果。
然而,指令重排是一把雙刃劍,雖然優化了程式的執行效率,但是在某些情況下,會影響到多執行緒的執行結果。
例子
假設xyab預設都爲0
程式執行結果 x=0;y=0
但是經過指令重排後(對於單個執行緒來說經過並不受影響)
結果可能變爲x=1,y=2;
那禁止指令重排的意思就是:不允許這些語句的順序發生改變。
在一個變數被volatile修飾後,JVM會爲我們做兩件事:
在每個volatile寫操作前插入StoreStore屏障,在寫操作後插入StoreLoad屏障。
在每個volatile讀操作前插入LoadLoad屏障,在讀操作後插入LoadStore屏障。
這樣就可以禁止指令重排。
什麼是CAS
cas是compareandswap的簡稱,從字面上理解就是比較並更新,簡單來說:從某一記憶體上取值V,和預期值A進行比較,如果記憶體值V和預期值A的結果相等,那麼我們就把新值B更新到記憶體,如果不相等,那麼就重複上述操作直到成功爲止。
JAVA1.5開始引入了CAS,主要程式碼都放在JUC的atomic包下,如下圖:
CAS機制 機製當中使用了3個基本運算元:記憶體地址V,舊的預期值A,要修改的新值B。
更新一個變數的時候,只有當變數的預期值A和記憶體地址V當中的實際值相同時,纔會將記憶體地址V對應的值修改爲B。
不斷比較的過程,我們可以稱它爲自旋。
CAS操作的就是樂觀鎖,每次不加鎖而是假設沒有衝突而去完成某項操作,如果因爲衝突失敗就重試,直到成功爲止。
它可以解決多執行緒併發安全的問題,以前我們對一些多執行緒操作的程式碼都是使用synchronize關鍵字,來保證執行緒安全的問題;現在我們將cas放入到多執行緒環境裡我們看一下它是怎麼解決的,我們假設有A、B兩個執行緒同時執行一個int值value自增的程式碼,並且同時獲取了當前的value,我們還要假設執行緒B比A快了那麼0.00000001s,所以B先執行,執行緒B執行了cas操作之後,發現當前值和預期值相符,就執行了自增操作,此時這個value = value + 1;然後A開始執行,A也執行了cas操作,但是此時value的值和它當時取到的值已經不一樣了,所以此次操作失敗,重新取值然後比較成功,然後將value值更新,這樣兩個執行緒進入,value值自增了兩次,符合我們的預期。
就像我們上面的2萬程式碼的例子,使用cas就可保證多執行緒下併發安全。
併發1(上):獲取出數據的初始值是A,後續計劃實施CAS樂觀鎖,期望數據仍是A的時候,修改才能 纔能成功
↓
併發2:將數據修改成B
↓
併發3:將數據修改回A
↓
併發1(下):CAS樂觀鎖,檢測發現初始值還是A,進行數據修改
併發1在修改數據時,雖然還是A,但已經不是初始條件的A了,中間發生了A變B,B又變A的變化,此A已經非彼A,數據卻成功修改,可能導致錯誤,這就是CAS引發的所謂的ABA問題。
優化CAS
ABA問題導致的原因,是CAS過程中只簡單進行了「值」的校驗,再有些情況下,「值」相同不會引入錯誤的業務邏輯(例如庫存),有些情況下,「值」雖然相同,卻已經不是原來的數據了。
我們可以給CAS加入一個類似於版本號的東西,必須當值和版本號都滿足的情況下,纔可以執行swap操作。
自己寫一個自旋鎖
// 自旋鎖
public class SpinlockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
// 加鎖
public void mylock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"==>mylock");
while (!atomicReference.compareAndSet(null,thread)){
}
}
// 解鎖
public void myUnlock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"==>myUnlock");
atomicReference.compareAndSet(thread,null);
}
}
public class testMylock {
public static void main(String[] args) throws InterruptedException {
SpinlockDemo lock = new SpinlockDemo();
new Thread(()->{
lock.mylock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.myUnlock();
}
},"A").start();
TimeUnit.SECONDS.sleep(3);
new Thread(()->{
lock.mylock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.myUnlock();
}
},"B").start();
}
}
什麼是死鎖?
所謂死鎖,是指多個進程在執行過程中因爭奪資源而造成的一種僵局,當進程處於這種僵持狀態時,若無外力作用,它們都將無法再向前推進。
如何排查?
jps -l
可以檢視當前執行中的執行緒以及其埠號
jstack 埠號
可以檢視當前埠的情況