人類壓榨CPU的腳步從未停止過。在實際的生產過程中,我們將CPU的任務分為兩大類:
為了提高IO密集型任務的CPU利用率,常常採用非同步加回撥的方案。我們去餐廳吃飯,點菜之後就可以回座位上刷手機了,這叫非同步;飯菜做好了,服務員把菜端過來,這叫回撥。
在軟體開發的過程中,非同步加回撥的方案將一件事拆成兩個過程,不符合人類的線性思維,增加了程式碼複雜度,提高了排查錯誤的難度。這就好比,我們下單後回座位等待,雖然有空幹別的事情,但是也不能離開餐廳,心裡要記得菜還沒上。
最簡單的方法是,下單之後在視窗等著,直到廚師做好了,我們才端走飯菜,這叫做同步阻塞。同步阻塞的方案簡單直接,程式設計師的心智負擔最輕,如下程式碼所示:
/**
* 顧客用餐
*
* @param customerOrder 顧客訂單
* @return
*/
public void customerDish(CustomerOrder customerOrder) {
// 顧客下單,生成訂單
RestaurantOrder restaurantOrder = submitOrder(customerOrder);
// 廚房接到訂單,開始做飯,耗時5分鐘
CustomerDish customerDish = cookCustomerDish(restaurantOrder);
// 顧客拿到飯菜,開始吃飯
customerEating(customerDish);
}
如果很多顧客來吃飯,都聚集在視窗等待,相當於將處理過程變為執行緒,放入執行緒池中執行,如下程式碼所示:
/**
* 顧客吃飯的執行緒
*/
class CustomerDishThread extends Thread {
private CustomerOrder customerOrder;
CustomerDishThread(CustomerOrder customerOrder) {
this.customerOrder = customerOrder;
}
@Override
public void run() {
// 顧客用餐
customerDish(customerOrder);
}
}
private static final ExecutorService THREAD_POOL = Executors.newCachedThreadPool();
/**
* 餐廳接待很多顧客
* @param customerOrderList
*/
public void serveManyCustomer(List<CustomerOrder> customerOrderList) {
for (CustomerOrder customerOrder : customerOrderList) {
THREAD_POOL.execute(new CustomerDishThread(customerOrder));
}
}
同步阻塞方案是低效的,浪費顧客的時間,視窗也擠不下太多人。如果把餐廳看作伺服器端,把顧客看成使用者端的請求,伺服器端能夠並行執行的執行緒數有限。當執行緒非常多的時候,作業系統頻繁排程執行緒,上下文切換是不小的開銷。有沒有辦法減少執行緒排程的開銷呢?
協程登場了。
協程(Coroutines)的完整定義是「共同作業式排程的使用者態執行緒」。首先,要理解執行緒排程的兩種方式:
共同作業式排程:當前執行緒完全佔用CPU時間,除非自己讓出時間片,直到執行結束,系統才執行下一個執行緒。可能出現一個執行緒一直佔有CPU,而其他執行緒等待。
搶佔式排程:作業系統決定下一個佔用CPU時間的是哪一個執行緒,定期的中斷當前正在執行的執行緒,任何一個執行緒都不能獨佔。不會因為一個執行緒而影響整個程序的執行。
另外,要理解使用者態和核心態的概念。
作業系統的核心是核心(kernel),它獨立於普通的應用程式,可以存取受保護的記憶體空間,也有存取底層硬體裝置的所有許可權。有些CPU 的指令是非常危險的,一旦用錯可能導致系統崩潰。如果所有的程式都可以任意使用這些指令,那麼系統崩潰的概率將大大增加。為了保證核心的安全,作業系統一般都禁止使用者程序直接操作核心。具體的實現方式是將虛擬記憶體空間劃分為兩部分,一部分為核心空間,另一部分為使用者空間。當程序執行在核心空間時就處於核心態,程序執行在使用者空間時則處於使用者態。
無論是程序還是執行緒,它們的上下文切換和"核心態、使用者態"沒有直接的關係。比如只要需要系統呼叫,即使不做任何切換,都需要進入核心態。舉個例子:一個執行緒呼叫函數在螢幕上列印 hello world,就已經進入了核心態了,因為列印字元的功能是由核心程式提供的。總的來說,應用程式通常執行在使用者態,遇到下列三種情況會切換到核心態:
執行緒的程式碼在使用者態執行,而排程是在核心態執行的。作業系統切換執行緒上下文的步驟如下所示:
協程不是作業系統的底層特性,系統感知不到它的存在。它執行線上程裡面,通過分時複用執行緒的方式執行,不會增加執行緒的數量。協程也有上下文切換,但是不會切換到核心態去,比執行緒切換的開銷要小很多。每個協程的體積比執行緒要小得多,一個執行緒可以容納數量相當可觀的協程。
在IO密集型的任務中有著大量的阻塞等待過程,協程採用共同作業式排程,在IO阻塞的時候讓出CPU,當IO就緒後再主動佔用CPU,犧牲任務執行的公平性換取吞吐量。
事物都有兩面性,協程也存在幾個弊端:
無論是執行緒還是協程,都只是作業系統層面的抽象概念,本質是函數執行的載體。可以簡單的認為協程是一個能夠被暫停以及被恢復執行的函數,在共同作業排程器的控制下執行,同一個時刻只能執行一個函數。
我們來看看下面的Java程式碼,程式碼中出現的註解 Coroutine 和 CoroutineSchedule ,只是為了更好的演示而編造出來,JDK並沒有這兩個註解。
public class CoroutineDemo {
static void functionA() {
System.out.println("A");
}
static void functionB() {
System.out.println("B");
}
static void functionC() {
System.out.println("C");
}
/**
* 普通的函數
*/
static void commonFunction() {
functionA();
functionB();
functionC();
}
/**
* @Coroutine 標識函數為協程
*/
@Coroutine
static void coroutineFunction() {
functionA();
functionB();
functionC();
}
/**
* @Coroutine 標識協程排程器,跟隨主執行緒一起啟動
*/
@CoroutineSchedule
void coroutineScheduleRule() {
//如果等待IO,暫停協程coroutineFunction,否則就恢復
if(waitIO()){
yieldFunction("coroutineFunction");
}else {
resumeFunction("coroutineFunction");
}
}
public static void main(String[] args) {
Thread commonThread = new Thread(() -> {
//執行普通函數
commonFunction();
});
commonThread.start();
Thread coroutineThread = new Thread(() -> {
//執行協程
coroutineFunction();
});
coroutineThread.start();
}
}
main方法啟動了兩個執行緒,普通函數 commonFunction 執行後,會依次列印出 A B C。協程 coroutineFunction 執行後,不確定列印什麼,因為協程排程器有規則:如果CPU繁忙就暫停協程。如果協程列印了 A 之後就被暫停了,當它被再次喚醒,可能會接著列印 B C,而不是列印 A 。因為協程記錄了函數執行的上下文資訊,知道自己上一次執行到了哪裡。這和作業系統排程執行緒是一樣的,暫停當前執行緒,儲存執行狀態後去排程其它執行緒,該執行緒再次被分配CPU後繼續執行,就像沒有被暫停過一樣。
我們嘗試一下用 C/C++ 實現一個簡單的協程。協程有兩個重要的部分:排程器和使用者態的上下文切換。Linux系統已經提供了操作使用者態上下文的介面,只需要實現排程器即可。glibc是一個C語言庫,封裝了系統最重要的系統服務,提供了最底層的API。glibc包含一個ucontext庫,支援使用者態的上下文切換。
首先看看ucontext提供的四個基本函數:
函數 | 作用 |
---|---|
int getcontext(ucontext_t *ucp) | 獲得當前上下文儲存的棧和入口執行點 |
int setcontext(const ucontext_t *ucp) | 設定當前上下文。初始化ucp結構體,將當前的上下文儲存到ucp中 |
void makecontext(ucontext_t ucp, void (func)(), int argc, ...) | 建立一個新的上下文。修改上下文ucp,給該上下文指定一個棧空間ucp->stack,設定後繼的上下文ucp->uc_link |
int swapcontext(ucontext_t *oucp, ucontext_t *ucp) | 切換上下文。儲存當前上下文到oucp,設定到ucp所指向的上下文,跳轉到ucp所指的地方 |
ucontext_t 是使用者態上下文資料,看看它的資料結構:
typedef struct ucontext {
// 後繼的上下文,表示當前程式執行之後下一個上下文
struct ucontext *uc_link;
sigset_t uc_sigmask;
// 上下文堆疊
stack_t uc_stack;
mcontext_t uc_mcontext;
} ucontext_t;
在下面的程式碼演示中,你將會進一步理解這4個函數的用法,程式碼的偵錯環境是Ubuntu 16、Visual Studio Code(包含 C/C++ 開發外掛):
#include <stdio.h>
#include <ucontext.h>
#include <unistd.h>
int i = 1 , max = 5;
int main() {
ucontext_t context;
puts("上菜了");
getcontext(&context);
if (i > max ) return 0;
puts("張三吃飯了");
i++;
setcontext(&context);
puts("李四吃飯了");
return 0;
}
李四吃上飯嗎?你大概能夠猜到程式碼不會執行到puts("李四吃飯了");
,以上程式碼的輸出結果是:
上菜了
張三吃飯了
張三吃飯了
張三吃飯了
張三吃飯了
張三吃飯了
getcontext(&context)
獲取了程式執行的上下文,setcontext(&context)
給當前程式設定上下文,程式立即重新執行。&context
記錄了已經執行的程式碼行,那麼再次執行的起始行是if (i > max ) return 0
,這樣永遠不會走到puts("李四吃飯了")
。
以下程式碼演示了 makecontext 和 swapcontext 函數的用法,以及設定上下文堆疊引數:
#include <ucontext.h>
#include <stdio.h>
void eating()
{
puts("李四吃飯了");
}
int main()
{
//指定棧空間
char stack[512*128];
ucontext_t child,main;
//獲取當前上下文
getcontext(&child);
//指定棧空間
child.uc_stack.ss_sp = stack;
//指定棧空間大小
child.uc_stack.ss_size = sizeof(stack);
child.uc_stack.ss_flags = 0;
//設定後繼上下文
child.uc_link = &main;
puts("上菜了");
//修改 child 上下文,指向eating函數
makecontext(&child,(void (*)(void))eating,0);
//切換到child上下文,儲存當前上下文到main
swapcontext(&main,&child);
puts("張三吃飯了");
return 0;
}
以上程式碼的輸出結果是:
上菜了
李四吃飯了
張三吃飯了
入口main方法是一個執行緒,函數swapcontext(&main,&child)
交換了上下文引數,將會執行函數eating()
,之後再執行child的後繼上下文main,回到了主執行緒main。從這段程式碼你能否想到如何實現一個協程排程器?
在真實的生產環境下,協程排程器是個執行在後臺的執行緒,自動化排程所有協程,排程規則也比較複雜。以下程式碼將實現一個無法自動化排程的排程器。
首先定義協程結構體:
//上下文堆疊
#define DEFAULT_STACK_SZIE (512*128)
//定義協程狀態
enum ThreadState{FREE,RUNNABLE,RUNNING,SUSPEND};
//定義協程結構體
typedef struct uthread_t
{
ucontext_t ctx;
Fun func;
void *arg;
enum ThreadState state;
char stack[DEFAULT_STACK_SZIE];
}uthread_t;
定義排程器結構體:
//最大協程數量
#define MAX_UTHREAD_SIZE 512
typedef struct schedule_t
{
ucontext_t main;
//正在執行的協程的ID,一個執行緒只能執行一個協程
int running_thread;
uthread_t *threads;
//協程數量
int uthread_count;
schedule_t():running_thread(-1), uthread_count(0) {
threads = new uthread_t[MAX_UTHREAD_SIZE];
for (int i = 0; i < MAX_UTHREAD_SIZE; i++) {
threads[i].state = FREE;
}
}
~schedule_t() {
delete [] threads;
}
}schedule_t;
定義協程排程方法:
// 建立協程
int uthread_create(schedule_t &schedule,Fun func,void *arg);
// 掛起協程
void uthread_yield(schedule_t &schedule);
// 恢復協程
void uthread_resume(schedule_t &schedule,int id);
實現協程排程方法:
// 建立協程
int uthread_create(schedule_t &schedule, Fun func, void *arg)
{
int id = 0;
for (id = 0; id < schedule.uthread_count; ++id)
{
if (schedule.threads[id].state == FREE)
{
break;
}
}
if (id == schedule.uthread_count)
{
schedule.uthread_count++;
}
uthread_t *t = &(schedule.threads[id]);
t->state = RUNNABLE;
t->func = func;
t->arg = arg;
getcontext(&(t->ctx));
t->ctx.uc_stack.ss_sp = t->stack;
t->ctx.uc_stack.ss_size = DEFAULT_STACK_SZIE;
t->ctx.uc_stack.ss_flags = 0;
t->ctx.uc_link = &(schedule.main);
schedule.running_thread = id;
//建立協程結構體
makecontext(&(t->ctx), (void (*)(void))(uthread_init), 1, &schedule);
//切換上下文,執行func函數
swapcontext(&(schedule.main), &(t->ctx));
return id;
}
//初始化一個協程,配合uthread_create使用
void uthread_init(schedule_t *ps)
{
int id = ps->running_thread;
if (id != -1)
{
uthread_t *t = &(ps->threads[id]);
t->func(t->arg);
t->state = FREE;
ps->running_thread = -1;
}
}
// 恢復執行協程
void uthread_resume(schedule_t &schedule, int id)
{
if (id < 0 || id >= schedule.uthread_count)
{
return;
}
uthread_t *t = &(schedule.threads[id]);
if (t->state == SUSPEND)
{
// 上下文切到t->ctx,即恢復執行協程
swapcontext(&(schedule.main), &(t->ctx));
}
}
// 掛起協程
void uthread_yield(schedule_t &schedule)
{
if (schedule.running_thread != -1)
{
uthread_t *t = &(schedule.threads[schedule.running_thread]);
t->state = SUSPEND;
schedule.running_thread = -1;
// 上下文切回主執行緒,相當於掛起協程
swapcontext(&(t->ctx), &(schedule.main));
}
}
測試排程方法:
void zhangsan(void * arg)
{
puts("張三吃飯了");
//掛起協程
uthread_yield(*(schedule_t *)arg);
puts("張三吃完了");
}
void lishi(void *arg)
{
puts("李四吃飯了");
//掛起協程
uthread_yield(*(schedule_t *)arg);
puts("李四吃完了");
}
int main()
{
//初始化排程器
schedule_t schedule;
//建立協程並掛起
int zhangsan_id = uthread_create(schedule,zhangsan,&schedule);
int lisi_id = uthread_create(schedule,lishi,&schedule);
//恢復協程
uthread_resume(schedule,zhangsan_id);
uthread_resume(schedule,lisi_id);
puts("餐廳營業中");
return 0;
}
以上程式的輸出結果:
張三吃飯了
李四吃飯了
張三吃完了
李四吃完了
餐廳營業中
目前許多語言已經支援協程,比如C#、Golang、Python、Lua、Ruby、C++ 20、Erlang,也有一些 C/C++ 開源的協程庫,比如Protothreads、libco。
是不是缺了一個年老色衰的Java?
目前還沒有JDK正式版本支援協程特性,如果想嘗試Java的協程,可以使用Open JDK 19的預覽特性或者 Alibaba JDK 最新版,以及第三方框架Quasar。
2018年1月,OpenJDK官方提出了協程專案Project Loom。2019年,Loom的首個EA版本問世,此時Java的協程類叫做Fiber。它將使用Fiber輕量級使用者模式執行緒,從JVM層面對多執行緒技術進行徹底的改變,使輕量級執行緒的並行也能夠適用於高吞吐量的業務場景。2019年10月,官方將Fiber重新實現為Thread的子類VirtualThread,相容Thread的所有操作。
2021年11月15日,OpenJDK官方宣佈 JDK 19中加入虛擬執行緒的特性 JEP 425: Virtual Threads (Preview)。
Virtual threads are lightweight threads that dramatically reduce the effort of writing, maintaining, and observing high-throughput concurrent applications. (虛擬執行緒是輕量級執行緒,可以顯著減少編寫、維護和觀察高吞吐量並行應用程式的工作量)
該特性屬於預覽版,距離穩定版本還需要一段時間。如要在 JDK 19上嘗試該功能,則必須通過--enable-preview
啟動,如下所示:
java --release 19 --enable-preview Main.java
簡單瞭解一下VirtualThread的相關API:
// 啟動一個簡單虛擬執行緒
Thread thread = Thread.ofVirtual().start(runnable);
// 採用ThreadFactory建立虛擬執行緒
ThreadFactory factory = Thread.ofVirtual().factory();
// 建立大量虛擬執行緒
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 10_000).forEach(i -> {
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return i;
});
});
}
想了解更多細節可以閱讀 https://openjdk.org/jeps/425
Quasar是一個開源的Java協程框架,基本原理是修改位元組碼,使方法掛起後可以儲存和恢復JVM棧幀,方法內部已執行到的位元組碼位置也通過增加狀態機的方式記錄,在下次恢復後可直接跳轉到中斷的位置。專案地址是 http://docs.paralleluniverse.co/quasar/ 。
我們測試一下使用執行緒和協程並行執行10000次的消耗,程式碼如下所示:
// 使用JDK的執行緒和執行緒池
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch=new CountDownLatch(10_000);
long start = System.currentTimeMillis();
ExecutoarService executor= Executors.newCachedThreadPool();
for (int i = 0; i < 10_000; i++) {
executor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
}
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("Thread use:"+(end-start)+" ms");
}
接下來使用Quasar框架的協程,maven依賴設定:
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.7.10</version>
</dependency>
JVM啟動引數要設定--javaagent:C:\Users\Administrator\.m2\repository\co\paralleluniverse\quasar-core\0.7.10\quasar-core-0.7.10.jar
。
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch=new CountDownLatch(10_000);
long start = System.currentTimeMillis();
for (int i = 0; i < 10_000; i++) {
new Fiber<>(new SuspendableRunnable(){
@Override
public Integer run() throws SuspendExecution, InterruptedException {
Fiber.sleep(1000);
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("Fiber use:"+(end-start)+" ms");
}
以上程式碼執行結果可以看出協程效能高出一倍,其他方面的比對如記憶體消耗、GC等,請讀者自行研究。
阿里巴巴JVM團隊根據自身業務需要,在 Open JDK 的基礎上開發了Alibaba Dragonwell,該版本攜帶的Wisp2元件讓JVM支援了協程。阿里巴巴的核心電商應用已經在協程模型上經過兩個雙十一的考驗,效能和穩定性得到了驗證。
Wisp協程完全相容現有多執行緒的程式碼寫法,僅增加JVM引數來開啟協程。我們來嘗試一下,先通過地址 https://github.com/alibaba/dragonwell8/releases/tag/dragonwell-standard-8.12.13_jdk8u345-ga 下載dragonwell8,這個版本相當於Oracle JDK 1.8。在JVM啟動引數中增加-XX:+UseWisp2
,即開啟了協程。
以下程式碼演示了線上程中將2個阻塞佇列的資料交換100000次。
public class Wisp2Demo {
private static final ExecutorService THREAD_POOL = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
BlockingQueue<Byte> q1 = new LinkedBlockingQueue<>(), q2 = new LinkedBlockingQueue<>();
THREAD_POOL.submit(() -> loop(q2, q1));
Future<?> f = THREAD_POOL.submit(() -> loop(q1, q2));
q1.put((byte) 1);
System.out.println(f.get() + " ms");
}
private static long loop(BlockingQueue<Byte> in, BlockingQueue<Byte> out) throws Exception {
long start = System.currentTimeMillis();
for (int i = 0; i < 1_000_000; i++) out.put(in.take());
return System.currentTimeMillis() - start;
}
}
正常啟動JVM:
java Wisp2Demo
6778 ms
帶引數啟動JVM:
// UnlockExperimentalVMOptions 允許使用實驗性引數,保證UseWisp2生效
// ActiveProcessorCount 指定JVM可用的CPU數
java -XX:+UnlockExperimentalVMOptions -XX:+UseWisp2 -XX:ActiveProcessorCount=1 Wisp2Demo
690 ms
啟用協程之後觀察耗時情況,效能提升了近10倍。