推薦學習:
業務中有時候我們會用 Redis 處理一些高並行的業務場景,例如,秒殺業務,對於庫存的操作。。。
先來分析下,並行場景下會發生什麼問題
並行問題主要發生在資料的修改上,對於使用者端修改資料,一般分成下面兩個步驟:
1、使用者端先把資料讀取到本地,在本地進行修改;
2、使用者端修改完資料後,再寫回Redis。
我們把這個流程叫做讀取-修改-寫回
操作(Read-Modify-Write
,簡稱為 RMW 操作)。如果使用者端並行進行 RMW 操作的時候,就需要保證 讀取-修改-寫回
是一個原子操作,進行命令操作的時候,其他使用者端不能對當前的資料進行操作。
錯誤的栗子:
統計一個頁面的存取次數,每次重新整理頁面存取次數+1,這裡使用 Redis 來記錄存取次數。
如果每次的讀取-修改-寫回
操作不是一個原子操作,那麼就可能存在下圖的問題,使用者端2在使用者端1操作的中途,也獲取 Redis 的值,也對值進行+1,操作,這樣就導致最終資料的錯誤。
對於上面的這種情況,一般會有兩種方式解決:
1、使用 Redis 實現一把分散式鎖,通過鎖來保護每次只有一個執行緒來操作臨界資源;
2、實現操作命令的原子性。
讀取-修改-寫回
是一個原子性的命令,那麼這個命令在操作過程中就不有別的執行緒同時讀取運算元據,這樣就能避免上面栗子出現的問題。下面從原子性和鎖兩個方面,具體分析下,對並行存取問題的處理
為了實現並行控制要求的臨界區程式碼互斥執行,如果使用 Redis 中命令的原子性,可以有下面兩種處理方式:
1、藉助於 Redis 中的原子性的單命令;
2、把多個操作寫到一個Lua指令碼中,以原子性方式執行單個Lua指令碼。
在探討 Redis 原子性的時候,先來探討下 Redis 中使用到的程式設計模型
Redis 中使用到了 Reactor 模型,Reactor 是非阻塞 I/O 模型,這裡來看下 Unix 中的 I/O 模型。
作業系統上的 I/O 是使用者空間和核心空間的資料互動,因此 I/O 操作通常包含以下兩個步驟:
1、等待網路資料到達網路卡(讀就緒)/等待網路卡可寫(寫就緒) –> 讀取/寫入到核心緩衝區;
2、從核心緩衝區複製資料 –> 使用者空間(讀)/從使用者空間複製資料 -> 核心緩衝區(寫);
Unix 中有五種基本的 I/O 模型
而判定一個 I/O 模型是同步還是非同步,主要看第二步:資料在使用者和核心空間之間複製的時候是不是會阻塞當前程序,如果會,則是同步 I/O,否則,就是非同步 I/O。
這裡主要分下下面三種 I/O 模型
當使用者程式執行 read ,執行緒會被阻塞,一直等到核心資料準備好,並把資料從核心緩衝區拷貝到應用程式的緩衝區中,當拷貝過程完成,read 才會返回。
阻塞等待的是「核心資料準備好」和「資料從核心態拷貝到使用者態」這兩個過程。
非阻塞的 read 請求在資料未準備好的情況下立即返回,可以繼續往下執行,此時應用程式不斷輪詢核心,直到資料準備好,核心將資料拷貝到應用程式緩衝區,read 呼叫才可以獲取到結果。
這裡最後一次 read 呼叫,獲取資料的過程,是一個同步的過程,是需要等待的過程。這裡的同步指的是核心態的資料拷貝到使用者程式的快取區這個過程。
發起非同步 I/O,就立即返回,核心自動將資料從核心空間拷貝到使用者空間,這個拷貝過程同樣是非同步的,核心自動完成的,和前面的同步操作不一樣,應用程式並不需要主動發起拷貝動作。
舉個你去飯堂吃飯的例子,你好比應用程式,飯堂好比作業系統。
阻塞 I/O 好比,你去飯堂吃飯,但是飯堂的菜還沒做好,然後你就一直在那裡等啊等,等了好長一段時間終於等到飯堂阿姨把菜端了出來(資料準備的過程),但是你還得繼續等阿姨把菜(核心空間)打到你的飯盒裡(使用者空間),經歷完這兩個過程,你才可以離開。
非阻塞 I/O 好比,你去了飯堂,問阿姨菜做好了沒有,阿姨告訴你沒,你就離開了,過幾十分鐘,你又來飯堂問阿姨,阿姨說做好了,於是阿姨幫你把菜打到你的飯盒裡,這個過程你是得等待的。
非同步 I/O 好比,你讓飯堂阿姨將菜做好並把菜打到飯盒裡後,把飯盒送到你面前,整個過程你都不需要任何等待。
在 web 服務中,處理 web 請求通常有兩種體系結構,分別為:thread-based architecture
(基於執行緒的架構)、event-driven architecture
(事件驅動模型)
thread-based architecture(基於執行緒的架構):這種比較容易理解,就是多執行緒並行模式,伺服器端在處理請求的時候,一個請求分配一個獨立的執行緒來處理。
因為每個請求分配一個獨立的執行緒,所以單個執行緒的阻塞不會影響到其他的執行緒,能夠提高程式的響應速度。
不足的是,連線和執行緒之間始終保持一對一的關係,如果是一直處於 Keep-Alive 狀態的長連線將會導致大量工作執行緒在空閒狀態下等待,例如,檔案系統存取,網路等。此外,成百上千的連線還可能會導致並行執行緒浪費大量記憶體的堆疊空間。
事件驅動的體系結構由事件生產者和事件消費者組,是一種鬆耦合、分散式的驅動架構,生產者收集到某應用產生的事件後實時對事件採取必要的處理後路由至下游系統,無需等待系統響應,下游的事件消費者組收到是事件訊息,非同步的處理。
事件驅動架構具有以下優勢:
降低事件生產者和訂閱者的耦合性。事件生產者只需關注事件的發生,無需關注事件如何處理以及被分發給哪些訂閱者。任何一個環節出現故障,不會影響其他業務正常執行。
事件驅動架構適用於非同步場景,即便是需求高峰期,收集各種來源的事件後保留在事件匯流排中,然後逐步分發傳遞事件,不會造成系統擁塞或資源過剩的情況。
事件驅動架構中路由和過濾能力支援劃分服務,便於擴充套件和路由分發。
Reactor 模式和 Proactor 模式都是 event-driven architecture
(事件驅動模型)的實現方式,這裡具體分析下
Reactor 模式,是指通過一個或多個輸入同時傳遞給服務處理器的服務請求的事件驅動處理模式。
在處理⽹絡 IO 的連線事件、讀事件、寫事件。Reactor 中引入了三類角色
Reactor 模型又分為 3 類:
建立連線(Acceptor)、監聽accept、read、write事件(Reactor)、處理事件(Handler)都只用一個單執行緒;
與單執行緒模式不同的是,新增了一個工作者執行緒池,並將非 I/O
操作從 Reactor 執行緒中移出轉交給工作者執行緒池(Thread Pool)來執行。
建立連線(Acceptor)和 監聽accept、read、write事件(Reactor),複用一個執行緒。
工作執行緒池:處理事件(Handler),由一個工作執行緒池來執行業務邏輯,包括資料就緒後,使用者態的資料讀寫。
對於多個CPU的機器,為充分利用系統資源,將 Reactor 拆分為兩部分:mainReactor 和 subReactor。
mainReactor:負責監聽server socket
,用來處理網路新連線的建立,將建立的socketChannel指定註冊給subReactor,通常一個執行緒就可以處理;
subReactor:監聽accept、read、write
事件(Reactor
),包括等待資料就緒時,核心態的資料讀寫,通常使用多執行緒。
工作執行緒:處理事件(Handler)可以和 subReactor 共同使用同一個執行緒,也可以做成執行緒池,類似上面多執行緒 Reactor 模式下的工作執行緒池的處理方式。
reactor 流程與 Reactor 模式類似
不同點就是
在每次感知到有事件發生(比如可讀就緒事件)後,就需要應用程序主動呼叫 read 方法來完成資料的讀取,也就是要應用程序主動將 socket 接收快取中的資料讀到應用程序記憶體中,這個過程是同步的,讀取完資料後應用程序才能處理資料。
在發起非同步讀寫請求時,需要傳入資料緩衝區的地址(用來存放結果資料)等資訊,這樣系統核心才可以自動幫我們把資料的讀寫工作完成,這裡的讀寫工作全程由作業系統來做,並不需要像 Reactor 那樣還需要應用程序主動發起 read/write
來讀寫資料,作業系統完成讀寫工作後,就會通知應用程序直接處理資料。
因此,Reactor 可以理解為「來了事件作業系統通知應用程序,讓應用程序來處理」,而 Proactor 可以理解為「來了事件作業系統來處理,處理完再通知應用程序」。
舉個實際生活中的例子,Reactor 模式就是快遞員在樓下,給你打電話告訴你快遞到你家小區了,你需要自己下樓來拿快遞。而在 Proactor 模式下,快遞員直接將快遞送到你家門口,然後通知你。
Redis 中使用是單執行緒,可能處於以下幾方面的考慮
1、Redis 是純記憶體的操作,執行速度是非常快的,因此這部分操作通常不會是效能瓶頸,效能瓶頸在於網路 I/O;
2、避免過多的上下文切換開銷,單執行緒則可以規避程序內頻繁的執行緒切換開銷;
3、避免同步機制的開銷,多執行緒必然會面臨對於共用資源的存取,這時候通常的做法就是加鎖,雖然是多執行緒,這時候就會變成序列的存取。也就是多執行緒程式設計模式會面臨的共用資源的並行存取控制問題;
4、簡單可維護,多執行緒也會引入同步原語來保護共用資源的並行存取,程式碼的可維護性和易讀性將會下降。
Redis 在 v6.0 版本之前,Redis 的核心網路模型一直是一個典型的單 Reactor 模型:利用 epoll/select/kqueue
等多路複用技術,在單執行緒的事件迴圈中不斷去處理事件(使用者端請求),最後回寫響應資料到使用者端:
這裡來看下 Redis 如何使用單執行緒處理任務
Redis 的網路框架實現了 Reactor 模型,並且自行開發實現了一個事件驅動框架。
事件驅動框架的邏輯簡單點講就是
來看下 Redis 中事件驅動框架實現的幾個主要函數
// 執行事件捕獲,分發和處理迴圈 void aeMain(aeEventLoop *eventLoop); // 用來註冊監聽的事件和事件對應的處理常式。只有對事件和處理常式進行了註冊,才能在事件發生時呼叫相應的函數進行處理。 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); // aeProcessEvents 函數實現的主要功能,包括捕獲事件、判斷事件型別和呼叫具體的事件處理常式,從而實現事件的處理 int aeProcessEvents(aeEventLoop *eventLoop, int flags);
使用 aeMain 作為主迴圈來對事件進行持續監聽和捕獲,其中會呼叫 aeProcessEvents 函數,實現事件捕獲、判斷事件型別和呼叫具體的事件處理常式,從而實現事件的處理。
// https://github.com/redis/redis/blob/5.0/src/ae.c#L496 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } // https://github.com/redis/redis/blob/5.0/src/ae.c#L358 int aeProcessEvents(aeEventLoop *eventLoop, int flags) { ... if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { ... //呼叫aeApiPoll函數捕獲事件 numevents = aeApiPoll(eventLoop, tvp); ... } ... }
可以看到 aeProcessEvents 中對於 IO 事件的捕獲是通過呼叫 aeApiPoll 來完成的。
aeApiPoll 是 I/O 多路複用 API,是基於 epoll_wait/select/kevent
等系統呼叫的封裝,監聽等待讀寫事件觸發,然後處理,它是事件迴圈(Event Loop)中的核心函數,是事件驅動得以執行的基礎。
Redis 是依賴於作業系統底層提供的 IO 多路複用機制,來實現事件捕獲,檢查是否有新的連線、讀寫事件發生。為了適配不同的作業系統,Redis 對不同作業系統實現的網路 IO 多路複用函數,都進行了統一的封裝。
// https://github.com/redis/redis/blob/5.0/src/ae.c#L49 #ifdef HAVE_EVPORT #include "ae_evport.c" // Solaris #else #ifdef HAVE_EPOLL #include "ae_epoll.c" // Linux #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" // MacOS #else #include "ae_select.c" // Windows #endif #endif #endif
ae_epoll.c:對應 Linux 上的 IO 複用函數 epoll;
ae_evport.c:對應 Solaris 上的 IO 複用函數 evport;
ae_kqueue.c:對應 macOS 或 FreeBSD 上的 IO 複用函數 kqueue;
ae_select.c:對應 Linux(或 Windows)的 IO 複用函數 select。
監聽 socket 的讀事件,當有使用者端連線請求過來,使用函數 acceptTcpHandler 和使用者端建立連線
當 Redis 啟動後,伺服器程式的 main 函數會呼叫 initSever 函數來進行初始化,而在初始化的過程中,aeCreateFileEvent 就會被 initServer 函數呼叫,用於註冊要監聽的事件,以及相應的事件處理常式。
// https://github.com/redis/redis/blob/5.0/src/server.c#L2036 void initServer(void) { ... // 建立一個事件處理程式以接受 TCP 和 Unix 中的新連線 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } ... }
可以看到 initServer 中會根據啟用的 IP 埠個數,為每個 IP 埠上的網路事件,呼叫 aeCreateFileEvent,建立對 AE_READABLE 事件的監聽,並且註冊 AE_READABLE 事件的處理 handler,也就是 acceptTcpHandler 函數。
然後看下 acceptTcpHandler 的實現
// https://github.com/redis/redis/blob/5.0/src/networking.c#L734 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { // 用於accept使用者端的連線,其返回值是使用者端對應的socket cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); // 會呼叫acceptCommonHandler對連線以及使用者端進行初始化 acceptCommonHandler(cfd,0,cip); } } // https://github.com/redis/redis/blob/5.0/src/networking.c#L664 static void acceptCommonHandler(int fd, int flags, char *ip) { client *c; // 分配並初始化新使用者端 if ((c = createClient(fd)) == NULL) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; } // 判斷當前連線的使用者端是否超過最大值,如果超過的話,會拒絕這次連線。否則,更新使用者端連線數的計數 if (listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; /* That's a best effort error message, don't check write errors */ if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; freeClient(c); return; } ... } // 使用多路複用,需要記錄每個使用者端的狀態,client 之前通過連結串列儲存 typedef struct client { int fd; // 欄位是使用者端通訊端檔案描述符 sds querybuf; // 儲存使用者端發來命令請求的輸入緩衝區。以Redis通訊協定的方式儲存 int argc; // 當前命令的引數數量 robj **argv; // 當前命令的引數 redisDb *db; // 當前選擇的資料庫指標 int flags; list *reply; // 儲存命令回覆的連結串列。因為靜態緩衝區大小固定,主要儲存固定長度的命令回覆,當處理一些返回大量回復的命令,則會將命令回覆以連結串列的形式連線起來。 // ... many other fields ... char buf[PROTO_REPLY_CHUNK_BYTES]; } client; client *createClient(int fd) { client *c = zmalloc(sizeof(client)); // 如果fd為-1,表示建立的是一個無網路連線的偽使用者端,用於執行lua指令碼的時候。 // 如果fd不等於-1,表示建立一個有網路連線的使用者端 if (fd != -1) { // 設定fd為非阻塞模式 anetNonBlock(NULL,fd); // 禁止使用 Nagle 演演算法,client向核心遞交的每個封包都會立即傳送給server出去,TCP_NODELAY anetEnableTcpNoDelay(NULL,fd); // 如果開啟了tcpkeepalive,則設定 SO_KEEPALIVE if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 建立一個檔案事件狀態el,且監聽讀事件,開始接受命令的輸入 if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } ... // 初始化client 中的引數 return c; }
1、acceptTcpHandler 主要用於處理和使用者端連線的建立;
2、其中會呼叫函數 anetTcpAccept 用於 accept 使用者端的連線,其返回值是使用者端對應的 socket;
3、然後呼叫 acceptCommonHandler 對連線以及使用者端進行初始化;
4、初始化使用者端的時候,同時使用 aeCreateFileEvent 用來註冊監聽的事件和事件對應的處理常式,將 readQueryFromClient 命令讀取處理器繫結到新連線對應的檔案描述符上;
5、伺服器會監聽該檔案描述符的讀事件,當用戶端傳送了命令,觸發了 AE_READABLE 事件,那麼就會呼叫回撥函數 readQueryFromClient() 來從檔案描述符 fd 中讀發來的命令,並儲存在輸入緩衝區中 querybuf。
readQueryFromClient 是請求處理的起點,解析並執行使用者端的請求命令。
// https://github.com/redis/redis/blob/5.0/src/networking.c#L1522 // 讀取client的輸入緩衝區的內容 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; size_t qblen; UNUSED(el); UNUSED(mask); ... // 輸入緩衝區的長度 qblen = sdslen(c->querybuf); // 更新緩衝區的峰值 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; // 擴充套件緩衝區的大小 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 呼叫read從描述符為fd的使用者端socket中讀取資料 nread = read(fd, c->querybuf+qblen, readlen); ... // 處理讀取的內容 processInputBufferAndReplicate(c); } // https://github.com/redis/redis/blob/5.0/src/networking.c#L1507 void processInputBufferAndReplicate(client *c) { // 當前使用者端不屬於主從複製中的Master // 直接呼叫 processInputBuffer,對使用者端輸入緩衝區中的命令和引數進行解析 if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c); // 使用者端屬於主從複製中的Master // 呼叫processInputBuffer函數,解析使用者端命令, // 呼叫replicationFeedSlavesFromMasterStream 函數,將主節點接收到的命令同步給從節點 } else { size_t prev_offset = c->reploff; processInputBuffer(c); size_t applied = c->reploff - prev_offset; if (applied) { replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied); sdsrange(c->pending_querybuf,applied,-1); } } } // https://github.com/redis/redis/blob/5.0/src/networking.c#L1428 void processInputBuffer(client *c) { server.current_client = c; /* Keep processing while there is something in the input buffer */ // 持續讀取緩衝區的內容 while(c->qb_pos < sdslen(c->querybuf)) { ... /* Multibulk processing could see a <= 0 length. */ // 如果引數為0,則重置client if (c->argc == 0) { resetClient(c); } else { /* Only reset the client when the command was executed. */ // 執行命令成功後重置client if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; } // 命令處於阻塞狀態中的使用者端,不需要進行重置 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) resetClient(c); } /* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */ if (server.current_client == NULL) break; } } /* Trim to pos */ if (server.current_client != NULL && c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } server.current_client = NULL; }
1、readQueryFromClient(),從檔案描述符 fd 中讀出資料到輸入緩衝區 querybuf 中;
2、使用 processInputBuffer 函數完成對命令的解析,在其中使用 processInlineBuffer 或者 processMultibulkBuffer 根據 Redis 協定解析命令;
3、完成對一個命令的解析,就使用 processCommand 對命令就行執行;
4、命令執行完成,最後呼叫 addReply 函數族的一系列函數將響應資料寫入到對應 client 的寫出緩衝區:client->buf 或者 client->reply ,client->buf 是首選的寫出緩衝區,固定大小 16KB,一般來說可以緩衝足夠多的響應資料,但是如果使用者端在時間視窗內需要響應的資料非常大,那麼則會自動切換到 client->reply 連結串列上去,使用連結串列理論上能夠儲存無限大的資料(受限於機器的實體記憶體),最後把 client 新增進一個 LIFO 佇列 clients_pending_write;
在 Redis 事件驅動框架每次迴圈進入事件處理常式前,來處理監聽到的已觸發事件或是到時的時間事件之前,都會呼叫 beforeSleep 函數,進行一些任務處理,這其中就包括了呼叫 handleClientsWithPendingWrites 函數,它會將 Redis sever
使用者端緩衝區中的資料寫回使用者端。
// https://github.com/redis/redis/blob/5.0/src/server.c#L1380 void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); ... // 將 Redis sever 使用者端緩衝區中的資料寫回使用者端 handleClientsWithPendingWrites(); ... } // https://github.com/redis/redis/blob/5.0/src/networking.c#L1082 int handleClientsWithPendingWrites(void) { listIter li; listNode *ln; // 遍歷 clients_pending_write 佇列,呼叫 writeToClient 把 client 的寫出緩衝區裡的資料回寫到使用者端 int processed = listLength(server.clients_pending_write); listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; listDelNode(server.clients_pending_write,ln); ... // 呼叫 writeToClient 函數,將使用者端輸出緩衝區中的資料寫回 if (writeToClient(c->fd,c,0) == C_ERR) continue; // 如果輸出緩衝區的資料還沒有寫完,此時,handleClientsWithPendingWrites 函數就 // 會呼叫 aeCreateFileEvent 函數,建立可寫事件,並設定回撥函數 sendReplyToClien if (clientHasPendingReplies(c)) { int ae_flags = AE_WRITABLE; if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) { ae_flags |= AE_BARRIER; } // 將檔案描述符fd和AE_WRITABLE事件關聯起來,當用戶端可寫時,就會觸發事件,呼叫sendReplyToClient()函數,執行寫事件 if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } } return processed; } // https://github.com/redis/redis/blob/5.0/src/networking.c#L1072 // 寫事件處理程式,只是傳送回復給client void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(el); UNUSED(mask); writeToClient(fd,privdata,1); } // https://github.com/redis/redis/blob/5.0/src/networking.c#L979 // 將輸出緩衝區的資料寫給client,如果client被釋放則返回C_ERR,沒被釋放則返回C_OK int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; size_t objlen; clientReplyBlock *o; // 如果指定的client的回覆緩衝區中還有資料,則返回真,表示可以寫socket while(clientHasPendingReplies(c)) { // 固定緩衝區傳送未完成 if (c->bufpos > 0) { // 將緩衝區的資料寫到fd中 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); ... // 如果傳送的資料等於buf的偏移量,表示傳送完成 if ((int)c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } // 固定緩衝區傳送完成,傳送回復連結串列的內容 } else { // 回覆連結串列的第一條回覆物件,和物件值的長度和所佔的記憶體 o = listNodeValue(listFirst(c->reply)); objlen = o->used; if (objlen == 0) { c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); continue; } // 將當前節點的值寫到fd中 nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; ... } ... } ... // 如果指定的client的回覆緩衝區中已經沒有資料,傳送完成 if (!clientHasPendingReplies(c)) { c->sentlen = 0; // 刪除當前client的可讀事件的監聽 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ // 如果指定了寫入按成之後立即關閉的標誌,則釋放client if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { freeClient(c); return C_ERR; } } return C_OK; }
1、beforeSleep 函數呼叫的 handleClientsWithPendingWrites 函數,會遍歷 clients_pending_write(待寫回資料的使用者端) 佇列,呼叫 writeToClient 把 client 的寫出緩衝區裡的資料回寫到使用者端,然後呼叫 writeToClient 函數,將使用者端輸出緩衝區中的資料傳送給使用者端;
2、如果輸出緩衝區的資料還沒有寫完,此時,handleClientsWithPendingWrites 函數就會呼叫 aeCreateFileEvent 函數,註冊 sendReplyToClient 到該連線的寫就緒事件,等待將後續將資料寫回給使用者端。
上面的執行流程總結下來就是
1、Redis Server
啟動後,主執行緒會啟動一個時間迴圈(Event Loop),持續監聽事件;
2、client 到 server 的新連線,會呼叫 acceptTcpHandler 函數,之後會註冊讀事件 readQueryFromClient 函數,client 發給 server 的資料,都會在這個函數處理,這個函數會解析 client 的資料,找到對應的 cmd 函數執行;
3、cmd 邏輯執行完成後,server 需要寫回資料給 client,呼叫 addReply 函數族的一系列函數將響應資料寫入到對應 client 的寫出緩衝區:client->buf
或者 client->reply
,client->buf
是首選的寫出緩衝區,固定大小 16KB,一般來說可以緩衝足夠多的響應資料,但是如果使用者端在時間視窗內需要響應的資料非常大,那麼則會自動切換到 client->reply
連結串列上去,使用連結串列理論上能夠儲存無限大的資料(受限於機器的實體記憶體),最後把 client 新增進一個 LIFO 佇列 clients_pending_write
;
4、在 Redis 事件驅動框架每次迴圈進入事件處理常式前,來處理監聽到的已觸發事件或是到時的時間事件之前,都會呼叫 beforeSleep 函數,進行一些任務處理,這其中就包括了呼叫 handleClientsWithPendingWrites 函數,它會將 Redis sever 使用者端緩衝區中的資料寫回使用者端;
在 Redis6.0 的版本中,引入了多執行緒來處理 IO 任務,多執行緒的引入,充分利用了當前伺服器多核特性,使用多核執行多執行緒,讓多執行緒幫助加速資料讀取、命令解析以及資料寫回的速度,提升 Redis 整體效能。
Redis6.0 之前的版本用的是單執行緒 Reactor 模式,所有的操作都在一個執行緒中完成,6.0 之後的版本使用了主從 Reactor 模式。
由一個 mainReactor 執行緒接收連線,然後傳送給多個 subReactor 執行緒處理,subReactor 負責處理具體的業務。
來看下 Redis 多IO執行緒的具體實現過程
使用 initThreadedIO 函數來初始化多 IO 執行緒。
// https://github.com/redis/redis/blob/6.2/src/networking.c#L3573 void initThreadedIO(void) { server.io_threads_active = 0; /* We start with threads not active. */ /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ // 如果使用者只設定了一個 I/O 執行緒,不需要建立新執行緒了,直接在主執行緒中處理 if (server.io_threads_num == 1) return; if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); } /* Spawn and initialize the I/O threads. */ // 初始化執行緒 for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); // 編號為0是主執行緒 if (i == 0) continue; /* Thread 0 is the main thread. */ /* Things we do only for the additional threads. */ pthread_t tid; // 初始化io_threads_mutex陣列 pthread_mutex_init(&io_threads_mutex[i],NULL); // 初始化io_threads_pending陣列 setIOPendingCount(i, 0); // 主執行緒在啟動 I/O 執行緒的時候會預設先鎖住它,直到有 I/O 任務才喚醒它。 pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ // 呼叫pthread_create函數建立IO執行緒,執行緒執行函數為IOThreadMain if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; } }
可以看到在 initThreadedIO 中完成了對下面四個陣列的初始化工作
io_threads_list 陣列:儲存了每個 IO 執行緒要處理的使用者端,將陣列每個元素初始化為一個 List 型別的列表;
io_threads_pending 陣列:儲存等待每個 IO 執行緒處理的使用者端個數;
io_threads_mutex 陣列:儲存執行緒互斥鎖;
io_threads 陣列:儲存每個 IO 執行緒的描述符。
Redis server
在和一個使用者端建立連線後,就開始了監聽使用者端的可讀事件,處理可讀事件的回撥函數就是 readQueryFromClient
// https://github.com/redis/redis/blob/6.2/src/networking.c#L2219 void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); int nread, readlen; size_t qblen; /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ // 判斷是否從使用者端延遲讀取資料 if (postponeClientRead(c)) return; ... } // https://github.com/redis/redis/blob/6.2/src/networking.c#L3746 int postponeClientRead(client *c) { // 當多執行緒 I/O 模式開啟、主執行緒沒有在處理阻塞任務時,將 client 加入非同步佇列。 if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) { // 給使用者端的flag新增CLIENT_PENDING_READ標記,表示推遲該使用者端的讀操作 c->flags |= CLIENT_PENDING_READ; // 將可獲得加入clients_pending_write列表 listAddNodeHead(server.clients_pending_read,c); return 1; } else { return 0; } }
使用 clients_pending_read 儲存了需要進行延遲讀操作的使用者端之後,這些使用者端又是如何分配給多 IO 執行緒執行的呢?
handleClientsWithPendingWritesUsingThreads 函數:該函數主要負責將 clients_pending_write 列表中的使用者端分配給 IO 執行緒進行處理。
看下如何實現
// https://github.com/redis/redis/blob/6.2/src/networking.c#L3766 int handleClientsWithPendingReadsUsingThreads(void) { // 當多執行緒 I/O 模式開啟,才能執行下面的流程 if (!server.io_threads_active || !server.io_threads_do_reads) return 0; int processed = listLength(server.clients_pending_read); if (processed == 0) return 0; // 遍歷待讀取的 client 佇列 clients_pending_read, // 根據IO執行緒的數量,讓clients_pending_read中使用者端數量對IO執行緒進行取模運算 // 取模的結果就是使用者端分配給對應IO執行緒的編號 listIter li; listNode *ln; listRewind(server.clients_pending_read,&li); int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } // 設定當前 I/O 操作為讀取操作,給每個 I/O 執行緒的計數器設定分配的任務數量, // 讓 I/O 執行緒可以開始工作:唯讀取和解析命令,不執行 io_threads_op = IO_THREADS_OP_READ; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); setIOPendingCount(j, count); } // 主執行緒自己也會去執行讀取使用者端請求命令的任務,以達到最大限度利用 CPU。 listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); readQueryFromClient(c->conn); } listEmpty(io_threads_list[0]); // 忙輪詢,等待所有 IO 執行緒完成待讀使用者端的處理 while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += getIOPendingCount(j); if (pending == 0) break; } // 遍歷待讀取的 client 佇列,清除 CLIENT_PENDING_READ標記, // 然後解析並執行所有 client 的命令。 while(listLength(server.clients_pending_read)) { ln = listFirst(server.clients_pending_read); client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; listDelNode(server.clients_pending_read,ln); serverAssert(!(c->flags & CLIENT_BLOCKED)); // client 的第一條命令已經被解析好了,直接嘗試執行。 if (processPendingCommandsAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid * processing the client later. So we just go * to the next. */ continue; } // 解析並執行 client 命令 processInputBuffer(c); // 命令執行完成之後,如果 client 中有響應資料需要回寫到使用者端,則將 client 加入到待寫出佇列 clients_pending_write if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) clientInstallWriteHandler(c); } /* Update processed count on server */ server.stat_io_reads_processed += processed; return processed; }
1、當用戶端傳送命令請求之後,會觸發 Redis 主執行緒的事件迴圈,命令處理器 readQueryFromClient 被回撥,多執行緒模式下,則會把 client 加入到 clients_pending_read 任務佇列中去,後面主執行緒再分配到 I/O 執行緒去讀取使用者端請求命令;
2、主執行緒會根據 clients_pending_read 中使用者端數量對IO執行緒進行取模運算,取模的結果就是使用者端分配給對應IO執行緒的編號;
3、忙輪詢,等待所有的執行緒完成讀取使用者端命令的操作,這一步用到了多執行緒的請求;
4、遍歷 clients_pending_read,執行所有 client 的命令,這裡就是在主執行緒中執行的,命令的執行是單執行緒的操作。
完成命令的讀取、解析以及執行之後,使用者端命令的響應資料已經存入 client->buf 或者 client->reply 中。
主迴圈在捕獲 IO 事件的時候,beforeSleep 函數會被呼叫,進而呼叫 handleClientsWithPendingWritesUsingThreads ,寫回響應資料給使用者端。
// https://github.com/redis/redis/blob/6.2/src/networking.c#L3662 int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ // 如果使用者設定的 I/O 執行緒數等於 1 或者當前 clients_pending_write 佇列中待寫出的 client // 數量不足 I/O 執行緒數的兩倍,則不用多執行緒的邏輯,讓所有 I/O 執行緒進入休眠, // 直接在主執行緒把所有 client 的相應資料回寫到使用者端。 if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { return handleClientsWithPendingWrites(); } // 喚醒正在休眠的 I/O 執行緒(如果有的話)。 if (!server.io_threads_active) startThreadedIO(); /* Distribute the clients across N different lists. */ // 和上面的handleClientsWithPendingReadsUsingThreads中的操作一樣分配使用者端給IO執行緒 listIter li; listNode *ln; listRewind(server.clients_pending_write,&li); int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; /* Remove clients from the list of pending writes since * they are going to be closed ASAP. */ if (c->flags & CLIENT_CLOSE_ASAP) { listDelNode(server.clients_pending_write, ln); continue; } int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } // 設定當前 I/O 操作為寫出操作,給每個 I/O 執行緒的計數器設定分配的任務數量, // 讓 I/O 執行緒可以開始工作,把寫出緩衝區(client->buf 或 c->reply)中的響應資料回寫到使用者端。 // 可以看到寫回操作也是多執行緒執行的 io_threads_op = IO_THREADS_OP_WRITE; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); setIOPendingCount(j, count); } // 主執行緒自己也會去執行讀取使用者端請求命令的任務,以達到最大限度利用 CPU。 listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); writeToClient(c,0); } listEmpty(io_threads_list[0]); /* Wait for all the other threads to end their work. */ // 等待所有的執行緒完成對應的工作 while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += getIOPendingCount(j); if (pending == 0) break; } // 最後再遍歷一次 clients_pending_write 佇列,檢查是否還有 client 的寫出緩衝區中有殘留資料, // 如果有,那就為 client 註冊一個命令回覆器 sendReplyToClient,等待使用者端寫就緒再繼續把資料回寫。 listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); // 檢查 client 的寫出緩衝區是否還有遺留資料。 if (clientHasPendingReplies(c) && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) { freeClientAsync(c); } } listEmpty(server.clients_pending_write); /* Update processed count on server */ server.stat_io_writes_processed += processed; return processed; }
1、也是會將 client 分配給所有的 IO 執行緒;
2、忙輪詢,等待所有的執行緒將快取中的資料寫回給使用者端,這裡寫回操作使用的多執行緒;
3、最後再遍歷 clients_pending_write,為那些還殘留有響應資料的 client 註冊命令回覆處理器 sendReplyToClient,等待使用者端可寫之後在事件迴圈中繼續回寫殘餘的響應資料。
通過上面的分析可以得出結論,Redis 多IO執行緒中多執行緒的應用
1、解析使用者端的命令的時候用到了多執行緒,但是對於使用者端命令的執行,使用的還是單執行緒;
2、給使用者端回覆資料的時候,使用到了多執行緒。
來總結下 Redis 中多執行緒的執行過程
1、Redis Server 啟動後,主執行緒會啟動一個時間迴圈(Event Loop),持續監聽事件;
2、client 到 server 的新連線,會呼叫 acceptTcpHandler 函數,之後會註冊讀事件 readQueryFromClient 函數,client 發給 server 的資料,都會在這個函數處理;
3、使用者端傳送給伺服器端的資料,不會類似 6.0 之前的版本使用 socket 直接去讀,而是會將 client 放入到 clients_pending_read 中,裡面儲存了需要進行延遲讀操作的使用者端;
4、處理 clients_pending_read 的函數 handleClientsWithPendingReadsUsingThreads,在每次事件迴圈的時候都會呼叫;
5、命令執行完成以後,回覆的內容還是會被寫入到 client 的快取區中,這些 client 和6.0之前的版本處理方式一樣,也是會被放入到 clients_pending_write(待寫回資料的使用者端);
6、6.0 對於clients_pending_write 的處理使用到了多執行緒;
通過上面的分析,我們知道,Redis 的主執行緒是單執行緒執行的,所有 Redis 中的單命令,都是原子性的。
所以對於一些場景的操作儘量去使用 Redis 中單命令去完成,就能保證命令執行的原子性。
比如對於上面的讀取-修改-寫回
操作可以使用 Redis 中的原子計數器, INCRBY(自增)、DECRBR(自減)、INCR(加1) 和 DECR(減1) 等命令。
這些命令可以直接幫助我們處理並行控制
127.0.0.1:6379> incr test-1 (integer) 1 127.0.0.1:6379> incr test-1 (integer) 2 127.0.0.1:6379> incr test-1 (integer) 3
分析下原始碼,看看這個命令是如何實現的
// https://github.com/redis/redis/blob/6.2/src/t_string.c#L617 void incrCommand(client *c) { incrDecrCommand(c,1); } void decrCommand(client *c) { incrDecrCommand(c,-1); } void incrbyCommand(client *c) { long long incr; if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != C_OK) return; incrDecrCommand(c,incr); } void decrbyCommand(client *c) { long long incr; if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != C_OK) return; incrDecrCommand(c,-incr); }
可以看到 INCRBY(自增)、DECRBR(自減)、INCR(加1) 和 DECR(減1)這幾個命令最終都是呼叫的 incrDecrCommand
// https://github.com/redis/redis/blob/6.2/src/t_string.c#L579 void incrDecrCommand(client *c, long long incr) { long long value, oldvalue; robj *o, *new; // 查詢有沒有對應的鍵值 o = lookupKeyWrite(c->db,c->argv[1]); // 判斷型別,如果value物件不是字串型別,直接返回 if (checkType(c,o,OBJ_STRING)) return; // 將字串型別的value轉換為longlong型別儲存在value中 if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return; // 備份舊的value oldvalue = value; // 判斷 incr 的值是否超過longlong型別所能表示的範圍 // 長度的範圍,十進位制 64 位有符號整數 if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) || (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) { addReplyError(c,"increment or decrement would overflow"); return; } // 計算新的 value值 value += incr; if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT && (value < 0 || value >= OBJ_SHARED_INTEGERS) && value >= LONG_MIN && value <= LONG_MAX) { new = o; o->ptr = (void*)((long)value); } else { new = createStringObjectFromLongLongForValue(value); // 如果之前的 value 物件存在 if (o) { // 重寫為 new 的值 dbOverwrite(c->db,c->argv[1],new); } else { // 如果之前沒有對應的 value,新設定 value 的值 dbAdd(c->db,c->argv[1],new); } } // 進行通知 signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id); server.dirty++; addReply(c,shared.colon); addReply(c,new); addReply(c,shared.crlf); }
1、Redis 中的命令執行都是單執行緒的,所以單命令的執行都是原子性的;
2、雖然 Redis6.0 版本引入了多執行緒,但是僅是在接收使用者端的命令和回覆使用者端的資料用到了多執行緒,實際命令的執行還是單執行緒在處理;
推薦學習:
以上就是詳細解析Redis中命令的原子性的詳細內容,更多請關注TW511.COM其它相關文章!