推薦學習:《》
業務場景:golang與swoole都擁抱了協程,在同任務並行數量下,協程可比執行緒多幾倍。所以最近在查詢java時瞭解java本身是沒有協程的,但是某牛自行實現了協程,也就是本文的主角quasar(纖程)!不過沒看到誰公開一下手寫協程池的騷操作(誰會直接new它用?那是沒捱過社會的毒打呀~)
一個執行緒可以多個協程,一個程序也可以單獨擁有多個協程。
執行緒程序都是同步機制,而協程則是非同步。
協程能保留上一次呼叫時的狀態,每次過程重入時,就相當於進入上一次呼叫的狀態。
執行緒是搶佔式,而協程是非搶佔式的,所以需要使用者自己釋放使用權來切換到其他協程,因此同一時間其實只有一個協程擁有執行權,相當於單執行緒的能力。
協程並不是取代執行緒, 而且抽象於執行緒之上, 執行緒是被分割的CPU資源, 協程是組織好的程式碼流程, 協程需要執行緒來承載執行, 執行緒是協程的資源, 但協程不會直接使用執行緒, 協程直接利用的是執行器(Interceptor), 執行器可以關聯任意執行緒或執行緒池, 可以使當前執行緒, UI執行緒, 或新建新程.。
執行緒是協程的資源。協程通過Interceptor來間接使用執行緒這個資源。
廢話不多說,直接上程式碼:
匯入包:
<dependency> <groupId>co.paralleluniverse</groupId> <artifactId>quasar-core</artifactId> <version>0.7.9</version> <classifier>jdk8</classifier> </dependency>
WorkTools工具類:
package com.example.ai; import co.paralleluniverse.fibers.Fiber; import co.paralleluniverse.fibers.SuspendExecution; import co.paralleluniverse.strands.SuspendableRunnable; import java.util.concurrent.ArrayBlockingQueue; public class WorkTools { //協程池中預設協程的個數為5 private static int WORK_NUM = 5; //佇列預設任務為100 private static int TASK_COUNT = 100; //工做協程陣列 private Fiber[] workThreads; //等待佇列 private final ArrayBlockingQueue<SuspendableRunnable> taskQueue; //使用者在構造這個協程池時,但願啟動的協程數 private final int workerNum; //構造方法:建立具備預設協程個數的協程池 public WorkTools() { this(WORK_NUM,TASK_COUNT); } //建立協程池,workNum為協程池中工做協程的個數 public WorkTools(int workerNum, int taskCount) { if (workerNum <= 0) { workerNum = WORK_NUM; } if (taskCount <= 0) { taskCount = TASK_COUNT; } this.workerNum = workerNum; taskQueue = new ArrayBlockingQueue(taskCount); workThreads = new Fiber[workerNum]; for (int i = 0; i < workerNum; i++) { int finalI = i; workThreads[i] = new Fiber<>(new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { SuspendableRunnable runnable = null; while (true){ try{ //取任務,沒有則阻塞。 runnable = taskQueue.take(); }catch (Exception e){ System.out.println(e.getMessage()); } //存在任務則執行。 if(runnable != null){ runnable.run(); } runnable = null; } } }); //new一個工做協程 workThreads[i].start(); //啟動工做協程 } Runtime.getRuntime().availableProcessors(); } //執行任務,其實就是把任務加入任務佇列,何時執行由協程池管理器決定 public void execute(SuspendableRunnable task) { try { taskQueue.put(task); //put:阻塞介面的插入 } catch (Exception e) { // TODO: handle exception System.out.println("阻塞"); } } //銷燬協程池,該方法保證全部任務都完成的狀況下才銷燬全部協程,不然等待任務完成再銷燬 public void destory() { //工做協程中止工做,且置為null System.out.println("ready close thread..."); for (int i = 0; i < workerNum; i++) { workThreads[i] = null; //help gc } taskQueue.clear(); //清空等待佇列 } //覆蓋toString方法,返回協程資訊:工做協程個數和已完成任務個數 @Override public String toString() { return "WorkThread number:" + workerNum + " ==分割線== wait task number:" + taskQueue.size(); } }
測試程式碼:
package com.example.ai; import co.paralleluniverse.strands.SuspendableRunnable; import lombok.SneakyThrows; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.concurrent.CountDownLatch; @SpringBootApplication public class AiApplication { @SneakyThrows public static void main(String[] args) { //等待協程任務完畢後再結束主執行緒 CountDownLatch cdl = new CountDownLatch(50); //開啟5個協程,50個任務列隊。 WorkTools myThreadPool = new WorkTools(5, 50); for (int i = 0; i< 50; i++){ int finalI = i; myThreadPool.execute(new SuspendableRunnable() { @Override public void run() { System.out.println(finalI); try { //延遲1秒 Thread.sleep(1000); cdl.countDown(); } catch (InterruptedException e) { System.out.println("阻塞中"); } } }); } //阻塞 cdl.await(); } }
具體程式碼都有註釋了,自行了解。我也是以執行緒池寫法實現。
當前為解決問題:在協程阻塞過程中Fiber類會報阻塞警告,滿臉懵逼啊,看著很討厭。暫時沒有辦法處理,看各位大神誰有招下方評論提供給下思路。萬分感謝~
推薦學習:《》
以上就是範例介紹Java基於quasar實現協程池的詳細內容,更多請關注TW511.COM其它相關文章!