GO實現Redis:GO實現TCP伺服器(1)

2023-03-24 06:06:59

interface/tcp/Handler.go

type Handler interface {
   Handle(ctx context.Context, conn net.Conn)
   Close() error
}
  • Handler:業務邏輯的處理介面
    • Handle(ctx context.Context, conn net.Conn) 處理連線

tcp/server.go

type Config struct {
    Address string
}

func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
    closeChan := make(chan struct{})
    listen, err := net.Listen("tcp", cfg.Address)
    if err != nil {
       return err
   }
    logger.Info("start listen")
    ListenAndServe(listen, handler, closeChan)
    return nil
}

func ListenAndServe(listener net.Listener,
                    handler tcp.Handler,
                    closeChan <-chan struct{}) {
    ctx := context.Background()
    var waitDone sync.WaitGroup
    for true {
        conn, err := listener.Accept()
        if err != nil {
            break
        }
        logger.Info("accept link")
        waitDone.Add(1)
        go func() {
            defer func() {
                waitDone.Done()
            }()
            handler.Handler(ctx, conn)
        }()
    }
    waitDone.Wait()
}
  • Config:啟動tcp伺服器的設定
    • Address:監聽地址
  • ListenAndServe:ctx是上下文,可以傳遞一些引數。死迴圈中接收到新連線時,讓一個協程去處理連線
  • 如果listener.Accept()出錯了就會break跳出來,這時候需要等待已經服務的使用者端退出。使用WaitGroup等待客服端退出

func ListenAndServe(listener net.Listener,
                    handler tcp.Handler,
                    closeChan <-chan struct{}) {

    go func() {
       <-closeChan
       logger.Info("shutting down...")
       _ = listener.Close()
       _ = handler.Close()
   }()

    defer func() {
       _ = listener.Close()
       _ = handler.Close()
   }()

    ......
}

listener和handler在退出的時候需要關掉。如果使用者直接kill掉了程式,我們也需要關掉listener和handler,這時候要使用closeChan,一旦接收到關閉訊號,就執行關閉邏輯

func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {

    closeChan := make(chan struct{})
    sigCh := make(chan os.Signal)
    signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
    go func() {
       sig := <-sigCh
       switch sig {
          case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
          closeChan <- struct{}{}
      }
   }()
    listen, err := net.Listen("tcp", cfg.Address)
    if err != nil {
       return err
   }
    logger.Info("start listen")
    ListenAndServe(listen, handler, closeChan)
    return nil
}

當系統對程式傳送訊號時,sigCh會接收到訊號

tcp/echo.go

type EchoHandler struct {
   activeConn sync.Map
   closing    atomic.Boolean
}

EchoHandler:

  • activeConn:記錄連線
  • closing:是否正在關閉,有並行競爭,使用atomic.Boolean

type EchoClient struct {
   Conn    net.Conn
   Waiting wait.Wait
}

func (c *EchoClient) Close() error {
	c.Waiting.WaitWithTimeout(10 * time.Second)
	_ = c.Conn.Close()
	return nil
}

EchoClient:一個使用者端就是一個連線。Close方法關閉使用者端連線,超時時間設定為10s

func MakeHandler() *EchoHandler {
	return &EchoHandler{}
}

func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {
   // 連線正在關閉,不接收新連線
   if h.closing.Get() {
      _ = conn.Close()
   }

   client := &EchoClient{
      Conn: conn,
   }
   h.activeConn.Store(client, struct{}{})

   reader := bufio.NewReader(conn)
   for {
      msg, err := reader.ReadString('\n')
      if err != nil {
         if err == io.EOF {
            logger.Info("connection close")
            h.activeConn.Delete(client)
         } else {
            logger.Warn(err)
         }
         return
      }
      // 正在處理業務,不要關掉
      client.Waiting.Add(1)
      // 將資料原封不動寫回去,測試
      b := []byte(msg)
      _, _ = conn.Write(b)
      client.Waiting.Done()
   }
}

func (h *EchoHandler) Close() error {
   logger.Info("handler shutting down...")
   h.closing.Set(true)
   h.activeConn.Range(func(key interface{}, val interface{}) bool {
      client := key.(*EchoClient)
      _ = client.Close()
      return true
   })
   return nil
}
  • MakeEchoHandler:建立EchoHandler
  • Handle:處理使用者端的連線。
    • 1.連線正在關閉時,不接收新連線
    • 2.儲存新連線,value用空結構體
    • 3.使用快取區接收使用者發來的資料,使用\n作為結束的標誌
  • Close:將所有使用者端連線關掉

main.go

const configFile string = "redis.conf"

var defaultProperties = &config.ServerProperties{
   Bind: "0.0.0.0",
   Port: 6379,
}

func fileExists(filename string) bool {
   info, err := os.Stat(filename)
   return err == nil && !info.IsDir()
}

func main() {
   logger.Setup(&logger.Settings{
      Path:       "logs",
      Name:       "godis",
      Ext:        "log",
      TimeFormat: "2022-02-02",
   })

   if fileExists(configFile) {
      config.SetupConfig(configFile)
   } else {
      config.Properties = defaultProperties
   }

   err := tcp.ListenAndServeWithSignal(
      &tcp.Config{
         Address: fmt.Sprintf("%s:%d",
            config.Properties.Bind,
            config.Properties.Port),
      },
      EchoHandler.MakeHandler())
   if err != nil {
      logger.Error(err)
   }
}