本系列Netty原始碼解析文章基於 4.1.56.Final版本
大家第一眼看到這幅流程圖,是不是腦瓜子嗡嗡的呢?
大家先不要驚慌,問題不大,本文筆者的目的就是要讓大家清晰的理解這幅流程圖,從而深刻的理解Netty Reactor的啟動全流程,包括其中涉及到的各種程式碼設計實現細節。
在上篇文章《聊聊Netty那些事兒之Reactor在Netty中的實現(建立篇)》中我們詳細介紹了Netty伺服器端核心引擎元件主從Reactor組模型 NioEventLoopGroup
以及Reactor模型 NioEventLoop
的建立過程。最終我們得到了netty Reactor模型的執行骨架如下:
現在Netty伺服器端程式的骨架是搭建好了,本文我們就基於這個骨架來深入剖析下Netty伺服器端的啟動過程。
我們繼續回到上篇文章提到的Netty伺服器端程式碼模板中,在建立完主從Reactor執行緒組:bossGroup
,workerGroup
後,接下來就開始設定Netty伺服器端的啟動輔助類ServerBootstrap
了。
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//建立主從Reactor執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//設定主從Reactor
.channel(NioServerSocketChannel.class)//設定主Reactor中的channel型別
.option(ChannelOption.SO_BACKLOG, 100)//設定主Reactor中channel的option選項
.handler(new LoggingHandler(LogLevel.INFO))//設定主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {//設定從Reactor中註冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 繫結埠啟動服務,開始監聽accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
在上篇文章中我們對程式碼模板中涉及到ServerBootstrap
的一些設定方法做了簡單的介紹,大家如果忘記的話,可以在返回去回顧一下。
ServerBootstrap類
其實沒有什麼特別的邏輯,主要是對Netty啟動過程中需要用到的一些核心資訊進行設定管理,比如:
Netty的核心引擎元件主從Reactor執行緒組: bossGroup,workerGroup
。通過ServerBootstrap#group方法
設定。
Netty伺服器端使用到的Channel型別:NioServerSocketChannel
,通過ServerBootstrap#channel方法
設定。
以及設定NioServerSocketChannel
時用到的SocketOption
。SocketOption
用於設定底層JDK NIO Socket的一些選項。通過ServerBootstrap#option方法
進行設定。
主ReactorGroup中的MainReactor管理的Channel型別為
NioServerSocketChannel
,如圖所示主要用來監聽埠,接收使用者端連線,為使用者端建立初始化NioSocketChannel
,然後採用round-robin
輪詢的方式從圖中從ReactorGroup中選擇一個SubReactor與該使用者端NioSocketChannel
進行繫結。
從ReactorGroup中的SubReactor管理的Channel型別為
NioSocketChannel
,它是netty中定義使用者端連線的一個模型,每個連線對應一個。如圖所示SubReactor負責監聽處理繫結在其上的所有NioSocketChannel
上的IO事件。
NioServerSocketChannel
和使用者端NioSocketChannel
對應pipeline
中指定的ChannelHandler
。用於後續Channel向Reactor註冊成功之後,初始化Channel裡的pipeline。不管是伺服器端用到的
NioServerSocketChannel
還是使用者端用到的NioSocketChannel
,每個Channel範例
都會有一個Pipeline
,Pipeline
中有多個ChannelHandler
用於編排處理對應Channel
上感興趣的IO事件
。
ServerBootstrap
結構中包含了netty伺服器端程式啟動的所有設定資訊,在我們介紹啟動流程之前,先來看下ServerBootstrap
的原始碼結構:
ServerBootstrap
的繼承結構比較簡單,繼承層次的職責分工也比較明確。
ServerBootstrap
主要負責對主從Reactor執行緒組
相關的設定進行管理,其中帶child字首的設定方法
是對從Reactor執行緒組
的相關設定管理。從Reactor執行緒組
中的Sub Reactor
負責管理的使用者端NioSocketChannel
相關設定儲存在ServerBootstrap
結構中。
父類別AbstractBootstrap
則是主要負責對主Reactor執行緒組
相關的設定進行管理,以及主Reactor執行緒組
中的Main Reactor
負責處理的伺服器端ServerSocketChannel
相關的設定管理。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//設定主從Reactor
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//Main Reactor執行緒組
volatile EventLoopGroup group;
//Sub Reactor執行緒組
private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//父類別管理主Reactor執行緒組
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
}
ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//用於建立ServerSocketChannel ReflectiveChannelFactory
private volatile ChannelFactory<? extends C> channelFactory;
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
}
在向ServerBootstrap
設定伺服器端ServerSocketChannel
的channel
方法中,其實是建立了一個ChannelFactory
工廠範例ReflectiveChannelFactory
,在Netty伺服器端啟動的過程中,會通過這個ChannelFactory
去建立相應的Channel
範例。
我們可以通過這個方法來設定netty的IO模型,下面為ServerSocketChannel
在不同IO模型下的實現:
BIO | NIO | AIO |
---|---|---|
OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
EventLoopGroup
Reactor執行緒組在不同IO模型下的實現:
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
我們只需要將IO模型
的這些核心介面對應的實現類字首
改為對應IO模型
的字首,就可以輕鬆在Netty中完成對IO模型
的切換。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
//NioServerSocketChannelde 構造器
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
//反射獲取NioServerSocketChannel的構造器
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
//建立NioServerSocketChannel範例
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}
從類的簽名我們可以看出,這個工廠類是通過泛型
加反射
的方式來建立對應的Channel
範例。
T extends Channel
表示的是要通過工廠類建立的Channel型別
,這裡我們初始化的是NioServerSocketChannel
。ReflectiveChannelFactory
的構造器中通過反射
的方式獲取NioServerSocketChannel
的構造器。newChannel
方法中通過構造器反射建立NioServerSocketChannel
範例。注意這時只是設定階段,NioServerSocketChannel
此時並未被建立。它是在啟動的時候才會被建立出來。
ServerBootstrap b = new ServerBootstrap();
//設定被MainReactor管理的NioServerSocketChannel的Socket選項
b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
//serverSocketChannel中的ChannelOption設定
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
}
無論是伺服器端的NioServerSocketChannel
還是使用者端的NioSocketChannel
它們的相關底層Socket選項ChannelOption
設定全部存放於一個Map
型別的資料結構中。
由於使用者端NioSocketChannel
是由從Reactor執行緒組
中的Sub Reactor
來負責處理,所以涉及到使用者端NioSocketChannel
所有的方法和設定全部是以child
字首開頭。
ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//使用者端SocketChannel對應的ChannelOption設定
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
ObjectUtil.checkNotNull(childOption, "childOption");
synchronized (childOptions) {
if (value == null) {
childOptions.remove(childOption);
} else {
childOptions.put(childOption, value);
}
}
return this;
}
}
相關的底層Socket選項,netty全部列舉在ChannelOption類中,筆者這裡就不一一列舉了,在本系列後續相關的文章中,筆者還會為大家詳細的介紹這些引數的作用。
public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
..................省略..............
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
..................省略..............
}
//serverSocketChannel中pipeline裡的handler(主要是acceptor)
private volatile ChannelHandler handler;
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
向NioServerSocketChannel
中的Pipeline
新增ChannelHandler
分為兩種方式:
顯式新增:
顯式新增的方式是由使用者在main執行緒中通過ServerBootstrap#handler
的方式新增。如果需要新增多個ChannelHandler
,則可以通過ChannelInitializer
向pipeline
中進行新增。關於
ChannelInitializer
後面筆者會有詳細介紹,這裡大家只需要知道ChannelInitializer
是一種特殊的ChannelHandler
,用於初始化pipeline
。適用於向pipeline中新增多個ChannelHandler的場景。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//設定主從Reactor
.channel(NioServerSocketChannel.class)//設定主Reactor中的channel型別
.handler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(channelhandler1)
.addLast(channelHandler2)
......
.addLast(channelHandler3);
}
})
隱式新增:
隱式新增主要新增的就是主ReactorGroup
的核心元件也就是下圖中的acceptor
,Netty中的實現為ServerBootstrapAcceptor
,本質上也是一種ChannelHandler
,主要負責在使用者端連線建立好後,初始化使用者端NioSocketChannel
,在從Reactor執行緒組中
選取一個Sub Reactor
,將使用者端NioSocketChannel
註冊到Sub Reactor
中的selector
上。隱式新增
ServerBootstrapAcceptor
是由Netty框架在啟動的時候負責新增,使用者無需關心。
在本例中,NioServerSocketChannel
的PipeLine
中只有兩個ChannelHandler
,一個由使用者在外部顯式新增的LoggingHandler
,另一個是由Netty框架隱式新增的ServerBootstrapAcceptor
。
其實我們在實際專案使用的過程中,不會向netty伺服器端
NioServerSocketChannel
新增額外的ChannelHandler,NioServerSocketChannel
只需要專心做好自己最重要的本職工作接收使用者端連線就好了。這裡額外新增一個LoggingHandler
只是為了向大家展示ServerBootstrap
的設定方法。
final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//設定從Reactor中註冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
//socketChannel中pipeline中的處理handler
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
向用戶端NioSocketChannel
中的Pipeline
裡新增ChannelHandler
完全是由使用者自己控制顯式新增,新增的數量不受限制。
由於在Netty的IO執行緒模型
中,是由單個Sub Reactor執行緒
負責執行使用者端NioSocketChannel
中的Pipeline
,一個Sub Reactor執行緒
負責處理多個NioSocketChannel
上的IO事件
,如果Pipeline
中的ChannelHandler
新增的太多,就會影響Sub Reactor執行緒
執行其他NioSocketChannel
上的Pipeline
,從而降低IO處理效率
,降低吞吐量。
所以Pipeline
中的ChannelHandler
不易新增過多,並且不能再ChannelHandler
中執行耗時的業務處理任務。
在我們通過ServerBootstrap
設定netty伺服器端啟動資訊的時候,無論是向伺服器端NioServerSocketChannel
的pipeline中新增ChannelHandler,還是向用戶端NioSocketChannel
的pipeline中新增ChannelHandler,當涉及到多個ChannelHandler新增的時候,我們都會用到ChannelInitializer
,那麼這個ChannelInitializer
究竟是何方聖神,為什麼要這樣做呢?我們接著往下看~~
首先ChannelInitializer
它繼承於ChannelHandler
,它自己本身就是一個ChannelHandler,所以它可以新增到childHandler
中。
其他的父類別大家這裡可以不用管,後面文章中筆者會一一為大家詳細介紹。
那為什麼不直接新增ChannelHandler
而是選擇用ChannelInitializer
呢?
這裡主要有兩點原因:
前邊我們提到,使用者端NioSocketChannel
是在伺服器端accept連線後,在伺服器端NioServerSocketChannel
中被建立出來的。但是此時我們正處於設定ServerBootStrap
階段,伺服器端還沒有啟動,更沒有使用者端連線上來,此時使用者端NioSocketChannel
還沒有被建立出來,所以也就沒辦法向用戶端NioSocketChannel
的pipeline中新增ChannelHandler
。
使用者端NioSocketChannel
中Pipeline
裡可以新增任意多個ChannelHandler
,但是Netty框架無法預知使用者到底需要新增多少個ChannelHandler
,所以Netty框架提供了回撥函數ChannelInitializer#initChannel
,使使用者可以自定義ChannelHandler
的新增行為。
當用戶端NioSocketChannel
註冊到對應的Sub Reactor
上後,緊接著就會初始化NioSocketChannel
中的Pipeline
,此時Netty框架會回撥ChannelInitializer#initChannel
執行使用者自定義的新增邏輯。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//當channelRegister事件發生時,呼叫initChannel初始化pipeline
if (initChannel(ctx)) {
.................省略...............
} else {
.................省略...............
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
//此時客戶單NioSocketChannel已經建立並初始化好了
initChannel((C) ctx.channel());
} catch (Throwable cause) {
.................省略...............
} finally {
.................省略...............
}
return true;
}
return false;
}
protected abstract void initChannel(C ch) throws Exception;
.................省略...............
}
這裡由netty框架回撥的ChannelInitializer#initChannel方法
正是我們自定義的新增邏輯。
final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//設定從Reactor中註冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
到此為止,Netty伺服器端啟動所需要的必要設定資訊,已經全部存入ServerBootStrap
啟動輔助類中。
接下來要做的事情就是伺服器端的啟動了。
// Start the server. 繫結埠啟動服務,開始監聽accept事件
ChannelFuture f = serverBootStrap.bind(PORT).sync();
經過前面的鋪墊終於來到了本文的核心內容----Netty伺服器端的啟動過程。
如程式碼模板中的範例所示,Netty伺服器端的啟動過程封裝在io.netty.bootstrap.AbstractBootstrap#bind(int)
函數中。
接下來我們看一下Netty伺服器端在啟動過程中究竟幹了哪些事情?
大家看到這副啟動流程圖先不要慌,接下來的內容筆者會帶大家各個擊破它,在文章的最後保證讓大家看懂這副流程圖。
我們先來從netty伺服器端啟動的入口函數開始我們今天的原始碼解析旅程:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
//校驗Netty核心元件是否設定齊全
validate();
//伺服器端開始啟動,繫結埠地址,接收使用者端連線
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//非同步建立,初始化,註冊ServerSocketChannel到main reactor上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
........serverSocketChannel向Main Reactor註冊成功後開始繫結埠....,
} else {
//如果此時註冊操作沒有完成,則向regFuture新增operationComplete回撥函數,註冊成功後回撥。
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
........serverSocketChannel向Main Reactor註冊成功後開始繫結埠....,
});
return promise;
}
}
Netty伺服器端的啟動流程總體如下:
建立伺服器端NioServerSocketChannel
並初始化。
將伺服器端NioServerSocketChannel
註冊到主Reactor執行緒組
中。
註冊成功後,開始初始化NioServerSocketChannel
中的pipeline,然後在pipeline中觸發channelRegister事件。
隨後由NioServerSocketChannel
繫結埠地址。
繫結埠地址成功後,向NioServerSocketChannel
對應的Pipeline
中觸發傳播ChannelActive事件
,在ChannelActive事件回撥
中向Main Reactor
註冊OP_ACCEPT事件
,開始等待使用者端連線。伺服器端啟動完成。
當netty伺服器端啟動成功之後,最終我們會得到如下結構的陣型,開始枕戈待旦,準備接收使用者端的連線,Reactor開始運轉。
接下來,我們就來看下Netty原始碼是如何實現以上步驟的~~
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//建立NioServerSocketChannel
//ReflectiveChannelFactory通過泛型,反射,工廠的方式靈活建立不同型別的channel
channel = channelFactory.newChannel();
//初始化NioServerSocketChannel
init(channel);
} catch (Throwable t) {
..............省略.................
}
//向MainReactor註冊ServerSocketChannel
ChannelFuture regFuture = config().group().register(channel);
..............省略.................
return regFuture;
}
從函數命名中我們可以看出,這個函數主要做的事情就是首先建立NioServerSocketChannel
,並對NioServerSocketChannel
進行初始化,最後將NioServerSocketChannel
註冊到Main Reactor
中。
還記得我們在介紹ServerBootstrap
啟動輔助類設定伺服器端ServerSocketChannel
型別的時候提到的工廠類ReflectiveChannelFactory
嗎?
因為當時我們在設定ServerBootstrap
啟動輔助類的時候,還沒到啟動階段,而設定階段並不是建立具體ServerSocketChannel
的時機。
所以Netty通過工廠模式
將要建立的ServerSocketChannel
的型別(通過泛型指定)以及 建立的過程(封裝在newChannel函數中
)統統先封裝在工廠類ReflectiveChannelFactory
中。
ReflectiveChannelFactory
通過泛型
,反射
,工廠
的方式靈活
建立不同型別的channel
等待建立時機來臨,我們呼叫儲存在ServerBootstrap
中的channelFactory
直接進行建立。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}
下面我們來看下NioServerSocketChannel
的構建過程:
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
//SelectorProvider(用於建立Selector和Selectable Channels)
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
//建立JDK NIO ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
//ServerSocketChannel相關的設定
private final ServerSocketChannelConfig config;
public NioServerSocketChannel(ServerSocketChannel channel) {
//父類別AbstractNioChannel中儲存JDK NIO原生ServerSocketChannel以及要監聽的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中設定用於Channel接收資料用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
首先呼叫newSocket
建立JDK NIO 原生ServerSocketChannel
,這裡呼叫了SelectorProvider#openServerSocketChannel
來建立JDK NIO 原生ServerSocketChannel
,我們在上篇文章《聊聊Netty那些事兒之Reactor在Netty中的實現(建立篇)》中詳細的介紹了SelectorProvider
相關內容,當時是用SelectorProvider
來建立Reactor
中的Selector
。大家還記得嗎??
通過父類別構造器設定NioServerSocketChannel
感興趣的IO事件
,這裡設定的是SelectionKey.OP_ACCEPT
事件。並將JDK NIO 原生ServerSocketChannel
封裝起來。
建立Channel
的設定類NioServerSocketChannelConfig
,在設定類中封裝了對Channel底層
的一些設定行為,以及JDK中的ServerSocket
。以及建立NioServerSocketChannel
接收資料用的Buffer
分配器AdaptiveRecvByteBufAllocator
。
NioServerSocketChannelConfig
沒什麼重要的東西,我們這裡也不必深究,它就是管理NioServerSocketChannel
相關的設定,這裡唯一需要大家注意的是這個用於Channel
接收資料用的Buffer分配器
AdaptiveRecvByteBufAllocator,我們後面在介紹Netty如何接收連線的時候還會提到。
NioServerSocketChannel
的整體構建過程介紹完了,現在我們來按照繼承層次再回過頭來看下NioServerSocketChannel
的層次構建,來看下每一層都建立了什麼,封裝了什麼,這些資訊都是Channel
的核心資訊,所以有必要了解一下。
在NioServerSocketChannel
的建立過程中,我們主要關注繼承結構圖中紅框標註的三個類,其他的我們佔時先不用管。
其中AbstractNioMessageChannel類
主要是對NioServerSocketChannel
底層讀寫行為的封裝和定義,比如accept接收使用者端連線。這個我們後續會介紹到,這裡我們並不展開。
public abstract class AbstractNioChannel extends AbstractChannel {
//JDK NIO原生Selectable Channel
private final SelectableChannel ch;
// Channel監聽事件集合 這裡是SelectionKey.OP_ACCEPT事件
protected final int readInterestOp;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//設定Channel為非阻塞 配合IO多路複用模型
ch.configureBlocking(false);
} catch (IOException e) {
.............省略................
}
}
}
封裝由SelectorProvider
建立出來的JDK NIO原生ServerSocketChannel
。
封裝Channel
在建立時指定感興趣的IO事件
,對於NioServerSocketChannel
來說感興趣的IO事件
為OP_ACCEPT事件
。
設定JDK NIO原生ServerSocketChannel
為非阻塞模式, 配合IO多路複用模型。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//channel是由建立層次的,比如ServerSocketChannel 是 SocketChannel的 parent
private final Channel parent;
//channel全域性唯一ID machineId+processId+sequence+timestamp+random
private final ChannelId id;
//unsafe用於封裝對底層socket的相關操作
private final Unsafe unsafe;
//為channel分配獨立的pipeline用於IO事件編排
private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel全域性唯一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用於定義實現對Channel的底層操作
unsafe = newUnsafe();
//為channel分配獨立的pipeline用於IO事件編排
pipeline = newChannelPipeline();
}
}
Netty中的Channel建立
是有層次的,這裡的parent屬性
用來儲存上一級的Channel
,比如這裡的NioServerSocketChannel
是頂級Channel
,所以它的parent = null
。使用者端NioSocketChannel
是由NioServerSocketChannel
建立的,所以它的parent = NioServerSocketChannel
。
為Channel
分配全域性唯一的ChannelId
。ChannelId
由機器Id(machineId
),程序Id(processId
),序列號(sequence
),時間戳(timestamp
),亂數(random
)構成
private DefaultChannelId() {
data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
int i = 0;
// machineId
System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
i += MACHINE_ID.length;
// processId
i = writeInt(i, PROCESS_ID);
// sequence
i = writeInt(i, nextSequence.getAndIncrement());
// timestamp (kind of)
i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
// random
int random = PlatformDependent.threadLocalRandom().nextInt();
i = writeInt(i, random);
assert i == data.length;
hashCode = Arrays.hashCode(data);
}
NioServerSocketChannel
的底層操作類Unsafe
。這裡建立的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
。
Unsafe
為Channel介面
的一個內部介面,用於定義實現對Channel底層的各種操作,Unsafe介面
定義的操作行為只能由Netty框架的Reactor執行緒
呼叫,使用者執行緒禁止呼叫。
interface Unsafe {
//分配接收資料用的Buffer
RecvByteBufAllocator.Handle recvBufAllocHandle();
//伺服器端繫結的埠地址
SocketAddress localAddress();
//遠端地址
SocketAddress remoteAddress();
//channel向Reactor註冊
void register(EventLoop eventLoop, ChannelPromise promise);
//伺服器端繫結埠地址
void bind(SocketAddress localAddress, ChannelPromise promise);
//使用者端連線伺服器端
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
//關閉channle
void close(ChannelPromise promise);
//讀資料
void beginRead();
//寫資料
void write(Object msg, ChannelPromise promise);
}
NioServerSocketChannel
分配獨立的pipeline
用於IO事件編排。pipeline
其實是一個ChannelHandlerContext
型別的雙向連結串列。頭結點HeadContext
,尾結點TailContext
。ChannelHandlerContext
中包裝著ChannelHandler
。
ChannelHandlerContext
儲存 ChannelHandler上下文資訊,用於事件傳播。後面筆者會單獨開一篇文章介紹,這裡我們還是聚焦於啟動主線。
這裡只是為了讓大家簡單理解pipeline
的一個大致的結構,後面會寫一篇文章專門詳細講解pipeline
。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
到了這裡NioServerSocketChannel
就建立完畢了,我們來回顧下它到底包含了哪些核心資訊。
void init(Channel channel) {
//向NioServerSocketChannelConfig設定ServerSocketChannelOption
setChannelOptions(channel, newOptionsArray(), logger);
//向netty自定義的NioServerSocketChannel設定attributes
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
//獲取從Reactor執行緒組
final EventLoopGroup currentChildGroup = childGroup;
//獲取用於初始化使用者端NioSocketChannel的ChannelInitializer
final ChannelHandler currentChildHandler = childHandler;
//獲取使用者設定的使用者端SocketChannel的channelOption以及attributes
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
//向NioServerSocketChannel中的pipeline新增初始化ChannelHandler的邏輯
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中使用者指定的channelHandler
ChannelHandler handler = config.handler();
if (handler != null) {
//LoggingHandler
pipeline.addLast(handler);
}
//新增用於接收使用者端連線的acceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
向NioServerSocketChannelConfig
設定ServerSocketChannelOption
。
向netty自定義的NioServerSocketChannel
設定ChannelAttributes
Netty自定義的SocketChannel
型別均繼承AttributeMap
介面以及DefaultAttributeMap
類,正是它們定義了ChannelAttributes
。用於向Channel
新增使用者自定義的一些資訊。
這個
ChannelAttributes
的用處大有可為,Netty後邊的許多特性都是依靠這個ChannelAttributes
來實現的。這裡先賣個關子,大家可以自己先想一下可以用這個ChannelAttributes
做哪些事情?
獲取從Reactor執行緒組childGroup
,以及用於初始化使用者端NioSocketChannel
的ChannelInitializer
,ChannelOption
,ChannelAttributes
,這些資訊均是由使用者在啟動的時候向ServerBootstrap
新增的使用者端NioServerChannel
設定資訊。這裡用這些資訊來初始化ServerBootstrapAcceptor
。因為後續會在ServerBootstrapAcceptor
中接收使用者端連線以及建立NioServerChannel
。
向NioServerSocketChannel
中的pipeline
新增用於初始化pipeline
的ChannelInitializer
。
問題來了,這裡為什麼不乾脆直接將ChannelHandler
新增到pipeline
中,而是又使用到了ChannelInitializer
呢?
其實原因有兩點:
為了保證執行緒安全
地初始化pipeline
,所以初始化的動作需要由Reactor執行緒
進行,而當前執行緒是使用者程式
的啟動Main執行緒
並不是
Reactor執行緒。這裡不能立即初始化。
初始化Channel
中pipeline
的動作,需要等到Channel
註冊到對應的Reactor
中才可以進行初始化,當前只是建立好了NioServerSocketChannel
,但並未註冊到Main Reactor
上。
初始化
NioServerSocketChannel
中pipeline
的時機是:當NioServerSocketChannel
註冊到Main Reactor
之後,繫結埠地址之前。
前邊在介紹
ServerBootstrap
設定childHandler
時也用到了ChannelInitializer
,還記得嗎??
問題又來了,大家注意下ChannelInitializer#initChannel
方法,在該初始化回撥方法中,新增LoggingHandler是直接向pipeline中新增,而新增Acceptor為什麼不是直接新增而是封裝成非同步任務呢?
這裡先給大家賣個關子,筆者會在後續流程中為大家解答~~~~~
此時NioServerSocketChannel
中的pipeline
結構如下圖所示:
從ServerBootstrap
獲取主Reactor執行緒組NioEventLoopGroup
,將NioServerSocketChannel
註冊到NioEventLoopGroup
中。
ChannelFuture regFuture = config().group().register(channel);
下面我們來看下具體的註冊過程:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventExecutor next() {
return chooser.next();
}
//獲取繫結策略
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
//採用輪詢round-robin的方式選擇Reactor
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
Netty通過next()
方法根據上篇文章《聊聊Netty那些事兒之Reactor在Netty中的實現(建立篇)》提到的channel到reactor的繫結策略
,從ReactorGroup
中選取一個Reactor進行註冊繫結。之後Channel
生命週期內的所有 IO 事件
都由這個 Reactor
負責處理,如 accept、connect、read、write
等 IO 事件。
一個
channel
只能繫結到一個Reactor
上,一個Reactor
負責監聽多個channel
。
由於這裡是
NioServerSocketChannle
向Main Reactor
進行註冊繫結,所以Main Reactor
主要負責處理的IO事件
是OP_ACCEPT
事件。
向Reactor
進行註冊的行為定義在NioEventLoop
的父類別SingleThreadEventLoop
中,印象模糊的同學可以在回看下上篇文章中的NioEventLoop繼承結構
小節內容。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
//註冊channel到繫結的Reactor上
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//unsafe負責channel底層的各種操作
promise.channel().unsafe().register(this, promise);
return promise;
}
}
通過NioServerSocketChannel
中的Unsafe類
執行底層具體的註冊動作。
protected abstract class AbstractUnsafe implements Unsafe {
/**
* 註冊Channel到繫結的Reactor上
* */
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
//EventLoop的型別要與Channel的型別一樣 Nio Oio Aio
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//在channel上設定繫結的Reactor
AbstractChannel.this.eventLoop = eventLoop;
/**
* 執行channel註冊的操作必須是Reactor執行緒來完成
*
* 1: 如果當前執行執行緒是Reactor執行緒,則直接執行register0進行註冊
* 2:如果當前執行執行緒是外部執行緒,則需要將register0註冊操作 封裝程非同步Task 由Reactor執行緒執行
* */
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...............省略...............
}
}
}
}
首先檢查NioServerSocketChannel
是否已經完成註冊。如果以完成註冊,則直接設定代表註冊操作結果的ChannelPromise
為fail狀態
。
通過isCompatible
方法驗證Reactor模型EventLoop
是否與Channel
的型別匹配。NioEventLoop
對應於NioServerSocketChannel
。
上篇文章我們介紹過 Netty對三種
IO模型
:Oio,Nio,Aio
的支援,使用者可以通過改變Netty核心類的字首輕鬆切換IO模型
。isCompatible
方法目的就是需要保證Reactor
和Channel
使用的是同一種IO模型
。
在Channel
中儲存其繫結的Reactor範例
。
執行Channel
向Reactor
註冊的動作必須要確保是在Reactor執行緒
中執行。
Reactor執行緒
則直接執行註冊動作register0
Reactor執行緒
,則需要將註冊動作register0
封裝成非同步任務,存放在Reactor
中的taskQueue
中,等待Reactor執行緒
執行。當前執行執行緒並不是
Reactor執行緒
,而是使用者程式的啟動執行緒Main執行緒
。
上篇文章中我們在介紹NioEventLoopGroup
的建立過程中提到了一個構造器引數executor
,它用於啟動Reactor執行緒
,型別為ThreadPerTaskExecutor
。
當時筆者向大家賣了一個關子~~「Reactor執行緒是何時啟動的?」
那麼現在就到了為大家揭曉謎底的時候了~~
Reactor執行緒
的啟動是在向Reactor
提交第一個非同步任務的時候啟動的。
Netty中的主Reactor執行緒組NioEventLoopGroup
中的Main ReactorNioEventLoop
是在使用者程式Main執行緒
向Main Reactor
提交用於註冊NioServerSocketChannel
的非同步任務時開始啟動。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
接下來我們關注下NioEventLoop
的execute方法
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
//當前執行緒是否為Reactor執行緒
boolean inEventLoop = inEventLoop();
//addTaskWakesUp = true addTask喚醒Reactor執行緒執行任務
addTask(task);
if (!inEventLoop) {
//如果當前執行緒不是Reactor執行緒,則啟動Reactor執行緒
//這裡可以看出Reactor執行緒的啟動是通過 向NioEventLoop新增非同步任務時啟動的
startThread();
.....................省略.....................
}
.....................省略.....................
}
}
首先將非同步任務task
新增到Reactor
中的taskQueue
中。
判斷當前執行緒是否為Reactor執行緒
,此時當前執行執行緒為使用者程式啟動執行緒,所以這裡呼叫startThread
啟動Reactor執行緒
。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//定義Reactor執行緒狀態
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
//Reactor執行緒狀態 初始為 未啟動狀態
private volatile int state = ST_NOT_STARTED;
//Reactor執行緒狀態列位state 原子更新器
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
}
Reactor執行緒
初始化狀態為ST_NOT_STARTED
,首先CAS
更新狀態為ST_STARTED
doStartThread
啟動Reactor執行緒
啟動失敗的話,需要將Reactor執行緒
狀態改回ST_NOT_STARTED
//ThreadPerTaskExecutor 用於啟動Reactor執行緒
private final Executor executor;
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//Reactor執行緒開始啟動
SingleThreadEventExecutor.this.run();
success = true;
}
................省略..............
}
這裡就來到了ThreadPerTaskExecutor
型別的executor
的用武之地了。
Reactor執行緒
的核心工作之前介紹過:輪詢所有註冊其上的Channel中的IO就緒事件
,處理對應Channel上的IO事件
,執行非同步任務
。Netty將這些核心工作封裝在io.netty.channel.nio.NioEventLoop#run
方法中。NioEventLoop#run
封裝在非同步任務中,提交給executor
執行,Reactor
執行緒至此開始工作了就。public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
@Override
public void execute(Runnable command) {
//啟動Reactor執行緒
threadFactory.newThread(command).start();
}
}
此時Reactor執行緒
已經啟動,後面的工作全部都由這個Reactor執行緒
來負責執行了。
而使用者啟動執行緒在向Reactor
提交完NioServerSocketChannel
的註冊任務register0
後,就逐步退出呼叫堆疊,回退到最開始的啟動入口處ChannelFuture f = b.bind(PORT).sync()
。
此時Reactor
中的任務佇列中只有一個任務register0
,Reactor執行緒
啟動後,會從任務佇列中取出任務執行。
至此NioServerSocketChannel
的註冊工作正式拉開帷幕~~
//true if the channel has never been registered, false otherwise
private boolean neverRegistered = true;
private void register0(ChannelPromise promise) {
try {
//檢視註冊操作是否已經取消,或者對應channel已經關閉
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//執行真正的註冊操作
doRegister();
//修改註冊狀態
neverRegistered = false;
registered = true;
//回撥pipeline中新增的ChannelInitializer的handlerAdded方法,在這裡初始化channelPipeline
pipeline.invokeHandlerAddedIfNeeded();
//設定regFuture為success,觸發operationComplete回撥,將bind操作放入Reactor的任務佇列中,等待Reactor執行緒執行。
safeSetSuccess(promise);
//觸發channelRegister事件
pipeline.fireChannelRegistered();
//對於伺服器端ServerSocketChannel來說 只有繫結埠地址成功後 channel的狀態才是active的。
//此時繫結操作作為非同步任務在Reactor的任務佇列中,繫結操作還沒開始,所以這裡的isActive()是false
if (isActive()) {
if (firstRegistration) {
//觸發channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
............省略.............
}
}
register0
是驅動整個Channel
註冊繫結流程的關鍵方法,下面我們來看下它的核心邏輯:
首先需要檢查Channel
的註冊動作是否在Reactor執行緒
外被取消了已經!promise.setUncancellable()
。檢查要註冊的Channel
是否已經關閉!ensureOpen(promise)
。如果Channel
已經關閉或者註冊操作已經被取消,那麼就直接返回,停止註冊流程。
呼叫doRegister()
方法,執行真正的註冊操作。最終實現在AbstractChannel
的子類AbstractNioChannel
中,這個我們一會在介紹,先關注整體流程。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
/**
* Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
*
* Sub-classes may override this method
*/
protected void doRegister() throws Exception {
// NOOP
}
}
Channel
向Reactor
註冊完畢後,呼叫pipeline.invokeHandlerAddedIfNeeded()
方法,觸發回撥pipeline中新增的ChannelInitializer的handlerAdded方法,在handlerAdded方法中利用前面提到的ChannelInitializer
初始化ChannelPipeline
。初始化
ChannelPipeline
的時機是當Channel
向對應的Reactor
註冊成功後,在handlerAdded事件回撥
中利用ChannelInitializer
進行初始化。
regFuture
為Success
,並回撥註冊在regFuture
上的ChannelFutureListener#operationComplete
方法,在operationComplete
回撥方法中將繫結操作
封裝成非同步任務,提交到Reactor
的taskQueue
中。等待Reactor
的執行。還記得這個
regFuture
在哪裡出現的嗎?它是在哪裡被建立,又是在哪裡新增的ChannelFutureListener
呢? 大家還有印象嗎?回憶不起來也沒關係,筆者後面還會提到
pipeline.fireChannelRegistered()
在pipeline
中觸發channelRegister事件
。
pipeline
中channelHandler
的channelRegistered方法
被回撥。
NioServerSocketChannel
來說, 只有繫結埠地址成功
後 channel的狀態才是active
的。此時繫結操作
在regFuture
上註冊的ChannelFutureListener#operationComplete
回撥方法中被作為非同步任務提交到了Reactor
的任務佇列中,Reactor執行緒
還沒開始
執行繫結任務
。所以這裡的isActive()
是false
。當
Reactor執行緒
執行完register0方法
後,才會去執行繫結任務
。
下面我們來看下register0
方法中這些核心步驟
的具體實現:
public abstract class AbstractNioChannel extends AbstractChannel {
//channel註冊到Selector後獲得的SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...............省略....................
}
}
}
}
呼叫底層JDK NIO Channel
方法java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object)
,將NettyNioServerSocketChannel
中包裝的JDK NIO ServerSocketChannel
註冊到Reactor
中的JDK NIO Selector
上。
簡單介紹下SelectableChannel#register
方法引數的含義:
Selector:
表示JDK NIO Channel
將要向哪個Selector
進行註冊。
int ops:
表示Channel
上感興趣的IO事件
,當對應的IO事件就緒
時,Selector
會返回Channel
對應的SelectionKey
。
SelectionKey
可以理解為Channel
在Selector
上的特殊表示形式,SelectionKey
中封裝了Channel
感興趣的IO事件集合~~~interestOps
,以及IO就緒的事件集合~~readyOps
, 同時也封裝了對應的JDK NIO Channel
以及註冊的Selector
。最後還有一個重要的屬性attachment
,可以允許我們在SelectionKey
上附加一些自定義的物件。
Object attachment:
向SelectionKey
中新增使用者自定義的附加物件。這裡
NioServerSocketChannel
向Reactor
中的Selector
註冊的IO事件
為0
,這個操作的主要目的是先獲取到Channel
在Selector
中對應的SelectionKey
,完成註冊。當繫結操作完成後,在去向SelectionKey
新增感興趣的IO事件
~~~OP_ACCEPT事件
。
同時通過
SelectableChannel#register
方法將Netty自定義的NioServerSocketChannel
(這裡的this
指標)附著在SelectionKey
的attechment
屬性上,完成Netty自定義Channel
與JDK NIOChannel
的關係繫結。這樣在每次對Selector
進行IO就緒事件
輪詢時,Netty 都可以從JDK NIO Selector
返回的SelectionKey
中獲取到自定義的Channel
物件(這裡指的就是NioServerSocketChannel
)。
當NioServerSocketChannel
註冊到Main Reactor
上的Selector
後,Netty通過呼叫pipeline.invokeHandlerAddedIfNeeded()
開始回撥NioServerSocketChannel
中pipeline
裡的ChannelHandler的handlerAdded方法
。
此時NioServerSocketChannel
的pipeline
結構如下:
此時pipeline
中只有在初始化NioServerSocketChannel
時新增的ChannelInitializer
。
我們來看下ChannelInitializer
中handlerAdded回撥方法
具體作了哪些事情~~
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
//初始化工作完成後,需要將自身從pipeline中移除
removeState(ctx);
}
}
}
//ChannelInitializer範例是被所有的Channel共用的,用於初始化ChannelPipeline
//通過Set集合儲存已經初始化的ChannelPipeline,避免重複初始化同一ChannelPipeline
private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
new ConcurrentHashMap<ChannelHandlerContext, Boolean>());
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
//初始化完畢後,從pipeline中移除自身
pipeline.remove(this);
}
}
return true;
}
return false;
}
//匿名類實現,這裡指定具體的初始化邏輯
protected abstract void initChannel(C ch) throws Exception;
private void removeState(final ChannelHandlerContext ctx) {
//從initMap防重Set集合中刪除ChannelInitializer
if (ctx.isRemoved()) {
initMap.remove(ctx);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
initMap.remove(ctx);
}
});
}
}
}
ChannelInitializer
中的初始化邏輯比較簡單明瞭:
首先要判斷必須是當前Channel
已經完成註冊後,才可以進行pipeline
的初始化。ctx.channel().isRegistered()
呼叫ChannelInitializer
的匿名類指定的initChannel
執行自定義的初始化邏輯。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中使用者指定的channelHandler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
還記得在初始化
NioServerSocketChannel
時。io.netty.bootstrap.ServerBootstrap#init
方法中向pipeline
中新增的ChannelInitializer
嗎?
initChannel 方法
後,ChannelPipeline
的初始化就結束了,此時ChannelInitializer
就沒必要再繼續呆在pipeline中了
,所需要將ChannelInitializer
從pipeline
中刪除。pipeline.remove(this)
當初始化完pipeline
時,此時pipeline
的結構再次發生了變化:
此時Main Reactor
中的任務佇列taskQueue
結構變化為:
新增ServerBootstrapAcceptor
的任務是在初始化NioServerSocketChannel
的時候向main reactor提交過去的。還記得嗎?
在本小節《Netty伺服器端的啟動》的最開始,我們介紹了伺服器端啟動的入口函數io.netty.bootstrap.AbstractBootstrap#doBind
,在函數的最開頭呼叫了initAndRegister()
方法用來建立並初始化NioServerSocketChannel
,之後便會將NioServerSocketChannel
註冊到Main Reactor
中。
註冊的操作是一個非同步的過程,所以在initAndRegister()
方法呼叫後返回一個代表註冊結果的ChannelFuture regFuture
。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private ChannelFuture doBind(final SocketAddress localAddress) {
//非同步建立,初始化,註冊ServerSocketChannel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
//如果註冊完成,則進行繫結操作
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//新增註冊完成 回撥函數
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
...............省略...............
// 註冊完成後,Reactor執行緒回撥這裡
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
}
之後會向ChannelFuture regFuture
新增一個註冊完成後的回撥函數~~~~ ChannelFutureListener
。在回撥函數operationComplete
中開始發起綁埠地址流程
。
那麼這個回撥函數在什麼時候?什麼地方發起的呢??
讓我們在回到本小節的主題register0
方法的流程中:
當呼叫doRegister()
方法完成NioServerSocketChannel
向Main Reactor
的註冊後,緊接著會呼叫pipeline.invokeHandlerAddedIfNeeded()
方法中觸發ChannelInitializer#handlerAdded
回撥中對pipeline
進行初始化。
最後在safeSetSuccess
方法中,開始回撥註冊在regFuture
上的ChannelFutureListener
。
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
@Override
public boolean trySuccess() {
return trySuccess(null);
}
@Override
public boolean trySuccess(V result) {
return setSuccess0(result);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
//回撥註冊在promise上的listeners
notifyListeners();
}
return true;
}
return false;
}
safeSetSuccess
的邏輯比較簡單,首先設定regFuture
結果為success
,並且回撥註冊在regFuture
上的ChannelFutureListener
。
需要提醒的是,執行
safeSetSuccess
方法,以及後邊回撥regFuture
上的ChannelFutureListener
這些動作都是由Reactor執行緒
執行的。
關於Netty中的
Promise模型
後邊我會在寫一篇專門的文章進行分析,這裡大家只需清楚大體的流程即可。不必在意過多的細節。
下面我們把視角切換到regFuture
上的ChannelFutureListener
回撥中,看看在Channel
註冊完成後,Netty又會做哪些事情?
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
}
這裡Netty又將繫結埠地址
的操作封裝成非同步任務,提交給Reactor
執行。
但是這裡有一個問題,其實此時執行doBind0
方法的執行緒正是Reactor執行緒
,那為什麼不直接在這裡去執行bind操作
,而是再次封裝成非同步任務提交給Reactor
中的taskQueue
呢?
反正最終都是由Reactor執行緒
執行,這其中又有什麼分別呢?
經過上小節的介紹我們知道,bind0
方法的呼叫是由io.netty.channel.AbstractChannel.AbstractUnsafe#register0
方法在將NioServerSocketChannel
註冊到Main Reactor
之後,並且NioServerSocketChannel
的pipeline
已經初始化完畢後,通過safeSetSuccess
方法回撥過來的。
這個過程全程是由Reactor執行緒
來負責執行的,但是此時register0
方法並沒有執行完畢,還需要執行後面的邏輯。
而繫結邏輯需要在註冊邏輯執行完之後執行,所以在doBind0
方法中Reactor執行緒
會將繫結操作
封裝成非同步任務先提交給taskQueue
中儲存,這樣可以使Reactor執行緒
立馬從safeSetSuccess
中返回,繼續執行剩下的register0
方法邏輯。
private void register0(ChannelPromise promise) {
try {
................省略............
doRegister();
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//觸發channelRegister事件
pipeline.fireChannelRegistered();
if (isActive()) {
................省略............
}
} catch (Throwable t) {
................省略............
}
}
當Reactor執行緒
執行完register0
方法後,就會從taskQueue
中取出非同步任務執行。
此時Reactor執行緒
中的taskQueue
結構如下:
Reactor執行緒
會先取出位於taskQueue
隊首的任務執行,這裡是指向NioServerSocketChannel
的pipeline
中新增ServerBootstrapAcceptor
的非同步任務。此時NioServerSocketChannel
中pipeline
的結構如下:
Reactor執行緒
執行繫結任務。對Channel
的操作行為全部定義在ChannelOutboundInvoker介面中
。
public interface ChannelOutboundInvoker {
/**
* Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
* completes, either because the operation was successful or because of an error.
*
*/
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
}
bind
方法由子類AbstractChannel
實現。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
}
呼叫pipeline.bind(localAddress, promise)
在pipeline
中傳播bind事件
,觸發回撥pipeline
中所有ChannelHandler
的bind方法
。
事件在pipeline
中的傳播具有方向性:
inbound事件
從HeadContext
開始逐個向後傳播直到TailContext
。outbound事件
則是反向傳播,從TailContext
開始反向向前傳播直到HeadContext
。
inbound事件
只能被pipeline
中的ChannelInboundHandler
響應處理
outbound事件
只能被pipeline
中的ChannelOutboundHandler
響應處理
然而這裡的bind事件
在Netty中被定義為outbound事件
,所以它在pipeline
中是反向傳播。先從TailContext
開始反向傳播直到HeadContext
。
然而bind
的核心邏輯也正是實現在HeadContext
中。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
//觸發AbstractChannel->bind方法 執行JDK NIO SelectableChannel 執行底層繫結操作
unsafe.bind(localAddress, promise);
}
}
在HeadContext#bind
回撥方法中,呼叫Channel
裡的unsafe
操作類執行真正的繫結操作。
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
.................省略................
//這時channel還未啟用 wasActive = false
boolean wasActive = isActive();
try {
//io.netty.channel.socket.nio.NioServerSocketChannel.doBind
//呼叫具體channel實現類
doBind(localAddress);
} catch (Throwable t) {
.................省略................
return;
}
//繫結成功後 channel啟用 觸發channelActive事件傳播
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//pipeline中觸發channelActive事件
pipeline.fireChannelActive();
}
});
}
//回撥註冊在promise上的ChannelFutureListener
safeSetSuccess(promise);
}
protected abstract void doBind(SocketAddress localAddress) throws Exception;
}
NioServerSocketChannel
具體實現的doBind
方法,通過JDK NIO 原生 ServerSocketChannel
執行底層的繫結操作。 @Override
protected void doBind(SocketAddress localAddress) throws Exception {
//呼叫JDK NIO 底層SelectableChannel 執行繫結操作
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
判斷是否為首次繫結,如果是的話將觸發pipeline中的ChannelActive事件
封裝成非同步任務放入Reactor
中的taskQueue
中。
執行safeSetSuccess(promise)
,回撥註冊在promise
上的ChannelFutureListener
。
還是同樣的問題,當前執行執行緒已經是Reactor執行緒
了,那麼為何不直接觸發pipeline
中的ChannelActive
事件而是又封裝成非同步任務呢??
因為如果直接在這裡觸發ChannelActive事件
,那麼Reactor執行緒
就會去執行pipeline
中的ChannelHandler
的channelActive事件回撥
。
這樣的話就影響了safeSetSuccess(promise)
的執行,延遲了
註冊在promise
上的ChannelFutureListener
的回撥。
到現在為止,Netty伺服器端就已經完成了繫結埠地址的操作,NioServerSocketChannel
的狀態現在變為Active
。
最後還有一件重要的事情要做,我們接著來看pipeline
中對channelActive事件
處理。
channelActive事件
在Netty中定義為inbound事件
,所以它在pipeline
中的傳播為正向傳播,從HeadContext
一直到TailContext
為止。
在channelActive事件
回撥中需要觸發向Selector
指定需要監聽的IO事件
~~OP_ACCEPT事件
。
這塊的邏輯主要在HeadContext
中實現。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//pipeline中繼續向後傳播channelActive事件
ctx.fireChannelActive();
//如果是autoRead 則自動觸發read事件傳播
//在read回撥函數中 觸發OP_ACCEPT註冊
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
//如果是autoRead 則觸發read事件傳播
channel.read();
}
}
//AbstractChannel
public Channel read() {
//觸發read事件
pipeline.read();
return this;
}
@Override
public void read(ChannelHandlerContext ctx) {
//觸發註冊OP_ACCEPT或者OP_READ事件
unsafe.beginRead();
}
}
HeadContext
中的channelActive
回撥中觸發pipeline
中的read事件
。read事件
再次傳播到HeadContext
時,觸發HeadContext#read
方法的回撥。在read回撥
中呼叫channel
底層操作類unsafe
的beginRead
方法向selector
註冊監聽OP_ACCEPT事件
。protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void beginRead() {
assertEventLoop();
//channel必須是Active
if (!isActive()) {
return;
}
try {
// 觸發在selector上註冊channel感興趣的監聽事件
doBeginRead();
} catch (final Exception e) {
.............省略..............
}
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//子類負責繼承實現
protected abstract void doBeginRead() throws Exception;
}
斷言判斷執行該方法的執行緒必須是Reactor執行緒
。
此時NioServerSocketChannel
已經完成埠地址的繫結操作,isActive() = true
呼叫doBeginRead
實現向Selector
註冊監聽事件OP_ACCEPT
public abstract class AbstractNioChannel extends AbstractChannel {
//channel註冊到Selector後獲得的SelectKey
volatile SelectionKey selectionKey;
// Channel監聽事件集合
protected final int readInterestOp;
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
/**
* 1:ServerSocketChannel 初始化時 readInterestOp設定的是OP_ACCEPT事件
* */
if ((interestOps & readInterestOp) == 0) {
//新增OP_ACCEPT事件到interestOps集合中
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
前邊提到在NioServerSocketChannel
在向Main Reactor
中的Selector
註冊後,會獲得一個SelectionKey
。這裡首先要獲取這個SelectionKey
。
從SelectionKey
中獲取NioServerSocketChannel
感興趣的IO事件集合 interestOps
,當時在註冊的時候interestOps
設定為0
。
將在NioServerSocketChannel
初始化時設定的readInterestOp = OP_ACCEPT
,設定到SelectionKey
中的interestOps
集合中。這樣Reactor
中的Selector
就開始監聽interestOps
集合中包含的IO事件
了。
Main Reactor
中主要監聽的是OP_ACCEPT事件
。
流程走到這裡,Netty伺服器端就真正的啟動起來了,下一步就開始等待接收使用者端連線了。大家此刻在來回看這副啟動流程圖,是不是清晰了很多呢?
此時Netty的Reactor模型
結構如下:
本文我們通過圖解原始碼的方式完整地介紹了整個Netty伺服器端啟動流程,並介紹了在啟動過程中涉及到的ServerBootstrap
相關的屬性以及設定方式。NioServerSocketChannel
的建立初始化過程以及類的繼承結構。
其中重點介紹了NioServerSocketChannel
向Reactor
的註冊過程以及Reactor執行緒
的啟動時機和pipeline
的初始化時機。
最後介紹了NioServerSocketChannel
繫結埠地址的整個流程。
上述介紹的這些流程全部是非同步操作,各種回撥繞來繞去的,需要反覆回想下,讀非同步程式碼就是這樣,需要理清各種回撥之間的關係,並且時刻提醒自己當前的執行執行緒是什麼?
好了,現在Netty伺服器端已經啟動起來,接著就該接收使用者端連線了,我們下篇文章見~~~~