之前通過看書、看視訊和部落格拼湊了一個webserver,然後有一段時間沒有繼續整這個專案
現在在去看之前的程式碼,真的是相當之簡陋,而且程式碼設計得很混亂,我認為沒有必要繼續在屎堆上修改了,於是開始閱讀別人的較為規範的開源實現
目的是嘗試理解一個可用級別的webserver需要具備哪些特性,以及在具體實現過程中要掌握的設計方法
下面是閱讀原始碼時的記錄,個人理解,僅供參考
或者按之前的理解,也可以叫做"I/O處理單元"
這部分在原來的程式碼中,我是混在一起寫的,這樣不好,很混亂,可維護性差
在TinyWeb中,這部分被分為eventListen和eventLoop兩部分
該函數應該使用socketAPI來建立一系列負責監聽的檔案描述符
流程大概是:建立socket檔案描述符->使用setsockopt設定通訊端(優雅關閉)->建立一個address結構體用於存放socket的地址->bind繫結埠->監聽所建立的檔案描述符->用epoll建立核心事件表->將監聽的fd加入epoll物件中
上述流程是Linux下經典的網路程式設計,用程式碼寫出來大概就是:
void WebServer::eventListen(){
m_listenfd = socket(PF_INET, SOCK_STREAM, 0);//建立一個通訊端用於監聽
assert(m_listenfd >= 0);
//優雅關閉連線(就是在關閉通訊端前等待一會)
if (0 == m_OPT_LINGER){//"Yoda 表示式"
struct linger tmp = {0, 1};
//設定 SO_LINGER 選項,用於控制關閉通訊端時的行為,包括處理未傳送完的資料。
setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));
}
else if (1 == m_OPT_LINGER){
struct linger tmp = {1, 1};
setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));
}
int ret = 0;//檢查這兩個函數是否成功執行
struct sockaddr_in address;//建立一個address結構體
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(m_port);
int flag = 1;
setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
ret = bind(m_listenfd, (struct sockaddr *)&address, sizeof(address));//繫結埠
assert(ret >= 0);
ret = listen(m_listenfd, 5);//監聽
assert(ret >= 0);
utils.init(TIMESLOT);//初始化定時器,在標頭檔案中定義,最小超時時間為5秒
//epoll建立核心事件表
epoll_event events[MAX_EVENT_NUMBER];
m_epollfd = epoll_create(5);//建立epoll物件
assert(m_epollfd != -1);
//將監聽的檔案描述符新增到epoll物件中
utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode);
http_conn::m_epollfd = m_epollfd;//賦值
}
在之前版本的程式碼中,監聽部分就到此結束了,然而一個比較完備的webserver還需要一些額外的東西。
上述程式碼中,我們還在該函數內初始化了一個定時器,這主要是為了避免在使用管道處理各種訊號時發生競態現象
因此在該函數中,我們還需要建立一個管道用於處理訊號
使用socketpair建立一個無名管道(因為是在同一臺機器上進行訊號通訊,由作業系統核心維護)m_pipefd
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_pipefd);
assert(ret != -1);
對m_pipefd進行一些設定
utils.setnonblocking(m_pipefd[1]);//將m_pipefd[1](寫端)設定為非阻塞模式,以確保在讀寫操作時不會被阻塞。
utils.addfd(m_epollfd, m_pipefd[0], false, 0);//將m_pipefd[0](讀端)新增到m_epollfd所代表的epoll事件監聽集合中,用於監聽該檔案描述符上的讀事件。
utils.addsig(SIGPIPE, SIG_IGN);//忽略SIGPIPE訊號,這樣當向一個已關閉的socket傳送資料時,不會產生SIGPIPE訊號導致程序異常終止。
//註冊訊號處理常式utils.sig_handler來處理SIGALRM和SIGTERM訊號。
utils.addsig(SIGALRM, utils.sig_handler, false);
utils.addsig(SIGTERM, utils.sig_handler, false);
Utils是在lst_timer.h中宣告的一個工具類;
addsig是Utils的成員函數,用於設定訊號函數;
然後,設定一個alarm,當時間達到TIMESLOT就觸發告警,最後將經由管道接收的m_pipefd和m_epollfd賦值給靜態成員變數
alarm(TIMESLOT);//5秒後,程序將收到 SIGALRM
//工具類,訊號和描述符基礎操作
//通過管道,將接收到的m_pipefd和m_epollfd賦值給靜態成員變數
Utils::u_pipefd = m_pipefd;
Utils::u_epollfd = m_epollfd;
自此,"事件監聽"函數完成了
回顧一下,如果要使用同步 I/O(以 epoll_wait 為例)實現 Reactor 模式,那麼工作流程是:
- 主執行緒往 epoll 核心事件表中註冊 socket 上的讀就緒事件。
- 主執行緒呼叫 epoll_wait 等待 socket 上有資料可讀。
- 當 socket 上有資料可讀時, epoll_wait 通知主執行緒。主執行緒則將 socket 可讀事件放入請求佇列。
- 睡眠在請求佇列上的某個工作執行緒被喚醒,它從 socket 讀取資料,並處理客戶請求,然後往 epoll
核心事件表中註冊該 socket 上的寫就緒事件。- 當主執行緒呼叫 epoll_wait 等待 socket 可寫。
- 當 socket 可寫時,epoll_wait 通知主執行緒。主執行緒將 socket 可寫事件放入請求佇列。
- 睡眠在請求佇列上的某個工作執行緒被喚醒,它往 socket 上寫入伺服器處理客戶請求的結果。
在事件監聽函數中,已經實現了第一點。我們建立了通訊端,為其設定地址並繫結,然後監聽它,把它扔到epoll物件中,由此新增進核心事件表。
此後,由建立了一個無名管道用於接收某個東西傳過來的訊號,管道的寫端設定為非阻塞,讀端也被仍到epoll物件
管道接收的是誰的訊號呢?從"工作流程"中可以推出,應該是epoll_wait。
這裡的管道是在等待epoll_wait發出的訊號,即事件觸發。這部分則由"事件迴圈"函數負責
這下輪到epoll_wait出場了!
在之前程式碼的討論中,我有提到"Reactor元件",該元件是Reactor模式的核心
那麼事件迴圈函數就是Reactor元件。
迴圈嘛,顧名思義,這玩意主要就是由一個while迴圈構成(大概率是死迴圈),不斷迴圈檢測有無事件發生
void WebServer::eventLoop(){
bool timeout = false;
bool stop_server = false;
while (!stop_server){
}
}
在while迴圈中,主角是epoll_wait。epoll_wait()會等待事件的發生,一旦事件發生,會將該事件的相關資訊儲存到events陣列(標頭檔案中宣告並定義)中,主執行緒會遍歷該陣列並處理所有發生的事件。
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
epoll_wait()的返回值是可用於 I/O 操作的檔案描述符(通訊端)的數量。(非阻塞呼叫)
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR){
LOG_ERROR("%s", "epoll failure");
break;
}
拿到當前觸發的事件後,遍歷這些事件,一件一件將其處理掉。這些儲存在事件陣列event中的所謂的"事件",其實就是一些fd,因此先從event中取出這些fd。
for (int i = 0; i < number; i++){
int sockfd = events[i].data.fd;//這裡把它們命名為sockfd
}
還記得事件監聽函數中建立的socket(m_listenfd)嗎?該socket用於監聽指定的網路埠(就是你係結的那個)
如果監聽socket觸發了可讀事件,那麼我們需要在事件迴圈中呼叫相應的回撥函數進行處理。
需要處理的事情挺多的,大概有:處理新的客戶連線、對方異常斷開或錯誤異常、處理訊號(管道讀事件)、可讀事件、可寫事件,一個個看
如果我們從event拿到的是一個和m_listenfd相等的玩意,那就表明有新的客戶連線。
if (sockfd == m_listenfd){//處理新到的客戶連線
bool flag = dealclinetdata();
if (false == flag)
continue;
}
...
此時需要呼叫使用者端處理常式bool WebServer::dealclinetdata()
這裡為了考慮到伺服器的功能性,提供m_LISTENTrigmode來設定處理使用者端連線的模式(一般採用非阻塞即可,在之前版本程式碼中預設非阻塞)
bool WebServer::dealclinetdata(){
struct sockaddr_in client_address;//建立一個sockaddr_in結構體
socklen_t client_addrlength = sizeof(client_address);//用於儲存使用者端地址長度
if (0 == m_LISTENTrigmode){//非阻塞模式監聽
...
}
else{//阻塞模式監聽
...
return false;
}
return true;
}
非阻塞模式下,當新的連線產生時伺服器會使用accept函數建立一個新的連線socket(connfd),這個新的socket會與使用者端的socket建立起通訊連線。
然後,通過呼叫 void WebServer::timer(int connfd, struct sockaddr_in client_address)
來設定連線的定時器。
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0){
LOG_ERROR("%s:errno is:%d", "accept error", errno);
return false;
}
if (http_conn::m_user_count >= MAX_FD){//目前連線滿了
utils.show_error(connfd, "Internal server busy");
LOG_ERROR("%s", "Internal server busy");
return false;
}
timer(connfd, client_address);
來看一下定時器又幹了什麼,定時器函數接收一個與使用者端連線socket(即connfd)和使用者端地址資訊client_address。
簡單來說:呼叫定時器函數,將socket(connfd)的連線狀態和資訊加入定時器,進行管理。
詳細的關於定時器分析見:定時器。本小節還是以梳理處理新使用者端的連線流程為主
總而言之,dealclinetdata()對新連線的使用者端做的處理就是:
1、幫它創了個新的fd;(conndfd)
2、然後把這個fd加入定時器;
3、該定時器為fd建立了http_conn物件用於管理當前新建連線上使用者端的所有操作;
我們從events陣列中獲取一個fd,並獲取該fd的events也就是當前fd發生的事件,如果事件資訊中包含EPOLLRDHUP | EPOLLHUP | EPOLLERR其中之一,那麼意味著這個fd出現了異常斷開或錯誤異常。此時要通過fd獲取到該連線的定時器,並將其刪除
連線就不用刪了,應為可能已經斷開不存在了
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
//伺服器端關閉連線,移除對應的定時器
util_timer *timer = users_timer[sockfd].timer;
deal_timer(timer, sockfd);
}
這裡的刪除操作呼叫的是WebServer::deal_timer(util_timer *timer, int sockfd)
,其實現如下:
void WebServer::deal_timer(util_timer *timer, int sockfd){
timer->cb_func(&users_timer[sockfd]);
if (timer){
utils.m_timer_lst.del_timer(timer);
}
LOG_INFO("close fd %d", users_timer[sockfd].sockfd);
}
該函數實現很簡單,就是直接去觸發定時器的回撥函數,並且呼叫定時器中的del_timer將定時器刪除
補充一下各個訊號的意思:
EPOLLRDHUP
:表示通訊端連線被對方關閉。它是對等方(graceful shutdown)關閉連線的一種方式。EPOLLHUP
:表示發生了掛起事件。它指示通訊端上發生了一些異常情況,例如連線重置或對端關閉連線。EPOLLERR
:表示發生了錯誤事件。它指示通訊端上發生了一些錯誤條件,例如連線錯誤或非阻塞操作產生的錯誤。
在事件監聽中,我們建立了一個無名管道用來獲取epoll_wait監聽到的資訊
m_pipefd[0]
表示管道的讀取端檔案描述符,當sockfd
等於m_pipefd[0]
時,表示有資料可供從管道中讀取。
同時,我們檢查當前fd的事件標誌,EPOLLIN表示讀事件。兩者結合表示當前管道的讀取端存在可讀資料,於是呼叫dealwithsignal來處理讀事件。
else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)){
bool flag = dealwithsignal(timeout, stop_server);
if (false == flag)
LOG_ERROR("%s", "dealclientdata failure");
}
以下是dealwithsignal的定義
bool WebServer::dealwithsignal(bool &timeout, bool &stop_server){
int ret = 0;
int sig;
char signals[1024];
ret = recv(m_pipefd[0], signals, sizeof(signals), 0);
if (ret == -1){
return false;
}
else if (ret == 0){
return false;
}
else{
for (int i = 0; i < ret; ++i){
switch (signals[i]){
case SIGALRM:
{
timeout = true;
break;
}
case SIGTERM:
{
stop_server = true;
break;
}
}
}
}
return true;
}
該函數從管道中接收讀取到的資料,當沒收到或者收到資料為0時均返回false
當正常接收到資料後,遍歷,獲取資料中的signals,根據不同值進行不同處理
該事件只是負責處理讀資料過程中出現上述訊號時的情況,如果讀資料時還沒讀完是不會出現上述超時或者停止事件迴圈的訊號的
下面要介紹的才是真正用於讀取資料的事件
與上面情況一樣,EPOLLIN表示讀事件發生,只不過我們這次處理的是從讀事件中拿到的資料
else if (events[i].events & EPOLLIN){
dealwithread(sockfd);
}
呼叫的處理常式是WebServer::dealwithread(int sockfd)
void WebServer::dealwithread(int sockfd){
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel){//為什麼這樣就代表了reactor
if (timer){//如果當前fd繫結了定時器,呼叫一下adjust_timer將其調整到正確位置
adjust_timer(timer);
}
//若監測到讀事件,將該事件放入請求佇列
m_pool->append(users + sockfd, 0);
while (true){//為什麼用while
if (1 == users[sockfd].improv){
if (1 == users[sockfd].timer_flag){
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else{
//proactor
if (users[sockfd].read_once()){
LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
//若監測到讀事件,將該事件放入請求佇列
m_pool->append_p(users + sockfd);
if (timer){
adjust_timer(timer);
}
}
else{
deal_timer(timer, sockfd);
}
}
}
在Reactor模式中,首先會檢測到有讀事件發生,即有使用者端傳送資料到達。如果使用的是m_actormodel
為1(即Reactor模式),那麼會進行以下操作:
先檢查與該sockfd
關聯的定時器timer
,並調整定時器(adjust_timer(timer)
)。然後將該讀事件放入執行緒池的請求佇列中,執行緒池中的run()函數對事件進行處理。然後會進入;一個死迴圈等待請求處理完成,在該回圈中會不斷檢查users[sockfd]
的狀態標誌,如果請求處理完成(users[sockfd].improv == 1
,該標誌由執行緒池中的run函數修改),則退出迴圈,繼續處理其他事件。
當events訊號為EPOLLOUT代表發生寫事件
else if (events[i].events & EPOLLOUT){
dealwithwrite(sockfd);
}
此時呼叫WebServer::dealwithwrite(int sockfd)
函數
void WebServer::dealwithwrite(int sockfd){
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel){//同疑問,為什麼這就是reactor了
if (timer){//如果當前fd繫結了定時器,呼叫一下adjust_timer將其調整到正確位置
adjust_timer(timer);
}
m_pool->append(users + sockfd, 1);//將該fd加入到執行緒池的正確位置
while (true){//沒理解這個迴圈的作用
if (1 == users[sockfd].improv){
if (1 == users[sockfd].timer_flag){
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else{
//proactor
if (users[sockfd].write()){
LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
if (timer){
adjust_timer(timer);
}
}
else{
deal_timer(timer, sockfd);
}
}
}
與讀事件的處理邏輯類似
void WebServer::eventLoop()
{
bool timeout = false;
bool stop_server = false;
while (!stop_server)
{
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR)
{
LOG_ERROR("%s", "epoll failure");
break;
}
for (int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd;
//處理新到的客戶連線
if (sockfd == m_listenfd)
{
bool flag = dealclinetdata();
if (false == flag)
continue;
}
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))
{
//伺服器端關閉連線,移除對應的定時器
util_timer *timer = users_timer[sockfd].timer;
deal_timer(timer, sockfd);
}
//處理訊號
else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN))
{
bool flag = dealwithsignal(timeout, stop_server);
if (false == flag)
LOG_ERROR("%s", "dealclientdata failure");
}
//處理客戶連線上接收到的資料
else if (events[i].events & EPOLLIN)
{
dealwithread(sockfd);
}
else if (events[i].events & EPOLLOUT)
{
dealwithwrite(sockfd);
}
}
if (timeout)
{
utils.timer_handler();
LOG_INFO("%s", "timer tick");
timeout = false;
}
}
}
以上是一個webserver中的事件迴圈函數,用於處理監聽到的各類事件,下面是負責處理管道讀事件和寫事件的函數
void WebServer::dealwithread(int sockfd)
{
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel)
{
if (timer)
{
adjust_timer(timer);
}
//若監測到讀事件,將該事件放入請求佇列
m_pool->append(users + sockfd, 0);
while (true)
{
if (1 == users[sockfd].improv)
{
if (1 == users[sockfd].timer_flag)
{
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else
{
//proactor
if (users[sockfd].read_once())
{
LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
//若監測到讀事件,將該事件放入請求佇列
m_pool->append_p(users + sockfd);
if (timer)
{
adjust_timer(timer);
}
}
else
{
deal_timer(timer, sockfd);
}
}
}
void WebServer::dealwithwrite(int sockfd)
{
util_timer *timer = users_timer[sockfd].timer;
//reactor
if (1 == m_actormodel)
{
if (timer)
{
adjust_timer(timer);
}
m_pool->append(users + sockfd, 1);
while (true)
{
if (1 == users[sockfd].improv)
{
if (1 == users[sockfd].timer_flag)
{
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
else
{
//proactor
if (users[sockfd].write())
{
LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
if (timer)
{
adjust_timer(timer);
}
}
else
{
deal_timer(timer, sockfd);
}
}
}
請結合所給的程式碼,詳細說明dealwithread和dealwithwrite函數分別都是怎麼實現reactor模式的,為什麼?
最後,這裡還需要不斷對定時器進行重新定時,以防止其觸發超時訊號導致連線被殺掉(?理解存疑)
if (timeout){
utils.timer_handler();
LOG_INFO("%s", "timer tick");
timeout = false;
}
timer_handler()函數的定義如下
//定時處理任務,重新定時以不斷觸發SIGALRM訊號
void Utils::timer_handler(){
m_timer_lst.tick();//詳見補充說明:定時器連結串列實現
alarm(m_TIMESLOT);
}
至此,事件迴圈結束
(總結一下所有流程,然後概括reactor究竟是怎麼實現的,與程式碼怎麼對應上的)
因為在事件迴圈中要處理事件,而處理時要使用定時器對每個連線fd進行管理,所以都用到了定時器,單獨抽出來細說
定時器的作用是管理連線和超時處理,其接收一個與使用者端連線的socket(即connfd)和使用者端地址資訊client_address作為引數,然後進行users陣列的初始化操作
void WebServer::timer(int connfd, struct sockaddr_in client_address){
users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName);
...
}
users是在標頭檔案webser.h中宣告的一個陣列,該陣列中的資料型別是http_conn*
,也就是指向一個http_conn的指標,每個http_conn物件代表一個使用者端連線。
http_conn
類封裝了處理使用者端 HTTP 請求的功能和操作(在http_conn.h中宣告)。它包含了處理請求報文、解析請求、生成響應等一系列與 HTTP 協定相關的操作。先不展開說明。
現在我們為connfd初始化好了一個 http_conn
物件,這玩意就儲存在 users
陣列的對應索引下。使用傳入的connfd作為索引從該陣列中取一個http_conn物件(?↓),該物件就代表著新建立的連線中進行的一系列HTTP協定下的操作。
然後,對該物件進行初始化(呼叫http_conn
類的初始函數)
ps:看到這裡的時候,我其實有一個問題:誰最早建立了http_conn物件並把它加到users陣列中?是定時器↓
connfd是我們通過事件迴圈中epoll_wait函數監聽捕獲訊號後建立的socket,其代表著有新的使用者端連線到webserver,因此此時會通過dealclinetdata()去處理該socket,處理的方式就是呼叫定時器函數並初始化定時器。(因為要用定時器管理連線)
在
timer()
函數中,通過users
陣列的索引connfd
建立一個http_conn
物件,該物件負責儲存connfd與webserver互動過程中的所有操作。
梳理清楚之後繼續
定時器中還建立一個新的util_timer
物件,併為其設定相關屬性。
將使用者端的地址資訊儲存在 users_timer
陣列(webser.h)中與 connfd
對應的位置。
將當前連線的檔案描述符 connfd
儲存在 users_timer
陣列中與 connfd
對應的位置。
...
//初始化client_data資料
//建立定時器,設定回撥函數和超時時間,繫結使用者資料,將定時器新增到連結串列中
users_timer[connfd].address = client_address;
users_timer[connfd].sockfd = connfd;
util_timer *timer = new util_timer;
...
users_timer
陣列的作用是為每個連線儲存相關的定時器資訊和使用者端地址資訊。這樣可以方便地通過檔案描述符查詢對應的定時器或者使用者端地址資訊,並進行相關的操作,例如處理定時事件或關閉連線。
繼續
前面我們建立了一個新的util_timer
物件,timer,現在將users_timer[connfd]
的地址賦值給timer
的user_data
成員變數,users_timer是一個陣列,裡面儲存著每個連線的計時器資訊。
將回撥函數cb_func
(該函數在lst_timer.cpp中定義)賦值給timer
的cb_func
成員變數。這裡的回撥函數指的是在定時器到期時要執行的函數(詳見:定時器連結串列的實現)。
...
timer->user_data = &users_timer[connfd];
timer->cb_func = cb_func;
time_t cur = time(NULL);//獲取當前時間
timer->expire = cur + 3 * TIMESLOT;
users_timer[connfd].timer = timer;
utils.m_timer_lst.add_timer(timer);
}
然後就是獲取當前時間並根據TIMESLOT引數計算定時器的超時時間,將建立的定時器物件timer
賦值給對應連線(connfd)的計時器資訊(users_timer[connfd].timer
)。
當上述一切操作處理完後,新建立的連線fd得到了它的定時器資訊,但是我們要管理這些定時器物件,如何做?用連結串列唄
在定時器函數的最後一行程式碼中,將定時器物件新增到了定時器連結串列m_timer_lst
中,該連結串列的定義位於lst_timer.cpp(詳見:定時器連結串列的實現)
至此,定時器函數說明完畢。其建立一個了定時器物件,併為該物件設定相關引數,然後將定時器新增到定時器連結串列中,以便在事件迴圈中進行定時器的管理和觸發。
從上面的梳理也能看到,Web伺服器中的定時器通常需要同時管理多個定時任務,為了提供高效的插入、刪除和排序操作,節省記憶體空間,並具有靈活性和可延伸性,我們需要通過連結串列來實現定時器
定時器連結串列與定時器函數的實現程式碼是分開的,定時器連結串列管理類sort_timer_lst是以一個類的形式在lst_timer.h中宣告
class sort_timer_lst{
public:
sort_timer_lst();
~sort_timer_lst();
void add_timer(util_timer *timer);//用於向定時器連結串列中新增定時器
void adjust_timer(util_timer *timer);//用於調整定時器的位置,當定時器的到期時間延後時需要呼叫該函數
void del_timer(util_timer *timer);//用於從連結串列中刪除定時器
void tick();//用於處理到期的定時器
private:
void add_timer(util_timer *timer, util_timer *lst_head);//輔助函數,用於插入定時器到指定節點之後
util_timer *head;//分別指向連結串列的頭部和尾部
util_timer *tail;
};
嚴格來說,sort_timer_lst也不是真正"定時器連結串列",這個類只是去使用了定時器連結串列,並提供一系列配套的成員函數用以管理定時器連結串列。
下面將分別介紹。
在定時器函數timer()中,我們建立的是定時器(util_timer
)物件,這些就是我們需要管理的節點,即連結串列節點。
以下是"節點類"util_timer的定義,該類真正給出了"定時器連結串列"的定義,由此可知定時器連結串列被定義為一個雙向連結串列
class util_timer
{
public:
util_timer() : prev(NULL), next(NULL) {}
public:
time_t expire;
void (* cb_func)(client_data *);
client_data* user_data;
util_timer* prev;
util_timer* next;
};
從util_timer提供的建構函式看,這個類充當的是一個雙向連結串列的節點,其與普通的連結串列有有點不同,該節點中還儲存了定時器超時時間expire以及使用者端的相關資料user_data,並且還提供一個指標指向回撥函數。
expire是一個時間變數,其資料型別為time_t
平時刷題時定義的ListNode通常有一個val用來存放節點值,這裡的user_data就是節點值。client_data
是一個結構體,其中存放了使用者端的相關資料
struct client_data{
sockaddr_in address;//儲存使用者端的地址資訊
int sockfd;//表示使用者端的通訊端描述符
util_timer *timer;//關聯使用者端和定時器
};
因此,user_data變數儲存了使用者端連線的地址資訊和fd,以及與使用者端繫結的定時器(從定時器函數的程式碼中可知,該定時器由add_timer函數新增到client_data結構體中)
然後來看回撥函數。
所謂回撥函數(Callback Function)是指將某種可以作為引數傳遞給另一個函數的函數。這種函數可以作為引數傳遞給另一個函數,當特定的事件發生後,呼叫傳入的"引數函數"進行特定的操作。(常用於非同步程式設計、事件處理)
class util_timer{
...
public:
...
void (* cb_func)(client_data *);
...
};
在util_timer中,有一個用於指向回撥函數cb_func的指標,該回撥函數的定義位於lst_timer.cpp
cb_func用於處理定時器到期時的操作。回撥函數的引數是一個指向client_data
結構體的指標。
void cb_func(client_data *user_data){
//使用epoll_ctl函數從 epoll 範例中刪除檔案描述符(socket)。
epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);//確保user_data指標不為空。
close(user_data->sockfd);//關閉之前處理的檔案描述符(socket),釋放資源
http_conn::m_user_count--;//將http_conn類中的靜態成員變數m_user_count減少1
}
說白了就是,定時器連結串列中管理的某個連線fd的定時器超時後,該定時器物件自身會呼叫一個回撥函數,通過回撥函數釋放當前連線的fd所佔用的資源。
看完了定時器連結串列節點的定義,現在來看看負責實際管理連結串列的一些成員函數。
由這些功能函數構建的定時器容器為帶頭尾結點的升序雙向連結串列。sort_timer_lst為每個連線建立一個定時器,將其新增到連結串列中,並按照超時時間升序排列。執行定時任務時,將到期的定時器從連結串列中刪除。
class sort_timer_lst{
public:
sort_timer_lst();
~sort_timer_lst();
void add_timer(util_timer *timer);//用於向定時器連結串列中新增定時器
void adjust_timer(util_timer *timer);//用於調整定時器的位置,當定時器的到期時間延後時需要呼叫該函數
void del_timer(util_timer *timer);//用於從連結串列中刪除定時器
void tick();//用於處理到期的定時器
private:
void add_timer(util_timer *timer, util_timer *lst_head);//輔助函數,用於插入定時器到指定節點之後
...
};
add_timer
函數就是用於構造連結串列的函數,該函數將目標定時器新增到連結串列中,新增時按照升序新增。
如果連結串列為空,則直接將定時器作為首節點插入。如果定時器的到期時間小於連結串列頭部定時器的到期時間,將定時器作為新的首節點插入。否則,呼叫輔助函數add_timer(timer, head)
插入定時器。
void sort_timer_lst::add_timer(util_timer* timer){
if (!timer) return;
if (!head){//當前連結串列為空,頭節點和尾節點是同一個
head = tail = timer;
return;
}
if (timer->expire < head->expire){//如果當前節點的超時時間小於頭節點,令其為新的頭節點
timer->next = head;
head->prev = timer;
head = timer;
return;
}
add_timer(timer, head);
}
adjust_timer
函數用於調整定時器的位置,當定時器的到期時間延後時需要呼叫該函數。
首先找到定時器的下一個節點tmp
,
如果tmp
為空或者定時器的到期時間小於tmp
的到期時間,說明定時器無需調整位置,直接返回。
如果待插入的定時器timer節點的過期時間比連結串列中其他節點的過期時間都要小,那麼該節點要稱為連結串列的頭部節點,因此要修改頭部指標並重新呼叫add_timer
函數插入定時器。
否則,修改定時器的前後指標,並呼叫add_timer(timer, timer->next)
插入定時器。
void sort_timer_lst::adjust_timer(util_timer* timer){
if (!timer) return;
util_timer* tmp = timer->next;
if (!tmp || (timer->expire < tmp->expire)){//定時器無需調整位置
return;
}
if (timer == head){//當前節點timer如果超時時間和頭節點一樣,那就要把它插到頭節點後面
head = head->next;
head->prev = NULL;
timer->next = NULL;
add_timer(timer, head);
}
else{//先刪除當前節點timer,然後再使用add_timer將其插入到正確的位置
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer(timer, timer->next);
}
}
解釋一下else的情況,else意味著當前節點的超時時間變大了,沒有變小,因此不會將其插到head後面
什麼意思呢?簡單來說,如果滿足else,那麼現在這個timer不是新的timer,而是一個之前就存在於連結串列中的timer
這個timer隨著時間的推移,其超時時間expire
值已經發生了變化(肯定變大了),所以要更新這個timer的位置(往連結串列尾部移動)
在調整節點位置之前,我們必須從連結串列中將其刪除。這是因為節點的expire
值已經改變,如果我們不將其刪除,它可能會位於錯誤的位置。
刪完之後,使用add_timer再將其插入正確的位置(根據expire
值找到正確的位置)
總結起來,
adjust_timer
函數的目的是重新調整定時器連結串列中節點的位置,以使連結串列仍然保持按照時間順序排序。為了達到這個目的,我們需要先將要調整的節點從連結串列中刪除,然後根據其新的expire
值重新插入到正確的位置。
del_timer
函數用於從連結串列中刪除定時器。首先判斷定時器是否是連結串列中唯一一個節點,如果是,則直接刪除並將頭尾指標置空。否則,根據定時器是否是頭部或尾部節點進行不同的處理,然後修改前後節點的指標,最後刪除定時器物件。
void sort_timer_lst::del_timer(util_timer *timer){
if (!timer) return;
if ((timer == head) && (timer == tail)){//如果連結串列中只有當前一個定時器
delete timer;//刪除後將頭尾指標置空
head = NULL;
tail = NULL;
return;
}
//當前節點位於連結串列頭尾處時,分別處理
if (timer == head){//如果當前定時器是頭節點,將head指標指向下一個節點
head = head->next;
head->prev = NULL;//其prev再指向空便完成刪除
delete timer;//刪除當前節點
return;
}
if (timer == tail){//如果當前定時器位於連結串列尾部,將tail指標指向 前一個節點
tail = tail->prev;
tail->next = NULL;//next指標指向空
delete timer;//刪除當前節點
return;
}//在其他位置就直接刪除就行
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}
tick
函數用於處理到期的定時器。
void sort_timer_lst::tick(){
if (!head) return;//如果連結串列中沒有定時器,那麼就直接返回
time_t cur = time(NULL);//獲取當前系統時間
util_timer* tmp = head;//將一個用於遍歷的指標指向head
while (tmp){
if (cur < tmp->expire){//此時還沒有超時
break;//所以退出迴圈
}//如果超時了,呼叫回撥函數把連線物件刪除了,即刪除當前tmp指向的節點
tmp->cb_func(tmp->user_data);
head = tmp->next;//刪除tmp
if (head){//如果刪的是頭節點的話,還要改一下prev,因為刪除時,頭節點已經變為原頭節點的下一個節點了
head->prev = NULL;
}
delete tmp;
tmp = head;//更新tmp指標
}
}
解釋一下cur < tmp->expire
,也就是定時器設定時間的機制
整個流程是這樣的:當有使用者端連線時,我們為其建立一個socket,該fd同時會被一個定時器繫結,整個定時器在初始化時獲取的時間是通過系統函數time得到的系統時間,然後因為我們人為的設定了一個超時時間引數TIMESLOT(15秒),因此定時器物件中的超時時間expire便是:當前系統時間+15秒。在檢查定時器時間時,我們也是通過time獲取系統時間cur,顯然cur會逐漸接近超時時間expire,當cur < tmp->expire
時,定時器還沒有超時,大於等於就超時了,此時啟動對該定時器的刪除工作。
主要的成員函數介紹完了,回顧一下:add_timer
函數構造雙向連結串列、adjust_timer
函數調整定時器在連結串列中的位置、del_timer
函數刪除某個定時器、tick
函數處理超時定時器。
接下來要介紹一下輔助函數add_timer(util_timer *timer, util_timer *lst_head)
,用於插入定時器到指定節點之後
void sort_timer_lst::add_timer(util_timer *timer, util_timer *lst_head){
util_timer *prev = lst_head;//雙指標,第一個指向head
util_timer *tmp = prev->next;
while (tmp){//遍歷連結串列
if (timer->expire < tmp->expire){//若當前遍歷定時器的超時時間晚於輸入定時器的超時時間
//那麼輸入的定時器timer就應該在這裡插入
prev->next = timer;//將timer插入到兩個指標之間
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}//不滿足插入條件即timer的過期時間大於或等於tmp的過期時間,就移動雙指標
prev = tmp;
tmp = tmp->next;
}//遍歷結束,還沒有找到插入位置,就把節點插到連結串列尾部
if (!tmp){//將timer插入到prev和連結串列末尾之間
prev->next = timer;
timer->prev = prev;
timer->next = NULL;
tail = timer;
}
}
至此,定時器連結串列的實現說明完畢
回顧一下流程:不論是什麼處理任務(使用者端產生新連線或者別的也好),只要呼叫了定時器函數,該定時器函數就會與輸入的一個fd進行繫結。在初始化時,定時器會呼叫當前的系統時間,並在此基礎上加上超時時間,形成當前定時器的超時時間。
之後就不斷呼叫系統時間與超時時間比較,同時在連結串列中也不斷比較定時器節點之間的時間間隔,由此移動定時器連結串列。
當系統時間也到達超時時間後,定時器超時,使用成員函數將其從定時器連結串列中刪除。
主函數main.cpp呼叫了log_write(),該函數的定義位於webserver.cpp中
void WebServer::log_write(){
if (0 == m_close_log){
//初始化紀錄檔
if (1 == m_log_write)
Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 800);
else
Log::get_instance()->init("./ServerLog", m_close_log, 2000, 800000, 0);
}
}
可以看到這裡面主要是呼叫Log類中的方法來實現功能,那麼Log類應該就是要分析的紀錄檔系統了
不出意外,紀錄檔系統有兩個檔案:log.cpp和log.h,先看看宣告
class Log{
public:
//C++11以後,使用區域性變數懶漢不用加鎖
static Log *get_instance(){
static Log instance;
return &instance;
}
static void *flush_log_thread(void *args){
Log::get_instance()->async_write_log();
}
//可選擇的引數有紀錄檔檔案、紀錄檔緩衝區大小、最大行數以及最長紀錄檔條佇列
bool init(const char *file_name, int close_log, int log_buf_size = 8192, int split_lines = 5000000, int max_queue_size = 0);
void write_log(int level, const char *format, ...);
void flush(void);
private:
Log();
virtual ~Log();
void *async_write_log(){
string single_log;
//從阻塞佇列中取出一個紀錄檔string,寫入檔案
while (m_log_queue->pop(single_log))
{
m_mutex.lock();
fputs(single_log.c_str(), m_fp);
m_mutex.unlock();
}
}
private:
char dir_name[128]; //路徑名
char log_name[128]; //log檔名
int m_split_lines; //紀錄檔最大行數
int m_log_buf_size; //紀錄檔緩衝區大小
long long m_count; //紀錄檔行數記錄
int m_today; //因為按天分類,記錄當前時間是那一天
FILE *m_fp; //開啟log的檔案指標
char *m_buf;
block_queue<string> *m_log_queue; //阻塞佇列
bool m_is_async; //是否同步標誌位
locker m_mutex;
int m_close_log; //關閉紀錄檔
};
有點複雜啊我靠
其實仔細看的話,主要的函數就只有:初始化函數init、寫紀錄檔函數write_log以及flush函數,除此之外還有一個flush_log_thread函數(有點怪,應該是負責一步任務的)
先從初始化函數開始吧,在這之前,還需要了解一下單例模式
什麼是單例模式?單例模式(Singleton Pattern)是一種建立和使用物件的設計模式。它確保類只有一個範例,並提供全域性存取點以便其他物件可以使用該範例。
單例模式通常用於需要全域性存取點且只能有一個範例的情況,例如紀錄檔記錄器、資料庫連線池等。其主要特點是:
GetInstance
或Instance
),該函數負責返回類的唯一範例。單例模式的實現通常遵循以下步驟:
為什麼一定要使用靜態成員變數?
主要是為了保證資料的一致性和共用性
靜態成員變數是屬於類而不是範例的。這意味著無論建立多少個類的範例,靜態成員變數只有一份拷貝。這樣就可以確保所有範例都共用同一個變數。
在某些情況下,需要在不同的範例之間共用資料。例如,在一個多執行緒的環境中,如果多個範例需要存取和修改同一個資料,將資料定義為靜態成員變數可以避免資料不一致的問題。
下面給一個例子來說明單例模式的程式碼實現
在餓漢式中,範例在類載入時就被建立,並且在整個程式生命週期記憶體在。這種方式確保了執行緒安全,但可能會增加程式啟動時間和記憶體消耗。
class Singleton {//餓漢式
private:
static Singleton* instance;
Singleton() {} // 私有建構函式
public:
static Singleton* getInstance() {
return instance;
}
};
Singleton* Singleton::instance = new Singleton(); // 在靜態成員變數初始化時建立範例
int main() {
Singleton* singletonObj = Singleton::getInstance();
return 0;
}
在懶漢式中,範例在首次呼叫 getInstance()
方法時才會被建立。這種方式延遲了範例的建立,節省了記憶體資源。然而,懶漢式在多執行緒環境下需要進行額外的執行緒安全處理,以避免多個執行緒同時建立範例的問題。
class Singleton {
private:
static Singleton* instance;
Singleton() {} // 私有建構函式
public:
static Singleton* getInstance() {
if (instance == nullptr) {
instance = new Singleton();
}
return instance;
}
};
Singleton* Singleton::instance = nullptr; // 初始化為nullptr
int main() {
Singleton* singletonObj = Singleton::getInstance();
return 0;
}
單例模式是為了滿足一些需要保證資料一致性的開發場景而設計的,簡單來說就是通過一些處理讓某一個功能類只能產生一個範例,且外部不能建立該類的範例。
為了實現上述目的,我們需要將類別建構函式私有化,同時建立一個靜態成員變數來儲存類的唯一範例。因為類的範例終究還是要提供給外界使用的,所以我們還要需要定義一個公共的靜態成員函數,負責返回單例類的唯一範例
然後,在單例類的程式碼實現中,有餓漢式和懶漢式兩種方式。餓漢式就是單例類在載入時其範例就會被建立,懶漢式則是需要首次呼叫公共靜態成員函數(請求返回唯一範例)時才會建立。
有了上面的前置知識,現在可以來研究紀錄檔類是如何被設計為一個單例類的了
在Log類的宣告中(Log.h),其構造和解構函式被宣告為私有,以防止外部直接建立Log類的物件。
class Log{
private:
Log();
virtual ~Log();
};
按照單例模式的流程,現在我們需要建立一個靜態成員變數來儲存唯一範例並提供一個公共的靜態成員函數供外界獲取唯一範例
在該紀錄檔類中,使用公共的靜態成員函數get_instance()
來完成上述兩步
class Log{
public:
static Log *get_instance(){
static Log instance;
return &instance;
}
};
在get_instance()
中,建立一個靜態的Log類指標變數instance
,並將其初始化為一個Log類的唯一範例,呼叫該函數即可返回唯一範例instance
為什麼這裡不用將static Log instance;(靜態成員變數)宣告為私有的?
因為在C++11之後,對於區域性靜態變數的初始化具備執行緒安全性。
將其定義為區域性靜態變數即可,在作用域(包含它的函數或程式碼塊)之外該變數是不可見的。
ps:紀錄檔類這裡使用的是懶漢式
除了單例模式需要特別說明一下外,紀錄檔類Log本質上還是一個類,該怎麼使用還是怎麼使用就行
在建立唯一範例的時候也需要呼叫初始化函數bool Log::init
bool Log::init(const char *file_name, int close_log, int log_buf_size, int split_lines, int max_queue_size){
//file_name表示紀錄檔檔案的路徑和名稱,close_log表示是否關閉紀錄檔,log_buf_size表示紀錄檔緩衝區的大小,split_lines表示紀錄檔檔案達到的最大行數時進行切割,max_queue_size表示非同步模式下阻塞佇列的長度。
}
初始化時,先判斷是否要以非同步模式執行,一般來說肯定是非同步啟動的
...
//如果設定了max_queue_size,則設定為非同步
if (max_queue_size >= 1){
m_is_async = true;
m_log_queue = new block_queue<string>(max_queue_size);
pthread_t tid;
//flush_log_thread為回撥函數,這裡表示建立執行緒非同步寫紀錄檔
pthread_create(&tid, NULL, flush_log_thread, NULL);
}
...
如果非同步啟動,那麼將m_is_async
標誌設定為true
,然後建立一個阻塞佇列m_log_queue
,指定該佇列的最大長度為max_queue_size
。
然後用pthread_create
起一個新執行緒,該執行緒的目的是從佇列中獲取紀錄檔條目,並以非同步方式將它們寫入紀錄檔檔案。
flush_log_thread
(詳見)作為回撥函數傳入新執行緒中(注意不是pthread_create),該函數負責從佇列中獲取紀錄檔條目並將其寫入紀錄檔檔案。
然後是一些引數的設定
m_close_log = close_log;//將close_log引數的值賦給成員變數m_close_log。該變數指示是否關閉紀錄檔功能。
m_log_buf_size = log_buf_size;//確定了紀錄檔緩衝區的大小,即緩衝區中可以儲存的最大字元數
m_buf = new char[m_log_buf_size];//動態分配了一個大小為m_log_buf_size的字元陣列(緩衝區)。指向這個分配記憶體的指標儲存在成員變數m_buf中,代表紀錄檔緩衝區。
memset(m_buf, '\0', m_log_buf_size);//使用空字元('\0')對紀錄檔緩衝區進行初始化。確保緩衝區最初為空,準備儲存紀錄檔訊息。
m_split_lines = split_lines;//指定每個紀錄檔檔案中的最大行數,在超過此限制後會建立一個新的紀錄檔檔案。
上述程式碼設定了Log類的各種設定引數,如紀錄檔緩衝區大小、每個紀錄檔檔案的最大行數以及是否關閉紀錄檔功能。
因為我們是要生成紀錄檔嘛,紀錄檔最重要的資訊就是時間,因此我們在初始化時需要把當前系統時間儲存到一個結構體struct tm中,以便後續生成時間戳的時候使用。
time_t t = time(NULL);//獲取當前時間的秒數
struct tm* sys_tm = localtime(&t);//使用localtime()函數將時間轉換為本地時間
struct tm my_tm = *sys_tm;//通過解除參照sys_tm指標,將其中儲存的struct tm結構體的內容複製到另一個名為my_tm的結構體中。這樣可以在後續程式碼中使用my_tm來存取年、月、日等日期和時間資訊。
然後就是要真正開始寫紀錄檔檔案,定義了一個const char*
型別的指標p
,並通過呼叫strrchr(file_name, '/')
函數來查詢file_name
字串中最後一個出現的斜槓字元('/')的位置。如果找不到斜槓字元,p
將被賦值為NULL
。
const char *p = strrchr(file_name, '/');
char log_full_name[256] = {0};//定義一個大小為256的字元陣列log_full_name,並初始化為全零。
然後,使用條件語句檢查p
是否為NULL
。如果p
是NULL
,表示file_name
字串中沒有斜槓字元,即該字串只包含檔名而不包含路徑資訊。在這種情況下,使用snprintf
函數將日期和檔名格式化為新的字串,並儲存在log_full_name
中。
如果p
不為NULL
,表示file_name
字串中存在斜槓字元,即該字串包含路徑資訊。在這種情況下,使用strcpy
函數將p + 1
處開始的子字串(即去除斜槓字元)複製到log_name
字元陣列中。同時,使用strncpy
函數將從file_name
的開頭到p - file_name + 1
個字元(包括斜槓字元)的子字串複製到dir_name
字元陣列中。最後,使用snprintf
函數將路徑、日期和檔名格式化為新的字串,並儲存在log_full_name
中。
if (p == NULL){
snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name);
}
else{
strcpy(log_name, p + 1);
strncpy(dir_name, file_name, p - file_name + 1);
snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name);
}
接下來,將當前日期的日部分(my_tm.tm_mday
)賦值給成員變數m_today
。
最後,使用fopen(log_full_name, "a")
函數以追加模式("a")開啟log_full_name
指定的紀錄檔檔案。如果檔案開啟失敗(返回NULL
),則返回false
。
m_today = my_tm.tm_mday;
m_fp = fopen(log_full_name, "a");
if (m_fp == NULL) return false;
至此,紀錄檔類初始化完成
我們確定類該範例的執行模式(非同步),然後為該範例起了一個新執行緒,在該執行緒中維護一個阻塞佇列,該佇列採用生產者-消費者模式設計,使用迴圈陣列實現。裡面儲存的是字串型別的紀錄檔資料(例如:string single_log;
)
然後我們還會獲取系統時間並對紀錄檔緩衝區的大小等引數進行設定,最後開啟一個紀錄檔檔案准備記錄紀錄檔資訊。
該函數沒有定義,只有在標頭檔案中的一個宣告,void *
表示返回型別為無型別指標。(使用了C++中的多執行緒程式設計和指標語法)
static void* flush_log_thread(void *args){
Log::get_instance()->async_write_log();
}
flush_log_thread
去呼叫了一個紀錄檔類範例中的私有方法async_write_log()來非同步地寫入紀錄檔,該函數的定義如下:
void* async_write_log(){
string single_log;
//從阻塞佇列中取出一個紀錄檔string,寫入檔案
while (m_log_queue->pop(single_log))
{
m_mutex.lock();
fputs(single_log.c_str(), m_fp);
m_mutex.unlock();
}
}
在async_write_log()中,通過 m_log_queue->pop(single_log)
從阻塞佇列(詳見)中取出一個紀錄檔字串 single_log
。
使用 fputs(single_log.c_str(), m_fp)
將紀錄檔字串寫入檔案。注意,這裡還使用了互斥鎖 m_mutex
來保護對檔案指標 m_fp
的存取。
fputs
函數是C和C++標準庫中的一個函數,用於將字串寫入檔案。
block_queue顧名思義其實現了一個阻塞佇列,以迴圈陣列的方式
該阻塞佇列中的元素是通過void *async_write_log()
先來看該佇列的初始化部分
#ifndef BLOCK_QUEUE_H
#define BLOCK_QUEUE_H
#include <iostream>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include "../lock/locker.h"
using namespace std;
template <class T>
class block_queue{
public:
block_queue(int max_size = 1000){
if (max_size <= 0){
exit(-1);
}
m_max_size = max_size;
m_array = new T[max_size];
m_size = 0;
m_front = -1;
m_back = -1;
}
...
private:
locker m_mutex;
cond m_cond;
T *m_array;
int m_size;
int m_max_size;
int m_front;
int m_back;
};
#endif
該阻塞佇列以類的形式存在,在該類別建構函式中,通過new T[max_size]
建立了一個大小為max_size
的陣列 m_array
,用於儲存元素。因為要實現的是一個「佇列」,所以該陣列要通過頭尾指標更新來管理資料存放的位置
在說明為何使用頭尾指標更新的策略之前需要先了解佇列的基本概念
首先,佇列是一種先進先出(FIFO)的資料結構,其中元素按照插入的順序進行存取和移除。因此佇列有兩個關鍵操作:入隊(enqueue)將元素新增到佇列的尾部,出隊(dequeue)將佇列的頭部元素移除並返回。
在程式碼實現中就是push和pop
佇列通常使用頭指標(front)和尾指標(rear)來管理元素的位置。這兩個指標用於確定佇列的起始點和結束點,從而允許我們在佇列的兩端進行插入和刪除操作。
初始化一個空佇列時,頭指標和尾指標都指向同一個位置(例如,初始值為0)。當我們執行入隊操作時,尾指標會遞增,並將新元素放在尾指標所指向的位置。而在出隊操作時,頭指標會遞增,並移動到下一個元素所在的位置。
頭尾指標更新的一般過程:
1、初始化佇列時,頭指標和尾指標均指向同一個位置。
2、執行入隊操作時,尾指標遞增,並將新元素放在尾指標所指向的位置。
3、執行出隊操作時,頭指標遞增,並移動到下一個元素所在的位置。注意,在出隊操作之前,我們需要檢查佇列是否為空。
ok回到程式碼
前面我們說到,為了實現阻塞佇列,程式碼中使用了迴圈陣列,通過頭指標和尾指標來管理資料的插入操作
佇列的頭指標m_front
和尾指標m_back
都是通過取模運算 (m_back + 1) % m_max_size
來實現迴圈的。(後面會有解釋)
這意味著當佇列的最後一個位置被佔用時,下一個元素會從陣列的起始位置重新開始存放。這樣就形成了迴圈的效果。
該阻塞佇列實現了執行緒安全的生產者-消費者模型。這種模型是多執行緒程式設計中常見的一種設計模式,用於解決生產者執行緒和消費者執行緒之間的資料同步和通訊問題。
簡單來說,這個類實現的所謂"阻塞佇列"中維護著一個資料結構,外部可以將資料輸入該資料結構也可以從中取出資料。
在多執行緒的背景下,
呼叫push()
函數將紀錄檔字串新增到阻塞佇列中的執行緒就是生產者執行緒;【在這裡就是void Log::write_log
】
通過呼叫pop()
函數從阻塞佇列中取出紀錄檔字串並寫入檔案的執行緒就是消費者執行緒;【在這裡就是void *async_write_log()
】
前面說過,這兩個函數分別實現阻塞佇列的入隊和出隊操作。
push(const T &item)
: 入隊操作。
//往佇列新增元素,需要將所有使用佇列的執行緒先喚醒
//當有元素push進佇列,相當於生產者生產了一個元素
//若當前沒有執行緒等待條件變數,則喚醒無意義
bool push(const T &item){
m_mutex.lock();//鎖
if (m_size >= m_max_size){//當前佇列大小超過上限
m_cond.broadcast();//broadcast()是對pthread_cond_broadcast的一個封裝
m_mutex.unlock();//解鎖
return false;
}
m_back = (m_back + 1) % m_max_size;//計算元素push到佇列之後的位置
m_array[m_back] = item;//將該元素放到指定位置
m_size++;//佇列長度增加
m_cond.broadcast();//喚醒所有等待在條件變數 m_cond 上的執行緒,當呼叫 m_cond.broadcast() 時,所有正等待在 m_cond 上的執行緒都將被喚醒,並且它們將重新競爭獲取相關的資源或執行特定的操作。這種廣播機制確保沒有執行緒會永久地阻塞在條件變數上,因為即使其中一個執行緒通過訊號或其他方式喚醒,其他執行緒仍然可以繼續執行。
m_mutex.unlock();
return true;
}
首先獲取互斥鎖m_mutex
,然後檢查佇列是否已滿。
如果佇列已滿,就喚醒所有等待條件變數m_cond
的執行緒,並返回false表示入隊失敗。
如果佇列未滿,則將元素插入隊尾,並更新佇列的大小。接著喚醒所有等待條件變數的執行緒,並釋放互斥鎖,最後返回true表示入隊成功。
m_back = (m_back + 1) % m_max_size;
的作用是將m_back
後移一位,並且通過取模運算確保m_back
在有效索引範圍內迴圈更新,從而實現佇列的新增操作。【作用於隊尾】舉個例子:
假設當前佇列的長度為
m_max_size
,則m_back
的範圍是從 0 到m_max_size-1
。當插入一個新的元素時,我們需要將m_back
後移一位來指向新的隊尾。如果
m_back
已經指向了佇列的最後一個位置(即m_back == m_max_size - 1
),則(m_back + 1) % m_max_size
的結果就是 0,即將m_back
更新為 0,重新回到陣列的開頭。這種迴圈更新索引的方式使得整個陣列成為一個環形結構,實現了迴圈佇列的特性。
pop(T &item)
: 出隊操作。
//pop時,如果當前佇列沒有元素,將會等待條件變數
bool pop(T &item){
m_mutex.lock();
while (m_size <= 0){
if (!m_cond.wait(m_mutex.get())){//如果沒有其他執行緒push進元素,那就沒東西可pop,返回true,解鎖
m_mutex.unlock();
return false;
}
}//有東西可以彈出,計算隊頭要移動的位置
m_front = (m_front + 1) % m_max_size;
item = m_array[m_front];//提供給使用者一個介面,讓他們能夠獲得從佇列中彈出的元素(如果想獲取的話)
m_size--;
m_mutex.unlock();
return true;
}
首先獲取互斥鎖m_mutex
,然後檢查佇列是否為空。
如果佇列為空,就進入迴圈等待條件變數m_cond
,直到有新元素被加入佇列。
m_cond.wait(m_mutex.get())
是一個條件變數(Condition Variable)的等待操作,條件變數通常與互斥鎖一起使用,以實現執行緒間的同步。
m_cond
是一個條件變數物件,m_mutex.get()
獲取互斥鎖物件。
在阻塞佇列中,當佇列為空時,呼叫pop
函數會進入等待狀態,直到有元素可供彈出或者超時。
如果等待失敗,即沒有其他執行緒通過 push
操作插入新的元素,那麼 !m_cond.wait(m_mutex.get())
返回 true
,即等待失敗。在這種情況下,函數將立即返回並返回 false
,表示彈出操作未成功。
一旦有新元素加入佇列,或者超時時間達到,就從隊頭取出一個元素並更新佇列的大小。最後釋放互斥鎖並返回true表示出隊成功。
與push函數中的類似,
m_front = (m_front + 1) % m_max_size;
的作用是將m_front
後移一位,並且通過取模運算確保m_front
在有效索引範圍內迴圈更新,從而實現佇列的彈出操作。【作用於隊頭】
這裡有一個問題,如果觀察的話會發現,在pop函數中,我們只是獲取了佇列頭部的元素賦值給item,並沒有直接刪除m_array中對應位置的元素,那這也能算pop嗎?
是的,因為這裡使用的是迴圈陣列,在下一次push操作時,新的元素將會覆蓋掉之前m_front所指向的位置,相當於間接刪除了該元素。
迴圈陣列的索引m_front和m_back被用來追蹤佇列的頭部和尾部。當呼叫pop函數時,我們通過更新m_front索引和減小佇列大小(m_size)來模擬彈出元素的操作。這樣做的好處是避免了頻繁地移動陣列中的元素,從而提高了效能。
上面介紹的是實現阻塞佇列的核心函數,除此之外,還需要提供一些方便的功能函數來輔助使用者完成某些功能
例如:full()函數可以判斷佇列是否滿了、empty()判斷佇列是否為空、front()可以直接返回隊首元素(其實就是直接返回m_front處的元素),同理還有back()等
初始化完畢後肯定要開始寫紀錄檔,void Log::write_log
負責這部分的工作
同時,由於該函數與要寫入紀錄檔資訊,因此要往阻塞佇列中push資料,所以該類就是阻塞佇列的消費者(準確的說是呼叫紀錄檔寫入函數的某個執行緒是消費者)
void Log::write_log(int level, const char *format, ...){
//level表示紀錄檔級別,format是一個格式化字串,類似於printf函數的格式化字串,用於指定紀錄檔訊息的內容。
}
紀錄檔最重要的是時間,首先我們獲取時間
void Log::write_log(int level, const char *format, ...){
struct timeval now = {0, 0};
gettimeofday(&now, NULL);//使用 gettimeofday 函數獲取當前的時間,並將其儲存在 now 變數中
time_t t = now.tv_sec;//將now結構體中的tv_sec欄位(表示從Unix紀元以來的秒數)賦值給time_t型別的變數t。
struct tm *sys_tm = localtime(&t);
struct tm my_tm = *sys_tm;
...
}
然後要根據傳入的紀錄檔級別level
的不同,將相應的紀錄檔級別字串複製到字元陣列s
中。
...
char s[16] = {0};
switch (level)
{
case 0:
strcpy(s, "[debug]:");
break;
case 1:
strcpy(s, "[info]:");
break;
case 2:
strcpy(s, "[warn]:");
break;
case 3:
strcpy(s, "[erro]:");
break;
default:
strcpy(s, "[info]:");
break;
}
...
下面開始寫入一個log,先獲取鎖,然後m_count++(紀錄檔數量加一)
...
m_mutex.lock();
m_count++;
...
建立紀錄檔前,檢查是否需要建立,如果當前日期已經進入新的一天或者當前紀錄檔檔案達到最大行數限制,則需要建立新的紀錄檔檔案
if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0){//新一天||最大行數
char new_log[256] = {0};//建立一個新的紀錄檔檔名緩衝區。
fflush(m_fp);//重新整理檔案流,主要作用是確保緩衝區中的資料被立即寫入到檔案,而不是等待緩衝區滿或者檔案關閉時才進行寫入。
fclose(m_fp);//關閉當前的紀錄檔檔案//m_fp是一個用於開啟log的檔案指標,定義在標頭檔案中
char tail[16] = {0};//建立一個字尾字串緩衝區,用於表示日期部分的檔名字尾。
//使用日期資訊將字尾字串格式化為 年_月_日_ 的形式。
snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday);
if (m_today != my_tm.tm_mday){//檢查當前日期是否與上次寫入紀錄檔的日期不同。
//將目錄名、日期字尾和紀錄檔檔名合併成一個完整的新紀錄檔檔名。
snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name);
m_today = my_tm.tm_mday;//如果是新的一天,則需要重置計數器和建立新的檔名。
m_count = 0;
}
else{//如果不是新的一天,根據計數器值建立帶有序號的新紀錄檔檔名。
snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines);
}
m_fp = fopen(new_log, "a");//開啟新的紀錄檔檔案以進行追加寫入操作。
}
m_mutex.unlock();//釋放互斥鎖
在寫入每條紀錄檔時檢查是否需要建立新的紀錄檔檔案。如果已經進入新的一天或者當前紀錄檔檔案的行數達到最大限制,就會建立一個新的紀錄檔檔案,並更新相關的計數器和日期資訊。
然後將紀錄檔的時間、級別和具體內容格式化為字串,並儲存在log_str
變數中。
...
va_list valst;//大概就是提供了一種處理可變引數的機制,允許函數在執行時根據傳入的引數數量和型別來進行相應的操作。
va_start(valst, format);
string log_str;
m_mutex.lock();//拿鎖
//寫入的具體時間內容格式
int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ",
my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday,
my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s);
//將可變參數列中的引數和格式字串 format 進行格式化
int m = vsnprintf(m_buf + n, m_log_buf_size - n - 1, format, valst);
m_buf[n + m] = '\n';
m_buf[n + m + 1] = '\0';
log_str = m_buf;
m_mutex.unlock();
...
至此,我們將格式化後的紀錄檔字串資訊儲存到了log_str
變數中
以下將對紀錄檔變數log_str
進行使用
if (m_is_async && !m_log_queue->full()){
m_log_queue->push(log_str);
}
else{
m_mutex.lock();
fputs(log_str.c_str(), m_fp);//log_str的內容使用fputs()函數寫入到檔案指標m_fp所代表的檔案中。
m_mutex.unlock();
}
va_end(valst);
}
如果紀錄檔模組是非同步寫入模式,並且紀錄檔佇列(m_log_queue
)沒有滿,則將log_str
推播到紀錄檔佇列中(通過呼叫m_log_queue->push(log_str)
)。這意味著紀錄檔內容將被放入佇列中以供後續處理。
如果紀錄檔模組不是非同步模式,或者紀錄檔佇列已滿,則直接將紀錄檔內容寫入檔案。
無論是將紀錄檔內容推播到佇列還是直接寫入檔案,最終都可以將紀錄檔內容記錄下來。
還記得之前的阻塞佇列嗎?如果是非同步寫入模式,log_str
就已經被push到阻塞佇列中了,此時,呼叫void Log::write_log
的執行緒就在充當一個生產者。
到這裡,算是把紀錄檔類的核心流程走完了。紀錄檔類維護著一個阻塞佇列,與該佇列進行資料互動時遵循生產者-消費者模式,生產者也就是這裡的紀錄檔寫入函數,會將標準化後的紀錄檔字串作為元素push到佇列中,等到被外界執行緒pop獲取。
ps:疑問,到底是誰最後向阻塞佇列請求紀錄檔資料?
flush
函數用於強制重新整理寫入流緩衝區,確保所有的紀錄檔內容都被寫入到檔案中
void Log::flush(void){
m_mutex.lock();
//強制重新整理寫入流緩衝區
fflush(m_fp);
m_mutex.unlock();
}
該函數首先獲取互斥鎖 m_mutex
,以確保在執行重新整理操作時不會與其他執行緒產生競爭條件。然後呼叫 fflush(m_fp)
函數,該函數用於將流(在此處為紀錄檔檔案 m_fp
)的緩衝區內容立即寫入到檔案,並清空緩衝區。最後釋放互斥鎖,完成重新整理操作。
通過呼叫 flush
函數,可以確保在需要立即將紀錄檔內容寫入磁碟的情況下,不必等待緩衝區滿或檔案關閉時才進行寫入,從而避免丟失重要的紀錄檔資訊。
基於之前手寫時的思路來看就行,大部分思路是一致的(詳見)
該專案中,執行緒池檔案存放在TinyWeb/threadpool中。使用模板來實現了執行緒池,在threadpool.h中實現了該模板類
寫法上基本上與之前的一致,但是理解上有不同。之前因為沒有在一個完整專案的角度來理解執行緒池,多少會有一些偏差,因此這裡重新對執行緒池進行一個梳理,加深在 Reactor 模式下對其的理解。
與上一版程式碼一致,這裡的執行緒池類也被宣告為一個模板類
template <typename T>
class threadpool{
public:
/*thread_number是執行緒池中執行緒的數量,max_requests是請求佇列中最多允許的、等待處理的請求的數量*/
threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
~threadpool();
bool append(T *request, int state);
bool append_p(T *request);
private:
/*工作執行緒執行的函數,它不斷從工作佇列中取出任務並執行之*/
static void *worker(void *arg);
void run();
private:
int m_thread_number; //執行緒池中的執行緒數
int m_max_requests; //請求佇列中允許的最大請求數
pthread_t *m_threads; //描述執行緒池的陣列,其大小為m_thread_number
std::list<T *> m_workqueue; //請求佇列
locker m_queuelocker; //保護請求佇列的互斥鎖
sem m_queuestat; //是否有任務需要處理
connection_pool *m_connPool; //資料庫
int m_actor_model; //模型切換
};
template <typename T>//通過參數列進行初始化
threadpool<T>::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool){
if (thread_number <= 0 || max_requests <= 0)//異常判斷,執行緒數和最大請求數小於0,報錯
throw std::exception();
m_threads = new pthread_t[m_thread_number];//建立執行緒池陣列
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; ++i){
printf("建立第 %d 個執行緒\n", i);
if (pthread_create(m_threads + i, NULL, worker, this) != 0){
delete[] m_threads;
throw std::exception();
}//在呼叫pthread_detach()函數之後,執行緒將進入「分離」狀態,這意味著它不能再被其他執行緒或主執行緒等待和加入。
if (pthread_detach(m_threads[i])){
delete[] m_threads;
throw std::exception();
}
}
}
總體來說,該建構函式建立了一組指定數量的執行緒,並將它們設定為可分離狀態。這些執行緒將用於處理任務佇列中的請求,實現了執行緒池的基本功能。
其他函數就先不看了,直接看入口函數worker
worker函數會線上程池初始化時被建構函式呼叫(作為引數輸入pthread_create,具體見)
void *threadpool<T>::worker(void *arg){
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
而該函數呼叫了run函數(這個run函數明顯比之前寫的要處理更多的事情)
template <typename T>
void threadpool<T>::run(){
while (true){
m_queuestat.wait();//阻塞等待捕獲sem號誌
m_queuelocker.lock();//拿到號誌之後上鎖
if (m_workqueue.empty()){//若佇列為空就解鎖
m_queuelocker.unlock();
continue;
}
T *request = m_workqueue.front();//取出佇列頭部的請求
m_workqueue.pop_front();
m_queuelocker.unlock();
if (!request)//沒有東西就繼續迴圈
continue;
}
首先,先阻塞等待,拿鎖。然後判斷佇列裡面有沒有東西,有就取出來。判斷一下取出來的大小裡面有沒有請求,沒有就繼續迴圈
從程式碼上看,執行緒池中維護的佇列m_workqueue
中,接收一個模板元素作為輸入,該元素便是request
結合webserver.cpp
中對執行緒池的使用來看,request應該是http_conn類的範例化物件。
也就是說,執行緒池中維護著工作佇列m_workqueue
,該佇列中的元素則為http_conn物件
好了,如果獲取到http_conn物件,接下來要對其進行相應的處理了
...
if (1 == m_actor_model){
if (0 == request->m_state){//讀取http
if (request->read_once()){
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}else{
request->improv = 1;
request->timer_flag = 1;
}
}
else{//寫
if (request->write()){
request->improv = 1;
}
else{
request->improv = 1;
request->timer_flag = 1;
}
}
}else{
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
}
}
#endif
這裡需要確定使用的模式,來進行對應的處理
在 1 == m_actor_model
的條件下,使用了主從模式,往後面看就會知道,這裡所謂的request(其實就是http_conn物件)中的process()函數使用了process_read()函數,而後者採用主從狀態機模式進行設計實現,能夠根據請求的狀態進行相應的處理操作。
獲取http_conn物件(request)中的成員屬性m_state,從http_conn.h中可知
int m_state; //讀為0, 寫為1
首先看一下寫的時候的操作
...
if (request->read_once()){
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool);//資料庫部分再說
request->process();
}else{
request->improv = 1;
request->timer_flag = 1;
}
...
呼叫http_conn物件中的bool http_conn::read_once()
函數,該函數用於迴圈讀取客戶資料,直到無資料可讀或對方關閉連線。此外,該函數還分別支援非阻塞ET工作模式和LT工作模式(詳見)
讀取完資料之後,read_once()返回true,然後將http_conn中的improv屬性置為1
然後建立一個connectionRAII物件mysqlcon(這裡後面介紹連線池會說:跳轉)獲取連線池中的一個連線資源,將當前http_conn物件的資訊給到這個連線,以便查詢資料庫中的相關資訊(登入資訊)
最後就是呼叫http_conn自身的處理常式process()來處理接受到的資料;(詳見)
如果讀取資料後沒有返回true,那麼說明讀取過程出現了錯誤,此時也要將improv置為1,同時還要把timer_flag也置為1。
然後到寫的操作
...
else{
if (request->write()){
request->improv = 1;
}
else{
request->improv = 1;
request->timer_flag = 1;
}
}
...
讀操作就比較簡單,直接呼叫http_conn物件中的write()函數即可
bool http_conn::write(){
int temp = 0;
if (bytes_to_send == 0){
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
init();
return true;
}
...
}
該函數首先定義了一個臨時變數temp
,用於儲存每次寫操作傳送的位元組數。
接下來,它檢查是否需要傳送的位元組數為零。如果是,則呼叫modfd()
函數修改檔案描述符m_sockfd
在m_epollfd
中的事件,將其設定為監聽讀事件(EPOLLIN
),並呼叫init()
函數重置HTTP連線的狀態。然後返回true
表示寫操作完成。
...
while (1){
temp = writev(m_sockfd, m_iv, m_iv_count);
if (temp < 0){
if (errno == EAGAIN){//表示寫緩衝區已滿,無法繼續傳送資料
//修改檔案描述符`m_sockfd`在`m_epollfd`中的事件,將其設定為監聽寫事件(`EPOLLOUT`)
modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);
return true;
}//表示寫操作發生錯誤
unmap();//呼叫`unmap()`函數取消對映檔案,並返回`false`表示寫操作失敗。
return false;
}
...
如果需要傳送的位元組數不為零,則進入一個迴圈。
在迴圈中,呼叫writev()
函數將寫緩衝區中的資料傳送到通訊端m_sockfd
。writev()
函數可以一次性傳送多個緩衝區的資料。如果傳送成功,writev()
函數返回傳送的位元組數。如果傳送失敗,會根據錯誤型別進行相應的處理。
...
bytes_have_send += temp;
bytes_to_send -= temp;//如果已傳送的位元組數bytes_have_send大於等於m_iv[0].iov_len
if (bytes_have_send >= m_iv[0].iov_len){//,表示當前的寫緩衝區資料已經傳送完畢。
m_iv[0].iov_len = 0;//將m_iv[0].iov_len設為0,表示不再傳送寫緩衝區的資料
//檔案地址加上已傳送位元組數與m_write_idx的差值,表示下一次傳送的資料是檔案內容的剩餘部分
m_iv[1].iov_base = m_file_address + (bytes_have_send - m_write_idx);
m_iv[1].iov_len = bytes_to_send;//更新m_iv[1].iov_len為剩餘待傳送的位元組數
}
else{//已傳送的位元組數bytes_have_send小於m_iv[0].iov_len,表示當前的寫緩衝區資料還未完全傳送。
m_iv[0].iov_base = m_write_buf + bytes_have_send;//更新為寫緩衝區中剩餘資料的起始地址
m_iv[0].iov_len = m_iv[0].iov_len - bytes_have_send;//更新為剩餘待傳送的位元組數。
}
...
在成功傳送資料後,更新已傳送的位元組數bytes_have_send
和剩餘待傳送的位元組數bytes_to_send
。然後根據當前的傳送情況,更新m_iv
陣列中的資料。
...
if (bytes_to_send <= 0){//檢查剩餘待傳送的位元組數是否小於等於0。滿足則表示所有資料都已傳送完畢。
unmap();//取消對映檔案//↓修改檔案描述符m_sockfd在m_epollfd中的事件,將其設定為監聽讀事件(EPOLLIN)。
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
if (m_linger){//設定了長連線
init();//呼叫init()函數重置HTTP連線的狀態,並返回true表示寫操作完成。
return true;
}//未設定長連線,則直接返回false表示寫操作完成。
else return false;
}
}
}
回到run函數主體,如果 0 == m_actor_model
,則表示執行緒池採用同步模式工作
在同步模式中,執行緒池中的執行緒按順序依次處理請求,每個執行緒處理完一個請求後再處理下一個請求。
當有新的請求到達時,執行緒池中的執行緒會依次處理這些請求,直到所有請求都得到處理。
...
else{
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
}
}
#endif
1 == m_actor_model
時是主從模式?TBD
該函數使用recv函數從socketfd接收資料
bool http_conn::read_once(){//先判斷一下當前要接收的資料是否已經超出緩衝區大小
if (m_read_idx >= READ_BUFFER_SIZE){
return false;//讀取失敗
}
int bytes_read = 0;
提供兩種讀取模式:邊緣觸發和水平觸發,兩種都是常用的事件觸發機制,用於處理非阻塞I/O事件。分別來看
...
//LT讀取資料
if (0 == m_TRIGMode){
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0);
m_read_idx += bytes_read;
if (bytes_read <= 0) return false;
return true;
}
...
水平觸發模式(Level Triggered Mode)下,只呼叫一次recv
函數來讀取資料。這是因為在LT模式下,當通訊端可讀時,會一直觸發可讀事件(由事件迴圈實現),直到讀取緩衝區中的資料為空。(沒有讀取完資料,函數會返回true
,表示已經讀取了一部分資料。下次可讀事件到達時,會再次呼叫read_once
函數來繼續讀取剩餘的資料,直到沒有更多資料可讀為止。)
如果在LT模式下沒有讀取完資料,那麼在下一次可讀事件到達時,會再次呼叫read_once
函數來讀取剩餘的資料。這樣可以確保在每個可讀事件中儘可能地讀取更多的資料。
水平觸發模式的優點:簡單可靠。缺點是:頻繁的事件通知增大開銷,無法及時處理事件導致阻塞。
水平觸發模式相對較簡單,只要應用程式處理完整個事件,系統就會持續通知,不需要過於細緻的處理邏輯。且該模式持續通知直到事件處理完成,確保事件不會被丟失,應用程式有足夠的時間處理事件。
再來看邊沿觸發模式(Edge Triggered Mode)
...
else{//ET讀資料
while (true){
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0);
if (bytes_read == -1){
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
return false;
}
else if (bytes_read == 0) return false;
m_read_idx += bytes_read;
}
return true;
}
}
該模式下,使用recv函數從對應檔案描述符(即m_sockfd)上讀取資料,一次性讀取儘可能多的資料,並將讀取到的資料儲存到m_read_buf中,返回值表示是否成功讀取資料。(recv函數的輸入引數介紹:詳見)
在ET模式下,當通訊端的接收緩衝區狀態發生變化時,作業系統只觸發一次可讀事件。也就是說,只有當接收緩衝區由空變為非空時,才會觸發一次可讀事件。通過迴圈呼叫recv
函數,直到返回值為-1(表示沒有更多資料可讀)或返回值為0(表示對方關閉連線)為止。
如果recv
函數返回-1且錯誤碼為EAGAIN或EWOULDBLOCK,表示當前沒有更多資料可讀,此時退出迴圈。否則,繼續讀取資料。
ET模式相比LT模式更加高效,因為它只在接收緩衝區狀態發生變化時觸發一次事件,減少了事件的觸發次數。
缺點就是更復雜,需要消耗更多的資源
該函數用於處理獲取到的資料
void http_conn::process(){
HTTP_CODE read_ret = process_read();
if (read_ret == NO_REQUEST){
modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode);
return;
}
bool write_ret = process_write(read_ret);
if (!write_ret) close_conn();
modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode);
}
可以看到,這裡實際上就是呼叫了process_read()
和process_write()
兩個函數來處理資料
工作流程如下:
process_read()
函數進行讀取和解析請求(詳見):
process_read()
函數負責從通訊端中讀取資料,並解析HTTP請求。NO_REQUEST
:表示沒有完整的HTTP請求,需要繼續等待資料到達。GET_REQUEST
:表示成功解析出一個完整的GET請求。BAD_REQUEST
:表示解析請求出現錯誤,請求格式不正確。read_ret
的值進行處理:
read_ret
為NO_REQUEST
,表示沒有完整的HTTP請求,將通訊端的事件設定為可讀,並返回等待下一次可讀事件的到達。read_ret
為其他值,表示成功解析出一個完整的HTTP請求或出現錯誤,需要進行下一步的處理。process_write()
函數進行響應處理():
process_write()
函數負責根據read_ret
的值生成HTTP響應,並將響應資料寫入通訊端。write_ret
表示寫入通訊端的結果,為true
表示寫入成功,為false
表示寫入失敗。write_ret
的值進行處理:
write_ret
為false
,表示寫入通訊端失敗,需要關閉連線。write_ret
為true
,表示寫入通訊端成功,將通訊端的事件設定為可寫,並等待下一次可寫事件的到達。雖然該函數也被process()呼叫,但是其沒有使用主從狀態機模式去設計,該函數的作用是根據傳入的HTTP_CODE
引數生成HTTP響應,並將生成的響應內容新增到寫緩衝區中。
process_write()
函數根據傳入的HTTP_CODE
引數,針對不同的狀態碼生成不同的響應內容。它通過呼叫一系列輔助函數(如add_status_line()
、add_headers()
和add_content()
)將響應的狀態行、響應頭和響應體新增到寫緩衝區中。
(不貼程式碼了,有點長)
所謂的"主從狀態機"其實就是指http_conn::HTTP_CODE http_conn::process_read()遵循的設計模式,該函數根據主從狀態機模式進行設計,用於處理不同狀態。詳見
因為之前有詳細寫過這部分的介紹,這裡就概括一下就行
主狀態機:http_conn::process_read()
函數是主狀態機。它負責解析HTTP請求的不同部分,並根據當前狀態執行相應的操作。主狀態機在迴圈中不斷解析一行資料,並根據解析的結果進行狀態切換和處理。主狀態機的狀態包括CHECK_STATE_REQUESTLINE
、CHECK_STATE_HEADER
和CHECK_STATE_CONTENT
。
從狀態機:parse_line()
函數是從狀態機。它在主狀態機中被呼叫,用於解析一行資料的狀態。從狀態機的任務是根據當前解析的資料判斷是否解析完成一行,並返回相應的狀態。從狀態機的狀態包括LINE_OK
、LINE_BAD
和LINE_OPEN
。
主狀態機和從狀態機的互動:主狀態機在迴圈中不斷呼叫從狀態機的parse_line()
函數來解析一行資料的狀態。如果從狀態機返回的狀態為LINE_OK
,表示成功解析一行資料,主狀態機根據當前狀態進行相應的處理。如果從狀態機返回的狀態不是LINE_OK
,則繼續迴圈解析下一行資料。
主狀態機根據從狀態機的返回結果進行不同的處理,包括解析請求行、解析請求頭、解析請求資料等。根據不同的解析結果,主狀態機會返回不同的HTTP_CODE
,用於後續的處理和生成HTTP響應。
一旦從狀態機解析完整個HTTP請求,主狀態機就會呼叫do_request()
函數來處理具體的請求資訊。該函數根據解析的請求資訊生成HTTP響應。它會根據請求型別和URL構建實際的檔案路徑,並進行相應的處理。例如,如果是CGI請求,它會處理登入和註冊等操作;如果是靜態檔案請求,它會檢查檔案的許可權和型別,並將檔案對映到記憶體中。
總體而言,這個主從狀態機的作用是實現了對HTTP請求的解析和處理,以及生成相應的HTTP響應。它通過合理的狀態切換和處理邏輯,使得Web伺服器能夠正確地響應使用者端的請求,並處理各種錯誤情況。
總結:
- 主狀態機是
http_conn::process_read()
函數,負責解析HTTP請求的不同部分。- 從狀態機是
parse_line()
函數,用於解析一行資料的狀態。- 主狀態機通過迴圈呼叫從狀態機來解析資料,並根據解析結果進行狀態切換和處理。
- 主狀態機和從狀態機的互動通過從狀態機返回的狀態來完成。
簡單來說就是:執行緒池是一種為伺服器引入並行性的多執行緒技術
詳細介紹:見
線上程池的run函數中,不管是主從模式還是同步模式,都有以下一段程式碼
connectionRAII mysqlcon(&request->mysql, m_connPool);
幹嘛的?現在來看
所謂的"池"實際上就是一組資源的集合,任何資源如果有需要都可以以池的形式組織,比如執行緒池
簡單來說,池是資源的容器,本質上是對資源的複用。連線池也不例外
連線池中的資源為一組資料庫連線,由程式動態地對池中的連線進行使用,釋放。
資料庫存取的流程一般是:當系統需要存取資料庫時,先系統建立資料庫連線,完成資料庫操作,然後系統斷開資料庫連線。
按照上面的流程,如果要頻繁地存取資料庫,那就得不斷建立和斷開資料庫連線,這個過程很耗時且存在資料安全隱患。
所以,在程式初始化時我們就提前建立一些資料庫連線,將它們用池管理起來,等用的時候再給程式,這樣既能保證較快的資料庫存取速度,又能確保資料安全。
上程式碼
主函數main.cpp
中通過呼叫webserver物件的sql_pool()
函數來建立一個連線池
server.sql_pool();
void WebServer::sql_pool()
首先建立一個連線池物件,然後進行初始化操作
void WebServer::sql_pool(){
//初始化資料庫連線池
m_connPool = connection_pool::GetInstance();
m_connPool->init("localhost", m_user, m_passWord, m_databaseName, 3306, m_sql_num, m_close_log);
//初始化資料庫讀取表
users->initmysql_result(m_connPool);
}
看到這個"GetInstance()"想到什麼?沒錯,單例模式
這個資料庫連線池的設計也使用到了單例模式,前面紀錄檔處理部分的時候我們見識過單例模式了其實
connection_pool *connection_pool::GetInstance(){
static connection_pool connPool;
return &connPool;
}//GetInstance()被呼叫之後返回一個唯一的連線池範例
並且這裡很明顯使用的也是懶漢式,GetInstance()被呼叫才會建立或返回唯一範例
(區分懶漢還是餓漢,最好就是看該類在標頭檔案中的定義)
//構造初始化
void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, int MaxConn, int close_log){
m_url = url;
m_Port = Port;
m_User = User;
m_PassWord = PassWord;
m_DatabaseName = DBName;
m_close_log = close_log;
...
傳入的引數賦值給連線池的成員變數,包括主機地址(m_url
)、埠號(m_Port
)、使用者名稱(m_User
)、密碼(m_PassWord
)、資料庫名(m_DatabaseName
)和紀錄檔開關(m_close_log
)。
然後使用迴圈建立指定數量(MaxConn
)的資料庫連線物件,並將其新增到連線池的connList
列表中。
...
for (int i = 0; i < MaxConn; i++){
MYSQL *con = NULL;
con = mysql_init(con);
if (con == NULL){
LOG_ERROR("MySQL Error");
exit(1);
}
con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0);
if (con == NULL){
LOG_ERROR("MySQL Error");
exit(1);
}
connList.push_back(con);
++m_FreeConn;//每建立一個連線物件,空閒連線數(m_FreeConn)加1
}//號誌reserve初始化,將號誌的初始值設定為空閒連線數(m_FreeConn),用於控制連線的獲取。
reserve = sem(m_FreeConn);
m_MaxConn = m_FreeConn;//將最大連線數(m_MaxConn)設定為當前空閒連線數(m_FreeConn)。
}
這裡首先使用mysql_init()
(MySQL C API 提供的函數)初始化一個 MYSQL 結構體物件。
con
是一個 MYSQL 結構體指標,傳遞給mysql_init()
函數後,函數會初始化con
指向的MYSQL結構體物件,使其成為一個有效的、用於表示資料庫連線的物件。該物件可進行後續的資料庫操作,如連線資料庫、執行 SQL 查詢等。
然後,我們需要使用mysql_real_connect()
給con
提供用於連線資料庫的資訊。如果連線成功,mysql_real_connect()
函數返回一個非空的MYSQL結構體指標,表示連線成功的連線物件。如果連線失敗,返回 NULL。
在給定的程式碼中,連線成功後,將連線物件新增到連線池的connList
列表(list<MYSQL *> connList;
)中,並增加空閒連線數。
注意,在使用完連線物件後,需要通過
mysql_close()
函數關閉連線,並將連線物件從連線池中移除。這部分邏輯在ReleaseConnection()
和DestroyPool()
(詳見)函數中實現。
初始化完資料庫連線,並將其加入連線池後,當然得用這個連線去存取資料庫啦
void WebServer::sql_pool(){
//初始化資料庫連線池
...
//初始化資料庫讀取表
users->initmysql_result(m_connPool);
}
這裡呼叫的是http_conn物件users中的initmysql_result()函數來與資料庫進行互動
在該函數中,建立connectionRAII
物件mysqlcon
,並傳遞&mysql
和connPool
作為引數。這樣,mysqlcon
物件的建構函式會獲取一個資料庫連線並將其賦值給mysql
。
map<string, string> users;
void http_conn::initmysql_result(connection_pool *connPool){
//先從連線池中取一個連線
MYSQL *mysql = NULL;
connectionRAII mysqlcon(&mysql, connPool);
...
connectionRAII
是一個自定義的類,用於管理資料庫連線的生命週期。它的建構函式接受兩個引數:一個MYSQL**
型別的指標con
和一個connection_pool*
型別的指標connPool
。
簡單來說,我們通過connectionRAII類來獲取連線池中儲存的"連線"資源,然後供後續使用。
注意,這裡建立的connectionRAII類負責管理一個連線的整個使用週期,包括其取用到銷燬
從連線池拿到連線後,開始檢索資料庫
...
//在user表中檢索username,passwd資料,瀏覽器端輸入
if (mysql_query(mysql, "SELECT username,passwd FROM user")){
LOG_ERROR("SELECT error:%s\n", mysql_error(mysql));
}
首先查詢的是使用者從瀏覽器輸入的賬戶名和密碼
然後查詢剩餘的資訊並返回
...
//從表中檢索完整的結果集
MYSQL_RES *result = mysql_store_result(mysql);
//返回結果集中的列數
int num_fields = mysql_num_fields(result);
//返回所有欄位結構的陣列
MYSQL_FIELD *fields = mysql_fetch_fields(result);
//從結果集中獲取下一行,將對應的使用者名稱和密碼,存入map中
while (MYSQL_ROW row = mysql_fetch_row(result))
{
string temp1(row[0]);
string temp2(row[1]);
users[temp1] = temp2;
}
}
從mysql中可以得到所有資料的返回值,此時選取row[0]和row[1]對應著使用者名稱和密碼,將其儲存到users(http_conn物件)中相應的成員屬性中。
OK,現在我們完成了以下流程:
使用者在瀏覽器輸入使用者名稱、密碼進行註冊->使用者名稱密碼入庫,註冊完成->使用者輸入使用者名稱密碼登入->使用輸入資料在資料庫查詢
實際上到這裡,連線池類的工作就已經完成了
字面意思,連線池類使用單例模式進行設計,建立了一個並維護一個連線池。
但是在初始化之後,維護連線池的工作更多的是由connectionRAII類來完成,該類呼叫連線池類提供的唯一範例,對外部提供了獲取和使用連線池中連線的方法。
讀完程式碼之後可以發現,實際上真正的連線池類connection_pool
好像並沒有被"直接"使用,就連連線池的管理都是"外包"出去的
這個就是RAII機制
RAII(Resource Acquisition Is Initialization)是一種資源獲取即初始化的程式設計技術,用於管理資源的生命週期。它是一種 C++ 的程式設計正規化,通過在物件的建構函式中獲取資源,並在物件的解構函式中釋放資源,以確保資源在物件的生命週期內得到正確的管理和釋放。
RAII的基本原則是:在物件的建構函式中獲取資源,並在解構函式中釋放資源。通過這種方式,無論是正常執行還是異常情況下的退出,都可以保證資源的正確釋放,避免資源洩漏。
對應到連線池的設計中就是:
連線池是儲存"連線"這種資源的一個容器,而我們在建立一個連線池物件後,不直接通過物件提供的函數來使用池中的連線。
我們定義一個類connectionRAII,
該類的建構函式中呼叫連線池類提供的GetConnection()函數來獲取連線;
該類的解構函式中呼叫連線池類提供的ReleaseConnection()函數來獲取連線;
這樣,當你想要獲取一個連線時,你會去建立一個connectionRAII物件並輸入一些必要的引數,當物件建立完成時你就已經得到了一個連線,這時候你可以開始用了。
然後等不用的時候,只需要將connectionRAII物件解構掉即可,過程中不用關係連線池背後做的一些記憶體回收的操作,避免產生錯誤
簡單來說,RAII就是給某些資源類再次進行了封裝,讓使用者專注於資源的使用邏輯而不需要考慮資源的管理細節,降低資源洩漏的概率
在 Web 伺服器中,執行緒池用於管理並行處理使用者端請求的執行緒。
通常情況下,執行緒池中的執行緒需要保持活動狀態,以便隨時處理新的請求。如果使用 RAII 來管理執行緒池中的執行緒,那麼在執行緒物件的解構函式中釋放執行緒資源將導致執行緒被終止,從而無法繼續處理新的請求。
為了保持執行緒池的正常工作和執行緒的重用,一般不使用 RAII 來管理執行緒池中的執行緒。相反,通常會使用其他手段來管理執行緒的生命週期,例如使用條件變數或標誌位來控制執行緒的啟動和停止,或者使用特定的執行緒池管理類來管理執行緒的建立、啟動和銷燬。
這就是為什麼執行緒池不用RAII的原因
該函數被連線池的解構函式呼叫,用於銷燬資料庫連線池,釋放記憶體
void connection_pool::DestroyPool(){
lock.lock();
if (connList.size() > 0){
list<MYSQL *>::iterator it;
for (it = connList.begin(); it != connList.end(); ++it){
MYSQL *con = *it;
mysql_close(con);
}
m_CurConn = 0;
m_FreeConn = 0;
connList.clear();
}
lock.unlock();
}
還是經典的執行緒安全操作,拿鎖
然後遍歷整個connList
,取出每一個之前建立的連線,使用mysql_close()
逐一關閉
最後將連線計數變數清空。然後刪除connList
,解鎖。
前面也提到了,這個類是用於管理連線池中"連線"的取用的。
該類的宣告如下:
class connectionRAII{//也位於連線池的標頭檔案中
public:
connectionRAII(MYSQL **con, connection_pool *connPool);
~connectionRAII();
private:
MYSQL *conRAII;
connection_pool *poolRAII;
};
connectionRAII類中的connectionRAII函數使用傳入的資料庫連線物件來接收連線池中的連線,具體來說是使用GetConnection()從connPool中獲取一個連線
connectionRAII::connectionRAII(MYSQL **SQL, connection_pool *connPool){
*SQL = connPool->GetConnection();
conRAII = *SQL;
poolRAII = connPool;
}
GetConnection()是連線池中的一個功能函數。當有請求時,從資料庫連線池中返回一個可用連線並更新使用和空閒連線數
MYSQL *connection_pool::GetConnection()
{
MYSQL *con = NULL;
if (0 == connList.size()) return NULL;
reserve.wait();
lock.lock();
con = connList.front();
connList.pop_front();
--m_FreeConn;
++m_CurConn;
lock.unlock();
return con;
}
在GetConnection()中,首先檢查connList
是否為空,如果為空則返回NULL。
不為空則改變reserve狀態為請求,然後拿鎖,從connList
的首部獲取一個連線並從池中pop掉
完成後解鎖
能夠從池子拿連線那肯定可以釋放連線,因為connectionRAII是一個類,所以釋放連線的操作是在該類解構時進行的
connectionRAII::~connectionRAII(){
poolRAII->ReleaseConnection(conRAII);
}
解構函式中會呼叫連線池物件poolRAII中的ReleaseConnection函數對連線進行釋放
bool connection_pool::ReleaseConnection(MYSQL *con){
if (NULL == con) return false;
lock.lock();
connList.push_back(con);
++m_FreeConn;
--m_CurConn;
lock.unlock();
reserve.post();
return true;
}
還是先檢查獲取到的連線物件指標con是否為空,然後就是經典的執行緒安全操作,在拿到鎖之後,程式把被釋放的連線從新加入connList
的尾部,此時該連線被視為空閒連線。
然後解鎖,將reserve號誌的狀態改為post(增加號誌)
操作完成返回true
至此,connectionRAII類通過呼叫連線池範例中的成員函數,對外提供了連線池中連線的取用與管理。