多執行緒操作資源類,牢記三步走
:
//A : num + 1
//B : num - 1
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {
for (int i = 0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(() -> {
for (int i = 0;i<10;i++){
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(() -> {
for (int i = 0;i<10;i++){
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
//等待,業務,通知
class Data{
private int number = 0;
//+1
public synchronized void increment() throws InterruptedException {
//while的作用就是防止虛假喚醒
while(number != 0){
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"< "+number);
//通知其他執行緒,我+1完畢了
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
while(number == 0){
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"< "+number);
//通知其他執行緒,我-1完畢了
this.notifyAll();
}
}
/使用阻塞佇列實現生產者,消費者模型
//volatile/CAS/AutomicReference/BlockingQueue/執行緒互動
public class BlockingQueueCustomerProductor {
public static void main(String[] args) throws InterruptedException {
MySource mySource = new MySource(new ArrayBlockingQueue(10));
new Thread(() -> {
try {
mySource.myProd();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"AAA").start();
new Thread(() -> {
try {
mySource.myConsumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"BBB").start();
TimeUnit.SECONDS.sleep(5);
System.out.println("5秒已經過了,開始結束吧");
mySource.stop();
}
}
//判斷,幹活,通知
class MySource{
private volatile boolean flag = true; //預設開啓,進行生產
private volatile boolean flag2 = true; //預設開啓,進行消費
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MySource(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws InterruptedException {
String data = null;
boolean retValue;
while (flag){
data = atomicInteger.incrementAndGet()+"";
retValue = blockingQueue.offer(data,2, TimeUnit.SECONDS);
if (retValue){
System.out.println(Thread.currentThread().getName()+"插入成功: "+data);
}else {
System.out.println(Thread.currentThread().getName()+"插入失敗: "+data);
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+ "\t大老闆叫停生產: flag=false");
}
public void myConsumer() throws InterruptedException {
String result = null;
while (flag2){
result = blockingQueue.poll(2,TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase(""))
{
flag2 = false;
System.out.println(Thread.currentThread().getName()+"\t 超過兩秒鐘沒有渠道,消費退出");
}else {
System.out.println(Thread.currentThread().getName() + "消費佇列: " + result + " 成功");
}
}
}
public void stop(){
this.flag = false;
}
}