本系列主要是為了對redis的網路模型進行學習,我會用golang實現一個reactor網路模型,並實現對redis協定的解析。
系列原始碼已經上傳github
https://github.com/HobbyBear/tinyredis/tree/chapter1
redis的網路模型是基於epoll實現的,所以這一節讓我們先基於epoll,實現一個最簡單的伺服器端使用者端通訊模型。在實現前,先來簡單的瞭解下epoll的原理。
為什麼不用golang的原生的netpoll網路框架呢,這是因為netpoll框架雖然底層也是基於epoll實現,但是它提供給開發人員使用網路io方式依然是同步阻塞模式,一個連線單獨的拿給一個協程去處理,為了更加真實的感受下redis的網路模型,我們不用netpoll框架,而是自己寫一個非阻塞的網路模型。
通常情況下伺服器端的處理使用者端請求的邏輯是使用者端每發起一個連線,伺服器端就單獨起一個執行緒去處理這個連線的請求,對於go應用程式而言,則是啟用一個協程去處理這個連線。 而採用epoll相關的api後,能夠讓我們在一個執行緒或者協程裡去處理多個連線的請求。
一個通訊端連線對應一個檔案描述符,當收到使用者端的連線請求時,可以將對應的檔案描述符加入到epoll範例關注的事件中去。
在golang裡,可以通過syscall.EpollCreate1 去建立一個epoll範例。
func EpollCreate1(flag int) (fd int, err error)
其返回結果的fd就代表epoll範例的fd,當收到使用者端的連線請求時,便可以將使用者端連線的fd,通過EpollCtl 加入到epoll範例感興趣的事件當中。
func EpollCtl(epfd int, op int, fd int, event *EpollEvent) (err error)
EpollCtl 方法引數的epfd則是EpollCreate1 返回的fd,EpollCtl的第二個引數則是代表使用者端連線的fd,通過我們在獲取到使用者端連線後,後續的行為便是檢視使用者端是否有資料傳送過來或者往使用者端傳送資料,這些在epoll api裡用event事件去表示,分別對應了讀event和寫event,這便是EpollCtl第三個引數所代表的含義。
將這些感興趣事件新增到epoll範例中後,就代表epoll範例後續會監聽這些連線的讀寫事件的到達,那麼讀寫事件到達後,使用者程式又是如何知道的呢,這就要提到epoll相關的另一個api,EpollWait。
func EpollWait(epfd int, events []EpollEvent, msec int) (n int, err error)
EpollWait的第二個引數是一個事件陣列,使用者應用程式呼叫EpollWait時傳入一個固定長度的事件陣列,然後EpollWait會將這個陣列儘可能填滿,這樣使用者程式便能知道有哪些事件型別到達了,EpollEvent型別如下所示:
type EpollEvent struct {
Events uint32
Fd int32
Pad int32
}
其中fd則代表這些事件所關聯的使用者端連線的fd,通過這個fd,我們便可以對對應連線進行讀寫操作了。
而Events是個列舉型別,比較常用的列舉以及含義如下:
型別 | 解釋 |
---|---|
EPOLLIN | 表示檔案描述符可讀。 |
EPOLLRDHUP | 表示 TCP 連線的遠端端點關閉或半關閉連線 |
EPOLLET | 表示使用邊緣觸發模式來監聽事件 |
EPOLLOUT | 表示檔案描述符可寫 |
EPOLLERR | 表示檔案描述符發生錯誤時發生,這個事件不通過EpollCtl新增也能觸發 |
EPOLLHUP | 與EPOLLRDHUP類似同樣表示連線關閉,在不支援EPOLLRDHUP的linux版本會觸發,這個事件不通過EpollCtl新增也能觸發 |
雖然epoll event還有其他型別,不過一般情況下監控這幾種型別就足夠了,golang的netpoll框架在新增連線的檔案描述符時事件時也只新增了這幾種型別。netpoll的部分原始碼如下:
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
瞭解完epoll的一些概念以後,現在來看下我們需要實現的網路框架模型是怎樣的。我們先實現一個最簡單的網路通訊框架,使用者端傳送來訊息,然後伺服器端列印收到的訊息。
如上圖所示,我們收到新的連線後,會呼叫epoll範例的EpollCtl方法將連線的可讀事件新增到epoll範例中,接著呼叫EpollWait方法等待使用者端再次傳送訊息時,讓連線變為可讀。
下面是程式的效果測試結果
啟動了兩個終端,其中右邊的終端連線上redis以後,傳送了1231,然後左邊的終端收到後將收到的訊息列印出來。
接著,我們來看看實際程式碼編寫邏輯。
我們定義一個Server的結構體來代表epoll的server。
Conn是對golang原生連線型別net.Conn的包裝,。
poll結構體是封裝了對epoll api的呼叫。
type Server struct {
Poll *poll
addr string
listener net.Listener
ConnMap sync.Map
}
type Conn struct {
s *Server
conn *net.TCPConn
nfd int
}
type poll struct {
EpollFd int
}
接著來看下如何啟動一個Server,NewServer是返回一個Server範例,Server 呼叫Run方法後,才算Server正式啟動了起來。
在Run 方法裡,構建監聽連線的listener,構建一個epoll範例,用於後續對事件的監聽,同時把監聽握手連線和處理連線可讀資料分成了兩個協程分別用accept方法,和handler方法執行。
func NewServ(addr string) *Server {
return &Server{addr: addr, ConnMap: sync.Map{}}
}
func (s *Server) Run() error {
listener, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
s.listener = listener
epollFD, err := syscall.EpollCreate1(0)
if err != nil {
return err
}
s.Poll = &poll{EpollFd: epollFD}
go s.accept()
go s.handler()
ch := make(chan int)
<-ch
return nil
}
accept 方法裡執行的邏輯就是將握手完成的連結從全連線佇列裡取出來,將其連線的檔案描述符和連線儲存到一個map裡, 然後將對應的檔案描述符通過epoll的epollCtl 系統呼叫監聽它的可讀事件,後續使用者端再使用這個連線傳送資料時,epoll就能監聽到了。
func (s *Server) accept() {
for {
acceptConn, err := s.listener.Accept()
if err != nil {
return
}
var nfd int
rawConn, err := acceptConn.(*net.TCPConn).SyscallConn()
if err != nil {
log.Error(err.Error())
continue
}
rawConn.Control(func(fd uintptr) {
nfd = int(fd)
})
// 設定為非阻塞狀態
err = syscall.SetNonblock(nfd, true)
if err != nil {
return
}
err = s.Poll.AddListen(nfd)
if err != nil {
log.Error(err.Error())
continue
}
c := &Conn{
conn: acceptConn.(*net.TCPConn),
nfd: nfd,
s: s,
}
s.ConnMap.Store(nfd, c)
}
}
handler裡的邏輯則是通過epoll Wait系統呼叫等待可讀事件產生,到達後,根據事件的檔案描述符找到對應連線,然後讀取對應連線的資料。
func (s *Server) handler() {
for {
events, err := s.Poll.WaitEvents()
if err != nil {
log.Error(err.Error())
continue
}
for _, e := range events {
connInf, ok := s.ConnMap.Load(int(e.FD))
if !ok {
continue
}
conn := connInf.(*Conn)
if IsClosedEvent(e.Type) {
conn.Close()
continue
}
if IsReadableEvent(e.Type) {
buf := make([]byte, 1024)
rd, err := conn.Read(buf)
if err != nil && err != syscall.EAGAIN {
conn.Close()
continue
}
fmt.Println("收到訊息", string(buf[:rd]))
}
}
}
}
主幹程式碼是比較容易理解的,但是用golang使用epoll 時有幾個點 需要注意下:
第一點是IsReadableEvent 的判斷方式,epoll的每個event 都有一個位掩碼,位掩碼是什麼意思呢?比如EPOLLIN 的值 是0x1,二進位制就是00000001,EPOLLHUP 的值是0x10,二進位制表示是00010000,那麼epoll wait系統呼叫的event要如何同時表示同一個檔案描述符同時擁有這兩個事件呢? epoll 的event會將對應的位掩碼設定為和對應事件一致,比如同時擁有EPOLLIN和EPOLLHUP,那麼event的值將會是00010001,所以利用與位運算是不是就能判斷event是否具有某個事件了。因為1只有與1進行與運算結果才為1。
func IsReadableEvent(event uint32) bool {
if event&syscall.EPOLLIN != 0 {
return true
}
return false
}
第二點是如何讀取連線的資料, 我們後續要達到的目的是在同一個事件迴圈裡能處理多個連線,所以要保證讀取連線中的資料時不能阻塞,通過呼叫golang的net.Conn下的read方法是阻塞的,其read實現最終會呼叫到下面