作者: Grey
原文地址:
新連線的接入分為3個過程
檢測到有新連線。
將新連線註冊到 worker 執行緒。
註冊新連線的讀事件。
檢測新連線的程式碼在NioEventLoop
中的processSelectedKey()
方法中
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
......
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
......
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
.....
}
啟動一個 Netty 伺服器端和 Netty 使用者端,在unsafe.read()
這一行打斷點,可以得到這裡的unsafe
就是NioMessageUnsafe
,進入NioMessageUnsafe
的read()
方法,
這個方法主要做的事情就是:建立,設定並繫結NioSocketChannel
。
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
......
do {
// 建立`NioSocketChannel`
int localRead = doReadMessages(readBuf);
......
} while (continueReading(allocHandle));
......
// 設定並繫結 NioSocketChannel
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
......
}
}
建立NioSocketChannel
呼叫的是doReadMessages()
方法,通過Debug,可以看到doReadMessage()
來自於NioServerSocketChannel
中
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
可以看到此時呼叫的是 Java 底層的accept()
方法,建立了一條 JDK 層面的Channel
, Netty 將其封裝成自定義的NioSocketChannel
,並加入一個List
。
繼續 Debug,進入 NioSocketChannel 的構造方法中,呼叫的是AbstractNioByteChannel
的構造方法
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
這個方法類似在 NIO 程式設計中,註冊 OP_READ 事件,表示 Channel 對讀事件感興趣。
接下來是設定並繫結NioSocketChannel
,處理每個NioSocketChannel
,通過 Debug 可以來到AbstractUnsafe
的register0()
方法
private void register0(ChannelPromise promise) {
// 註冊Selector
doRegister();
// 執行 handler
pipeline.invokeHandlerAddedIfNeeded();
// 傳播 ChannelRegistered事件
pipeline.fireChannelRegistered();
// 註冊讀事件
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
}
這個方法主要完成的事情就是:
將NioSocketChannel
註冊到Selector
上
設定自定義的Handler
。
將連線註冊事件傳播下去,呼叫了每個Handler
的channelRegistered
方法。
註冊讀事件。
完整程式碼見:hello-netty
本文所有圖例見:processon: Netty學習筆記
更多內容見:Netty專欄
本文來自部落格園,作者:Grey Zeng,轉載請註明原文連結:https://www.cnblogs.com/greyzeng/p/16755179.html