作者:vivo 網際網路伺服器團隊- Ye Feng
本文介紹了協程的概念,並討論了 Tars Cpp 協程的實現原理和原始碼分析。
Tars 是 Linux 基金會的開源專案(https://github.com/TarsCloud),它是基於名字服務使用 Tars 協定的高效能 RPC 開發框架,配套一體化的運營管理平臺,並通過伸縮排程,實現運維半托管服務。Tars 集可延伸協定編解碼、高效能 RPC 通訊框架、名字路由與發現、釋出監控、紀錄檔統計、設定管理等於一體,通過它可以快速用微服務的方式構建自己的穩定可靠的分散式應用,並實現完整有效的服務治理。
Tars 目前支援 C++,Java,PHP,Nodejs,Go 語言,其中 TarsCpp 3.x 全面啟用對協程的支援,服務架構全面融合協程。本文基於TarsCpp-v3.0.0版本,討論了協程在TarsCpp服務架構的實現。
協程的概念最早出現在Melvin Conway在1963年的論文("Design of a separable transition-diagram compiler"),協程認為是「可以暫停和恢復執行」的函數。
協程可以看成一種特殊的函數,相比於函數,協程最大的特點就是支援掛起(yield)和恢復(resume)的能力。如上圖所示:函數不能主動中斷執行流;而協程支援主動掛起,中斷執行流,並在一定時機恢復執行。
協程的作用:
降低並行編碼的複雜度,尤其是非同步程式設計(callback hell)。
協程在使用者態中實現排程,避免了陷入核心,上下文切換開銷小。
我們可以簡單的認為協程是使用者態的執行緒。協程和執行緒主要異同:
相同點:都可以實現上下文切換(儲存和恢復執行流)
不同點:執行緒的上下文切換在核心實現,切換的時機由核心排程器控制。協程的上下文切換在使用者態實現,切換的時機由呼叫方自身控制。
程序、執行緒和協程的比較:
按控制傳遞(Control-transfer)機制分為:對稱(Symmetric)協程和非對稱(Asymmetric)協程。
對稱協程:協程之間相互獨立,排程權(CPU)可以在任意協程之間轉移。協程只有一種控制傳遞操作(yield)。對稱協程一般需要排程器支援,通過排程演演算法選擇下一個目標協程。
非對稱協程:協程之間存在呼叫關係,協程讓出的排程權只能返回給呼叫者。協程有兩種控制操作:恢復(resume)和掛起(yield)。
下圖演示了對稱協程的排程權轉移流程,協程只有一個操作yield,表示讓出CPU,返回給排程器。
下圖演示了非對稱協程的排程權轉移流程。協程可以有兩個操作,即resume和yield。resume表示轉移CPU給被呼叫者,yield表示被呼叫者返回CPU給呼叫者。
根據協程是否有獨立的棧空間,協程分為有棧協程(stackful)和無棧協程(stackless)兩種。
有棧協程:每個協程有獨立的棧空間,儲存獨立的上下文(執行棧、暫存器等),協程的喚醒和掛起就是拷貝和切換上下文。優點:協程排程可以巢狀,在記憶體中的任意位置、任意時刻進行。侷限:協程數目增大,記憶體開銷增大。
無棧協程:單個執行緒內所有協程都共用同一個棧空間(共用棧),協程的切換就是簡單的函數呼叫和返回,無棧協程通常是基於狀態機或閉包來實現。優點:減小記憶體開銷。侷限:協程排程產生的區域性變數都在共用棧上, 一旦新的協程執行後共用棧中的資料就會被覆蓋, 先前協程的區域性變數也就不再有效, 進而無法實現引數傳遞、巢狀呼叫等高階協程互動。
Golang 中的 goroutine、Lua 中的協程都是有棧協程;ES6的 await/async、Python 的 Generator、C++20 中的 cooroutine 都是無棧協程。
實現協程的核心有兩點:
實現使用者態的上下文切換。
實現協程的排程。
Tars 協程的由下面幾個類實現:
TC_CoroutineInfo 協程資訊類:實現協程的上下文切換。每個協程對應一個 TC_CoroutineInfo 物件,上下文切換基於boost.context實現。
TC_CoroutineScheduler 協程排程器類:實現了協程的管理和排程。
TC_Coroutine 協程類:繼承於執行緒類(TC_Thread),方便業務快速使用協程。
Tars 協程有幾個特點:
有棧協程。每個協程都分配了獨立的棧空間。
對稱協程。協程之間相互獨立,由排程器負責排程。
基於 epoll 實現協程排程,和網路IO無縫結合。
協程可以看成一種特殊的函數,和普通函數不同,協程函數有掛起(yield)和恢復(resume)的能力,即可以中斷自己的執行流,並且在合適的時候恢復執行流,這也稱為上下文切換的能力。
協程執行的過程,依賴兩個關鍵要素:協程棧和暫存器,協程的上下文環境其實就是暫存器和棧的狀態。實現上下文切換的核心就是實現儲存並恢復當前執行環境的暫存器狀態的能力。
實現使用者態上下文切換一般有以下方式:
Tars 協程是基於 boost.context 實現,boost.context 提供了兩個介面(make_fcontext, jump_fcontext)實現協程的上下文切換。
程式碼1:
/**
* @biref 執行環境上下文
*/
typedef void* fcontext_t;
/**
* @biref 事件引數包裝
*/
struct transfer_t {
fcontext_t fctx; // 來源的執行上下文。來源的上下文指的是從什麼位置跳轉過來的
void* data; // 介面傳入的自定義的指標
};
/**
* @biref 初始化執行環境上下文
* @param sp 棧空間地址
* @param size 棧空間的大小
* @param fn 入口函數
* @return 返回初始化完成後的執行環境上下文
*/
extern "C" fcontext_t make_fcontext(void * stack, std::size_t stack_size, void (* fn)( transfer_t));
/**
* @biref 跳轉到目標上下文
* @param to 目標上下文
* @param vp 目標上下文的附加引數,會設定為transfer_t裡的data成員
* @return 跳轉來源
*/
extern "C" transfer_t jump_fcontext(fcontext_t const to, void * vp);
(1)make_fcontext 建立協程
接受三個引數,stack 是為協程分配的棧底,stack_size 是棧的大小,fn 是協程的入口函數
返回初始化完成後的執行環境上下文
(2)jump_fcontext 切換協程
接受兩個引數,目標上下文地址和引數指標
返回一個上下文,指向當前上下文從哪個上下文跳轉過來
make_fcontext 和 jump_fcontext 通過組合程式碼實現,具體的組合程式碼可以參考:
https://github.com/TarsCloud/TarsCpp/blob/v3.0.0/util/src/asm/jump_x86_64_sysv_elf_gas.S
https://github.com/TarsCloud/TarsCpp/blob/v3.0.0/util/src/asm/make_x86_64_sysv_elf_gas.S
boost context 是通過 fcontext_t結構體來儲存協程狀態。相對於其它組合實現的協程庫,boost的context和stack是一起的,棧底指標就是context,切換context就是切換stack。
TC_CoroutineInfo 協程資訊類,包裝了 boost.context 提供的介面,表示一個 TARS 協程。
其中,TC_CoroutineInfo::registerFunc 定義了協程的建立。
程式碼2:
void TC_CoroutineInfo::registerFunc(const std::function<void ()>& callback)
{
_callback = callback;
_init_func.coroFunc = TC_CoroutineInfo::corotineProc;
_init_func.args = this;
fcontext_t ctx = make_fcontext(_stack_ctx.sp, _stack_ctx.size,
TC_CoroutineInfo::corotineEntry); // 建立協程
transfer_t tf = jump_fcontext(ctx, this); // context 切換
//實際的ctx
this->setCtx(tf.fctx);
}
void TC_CoroutineInfo::corotineEntry(transfer_t tf)
{
TC_CoroutineInfo * coro = static_cast< TC_CoroutineInfo * >(tf.data); // this
auto func = coro->_init_func.coroFunc;
void* args = coro->_init_func.args;
transfer_t t = jump_fcontext(tf.fctx, NULL);
//拿到自己的協程堆疊, 當前協程結束以後, 好跳轉到main
coro->_scheduler->setMainCtx(t.fctx);
//再跳轉到具體函數
func(args, t);
}
TC_CoroutineInfo::switchCoro 定義了協程切換。
程式碼3:
void TC_CoroutineScheduler::switchCoro(TC_CoroutineInfo *to)
{
//跳轉到to協程
_currentCoro = to;
transfer_t t = jump_fcontext(to->getCtx(), NULL);
//並儲存協程堆疊
to->setCtx(t.fctx);
}
基於 boost.context 的 TC_CoroutineInfo 類實現了協程的上下文切換,協程的管理和排程,則是由 TC_CoroutineScheduler 協程排程器類來負責,分管理和排程兩個方面來說明 TC_CoroutineScheduler 排程類。
協程管理:目的是需要合理的資料結構來組織協程(TC_CoroutineInfo),方便排程的實現。
協程排程:目的是控制協程的啟動、休眠和喚醒,實現了 yield, sleep 等功能,本質就是實現協程的狀態機,完成協程的狀態切換。Tars 協程分為 5 個狀態:FREE, ACTIVE, AVAIL, INACTIVE, TIMEOUT
程式碼4:
/**
* 協程的狀態資訊
*/
enum CORO_STATUS
{
CORO_FREE = 0,
CORO_ACTIVE = 1,
CORO_AVAIL = 2,
CORO_INACTIVE = 3,
CORO_TIMEOUT = 4
};
TC_CoroutineScheduler 主要通過以下方法管理協程:
TC_CoroutineScheduler::create() 建立 TC_CoroutineScheduler 物件
TC_CoroutineScheduler::init() 初始化,分配協程棧記憶體
TC_CoroutineScheduler::run() 啟動排程
TC_CoroutineScheduler::terminate() 停止排程
TC_CoroutineScheduler::destroy() 資源銷燬,釋放協程棧記憶體
我們可以通過 TC_CoroutineScheduler::init() 看到資料結構的初始化過程。
程式碼5:
void TC_CoroutineScheduler::init()
{
... ....
createCoroutineInfo(_poolSize); // _all_coro = new TC_CoroutineInfo*[_poolSize+1];
TC_CoroutineInfo::CoroutineHeadInit(&_active);
TC_CoroutineInfo::CoroutineHeadInit(&_avail);
TC_CoroutineInfo::CoroutineHeadInit(&_inactive);
TC_CoroutineInfo::CoroutineHeadInit(&_timeout);
TC_CoroutineInfo::CoroutineHeadInit(&_free);
int iSucc = 0;
for(size_t i = 0; i < _currentSize; ++i)
{
//iId=0不使用, 給mainCoro使用!!!!
uint32_t iId = generateId();
stack_context s_ctx = stack_traits::allocate(_stackSize); // 分配協程棧記憶體
TC_CoroutineInfo *coro = new TC_CoroutineInfo(this, iId, s_ctx);
_all_coro[iId] = coro;
TC_CoroutineInfo::CoroutineAddTail(coro, &_free);
++iSucc;
}
_currentSize = iSucc;
_mainCoro.setUid(0);
_mainCoro.setStatus(TC_CoroutineInfo::CORO_FREE);
_currentCoro = &_mainCoro;
}
通過下面的 TC_CoroutineScheduler 排程類資料結構圖,可以更清楚的看到協程的組織方式:
Tars排程類資料結構
使用協程之前,需要在協程陣列(_all_coro),建立指定數量的協程物件,併為每個協程分配協程棧記憶體。
通過連結串列的方式管理協程,每個狀態都有一個連結串列。協程狀態切換,對應協程在不同狀態連結串列的轉移。
Tars 排程是基於epoll實現,在 epoll 迴圈裡檢查是否有需要執行的協程, 有則執行之, 沒有則等待在epoll物件上, 直到有喚醒或者超時。使用 epoll 實現的好處是可以和網路IO無縫粘合, 當有資料傳送/接收時, 喚醒epoll物件, 從而完成協程的切換。
Tars 協程排程的核心邏輯是:TC_CoroutineScheduler::run()
程式碼6:
void TC_CoroutineScheduler::run()
{
... ...
while(!_epoller->isTerminate())
{
if(_activeCoroQueue.empty() && TC_CoroutineInfo::CoroutineHeadEmpty(&_avail) && TC_CoroutineInfo::CoroutineHeadEmpty(&_active))
{
_epoller->done(1000); // epoll_wait(..., 1000ms) 先處理epoll的網路事件
}
//喚醒需要啟用的協程
wakeup();
//喚醒sleep的協程
wakeupbytimeout();
//喚醒yield的協程
wakeupbyself();
int iLoop = 100;
//執行active協程, 每次執行100個, 避免佔滿cpu
while(iLoop > 0 && !TC_CoroutineInfo::CoroutineHeadEmpty(&_active))
{
TC_CoroutineInfo *coro = _active._next;
switchCoro(coro);
--iLoop;
}
//執行available協程, 每次執行1個
if(!TC_CoroutineInfo::CoroutineHeadEmpty(&_avail))
{
TC_CoroutineInfo *coro = _avail._next;
switchCoro(coro);
}
}
... ...
}
下圖可以更清楚得看到協程排程和狀態轉移的過程。
TC_CoroutineScheduler 提供了下面四種方法實現協程的排程:
(1) TC_CoroutineScheduler::go(): 啟動協程。
(2)TC_CoroutineScheduler::yield(): 當前協程放棄繼續執行。並提供了兩種方式,支援不同的喚醒策略。
yield(true): 會自動喚醒(等到下次協程排程, 都會再啟用當前執行緒)
yield(false): 不再自動喚醒, 除非自己排程該協程(比如put到排程器中)
(3)TC_CoroutineScheduler::sleep(): 當前協程休眠iSleepTime時間(單位:毫秒),然後會被喚醒繼續執行。
(4)TC_CoroutineScheduler::put(): 放入需要喚醒的協程, 將協程放入到排程器中, 馬上會被排程器排程。
本文介紹了協程的概念,並討論了 Tars Cpp 協程的實現原理和原始碼分析。
TarsCpp 3.x全面啟用對協程的支援,本文的原始碼分析是基於TarsCpp-v3.0.0版本