Netty 學習(五):伺服器端啟動核心流程原始碼說明

2022-09-30 18:00:10

Netty 學習(五):伺服器端啟動核心流程原始碼說明

作者: Grey

原文地址:

部落格園:Netty 學習(五):伺服器端啟動核心流程原始碼說明

CSDN:Netty 學習(五):伺服器端啟動核心流程原始碼說明

說明

本文使用的 Netty 版本是 4.1.82.Final,

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.82.Final</version>
        </dependency>

伺服器端在啟動的時候,主要流程有如下幾個

  1. 建立伺服器端的 Channel

  2. 初始化伺服器端的 Channel

  3. 註冊 Selector

  4. 埠繫結

我們可以寫一個簡單的伺服器端程式碼,通過 Debug 的方式檢視這幾個關鍵流程的核心程式碼。

package source;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 程式碼閱讀
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2022/9/12
 * @since
 */
public final class SimpleServer {
    public static void main(String[] args) throws InterruptedException {
        // EventLoopGroup: 伺服器端的執行緒模型外觀類。這個執行緒要做的事情
        // 就是不停地檢測IO事件,處理IO事件,執行任務。
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 伺服器端的一個啟動輔助類。通過給它設定一系列引數來繫結埠啟動服務。
            ServerBootstrap b = new ServerBootstrap();
            b
                    // 設定伺服器端的執行緒模型。
                    // bossGroup 負責不斷接收新的連線,將新的連線交給 workerGroup 來處理。
                    .group(bossGroup, workerGroup)
                    // 設定伺服器端的 IO 型別是 NIO。Netty 通過指定 Channel 的型別來指定 IO 型別。
                    .channel(NioServerSocketChannel.class)
                    // 伺服器端啟動過程中,需要經過哪些流程。
                    .handler(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) {
                            System.out.println("channelActive");
                        }

                        @Override
                        public void channelRegistered(ChannelHandlerContext ctx) {
                            System.out.println("channelRegistered");
                        }

                        @Override
                        public void handlerAdded(ChannelHandlerContext ctx) {
                            System.out.println("handlerAdded");
                        }
                    })
                    // 用於設定一系列 Handler 來處理每個連線的資料
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {

                        }
                    });
            // 繫結埠同步等待。等伺服器端啟動完畢,才會進入下一行程式碼
            ChannelFuture f = b.bind(8888).sync();
            // 等待伺服器端關閉埠繫結,這裡的作用是讓程式不會退出
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

通過

ChannelFuture f = b.bind(8888).sync();

bind方法,進入原始碼進行檢視。

首先,進入的是AbstractBootstrap中,呼叫的最關鍵的方法是如下兩個:

……
    private ChannelFuture doBind(final SocketAddress localAddress) {
        ……
        final ChannelFuture regFuture = initAndRegister();
        ……
        doBind0(regFuture, channel, localAddress, promise);
        ……
    }
……

進入initAndResgister()方法中

……
    final ChannelFuture initAndRegister() {
        ……
        // channel 的新建
        channel = channelFactory.newChannel();
        // channel 的初始化
        init(channel);
        ……
    }
……

這裡完成了 Channel 的新建和初始化,Debug 進去,發現channelFactory.newChannel()實際上是呼叫了ReflectiveChannelFactorynewChannel方法,

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
……
    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ……
        this.constructor = clazz.getConstructor();
        ……
    }

    @Override
    public T newChannel() {
        ……
        return constructor.newInstance();
        ……
    }
……
}

這裡呼叫了反射方法,其實就是將伺服器端程式碼中的這一行.channel(NioServerSocketChannel.class)中的NioServerSocketChannel.class傳入進行物件建立,建立一個NioServerSocketChannel範例。

在建立NioServerSocketChannel的時候,呼叫了NioServerSocketChannel的構造方法,構造方法的主要邏輯如下

……
    public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
        this(newChannel(provider, family));
    }
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
        ……
            ServerSocketChannel channel =
                    SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
            return channel == null ? provider.openServerSocketChannel() : channel;
        ……
    }
……

其中provider.openServerSocketChannel()就是呼叫底層 JDK 的 API,獲取了 JDK 底層的java.nio.channels.ServerSocketChannel

通過super(null, channel, SelectionKey.OP_ACCEPT);一路跟蹤進去,進入AbstractNioChannel中,

   protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        ……
        ch.configureBlocking(false);
        ……
    }

關鍵程式碼是ch.configureBlocking(false),設定 I/O 模型為非阻塞模式。

通過super(parent)跟蹤上去,

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

其中 id 是 Netty 中每條 Channel 的唯一標識。

以上就是伺服器端 Channel 的建立過程。

接下來是伺服器端 Channel 的初始化過程,回到AbstractBootstrap.initAndResgister()方法

……
    final ChannelFuture initAndRegister() {
        ……
        // channel 的新建
        channel = channelFactory.newChannel();
        // channel 的初始化
        init(channel);
        ……
    }
……

其中的init(channel)方法就是伺服器端的 Channel 的初始化過程,Debug 進入,發現是呼叫了ServerBootstrap.init(channel)方法,


    @Override
    void init(Channel channel) {
        ……
        // 設定一些 Channel 的屬性和設定資訊
        ……
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

其核心程式碼如上,主要用於定義伺服器端啟動過程中需要執行哪些邏輯。主要分為兩塊:

  1. 一塊是新增使用者自定義的處理邏輯到伺服器端啟動流程。

  2. 另一塊是新增一個特殊的處理邏輯,ServerBootstrapAcceptor 是一個接入器,接受新請求,把新的請求傳遞給某個事件迴圈器。

以上就是伺服器端的 Channel 的初始化過程。接下來是伺服器端 Channel 的註冊 Selector 的過程。

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

在這個步驟中,我們可以看到關於 JDK 底層的操作

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

首先拿到在前面過程中建立的 JDK 底層的 Channel,然後呼叫 JDK 的 register() 方法,將 this 也即 NioServerSocketChannel 物件當作 attachment 繫結到 JDK 的 Selector 上,這樣後續從 Selector 拿到對應的事件之後,就可以把 Netty 領域的 Channel 拿出來。

接下來是伺服器端繫結埠的邏輯,見AbstractBootstrap中的doBind0方法

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

圖例

本文所有圖例見:processon: Netty學習筆記

程式碼

hello-netty

更多內容見:Netty專欄

參考資料

跟閃電俠學 Netty:Netty 即時聊天實戰與底層原理

深度解析Netty原始碼