interface/tcp/Handler.go
type Handler interface {
Handle(ctx context.Context, conn net.Conn)
Close() error
}
tcp/server.go
type Config struct {
Address string
}
func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
closeChan := make(chan struct{})
listen, err := net.Listen("tcp", cfg.Address)
if err != nil {
return err
}
logger.Info("start listen")
ListenAndServe(listen, handler, closeChan)
return nil
}
func ListenAndServe(listener net.Listener,
handler tcp.Handler,
closeChan <-chan struct{}) {
ctx := context.Background()
var waitDone sync.WaitGroup
for true {
conn, err := listener.Accept()
if err != nil {
break
}
logger.Info("accept link")
waitDone.Add(1)
go func() {
defer func() {
waitDone.Done()
}()
handler.Handler(ctx, conn)
}()
}
waitDone.Wait()
}
func ListenAndServe(listener net.Listener,
handler tcp.Handler,
closeChan <-chan struct{}) {
go func() {
<-closeChan
logger.Info("shutting down...")
_ = listener.Close()
_ = handler.Close()
}()
defer func() {
_ = listener.Close()
_ = handler.Close()
}()
......
}
listener和handler在退出的時候需要關掉。如果使用者直接kill掉了程式,我們也需要關掉listener和handler,這時候要使用closeChan,一旦接收到關閉訊號,就執行關閉邏輯
func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
closeChan := make(chan struct{})
sigCh := make(chan os.Signal)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
go func() {
sig := <-sigCh
switch sig {
case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
closeChan <- struct{}{}
}
}()
listen, err := net.Listen("tcp", cfg.Address)
if err != nil {
return err
}
logger.Info("start listen")
ListenAndServe(listen, handler, closeChan)
return nil
}
當系統對程式傳送訊號時,sigCh會接收到訊號
tcp/echo.go
type EchoHandler struct {
activeConn sync.Map
closing atomic.Boolean
}
EchoHandler:
type EchoClient struct {
Conn net.Conn
Waiting wait.Wait
}
func (c *EchoClient) Close() error {
c.Waiting.WaitWithTimeout(10 * time.Second)
_ = c.Conn.Close()
return nil
}
EchoClient:一個使用者端就是一個連線。Close方法關閉使用者端連線,超時時間設定為10s
func MakeHandler() *EchoHandler {
return &EchoHandler{}
}
func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {
// 連線正在關閉,不接收新連線
if h.closing.Get() {
_ = conn.Close()
}
client := &EchoClient{
Conn: conn,
}
h.activeConn.Store(client, struct{}{})
reader := bufio.NewReader(conn)
for {
msg, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
logger.Info("connection close")
h.activeConn.Delete(client)
} else {
logger.Warn(err)
}
return
}
// 正在處理業務,不要關掉
client.Waiting.Add(1)
// 將資料原封不動寫回去,測試
b := []byte(msg)
_, _ = conn.Write(b)
client.Waiting.Done()
}
}
func (h *EchoHandler) Close() error {
logger.Info("handler shutting down...")
h.closing.Set(true)
h.activeConn.Range(func(key interface{}, val interface{}) bool {
client := key.(*EchoClient)
_ = client.Close()
return true
})
return nil
}
main.go
const configFile string = "redis.conf"
var defaultProperties = &config.ServerProperties{
Bind: "0.0.0.0",
Port: 6379,
}
func fileExists(filename string) bool {
info, err := os.Stat(filename)
return err == nil && !info.IsDir()
}
func main() {
logger.Setup(&logger.Settings{
Path: "logs",
Name: "godis",
Ext: "log",
TimeFormat: "2022-02-02",
})
if fileExists(configFile) {
config.SetupConfig(configFile)
} else {
config.Properties = defaultProperties
}
err := tcp.ListenAndServeWithSignal(
&tcp.Config{
Address: fmt.Sprintf("%s:%d",
config.Properties.Bind,
config.Properties.Port),
},
EchoHandler.MakeHandler())
if err != nil {
logger.Error(err)
}
}