Netty 學習(四):ChannelHandler 的事件傳播和生命週期

2022-09-20 06:02:22

Netty 學習(四):ChannelHandler 的事件傳播和生命週期

作者: Grey

原文地址:

部落格園:Netty 學習(四):ChannelHandler 的事件傳播和生命週期

CSDN:Netty 學習(四):ChannelHandler 的事件傳播和生命週期

ChannelHandler 的事件傳播

在通訊使用者端和伺服器端,處理的流程大致有如下步驟

輸入---> 解碼 ---> 根據不同的訊息指令解析封包 ---> 編碼 ---> 輸出

在『根據不同的訊息指令解析封包』這個步驟中,經常需要用if-else來判斷不同的指令型別並進行解析。邏輯一旦複雜,就會讓程式碼變的極為臃腫,難以維護。

Netty 中的 Pipeline 和 ChannelHandler 就是用來解決這個問題,它通過責任鏈設計模式來組織程式碼邏輯,並且能夠支援邏輯的動態新增和刪除。

在 Netty 框架中,一個連線對應一個 Channel,這個 Channel 的所有處理邏輯都在 ChannelPipeline 的物件裡,ChannelPipeline 是雙向連結串列結構,它和 Channel 之間是一對一的關係。這個雙向連結串列每個節點都是一個 ChannelHandlerContext 物件,這個物件可以獲得和 Channel 相關的所有上下文資訊。

範例圖如下

ChannelHandler 包括兩個子介面:ChannelInboundHandler 和 ChannelOutboundHandler,分別用於處理讀資料和寫資料的邏輯。

我們可以寫一個範例來說明 ChannelHandler 的事件傳播順序(包含 ChannelInboundHandler 和 ChannelOutboundHandler)

在伺服器端設定如下

      ch.pipeline().addLast(new InHandlerA());
      ch.pipeline().addLast(new InHandlerB());
      ch.pipeline().addLast(new InHandlerC());
      ch.pipeline().addLast(new OutHandlerA());
      ch.pipeline().addLast(new OutHandlerB());
      ch.pipeline().addLast(new OutHandlerC());

其中 InHandlerA 程式碼如下(InHandlerB 和 InHandlerC 類似)

package snippet.chat.server.inbound;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2022/9/19
 * @since
 */
public class InHandlerA extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("in-A:" + msg);
        super.channelRead(ctx, msg);
    }
}

OutHandlerA 程式碼如下(OutHandlerB 和 OutHandlerC 類似)

package snippet.chat.server.outbound;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

/**
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2022/9/19
 * @since
 */
public class OutHandlerA extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("out-A:" + msg);
        super.write(ctx, msg, promise);
    }
}

執行伺服器端和使用者端,使用使用者端向伺服器端傳送一些資料,可以看到如下紀錄檔

in-A:PooledUnsafeDirectByteBuf(ridx: 0, widx: 108, cap: 2048)
in-B:PooledUnsafeDirectByteBuf(ridx: 0, widx: 108, cap: 2048)
in-C:PooledUnsafeDirectByteBuf(ridx: 0, widx: 108, cap: 2048)
......
out-C:PooledUnsafeDirectByteBuf(ridx: 0, widx: 39, cap: 256)
out-B:PooledUnsafeDirectByteBuf(ridx: 0, widx: 39, cap: 256)
out-A:PooledUnsafeDirectByteBuf(ridx: 0, widx: 39, cap: 256)

由此可以知:inboundHandler 的新增順序和執行順序一致,而 outboundHandler 的新增順序和執行順序相反。 如下圖範例

ChannelHandler 的生命週期

可以用程式碼來說明 ChannelHandler 的生命週期,我們基於 ChannelInboundHandlerAdapter,定義了一個 LifeCycleTestHandler,完整程式碼如下

package snippet.chat.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2022/9/19
 * @since
 */
public class LifeCycleTestHandler extends ChannelInboundHandlerAdapter {
    // 這個回撥方法表示當前Channel的所有邏輯處理已經和某個NIO執行緒建立了繫結關係,接收新的連線,然後建立一個執行緒來處理這個連線的讀寫,只不過在Netty裡使用了執行緒池的方式,
    // 只需要從執行緒池裡去抓一個執行緒繫結在這個Channel上即可。這裡的NIO執行緒通常指NioEventLoop
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 繫結到執行緒(NioEventLoop):channelRegistered()");
        super.channelRegistered(ctx);
    }

    // 這個回撥表明與這個連線對應的NIO執行緒移除了對這個連線的處理。
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 取消執行緒(NioEventLoop)的繫結:channelUnregistered()");
        super.channelUnregistered(ctx);
    }

    // 當Channel的所有業務邏輯鏈準備完畢(即Channel的Pipeline中已經新增完所有的Handler),
// 以及繫結好一個NIO執行緒之後,這個連線才真正被啟用,接下來就會回撥到此方法。
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 準備就緒:channelActive()");
        super.channelActive(ctx);
    }

    // 這個連線在TCP層面已經不再是ESTABLISH狀態了。
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 被關閉:channelInactive()");
        super.channelInactive(ctx);
    }

    // 使用者端向伺服器端傳送資料,每次都會回撥此方法,表示有資料可讀。
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channel 有資料可讀:channelRead()");
        super.channelRead(ctx, msg);
    }

    // 伺服器端每讀完一次完整的資料,都回撥該方法,表示資料讀取完畢。
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 某次資料讀完:channelReadComplete()");
        super.channelReadComplete(ctx);
    }

    // 表示在當前Channel中,已經成功新增了一個Handler處理器。
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("邏輯處理器被新增:handlerAdded()");
        super.handlerAdded(ctx);
    }

    // 我們給這個連線新增的所有業務邏輯處理器都被移除。
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("邏輯處理器被移除:handlerRemoved()");
        super.handlerRemoved(ctx);
    }
}


我們在伺服器端新增這個 Handler,然後啟動伺服器端和使用者端,可以看到服務檯首先輸出如下紀錄檔

邏輯處理器被新增:handlerAdded()
channel 繫結到執行緒(NioEventLoop):channelRegistered()
channel 準備就緒:channelActive()
channel 有資料可讀:channelRead()
Mon Sep 19 22:49:49 CST 2022: 收到使用者端登入請求……
Mon Sep 19 22:49:49 CST 2022: 登入成功!
channel 某次資料讀完:channelReadComplete()

由紀錄檔可以看到,ChannelHandler 執行順序為:

handlerAdded()->channelRegistered()->channelActive()->channelRead()->channelReadComplete()

關閉使用者端,保持伺服器端不關閉,在伺服器端此時觸發了 Channel 的關閉,列印紀錄檔如下

channel 被關閉:channelInactive()
channel 取消執行緒(NioEventLoop)的繫結:channelUnregistered()
邏輯處理器被移除:handlerRemoved()

如上述紀錄檔可知,ChannelHandler 的執行順序是

channelInactive()->channelUnregistered()->handlerRemoved()

整個 ChannelHandler 的生命週期如下圖所示

圖例

本文所有圖例見:processon: Netty學習筆記

程式碼

hello-netty

更多內容見:Netty專欄

參考資料

跟閃電俠學 Netty:Netty 即時聊天實戰與底層原理

深度解析Netty原始碼