眾所周知,java 是沒有協程執行緒的,在我們如此熟知的jdk 1.8時代,大佬們想出來的辦法就是非同步io,甚至用並行的stream流來實現,高並行也好,縮短事件處理時間也好;大家都在想著自己認為更好的實現方式;
在來說說吧,我為什麼會在今天研究這個破b玩意兒呢,
這事情還的從一個月前的版本維護說起,
目前公司遊戲運營的算中規中矩吧,日新增和日活躍使用者基本保持在1w,2.5w樣子;
大概1-2週會有一次版本更新,需要停服維護的,
我想大部分做遊戲的同僚可能都知道,遊戲架構裡面包含一個登入服這麼一個環節,用於對賬號管理以及和sdk平臺做登入二次驗證;
我們的問題也就出在了這sdk二次登入驗證環境;
從這個截圖中不難看出,我在向sdk伺服器進行驗證的時候http請求耗時,一個請求多長達400ms,按照這個邏輯,一個執行緒一秒鐘也只能是2個登入;
然後面對停服維護階段,玩家瘋狂的嘗試登入,導致登入伺服器直接積壓了30萬個登入請求等待處理;
在尋求方案的時候,看到了http請求池化方案,目前已經大執行緒池(這裡是本人自定義執行緒池)和http池化(基於 Apache CloseableHttpClient)處理方案 因為平臺是jdk11的
在尋求方案同時發現了jdk19開放的預覽版新功能虛擬執行緒;翻閱了一些資料,就像這虛擬執行緒能不能為我帶來更好效能體驗,讓現有的系統,吞吐量更上一層樓;
一下測試程式碼用的是jdk20測試
第一步我們需要先建立虛擬執行緒,才能去理解什麼是虛擬執行緒
1 public static void main(String[] args) throws Exception { 2 3 Thread.startVirtualThread(() -> { 4 System.out.println(Thread.currentThread().toString()); 5 }); 6 7 Thread.sleep(3000); 8 }
這就正確的啟動了一個虛擬執行緒;從執行緒明明輸出看著是不是有點眼熟,是不是跟stream的並行流很相似;
接下來我們看看虛擬執行緒的執行是怎麼回事,
1 public static void main(String[] args) throws Exception { 2 3 Thread.startVirtualThread(() -> { 4 try { 5 Thread.sleep(5000); 6 } catch (InterruptedException e) { 7 throw new RuntimeException(e); 8 } 9 System.out.println(Thread.currentThread().toString()); 10 }); 11 12 Thread.startVirtualThread(() -> { 13 try { 14 Thread.sleep(5000); 15 } catch (InterruptedException e) { 16 throw new RuntimeException(e); 17 } 18 System.out.println(Thread.currentThread().toString()); 19 }); 20 Thread.startVirtualThread(() -> { 21 try { 22 Thread.sleep(5000); 23 } catch (InterruptedException e) { 24 throw new RuntimeException(e); 25 } 26 System.out.println(Thread.currentThread().toString()); 27 }); 28 Thread.startVirtualThread(() -> { 29 try { 30 Thread.sleep(5000); 31 } catch (InterruptedException e) { 32 throw new RuntimeException(e); 33 } 34 System.out.println(Thread.currentThread().toString()); 35 }); 36 Thread.startVirtualThread(() -> { 37 try { 38 Thread.sleep(5000); 39 } catch (InterruptedException e) { 40 throw new RuntimeException(e); 41 } 42 System.out.println(Thread.currentThread().toString()); 43 }); 44 Thread.sleep(3000); 45 }
我們多new幾個虛擬執行緒來看看監控
看到了吧,實際上你new的虛擬執行緒,其實是被當成了一個任務丟到了執行緒池裡面在執行;
在翻閱了現有的程式碼邏輯還不能定義這個底部執行緒池,只能使用預設的;
當然目前是預覽版,不確定之後會不會可以自定義實現,stream流一樣,可以定義它並行數量;
測試用例1
1 @Test 2 public void r() { 3 t1(); 4 t2(); 5 } 6 7 public void t1() { 8 AtomicInteger atomicInteger = new AtomicInteger(100); 9 try (var executor = Executors.newFixedThreadPool(10)) { 10 long nanoTime = System.nanoTime(); 11 for (int i = 0; i < 100; i++) { 12 executor.execute(() -> { 13 try { 14 Thread.sleep(50); 15 } catch (InterruptedException e) { 16 throw new RuntimeException(e); 17 } 18 atomicInteger.decrementAndGet(); 19 }); 20 } 21 while (atomicInteger.get() > 0) {} 22 System.out.println("平臺執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f)); 23 } 24 } 25 26 public void t2() { 27 AtomicInteger atomicInteger = new AtomicInteger(100); 28 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { 29 long nanoTime = System.nanoTime(); 30 for (int i = 0; i < 100; i++) { 31 executor.execute(() -> { 32 try { 33 Thread.sleep(50); 34 } catch (InterruptedException e) { 35 throw new RuntimeException(e); 36 } 37 atomicInteger.decrementAndGet(); 38 }); 39 } 40 while (atomicInteger.get() > 0) {} 41 System.out.println("虛擬執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f)); 42 } 43 }
通過這段測試程式碼對比,總任務耗時,顯而易見效能;
測試用例2
1 public void t2p() { 2 Runnable runnable = () -> { 3 long g = 0; 4 for (int i = 0; i < 10000; i++) { 5 for (int j = 0; j < 10000; j++) { 6 for (int k = 0; k < 100; k++) { 7 g++; 8 } 9 } 10 } 11 }; 12 AtomicInteger atomicInteger = new AtomicInteger(100); 13 try (var executor = Executors.newFixedThreadPool(10)) { 14 long nanoTime = System.nanoTime(); 15 for (int i = 0; i < 100; i++) { 16 executor.execute(() -> { 17 runnable.run(); 18 atomicInteger.decrementAndGet(); 19 }); 20 } 21 while (atomicInteger.get() > 0) {} 22 System.out.println("平臺執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f)); 23 } 24 } 25 26 public void t2v() { 27 Runnable runnable = () -> { 28 long g = 0; 29 for (int i = 0; i < 10000; i++) { 30 for (int j = 0; j < 10000; j++) { 31 for (int k = 0; k < 100; k++) { 32 g++; 33 } 34 } 35 } 36 }; 37 AtomicInteger atomicInteger = new AtomicInteger(100); 38 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { 39 long nanoTime = System.nanoTime(); 40 for (int i = 0; i < 100; i++) { 41 executor.execute(() -> { 42 runnable.run(); 43 atomicInteger.decrementAndGet(); 44 }); 45 } 46 while (atomicInteger.get() > 0) {} 47 System.out.println("虛擬執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f)); 48 } 49 }
通過測試用例2不難看出,虛擬執行緒已經不佔優勢;
這是為什麼呢?
平臺執行緒我就不過多描述因為大家都知道,網上的描述也特別多;
虛擬執行緒,其實我們更多可以可以考慮他只是一個任務,非同步的任務;
區別在於,平臺執行緒受制於cpu,如果你執行任務很耗時或者比如網路io等掛起等待,那麼這個cpu也會一直掛起等待無法處理其他事情;
虛擬執行緒是非同步任務凌駕於平臺執行緒之上,也就是說,當你的虛擬執行緒等待掛起的時候,平臺執行緒就去執行其他任務(其他虛擬執行緒)去了
我們通過上面測試用例可以這樣理解,
用例1,通常我們的RPC服務或者SDK跟我開通SDK二次驗證大部分時間處於等待掛起業務,這時候虛擬執行緒的作用就會非常大,他可以發起大量的驗證請求,等待回答;我們通常定義的IO密集型應用;
用例2,屬於計算型的,它會一直佔用cpu時間片,不會騰出cpu去執行其他事件;我們通常說cpu密集型應用不太適用虛擬執行緒;
目前虛擬執行緒的執行依賴於底層執行緒池,我們無法自主控制它,所以不是很建議使用
關於虛擬執行緒的描述或者定義我就不在過多的去闡述,
我只說一下它執行的邏輯吧,
1,在不同時間段一個虛擬執行緒可以由不同的平臺執行緒排程,也可以由一個平臺執行緒排程,平臺執行緒=系統執行緒=cpu
2,在不同時間段一個平臺執行緒在可以排程不同的虛擬執行緒,也可以反覆排程一個虛擬執行緒
3,在同一時間段,一個平臺執行緒只能呼叫一個虛擬執行緒,一個虛擬執行緒只能由一個平臺執行緒排程
換言之,其實虛擬執行緒可以看成一個task,你可以new很多的task,至於他什麼時候被執行,就看你的工人(cpu)什麼時候有空,
1 package code.threading; 2 3 import org.junit.Test; 4 5 import java.util.ArrayList; 6 import java.util.List; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.atomic.AtomicBoolean; 9 import java.util.concurrent.atomic.AtomicInteger; 10 11 /** 12 * 執行緒測試 13 * 14 * @author: Troy.Chen(無心道, 15388152619) 15 * @version: 2023-05-29 21:31 16 **/ 17 public class ThreadCode { 18 19 public static void main(String[] args) throws Exception { 20 21 } 22 23 @Test 24 public void s() throws Exception { 25 26 Runnable runnable = () -> { 27 long nanoTime = System.nanoTime(); 28 long g = 0; 29 for (int i = 0; i < 10000; i++) { 30 for (int j = 0; j < 10000; j++) { 31 for (int k = 0; k < 100; k++) { 32 g++; 33 } 34 } 35 } 36 Thread thread = Thread.currentThread(); 37 System.out.println(g + " - " + thread.isVirtual() + " - " + thread.threadId() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f)); 38 }; 39 40 List<VirtualThread> ts = new ArrayList<>(); 41 ts.add(new VirtualThread(runnable)); 42 ts.add(new VirtualThread(runnable)); 43 ts.add(new VirtualThread(runnable)); 44 ts.add(new VirtualThread(runnable)); 45 ts.add(new VirtualThread(runnable)); 46 ts.add(new VirtualThread(runnable)); 47 ts.add(new VirtualThread(runnable)); 48 ts.add(new VirtualThread(runnable)); 49 ts.add(new VirtualThread(runnable)); 50 ts.add(new VirtualThread(runnable)); 51 ts.add(new VirtualThread(runnable)); 52 ts.add(new VirtualThread(runnable)); 53 ts.add(new VirtualThread(runnable)); 54 ts.add(new VirtualThread(runnable)); 55 ts.add(new VirtualThread(runnable)); 56 ts.add(new VirtualThread(runnable)); 57 ts.add(new VirtualThread(runnable)); 58 ts.add(new VirtualThread(runnable)); 59 ts.add(new VirtualThread(runnable)); 60 ts.add(new VirtualThread(runnable)); 61 for (VirtualThread t : ts) { 62 t.shutdown(); 63 } 64 for (VirtualThread t : ts) { 65 t.join(); 66 } 67 } 68 69 public static class VirtualThread implements Runnable { 70 71 /*虛擬執行緒構建器*/ 72 static final Thread.Builder.OfVirtual ofVirtual = Thread.ofVirtual().name("v-", 1); 73 74 AtomicBoolean shutdown = new AtomicBoolean(); 75 Thread _thread; 76 Runnable runnable; 77 78 public VirtualThread(Runnable runnable) { 79 this.runnable = runnable; 80 _thread = ofVirtual.start(this); 81 } 82 83 @Override public void run() { 84 do { 85 try { 86 try { 87 this.runnable.run(); 88 } catch (Throwable e) { 89 e.printStackTrace(); 90 } 91 } catch (Throwable throwable) {} 92 } while (!shutdown.get()); 93 System.out.println("虛擬執行緒退出 " + _thread.isVirtual() + " - " + _thread.threadId() + " - " + _thread.getName()); 94 } 95 96 public void shutdown() { 97 shutdown.lazySet(true); 98 } 99 100 public void join() throws InterruptedException { 101 _thread.join(); 102 } 103 } 104 105 @Test 106 public void r() { 107 t2p(); 108 t2v(); 109 } 110 111 public void t1p() { 112 AtomicInteger atomicInteger = new AtomicInteger(100); 113 try (var executor = Executors.newFixedThreadPool(10)) { 114 long nanoTime = System.nanoTime(); 115 for (int i = 0; i < 100; i++) { 116 executor.execute(() -> { 117 try { 118 Thread.sleep(50); 119 } catch (InterruptedException e) { 120 throw new RuntimeException(e); 121 } 122 atomicInteger.decrementAndGet(); 123 }); 124 } 125 while (atomicInteger.get() > 0) {} 126 System.out.println("平臺執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f)); 127 } 128 } 129 130 public void t1v() { 131 AtomicInteger atomicInteger = new AtomicInteger(100); 132 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { 133 long nanoTime = System.nanoTime(); 134 for (int i = 0; i < 100; i++) { 135 executor.execute(() -> { 136 try { 137 Thread.sleep(50); 138 } catch (InterruptedException e) { 139 throw new RuntimeException(e); 140 } 141 atomicInteger.decrementAndGet(); 142 }); 143 } 144 while (atomicInteger.get() > 0) {} 145 System.out.println("虛擬執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f)); 146 } 147 } 148 149 public void t2p() { 150 Runnable runnable = () -> { 151 long g = 0; 152 for (int i = 0; i < 10000; i++) { 153 for (int j = 0; j < 10000; j++) { 154 for (int k = 0; k < 100; k++) { 155 g++; 156 } 157 } 158 } 159 }; 160 AtomicInteger atomicInteger = new AtomicInteger(100); 161 try (var executor = Executors.newFixedThreadPool(10)) { 162 long nanoTime = System.nanoTime(); 163 for (int i = 0; i < 100; i++) { 164 executor.execute(() -> { 165 runnable.run(); 166 atomicInteger.decrementAndGet(); 167 }); 168 } 169 while (atomicInteger.get() > 0) {} 170 System.out.println("平臺執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f)); 171 } 172 } 173 174 public void t2v() { 175 Runnable runnable = () -> { 176 long g = 0; 177 for (int i = 0; i < 10000; i++) { 178 for (int j = 0; j < 10000; j++) { 179 for (int k = 0; k < 100; k++) { 180 g++; 181 } 182 } 183 } 184 }; 185 AtomicInteger atomicInteger = new AtomicInteger(100); 186 try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { 187 long nanoTime = System.nanoTime(); 188 for (int i = 0; i < 100; i++) { 189 executor.execute(() -> { 190 runnable.run(); 191 atomicInteger.decrementAndGet(); 192 }); 193 } 194 while (atomicInteger.get() > 0) {} 195 System.out.println("虛擬執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f)); 196 } 197 } 198 }
附加一段全部測試程式碼
跪求保留標示符 /** * @author: Troy.Chen(失足程式設計師, 15388152619) * @version: 2021-07-20 10:55 **/ C#版本程式碼 vs2010及以上工具可以 java 開發工具是netbeans 和 idea 版本,只有專案匯入如果出現異常,請根據自己的工具調整 提供免費倉儲。 最新的程式碼地址:↓↓↓ https://gitee.com/wuxindao 覺得我還可以,打賞一下吧,你的肯定是我努力的最大動力