java協程執行緒之虛擬執行緒

2023-07-19 15:00:11

前言

眾所周知,java 是沒有協程執行緒的,在我們如此熟知的jdk 1.8時代,大佬們想出來的辦法就是非同步io,甚至用並行的stream流來實現,高並行也好,縮短事件處理時間也好;大家都在想著自己認為更好的實現方式;

在來說說吧,我為什麼會在今天研究這個破b玩意兒呢,

這事情還的從一個月前的版本維護說起,

目前公司遊戲運營的算中規中矩吧,日新增和日活躍使用者基本保持在1w,2.5w樣子;

大概1-2週會有一次版本更新,需要停服維護的,

我想大部分做遊戲的同僚可能都知道,遊戲架構裡面包含一個登入服這麼一個環節,用於對賬號管理以及和sdk平臺做登入二次驗證;

我們的問題也就出在了這sdk二次登入驗證環境;

 從這個截圖中不難看出,我在向sdk伺服器進行驗證的時候http請求耗時,一個請求多長達400ms,按照這個邏輯,一個執行緒一秒鐘也只能是2個登入;

然後面對停服維護階段,玩家瘋狂的嘗試登入,導致登入伺服器直接積壓了30萬個登入請求等待處理;

在尋求方案的時候,看到了http請求池化方案,目前已經大執行緒池(這裡是本人自定義執行緒池)和http池化(基於 Apache CloseableHttpClient)處理方案 因為平臺是jdk11的

在尋求方案同時發現了jdk19開放的預覽版新功能虛擬執行緒;翻閱了一些資料,就像這虛擬執行緒能不能為我帶來更好效能體驗,讓現有的系統,吞吐量更上一層樓;

一下測試程式碼用的是jdk20測試

 構建虛擬執行緒

 第一步我們需要先建立虛擬執行緒,才能去理解什麼是虛擬執行緒

1     public static void main(String[] args) throws Exception {
2 
3         Thread.startVirtualThread(() -> {
4             System.out.println(Thread.currentThread().toString());
5         });
6 
7         Thread.sleep(3000);
8     }

 

 這就正確的啟動了一個虛擬執行緒;從執行緒明明輸出看著是不是有點眼熟,是不是跟stream的並行流很相似;

接下來我們看看虛擬執行緒的執行是怎麼回事,

 1    public static void main(String[] args) throws Exception {
 2 
 3         Thread.startVirtualThread(() -> {
 4             try {
 5                 Thread.sleep(5000);
 6             } catch (InterruptedException e) {
 7                 throw new RuntimeException(e);
 8             }
 9             System.out.println(Thread.currentThread().toString());
10         });
11 
12         Thread.startVirtualThread(() -> {
13             try {
14                 Thread.sleep(5000);
15             } catch (InterruptedException e) {
16                 throw new RuntimeException(e);
17             }
18             System.out.println(Thread.currentThread().toString());
19         });
20         Thread.startVirtualThread(() -> {
21             try {
22                 Thread.sleep(5000);
23             } catch (InterruptedException e) {
24                 throw new RuntimeException(e);
25             }
26             System.out.println(Thread.currentThread().toString());
27         });
28         Thread.startVirtualThread(() -> {
29             try {
30                 Thread.sleep(5000);
31             } catch (InterruptedException e) {
32                 throw new RuntimeException(e);
33             }
34             System.out.println(Thread.currentThread().toString());
35         });
36         Thread.startVirtualThread(() -> {
37             try {
38                 Thread.sleep(5000);
39             } catch (InterruptedException e) {
40                 throw new RuntimeException(e);
41             }
42             System.out.println(Thread.currentThread().toString());
43         });
44         Thread.sleep(3000);
45     }
View Code

我們多new幾個虛擬執行緒來看看監控

 看到了吧,實際上你new的虛擬執行緒,其實是被當成了一個任務丟到了執行緒池裡面在執行;

 在翻閱了現有的程式碼邏輯還不能定義這個底部執行緒池,只能使用預設的;

當然目前是預覽版,不確定之後會不會可以自定義實現,stream流一樣,可以定義它並行數量;

 

執行緒池對比

測試用例1 

 1     @Test
 2     public void r() {
 3         t1();
 4         t2();
 5     }
 6 
 7     public void t1() {
 8         AtomicInteger atomicInteger = new AtomicInteger(100);
 9         try (var executor = Executors.newFixedThreadPool(10)) {
10             long nanoTime = System.nanoTime();
11             for (int i = 0; i < 100; i++) {
12                 executor.execute(() -> {
13                     try {
14                         Thread.sleep(50);
15                     } catch (InterruptedException e) {
16                         throw new RuntimeException(e);
17                     }
18                     atomicInteger.decrementAndGet();
19                 });
20             }
21             while (atomicInteger.get() > 0) {}
22             System.out.println("平臺執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
23         }
24     }
25 
26     public void t2() {
27         AtomicInteger atomicInteger = new AtomicInteger(100);
28         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
29             long nanoTime = System.nanoTime();
30             for (int i = 0; i < 100; i++) {
31                 executor.execute(() -> {
32                     try {
33                         Thread.sleep(50);
34                     } catch (InterruptedException e) {
35                         throw new RuntimeException(e);
36                     }
37                     atomicInteger.decrementAndGet();
38                 });
39             }
40             while (atomicInteger.get() > 0) {}
41             System.out.println("虛擬執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
42         }
43     }
View Code

 通過這段測試程式碼對比,總任務耗時,顯而易見效能;

測試用例2

 1     public void t2p() {
 2         Runnable runnable = () -> {
 3             long g = 0;
 4             for (int i = 0; i < 10000; i++) {
 5                 for (int j = 0; j < 10000; j++) {
 6                     for (int k = 0; k < 100; k++) {
 7                         g++;
 8                     }
 9                 }
10             }
11         };
12         AtomicInteger atomicInteger = new AtomicInteger(100);
13         try (var executor = Executors.newFixedThreadPool(10)) {
14             long nanoTime = System.nanoTime();
15             for (int i = 0; i < 100; i++) {
16                 executor.execute(() -> {
17                     runnable.run();
18                     atomicInteger.decrementAndGet();
19                 });
20             }
21             while (atomicInteger.get() > 0) {}
22             System.out.println("平臺執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
23         }
24     }
25 
26     public void t2v() {
27         Runnable runnable = () -> {
28             long g = 0;
29             for (int i = 0; i < 10000; i++) {
30                 for (int j = 0; j < 10000; j++) {
31                     for (int k = 0; k < 100; k++) {
32                         g++;
33                     }
34                 }
35             }
36         };
37         AtomicInteger atomicInteger = new AtomicInteger(100);
38         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
39             long nanoTime = System.nanoTime();
40             for (int i = 0; i < 100; i++) {
41                 executor.execute(() -> {
42                     runnable.run();
43                     atomicInteger.decrementAndGet();
44                 });
45             }
46             while (atomicInteger.get() > 0) {}
47             System.out.println("虛擬執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
48         }
49     }

 通過測試用例2不難看出,虛擬執行緒已經不佔優勢;

這是為什麼呢?

總結

平臺執行緒我就不過多描述因為大家都知道,網上的描述也特別多;

虛擬執行緒,其實我們更多可以可以考慮他只是一個任務,非同步的任務;

區別在於,平臺執行緒受制於cpu,如果你執行任務很耗時或者比如網路io等掛起等待,那麼這個cpu也會一直掛起等待無法處理其他事情;

虛擬執行緒是非同步任務凌駕於平臺執行緒之上,也就是說,當你的虛擬執行緒等待掛起的時候,平臺執行緒就去執行其他任務(其他虛擬執行緒)去了

我們通過上面測試用例可以這樣理解,

 

用例1,通常我們的RPC服務或者SDK跟我開通SDK二次驗證大部分時間處於等待掛起業務,這時候虛擬執行緒的作用就會非常大,他可以發起大量的驗證請求,等待回答;我們通常定義的IO密集型應用;

 

用例2,屬於計算型的,它會一直佔用cpu時間片,不會騰出cpu去執行其他事件;我們通常說cpu密集型應用不太適用虛擬執行緒;

 

目前虛擬執行緒的執行依賴於底層執行緒池,我們無法自主控制它,所以不是很建議使用

關於虛擬執行緒的描述或者定義我就不在過多的去闡述,

我只說一下它執行的邏輯吧,

1,在不同時間段一個虛擬執行緒可以由不同的平臺執行緒排程,也可以由一個平臺執行緒排程,平臺執行緒=系統執行緒=cpu

2,在不同時間段一個平臺執行緒在可以排程不同的虛擬執行緒,也可以反覆排程一個虛擬執行緒

3,在同一時間段,一個平臺執行緒只能呼叫一個虛擬執行緒,一個虛擬執行緒只能由一個平臺執行緒排程

換言之,其實虛擬執行緒可以看成一個task,你可以new很多的task,至於他什麼時候被執行,就看你的工人(cpu)什麼時候有空,

 

  1 package code.threading;
  2 
  3 import org.junit.Test;
  4 
  5 import java.util.ArrayList;
  6 import java.util.List;
  7 import java.util.concurrent.Executors;
  8 import java.util.concurrent.atomic.AtomicBoolean;
  9 import java.util.concurrent.atomic.AtomicInteger;
 10 
 11 /**
 12  * 執行緒測試
 13  *
 14  * @author: Troy.Chen(無心道, 15388152619)
 15  * @version: 2023-05-29 21:31
 16  **/
 17 public class ThreadCode {
 18 
 19     public static void main(String[] args) throws Exception {
 20 
 21     }
 22 
 23     @Test
 24     public void s() throws Exception {
 25 
 26         Runnable runnable = () -> {
 27             long nanoTime = System.nanoTime();
 28             long g = 0;
 29             for (int i = 0; i < 10000; i++) {
 30                 for (int j = 0; j < 10000; j++) {
 31                     for (int k = 0; k < 100; k++) {
 32                         g++;
 33                     }
 34                 }
 35             }
 36             Thread thread = Thread.currentThread();
 37             System.out.println(g + " - " + thread.isVirtual() + " - " + thread.threadId() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
 38         };
 39 
 40         List<VirtualThread> ts = new ArrayList<>();
 41         ts.add(new VirtualThread(runnable));
 42         ts.add(new VirtualThread(runnable));
 43         ts.add(new VirtualThread(runnable));
 44         ts.add(new VirtualThread(runnable));
 45         ts.add(new VirtualThread(runnable));
 46         ts.add(new VirtualThread(runnable));
 47         ts.add(new VirtualThread(runnable));
 48         ts.add(new VirtualThread(runnable));
 49         ts.add(new VirtualThread(runnable));
 50         ts.add(new VirtualThread(runnable));
 51         ts.add(new VirtualThread(runnable));
 52         ts.add(new VirtualThread(runnable));
 53         ts.add(new VirtualThread(runnable));
 54         ts.add(new VirtualThread(runnable));
 55         ts.add(new VirtualThread(runnable));
 56         ts.add(new VirtualThread(runnable));
 57         ts.add(new VirtualThread(runnable));
 58         ts.add(new VirtualThread(runnable));
 59         ts.add(new VirtualThread(runnable));
 60         ts.add(new VirtualThread(runnable));
 61         for (VirtualThread t : ts) {
 62             t.shutdown();
 63         }
 64         for (VirtualThread t : ts) {
 65             t.join();
 66         }
 67     }
 68 
 69     public static class VirtualThread implements Runnable {
 70 
 71         /*虛擬執行緒構建器*/
 72         static final Thread.Builder.OfVirtual ofVirtual = Thread.ofVirtual().name("v-", 1);
 73 
 74         AtomicBoolean shutdown = new AtomicBoolean();
 75         Thread _thread;
 76         Runnable runnable;
 77 
 78         public VirtualThread(Runnable runnable) {
 79             this.runnable = runnable;
 80             _thread = ofVirtual.start(this);
 81         }
 82 
 83         @Override public void run() {
 84             do {
 85                 try {
 86                     try {
 87                         this.runnable.run();
 88                     } catch (Throwable e) {
 89                         e.printStackTrace();
 90                     }
 91                 } catch (Throwable throwable) {}
 92             } while (!shutdown.get());
 93             System.out.println("虛擬執行緒退出 " + _thread.isVirtual() + " - " + _thread.threadId() + " - " + _thread.getName());
 94         }
 95 
 96         public void shutdown() {
 97             shutdown.lazySet(true);
 98         }
 99 
100         public void join() throws InterruptedException {
101             _thread.join();
102         }
103     }
104 
105     @Test
106     public void r() {
107         t2p();
108         t2v();
109     }
110 
111     public void t1p() {
112         AtomicInteger atomicInteger = new AtomicInteger(100);
113         try (var executor = Executors.newFixedThreadPool(10)) {
114             long nanoTime = System.nanoTime();
115             for (int i = 0; i < 100; i++) {
116                 executor.execute(() -> {
117                     try {
118                         Thread.sleep(50);
119                     } catch (InterruptedException e) {
120                         throw new RuntimeException(e);
121                     }
122                     atomicInteger.decrementAndGet();
123                 });
124             }
125             while (atomicInteger.get() > 0) {}
126             System.out.println("平臺執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
127         }
128     }
129 
130     public void t1v() {
131         AtomicInteger atomicInteger = new AtomicInteger(100);
132         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
133             long nanoTime = System.nanoTime();
134             for (int i = 0; i < 100; i++) {
135                 executor.execute(() -> {
136                     try {
137                         Thread.sleep(50);
138                     } catch (InterruptedException e) {
139                         throw new RuntimeException(e);
140                     }
141                     atomicInteger.decrementAndGet();
142                 });
143             }
144             while (atomicInteger.get() > 0) {}
145             System.out.println("虛擬執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
146         }
147     }
148 
149     public void t2p() {
150         Runnable runnable = () -> {
151             long g = 0;
152             for (int i = 0; i < 10000; i++) {
153                 for (int j = 0; j < 10000; j++) {
154                     for (int k = 0; k < 100; k++) {
155                         g++;
156                     }
157                 }
158             }
159         };
160         AtomicInteger atomicInteger = new AtomicInteger(100);
161         try (var executor = Executors.newFixedThreadPool(10)) {
162             long nanoTime = System.nanoTime();
163             for (int i = 0; i < 100; i++) {
164                 executor.execute(() -> {
165                     runnable.run();
166                     atomicInteger.decrementAndGet();
167                 });
168             }
169             while (atomicInteger.get() > 0) {}
170             System.out.println("平臺執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
171         }
172     }
173 
174     public void t2v() {
175         Runnable runnable = () -> {
176             long g = 0;
177             for (int i = 0; i < 10000; i++) {
178                 for (int j = 0; j < 10000; j++) {
179                     for (int k = 0; k < 100; k++) {
180                         g++;
181                     }
182                 }
183             }
184         };
185         AtomicInteger atomicInteger = new AtomicInteger(100);
186         try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
187             long nanoTime = System.nanoTime();
188             for (int i = 0; i < 100; i++) {
189                 executor.execute(() -> {
190                     runnable.run();
191                     atomicInteger.decrementAndGet();
192                 });
193             }
194             while (atomicInteger.get() > 0) {}
195             System.out.println("虛擬執行緒 - " + atomicInteger.get() + " - " + ((System.nanoTime() - nanoTime) / 10000 / 100f));
196         }
197     }
198 }
View Code

附加一段全部測試程式碼