本系列Netty原始碼解析文章基於 4.1.56.Final版本
對於一個高效能網路通訊框架來說,最最重要也是最核心的工作就是如何高效的接收使用者端連線,這就好比我們開了一個飯店,那麼迎接客人就是飯店最重要的工作,我們要先把客人迎接進來,不能讓客人一看人多就走掉,只要客人進來了,哪怕菜做的慢一點也沒關係。
本文筆者就來為大家介紹下netty這塊最核心的內容,看看netty是如何高效的接收使用者端連線的。
下圖為筆者在一個月黑風高天空顯得那麼深邃遙遠的夜晚,閒來無事,於是捧起Netty關於如何接收連線這部分原始碼細細品讀的時候,意外的發現了一個影響Netty接收連線吞吐的一個Bug。
於是筆者就在Github提了一個Issue#11708,闡述了下這個Bug產生的原因以及導致的結果並和Netty的作者一起討論了下修復措施。如上圖所示。
Issue#11708:https://github.com/netty/netty/issues/11708
這裡先不詳細解釋這個Issue,也不建議大家現在就開啟這個Issue檢視,筆者會在本文的介紹中隨著原始碼深入的解讀慢慢的為大家一層一層地撥開迷霧。
之所以在文章的開頭把這個拎出來,筆者是想讓大家帶著懷疑,審視,欣賞,崇敬,敬畏的態度來一起品讀世界頂級程式設計師編寫的程式碼。由衷的感謝他們在這一領域做出的貢獻。
好了,問題丟擲來後,我們就帶著這個疑問來開始本文的內容吧~~~
按照老規矩,再開始本文的內容之前,我們先來回顧下前邊幾篇文章的概要內容幫助大家梳理一個框架全貌出來。
筆者這裡再次想和讀者朋友們強調的是本文可以獨立觀看,並不依賴前邊系列文章的內容,只是大家如果對相關細節部分感興趣的話,可以在閱讀完本文之後在去回看相關文章。
在前邊的系列文章中,筆者為大家介紹了驅動Netty整個框架運轉的核心引擎Reactor的建立,啟動,執行的全流程。從現在開始Netty的整個核心框架就開始運轉起來開始工作了,本文要介紹的主要內容就是Netty在啟動之後要做的第一件事件:監聽埠地址,高效接收使用者端連線。
在《聊聊Netty那些事兒之從核心角度看IO模型》一文中,我們是從整個網路框架的基石IO模型的角度整體闡述了下Netty的IO執行緒模型。
而Netty中的Reactor正是IO執行緒在Netty中的模型定義。Reactor在Netty中是以Group的形式出現的,分為:
主Reactor執行緒組也就是我們在啟動程式碼中設定的EventLoopGroup bossGroup
,main reactor group中的reactor主要負責監聽使用者端連線事件,高效的處理使用者端連線。也是本文我們要介紹的重點。
從Reactor執行緒組也就是我們在啟動程式碼中設定的EventLoopGroup workerGroup
,sub reactor group中的reactor主要負責處理使用者端連線上的IO事件,以及非同步任務的執行。
最後我們得出Netty的整個IO模型如下:
本文我們討論的重點就是MainReactorGroup的核心工作上圖中所示的步驟1,步驟2,步驟3。
在從整體上介紹完Netty的IO模型之後,我們又在《Reactor在Netty中的實現(建立篇)》中完整的介紹了Netty框架的骨架主從Reactor組的搭建過程,闡述了Reactor是如何被建立出來的,並介紹了它的核心元件如下圖所示:
thread
即為Reactor中的IO執行緒,主要負責監聽IO事件,處理IO任務,執行非同步任務。
selector
則是JDK NIO對作業系統底層IO多路複用技術實現的封裝。用於監聽IO就緒事件。
taskQueue
用於儲存Reactor需要執行的非同步任務,這些非同步任務可以由使用者在業務執行緒中向Reactor提交,也可以是Netty框架提交的一些自身核心的任務。
scheduledTaskQueue
則是儲存Reactor中執行的定時任務。代替了原有的時間輪來執行延時任務。
tailQueue
儲存了在Reactor需要執行的一些尾部收尾任務,在普通任務執行完後 Reactor執行緒會執行尾部任務,比如對Netty 的執行狀態做一些統計資料,例如任務迴圈的耗時、佔用實體記憶體的大小等等
在骨架搭建完畢之後,我們隨後又在在《詳細圖解Netty Reactor啟動全流程》》一文中介紹了本文的主角伺服器端NioServerSocketChannel的建立,初始化,繫結埠地址,向main reactor註冊監聽OP_ACCEPT事件
的完整過程。
main reactor如何處理OP_ACCEPT事件將會是本文的主要內容。
自此Netty框架的main reactor group已經啟動完畢,開始準備監聽OP_accept事件,當用戶端連線上來之後,OP_ACCEPT事件活躍,main reactor開始處理OP_ACCEPT事件接收使用者端連線了。
而netty中的IO事件分為:OP_ACCEPT事件,OP_READ事件,OP_WRITE事件和OP_CONNECT事件,netty對於IO事件的監聽和處理統一封裝在Reactor模型中,這四個IO事件的處理過程也是我們後續文章中要單獨拿出來介紹的,本文我們聚焦OP_ACCEPT事件的處理。
而為了讓大家能夠對IO事件的處理有一個完整性的認識,筆者寫了《一文聊透Netty核心引擎Reactor的運轉架構》這篇文章,在文章中詳細介紹了Reactor執行緒的整體執行框架。
Reactor執行緒會在一個死迴圈中996不停的運轉,在迴圈中會不斷的輪詢監聽Selector上的IO事件,當IO事件活躍後,Reactor從Selector上被喚醒轉去執行IO就緒事件的處理,在這個過程中我們引出了上述四種IO事件的處理入口函數。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//獲取Channel的底層操作類Unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
......如果SelectionKey已經失效則關閉對應的Channel......
}
try {
//獲取IO就緒事件
int readyOps = k.readyOps();
//處理Connect事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
//移除對Connect事件的監聽,否則Selector會一直通知
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//觸發channelActive事件處理Connect事件
unsafe.finishConnect();
}
//處理Write事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//處理Read事件或者Accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
本文筆者將會為大家重點介紹OP_ACCEPT事件
的處理入口函數unsafe.read()
的整個原始碼實現。
當用戶端連線完成三次握手之後,main reactor中的selector產生OP_ACCEPT事件
活躍,main reactor隨即被喚醒,來到了OP_ACCEPT事件
的處理入口函數開始接收使用者端連線。
當Main Reactor
輪詢到NioServerSocketChannel
上的OP_ACCEPT事件
就緒時,Main Reactor執行緒就會從JDK Selector
上的阻塞輪詢APIselector.select(timeoutMillis)
呼叫中返回。轉而去處理NioServerSocketChannel
上的OP_ACCEPT事件
。
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
..............省略.................
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
..............處理OP_CONNECT事件.................
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
..............處理OP_WRITE事件.................
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//本文重點處理OP_ACCEPT事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
處理IO就緒事件的入口函數processSelectedKey
中的引數AbstractNioChannel ch
正是Netty伺服器端NioServerSocketChannel
。因為此時的執行執行緒為main reactor執行緒,而main reactor上註冊的正是netty伺服器端NioServerSocketChannel負責監聽埠地址,接收使用者端連線。
通過ch.unsafe()
獲取到的NioUnsafe操作類正是NioServerSocketChannel中對底層JDK NIO ServerSocketChannel的Unsafe底層操作類。
Unsafe介面
是Netty對Channel底層操作行為的封裝,比如NioServerSocketChannel的底層Unsafe操作類乾的事情就是繫結埠地址
,處理OP_ACCEPT事件
。
這裡我們看到,Netty將OP_ACCEPT事件
處理的入口函數封裝在NioServerSocketChannel
裡的底層操作類Unsafe的read
方法中。
而NioServerSocketChannel中的Unsafe操作類實現型別為NioMessageUnsafe
定義在上圖繼承結構中的AbstractNioMessageChannel父類別中
。
下面我們到NioMessageUnsafe#read
方法中來看下Netty對OP_ACCPET事件
的具體處理過程:
我們還是按照老規矩,先從整體上把整個OP_ACCEPT事件的邏輯處理框架提取出來,讓大家先總體俯視下流程全貌,然後在針對每個核心點位進行各個擊破。
main reactor執行緒是在一個do...while{...}
迴圈read loop中不斷的呼叫JDK NIO serverSocketChannel.accept()
方法來接收完成三次握手的使用者端連線NioSocketChannel
的,並將接收到的使用者端連線NioSocketChannel臨時儲存在List<Object> readBuf
集合中,後續會伺服器端NioServerSocketChannel的pipeline中通過ChannelRead事件來傳遞,最終會在ServerBootstrapAcceptor這個ChannelHandler中被處理初始化,並將其註冊到Sub Reator Group中。
這裡的read loop迴圈會被限定只能讀取16次,當main reactor從NioServerSocketChannel中讀取使用者端連線NioSocketChannel的次數達到16次之後,無論此時是否還有使用者端連線都不能在繼續讀取了。
因為我們在《一文聊透Netty核心引擎Reactor的運轉架構》一文中提到,netty對reactor執行緒壓榨的比較狠,要乾的事情很多,除了要監聽輪詢IO就緒事件,處理IO就緒事件,還需要執行使用者和netty框架本省提交的非同步任務和定時任務。
所以這裡的main reactor執行緒不能在read loop中無限制的執行下去,因為還需要分配時間去執行非同步任務,不能因為無限制的接收使用者端連線而耽誤了非同步任務的執行。所以這裡將read loop的迴圈次數限定為16次。
如果main reactor執行緒在read loop中讀取使用者端連線NioSocketChannel的次數已經滿了16次,即使此時還有使用者端連線未接收,那麼main reactor執行緒也不會再去接收了,而是轉去執行非同步任務,當非同步任務執行完畢後,還會在回來執行剩餘接收連線的任務。
main reactor執行緒退出read loop迴圈的條件有兩個:
在限定的16次讀取中,已經沒有新的使用者端連線要接收了。退出迴圈。
從NioServerSocketChannel中讀取使用者端連線的次數達到了16次,無論此時是否還有使用者端連線都需要退出迴圈。
以上就是Netty在接收使用者端連線時的整體核心邏輯,下面筆者將這部分邏輯的核心原始碼實現框架提取出來,方便大家根據上述核心邏輯與原始碼中的處理模組對應起來,還是那句話,這裡只需要總體把握核心處理流程,不需要讀懂每一行程式碼,筆者會在文章的後邊分模組來各個擊破它們。
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//存放連線建立後,建立的使用者端SocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
//必須在Main Reactor執行緒中執行
assert eventLoop().inEventLoop();
//注意下面的config和pipeline都是伺服器端ServerSocketChannel中的
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//建立接收資料Buffer分配器(用於分配容量大小合適的byteBuffer用來容納接收資料)
//在接收連線的場景中,這裡的allocHandle只是用於控制read loop的迴圈讀取建立連線的次數。
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//底層呼叫NioServerSocketChannel->doReadMessages 建立使用者端SocketChannel
int localRead = doReadMessages(readBuf);
//已無新的連線可接收則退出read loop
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//統計在當前事件迴圈中已經讀取到得Message數量(建立連線的個數)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());//判斷是否已經讀滿16次
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//在NioServerSocketChannel對應的pipeline中傳播ChannelRead事件
//初始化使用者端SocketChannel,並將其繫結到Sub Reactor執行緒組中的一個Reactor上
pipeline.fireChannelRead(readBuf.get(i));
}
//清除本次accept 建立的使用者端SocketChannel集合
readBuf.clear();
allocHandle.readComplete();
//觸發readComplete事件傳播
pipeline.fireChannelReadComplete();
....................省略............
} finally {
....................省略............
}
}
}
}
}
這裡首先要通過斷言 assert eventLoop().inEventLoop()
確保處理接收使用者端連線的執行緒必須為Main Reactor 執行緒。
而main reactor中主要註冊的是伺服器端NioServerSocketChannel,主要負責處理OP_ACCEPT事件
,所以當前main reactor執行緒是在NioServerSocketChannel中執行接收連線的工作。
所以這裡我們通過config()
獲取到的是NioServerSocketChannel的屬性設定類NioServerSocketChannelConfig
,它是在Reactor的啟動階段被建立出來的。
public NioServerSocketChannel(ServerSocketChannel channel) {
//父類別AbstractNioChannel中儲存JDK NIO原生ServerSocketChannel以及要監聽的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中設定用於Channel接收資料用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
同理這裡通過pipeline()
獲取到的也是NioServerSocketChannel中的pipeline
。它會在NioServerSocketChannel向main reactor註冊成功之後被初始化。
前邊提到main reactor執行緒會被限定只能在read loop中向NioServerSocketChannel讀取16次使用者端連線,所以在開始read loop之前,我們需要建立一個能夠儲存記錄讀取次數的物件,在每次read loop迴圈之後,可以根據這個物件來判斷是否結束read loop。
這個物件就是這裡的 RecvByteBufAllocator.Handle allocHandle
專門用於統計read loop中接收使用者端連線的次數,以及判斷是否該結束read loop轉去執行非同步任務。
當這一切準備就緒之後,main reactor執行緒就開始在do{....}while(...)
迴圈中接收使用者端連線了。
在 read loop中通過呼叫doReadMessages函數
接收完成三次握手的使用者端連線,底層會呼叫到JDK NIO ServerSocketChannel的accept方法,從核心全連線佇列中取出使用者端連線。
返回值localRead
表示接收到了多少使用者端連線,使用者端連線通過accept方法只會一個一個的接收,所以這裡的localRead
正常情況下都會返回1
,當localRead <= 0
時意味著已經沒有新的使用者端連線可以接收了,本次main reactor接收使用者端的任務到這裡就結束了,跳出read loop。開始新的一輪IO事件的監聽處理。
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
隨後會將接收到的使用者端連線佔時存放到List<Object> readBuf
集合中。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//存放連線建立後,建立的使用者端SocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
}
呼叫allocHandle.incMessagesRead
統計本次事件迴圈中接收到的使用者端連線個數,最後在read loop末尾通過allocHandle.continueReading
判斷是否達到了限定的16次。從而決定main reactor執行緒是繼續接收使用者端連線還是轉去執行非同步任務。
main reactor執行緒退出read loop的兩個條件:
在限定的16次讀取中,已經沒有新的使用者端連線要接收了。退出迴圈。
從NioServerSocketChannel中讀取使用者端連線的次數達到了16次,無論此時是否還有使用者端連線都需要退出迴圈。
當滿足以上兩個退出條件時,main reactor執行緒就會退出read loop,由於在read loop中接收到的使用者端連線全部暫存在List<Object> readBuf
集合中,隨後開始遍歷readBuf,在NioServerSocketChannel的pipeline中傳播ChannelRead事件。
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//NioServerSocketChannel對應的pipeline中傳播read事件
//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead
//初始化使用者端SocketChannel,並將其繫結到Sub Reactor執行緒組中的一個Reactor上
pipeline.fireChannelRead(readBuf.get(i));
}
最終pipeline中的ChannelHandler(ServerBootstrapAcceptor)會響應ChannelRead事件,並在相應回撥函數中初始化使用者端NioSocketChannel,並將其註冊到Sub Reactor Group中。此後使用者端NioSocketChannel繫結到的sub reactor就開始監聽處理使用者端連線上的讀寫事件了。
Netty整個接收使用者端的邏輯過程如下圖步驟1,2,3所示。
以上內容就是筆者提取出來的整體流程框架,下面我們來將其中涉及到的重要核心模組拆開,一個一個詳細解讀下。
Reactor在處理對應Channel上的IO資料時,都會採用一個ByteBuffer
來接收Channel上的IO資料。而本小節要介紹的RecvByteBufAllocator正是用來分配ByteBuffer的一個分配器。
還記得這個RecvByteBufAllocator
在哪裡被建立的嗎??
在《聊聊Netty那些事兒之Reactor在Netty中的實現(建立篇)》一文中,在介紹NioServerSocketChannel
的建立過程中提到,對應Channel的設定類NioServerSocketChannelConfig也會隨著NioServerSocketChannel的建立而建立。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
在建立NioServerSocketChannelConfig
的過程中會建立RecvByteBufAllocator
。
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
這裡我們看到NioServerSocketChannel中的RecvByteBufAllocator實際型別為AdaptiveRecvByteBufAllocator
,顧名思義,這個型別的RecvByteBufAllocator可以根據Channel上每次到來的IO資料大小來自適應動態調整ByteBuffer的容量。
對於伺服器端NioServerSocketChannel來說,它上邊的IO資料就是使用者端的連線,它的長度和型別都是固定的,所以在接收使用者端連線的時候並不需要這樣的一個ByteBuffer來接收,我們會將接收到的使用者端連線存放在List<Object> readBuf
集合中
對於使用者端NioSocketChannel來說,它上邊的IO資料時使用者端傳送來的網路資料,長度是不定的,所以才會需要這樣一個可以根據每次IO資料的大小來自適應動態調整容量的ByteBuffer來接收。
那麼看起來這個RecvByteBufAllocator和本文的主題不是很關聯,因為在接收連線的過程中並不會怎麼用到它,這個類筆者還會在後面的文章中詳細介紹,之所以這裡把它拎出來單獨介紹是因為它和本文開頭提到的Bug有關係,這個Bug就是由這個類引起的。
在本文中,我們是通過NioServerSocketChannel中的unsafe底層操作類來獲取RecvByteBufAllocator.Handle的
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
}
我們看到最終會在NioServerSocketChannel的設定類NioServerSocketChannelConfig中獲取到AdaptiveRecvByteBufAllocator
public class DefaultChannelConfig implements ChannelConfig {
//用於Channel接收資料用的buffer分配器 型別為AdaptiveRecvByteBufAllocator
private volatile RecvByteBufAllocator rcvBufAllocator;
}
AdaptiveRecvByteBufAllocator
中會建立自適應動態調整容量的ByteBuffer分配器。
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
@Override
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
}
private final class HandleImpl extends MaxMessageHandle {
.................省略................
}
}
這裡的newHandle
方法返回的具體型別為MaxMessageHandle
,這個MaxMessageHandle
裡邊儲存了每次從Channel
中讀取IO資料
的容量指標,方便下次讀取時分配合適大小的buffer
。
每次在使用allocHandle
前需要呼叫allocHandle.reset(config);
重置裡邊的統計指標。
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
//每次事件輪詢時,最多讀取16次
private int maxMessagePerRead;
//本次事件輪詢總共讀取的message數,這裡指的是接收連線的數量
private int totalMessages;
//本次事件輪詢總共讀取的位元組數
private int totalBytesRead;
@Override
public void reset(ChannelConfig config) {
this.config = config;
//預設每次最多讀取16次
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
}
ServerBootstrap
中通過ChannelOption.MAX_MESSAGES_PER_READ
選項設定。ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.MAX_MESSAGES_PER_READ, 自定義次數)
allocHandle.incMessagesRead
增加記錄接收到的連線個數。 @Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
}
}
MaxMessageHandler中還有一個非常重要的方法就是在每次read loop末尾會呼叫allocHandle.continueReading()
方法來判斷讀取連線次數是否已滿16次,來決定main reactor執行緒是否退出迴圈。
do {
//底層呼叫NioServerSocketChannel->doReadMessages 建立使用者端SocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//統計在當前事件迴圈中已經讀取到得Message數量(建立連線的個數)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
紅框中圈出來的兩個判斷條件和本文主題無關,我們這裡不需要關注,筆者會在後面的文章詳細介紹。
totalMessages < maxMessagePerRead
:在本文的接收使用者端連線場景中,這個條件用於判斷main reactor執行緒在read loop中的讀取次數是否超過了16次。如果超過16次就會返回false,main reactor執行緒退出迴圈。
totalBytesRead > 0
:用於判斷當用戶端NioSocketChannel上的OP_READ事件活躍時,sub reactor執行緒在read loop中是否讀取到了網路資料。
以上內容就是RecvByteBufAllocator.Handle在接收使用者端連線場景下的作用,大家這裡仔細看下這個allocHandle.continueReading()
方法退出迴圈的判斷條件,再結合整個do{....}while(...)
接收連線迴圈體,感受下是否哪裡有些不對勁?Bug即將出現~~~
netty不論是在本文中處理接收使用者端連線的場景還是在處理接收使用者端連線上的網路資料場景都會在一個do{....}while(...)
迴圈read loop中不斷的處理。
同時也都會利用在上一小節中介紹的RecvByteBufAllocator.Handle
來記錄每次read loop接收到的連線個數和從連線上讀取到的網路資料大小。
從而在read loop的末尾都會通過allocHandle.continueReading()
方法判斷是否應該退出read loop迴圈結束連線的接收流程或者是結束連線上資料的讀取流程。
無論是用於接收使用者端連線的main reactor也好還是用於接收使用者端連線上的網路資料的sub reactor也好,它們的執行框架都是一樣的,只不過是具體分工不同。
所以netty這裡想用統一的RecvByteBufAllocator.Handle
來處理以上兩種場景。
而RecvByteBufAllocator.Handle
中的totalBytesRead
欄位主要記錄sub reactor執行緒在處理使用者端NioSocketChannel中OP_READ事件活躍時,總共在read loop中讀取到的網路資料,而這裡是main reactor執行緒在接收使用者端連線所以這個欄位並不會被設定。totalBytesRead欄位的值在本文中永遠會是0
。
所以無論同時有多少個使用者端並行連線到伺服器端上,在接收連線的這個read loop中永遠只會接受一個連線就會退出迴圈,因為allocHandle.continueReading()方法
中的判斷條件totalBytesRead > 0
永遠會返回false
。
do {
//底層呼叫NioServerSocketChannel->doReadMessages 建立使用者端SocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//統計在當前事件迴圈中已經讀取到得Message數量(建立連線的個數)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
而netty的本意是在這個read loop迴圈中儘可能多的去接收使用者端的並行連線,同時又不影響main reactor執行緒執行非同步任務。但是由於這個Bug,main reactor在這個迴圈中只執行一次就結束了。這也一定程度上就影響了netty的吞吐。
讓我們想象下這樣的一個場景,當有16個使用者端同時並行連線到了伺服器端,這時NioServerSocketChannel上的OP_ACCEPT事件
活躍,main reactor從Selector上被喚醒,隨後執行OP_ACCEPT事件
的處理。
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
............省略.........
case SelectStrategy.BUSY_WAIT:
............省略.........
case SelectStrategy.SELECT:
............監聽輪詢IO事件.........
default:
}
} catch (IOException e) {
............省略.........
}
............處理IO就緒事件.........
............執行非同步任務.........
}
}
但是由於這個Bug的存在,main reactor在接收使用者端連線的這個read loop中只接收了一個使用者端連線就匆匆返回了。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
do {
int localRead = doReadMessages(readBuf);
.........省略...........
} while (allocHandle.continueReading());
}
然後根據下圖中這個Reactor的執行結構去執行非同步任務,隨後繞一大圈又會回到NioEventLoop#run
方法中重新發起一輪OP_ACCEPT事件輪詢。
由於現在還有15個使用者端並行連線沒有被接收,所以此時Main Reactor執行緒並不會在selector.select()
上阻塞,最終繞一圈又會回到NioMessageUnsafe#read
方法的do{.....}while()
迴圈。在接收一個連線之後又退出迴圈。
本來我們可以在一次read loop中把這16個並行的使用者端連線全部接收完畢的,因為這個Bug,main reactor需要不斷的發起OP_ACCEPT事件的輪詢,繞了很大一個圈子。同時也增加了許多不必要的selector.select()系統呼叫開銷
這時大家在看這個Issue#11708中的討論是不是就清晰很多了~~
Issue#11708:https://github.com/netty/netty/issues/11708
筆者在寫這篇文章的時候,Netty最新版本是4.1.68.final,這個Bug在4.1.69.final中被修復。
由於該Bug產生的原因正是因為伺服器端NioServerSocketChannel(用於監聽埠地址和接收使用者端連線)和 使用者端NioSocketChannel(用於通訊)中的Config設定類混用了同一個ByteBuffer分配器AdaptiveRecvByteBufAllocator
而導致的。
所以在新版本修復中專門為伺服器端ServerSocketChannel中的Config設定類引入了一個新的ByteBuffer分配器ServerChannelRecvByteBufAllocator
,專門用於伺服器端ServerSocketChannel接收使用者端連線的場景。
在ServerChannelRecvByteBufAllocator
的父類別DefaultMaxMessagesRecvByteBufAllocator
中引入了一個新的欄位ignoreBytesRead
,用於表示是否忽略網路位元組的讀取,在建立伺服器端Channel設定類NioServerSocketChannelConfig的時候,這個欄位會被賦值為true
。
當main reactor執行緒在read loop迴圈中接收使用者端連線的時候。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
do {
int localRead = doReadMessages(readBuf);
.........省略...........
} while (allocHandle.continueReading());
}
在read loop迴圈的末尾就會採用從ServerChannelRecvByteBufAllocator
中建立的MaxMessageHandle#continueReading
方法來判斷讀取連線次數是否超過了16次。由於這裡的ignoreBytesRead == true
這回我們就會忽略totalBytesRead == 0
的情況,從而使得接收連線的read loop得以繼續地執行下去。在一個read loop中一次性把16個連線全部接收完畢。
以上就是對這個Bug產生的原因,以及發現的過程,最後修復的方案一個全面的介紹,因此筆者也出現在了netty 4.1.69.final版本釋出公告裡的thank-list中。哈哈,真是令人開心的一件事情~~~
通過以上對netty接收使用者端連線的全流程分析和對這個Bug來龍去脈以及修復方案的介紹,大家現在一定已經理解了整個接收連線的流程框架。
接下來筆者就把這個流程中涉及到的一些核心模組在單獨拎出來從細節入手,為大家各個擊破~~~
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
}
javaChannel()
獲取封裝在Netty伺服器端NioServerSocketChannel
中的JDK 原生 ServerSocketChannel
。 @Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
JDK NIO 原生
的ServerSocketChannel
的accept方法
獲取JDK NIO 原生
使用者端連線SocketChannel
。 public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
這一步就是我們在《聊聊Netty那些事兒之從核心角度看IO模型》介紹到的呼叫監聽Socket
的accept方法
,核心會基於監聽Socket
建立出來一個新的Socket
專門用於與使用者端之間的網路通訊這個我們稱之為使用者端連線Socket
。這裡的ServerSocketChannel
就類似於監聽Socket
。SocketChannel
就類似於使用者端連線Socket
。
由於我們在建立NioServerSocketChannel
的時候,會將JDK NIO 原生
的ServerSocketChannel
設定為非阻塞
,所以這裡當ServerSocketChannel
上有使用者端連線時就會直接建立SocketChannel
,如果此時並沒有使用者端連線時accept呼叫
就會立刻返回null
並不會阻塞。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//設定Channel為非阻塞 配合IO多路複用模型
ch.configureBlocking(false);
} catch (IOException e) {
..........省略.............
}
}
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
.........省略.......
}
return 0;
}
}
這裡會根據ServerSocketChannel
的accept
方法獲取到JDK NIO 原生
的SocketChannel
(用於底層真正與使用者端通訊的Channel),來建立Netty中的NioSocketChannel
。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
}
建立使用者端NioSocketChannel
的過程其實和之前講的建立伺服器端NioServerSocketChannel
大體流程是一樣的,我們這裡只對使用者端NioSocketChannel
和伺服器端NioServerSocketChannel
在建立過程中的不同之處做一個對比。
具體細節部分大家可以在回看下《詳細圖解Netty Reactor啟動全流程》一文中關於
NioServerSocketChannel
的建立的詳細細節。
在我們介紹Reactor的建立文章中,我們提到Netty中的Channel
是具有層次的。由於使用者端NioSocketChannel是在main reactor接收連線時在伺服器端NioServerSocketChannel中被建立的,所以在建立使用者端NioSocketChannel的時候會通過建構函式指定了parent屬性為NioServerSocketChanel
。並將JDK NIO 原生
的SocketChannel
封裝進Netty的使用者端NioSocketChannel
中。
而在Reactor啟動過程中建立NioServerSocketChannel
的時候parent屬性
指定是null
。因為它就是頂層的Channel
,負責建立使用者端NioSocketChannel
。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
使用者端NioSocketChannel向Sub Reactor註冊的是SelectionKey.OP_READ事件
,而伺服器端NioServerSocketChannel向Main Reactor註冊的是SelectionKey.OP_ACCEPT事件
。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
}
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
public NioServerSocketChannel(ServerSocketChannel channel) {
//父類別AbstractNioChannel中儲存JDK NIO原生ServerSocketChannel以及要監聽的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中設定用於Channel接收資料用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
使用者端NioSocketChannel
繼承的是AbstractNioByteChannel
,而伺服器端NioServerSocketChannel
繼承的是AbstractNioMessageChannel
。
它們繼承的這兩個抽象類一個字首是Byte
,一個字首是Message
有什麼區別嗎??
使用者端
NioSocketChannel
主要處理的是伺服器端與使用者端的通訊,這裡涉及到接收使用者端傳送來的資料,而Sub Reactor執行緒
從NioSocketChannel
中讀取的正是網路通訊資料單位為Byte
。
伺服器端
NioServerSocketChannel
主要負責處理OP_ACCEPT事件
,建立用於通訊的使用者端NioSocketChannel
。這時候使用者端與伺服器端還沒開始通訊,所以Main Reactor執行緒
從NioServerSocketChannel
的讀取物件為Message
。這裡的Message
指的就是底層的SocketChannel
使用者端連線。
以上就是NioSocketChannel
與NioServerSocketChannel
建立過程中的不同之處,後面的過程就一樣了。
SocketChannel
,並將其底層的IO模型設定為非阻塞
,儲存需要監聽的IO事件OP_READ
。 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//設定Channel為非阻塞 配合IO多路複用模型
ch.configureBlocking(false);
} catch (IOException e) {
}
}
channelId
,建立使用者端NioSocketChannel的底層操作類NioByteUnsafe
,建立pipeline。 protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel全域性唯一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用於底層socket的讀寫操作
unsafe = newUnsafe();
//為channel分配獨立的pipeline用於IO事件編排
pipeline = newChannelPipeline();
}
AdaptiveRecvByteBufAllocator
。 public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
在Bug修復後的版本中伺服器端NioServerSocketChannel的RecvByteBufAllocator型別設定為
ServerChannelRecvByteBufAllocator
最終我們得到的使用者端NioSocketChannel
結構如下:
在前邊介紹接收連線的整體核心流程框架的時候,我們提到main reactor執行緒是在一個do{.....}while(...)
迴圈read loop中不斷的呼叫ServerSocketChannel#accept
方法來接收使用者端的連線。
當滿足退出read loop迴圈的條件有兩個:
在限定的16次讀取中,已經沒有新的使用者端連線要接收了。退出迴圈。
從NioServerSocketChannel中讀取使用者端連線的次數達到了16次,無論此時是否還有使用者端連線都需要退出迴圈。
main reactor就會退出read loop迴圈,此時接收到的使用者端連線NioSocketChannel暫存與List<Object> readBuf
集合中。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
try {
try {
do {
........省略.........
//底層呼叫NioServerSocketChannel->doReadMessages 建立使用者端SocketChannel
int localRead = doReadMessages(readBuf);
........省略.........
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
........省略.........
} finally {
........省略.........
}
}
}
隨後main reactor執行緒會遍歷List<Object> readBuf
集合中的NioSocketChannel,並在NioServerSocketChannel的pipeline中傳播ChannelRead事件。
最終ChannelRead事件
會傳播到ServerBootstrapAcceptor
中,這裡正是Netty處理使用者端連線的核心邏輯所在。
ServerBootstrapAcceptor
主要的作用就是初始化使用者端NioSocketChannel
,並將使用者端NioSocketChannel註冊到Sub Reactor Group
中,並監聽OP_READ事件
。
在ServerBootstrapAcceptor 中會初始化使用者端NioSocketChannel的這些屬性。
比如:從Reactor組EventLoopGroup childGroup
,用於初始化NioSocketChannel
中的pipeline
用到的ChannelHandler childHandler
,以及NioSocketChannel
中的一些childOptions
和childAttrs
。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//向用戶端NioSocketChannel的pipeline中
//新增在啟動設定類ServerBootstrap中設定的ChannelHandler
child.pipeline().addLast(childHandler);
//利用設定的屬性初始化使用者端NioSocketChannel
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
/**
* 1:在Sub Reactor執行緒組中選擇一個Reactor繫結
* 2:將使用者端SocketChannel註冊到繫結的Reactor上
* 3:SocketChannel註冊到sub reactor中的selector上,並監聽OP_READ事件
* */
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
正是在這裡,netty會將我們在《詳細圖解Netty Reactor啟動全流程》的啟動範例程式中在ServerBootstrap中設定的使用者端NioSocketChannel的所有屬性(child字首設定)初始化到NioSocketChannel中。
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//建立主從Reactor執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//設定主從Reactor
.channel(NioServerSocketChannel.class)//設定主Reactor中的channel型別
.option(ChannelOption.SO_BACKLOG, 100)//設定主Reactor中channel的option選項
.handler(new LoggingHandler(LogLevel.INFO))//設定主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {//設定從Reactor中註冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 繫結埠啟動服務,開始監聽accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
以上範例程式碼中通過ServerBootstrap設定的NioSocketChannel相關屬性,會在Netty啟動並開始初始化NioServerSocketChannel
的時候將ServerBootstrapAcceptor
的建立初始化工作封裝成非同步任務
,然後在NioServerSocketChannel
註冊到Main Reactor
中成功後執行。
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
@Override
void init(Channel channel) {
................省略................
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
................省略................
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}
在經過ServerBootstrapAccptor#chanelRead回撥
的處理之後,此時使用者端NioSocketChannel中pipeline的結構為:
隨後會將初始化好的使用者端NioSocketChannel向Sub Reactor Group中註冊,並監聽OP_READ事件
。
如下圖中的步驟3所示:
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
使用者端NioSocketChannel向Sub Reactor Group註冊的流程完全和伺服器端NioServerSocketChannel向Main Reactor Group註冊流程一樣。
關於伺服器端NioServerSocketChannel的註冊流程,筆者已經在《詳細圖解Netty Reactor啟動全流程》一文中做出了詳細的介紹,對相關細節感興趣的同學可以在回看下。
這裡筆者在帶大家簡要回顧下整個註冊過程並著重區別對比使用者端NioSocetChannel與伺服器端NioServerSocketChannel註冊過程中不同的地方。
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventExecutor next() {
return chooser.next();
}
}
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
//註冊channel到繫結的Reactor上
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//unsafe負責channel底層的各種操作
promise.channel().unsafe().register(this, promise);
return promise;
}
}
當時我們在介紹NioServerSocketChannel
的註冊過程時,這裡的promise.channel()
為NioServerSocketChannel
。底層的unsafe操作類為NioMessageUnsafe
。
此時這裡的promise.channel()
為NioSocketChannel
。底層的unsafe操作類為NioByteUnsafe
。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
..............省略....................
//此時這裡的eventLoop為Sub Reactor
AbstractChannel.this.eventLoop = eventLoop;
/**
* 執行channel註冊的操作必須是Reactor執行緒來完成
*
* 1: 如果當前執行執行緒是Reactor執行緒,則直接執行register0進行註冊
* 2:如果當前執行執行緒是外部執行緒,則需要將register0註冊操作 封裝程非同步Task 由Reactor執行緒執行
* */
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
..............省略....................
}
}
}
注意此時傳遞進來的EventLoop eventLoop為Sub Reactor。
但此時的執行執行緒為Main Reactor執行緒
,並不是Sub Reactor執行緒(此時還未啟動)。
所以這裡的eventLoop.inEventLoop()
返回的是false
。
在else分支
中向繫結的Sub Reactor提交註冊NioSocketChannel
的任務。
當註冊任務提交後,此時繫結的
Sub Reactor執行緒
啟動。
我們又來到了Channel註冊的老地方register0方法
。在《詳細圖解Netty Reactor啟動全流程》中我們花了大量的篇幅介紹了這個方法。這裡我們只對比NioSocketChannel
與NioServerSocketChannel
不同的地方。
private void register0(ChannelPromise promise) {
try {
................省略..................
boolean firstRegistration = neverRegistered;
//執行真正的註冊操作
doRegister();
//修改註冊狀態
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
if (isActive()) {
if (firstRegistration) {
//觸發channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
................省略..................
}
}
這裡 doRegister()方法
將NioSocketChannel註冊到Sub Reactor中的Selector
上。
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...............省略...............
}
}
}
}
這裡是Netty使用者端NioSocketChannel
與JDK NIO 原生 SocketChannel關聯的地方。此時註冊的IO事件
依然是0
。目的也是隻是為了獲取NioSocketChannel在Selector中的SelectionKey
。
同時通過SelectableChannel#register
方法將Netty自定義的NioSocketChannel(這裡的this指標)附著在SelectionKey的attechment屬性上,完成Netty自定義Channel與JDK NIO Channel的關係繫結。這樣在每次對Selector進行IO就緒事件輪詢時,Netty 都可以從 JDK NIO Selector返回的SelectionKey中獲取到自定義的Channel物件(這裡指的就是NioSocketChannel)。
隨後呼叫pipeline.invokeHandlerAddedIfNeeded()
回撥使用者端NioSocketChannel上pipeline中的所有ChannelHandler的handlerAdded方法
,此時pipeline
的結構中只有一個ChannelInitializer
。最終會在ChannelInitializer#handlerAdded
回撥方法中初始化使用者端NioSocketChannel
的pipeline
。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
//初始化工作完成後,需要將自身從pipeline中移除
removeState(ctx);
}
}
}
protected abstract void initChannel(C ch) throws Exception;
}
關於對Channel中pipeline的詳細初始化過程,對細節部分感興趣的同學可以回看下《詳細圖解Netty Reactor啟動全流程》
此時使用者端NioSocketChannel中的pipeline中的結構就變為了我們自定義的樣子,在範例程式碼中我們自定義的ChannelHandler
為EchoServerHandler
。
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
當用戶端NioSocketChannel中的pipeline初始化完畢後,netty就開始呼叫safeSetSuccess(promise)方法
回撥regFuture
中註冊的ChannelFutureListener
,通知使用者端NioSocketChannel已經成功註冊到Sub Reactor上了。
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
在伺服器端NioServerSocketChannel註冊的時候我們會在listener中向Main Reactor提交
bind繫結埠地址任務
。但是在NioSocketChannel
註冊的時候,只會在listener
中處理一下注冊失敗的情況。
當Sub Reactor執行緒通知ChannelFutureListener註冊成功之後,隨後就會呼叫pipeline.fireChannelRegistered()
在使用者端NioSocketChannel的pipeline中傳播ChannelRegistered事件
。
這裡筆者重點要強調下,在之前介紹NioServerSocketChannel註冊的時候,我們提到因為此時NioServerSocketChannel並未繫結埠地址,所以這時的NioServerSocketChannel並未啟用,這裡的isActive()
返回false
。register0方法
直接返回。
伺服器端NioServerSocketChannel判斷是否啟用的標準為埠是否繫結成功。
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
public boolean isActive() {
return isOpen() && javaChannel().socket().isBound();
}
}
使用者端
NioSocketChannel
判斷是否啟用的標準為是否處於Connected狀態
。那麼顯然這裡肯定是處於connected狀態
的。
@Override
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
NioSocketChannel
已經處於connected狀態
,這裡並不需要繫結埠,所以這裡的isActive()
返回true
。
if (isActive()) {
/**
* 使用者端SocketChannel註冊成功後會走這裡,在channelActive事件回撥中註冊OP_READ事件
* */
if (firstRegistration) {
//觸發channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
.......省略..........
}
}
}
最後呼叫pipeline.fireChannelActive()
在NioSocketChannel中的pipeline傳播ChannelActive事件
,最終在pipeline
的頭結點HeadContext
中響應並註冊OP_READ事件
到Sub Reactor
中的Selector
上。
public abstract class AbstractNioChannel extends AbstractChannel { {
@Override
protected void doBeginRead() throws Exception {
..............省略................
final int interestOps = selectionKey.interestOps();
/**
* 1:ServerSocketChannel 初始化時 readInterestOp設定的是OP_ACCEPT事件
* 2:SocketChannel 初始化時 readInterestOp設定的是OP_READ事件
* */
if ((interestOps & readInterestOp) == 0) {
//註冊監聽OP_ACCEPT或者OP_READ事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
注意這裡的
readInterestOp
為使用者端NioSocketChannel
在初始化時設定的OP_READ事件
。
到這裡,Netty中的Main Reactor
接收連線的整個流程,我們就介紹完了,此時Netty中主從Reactor組的結構就變為:
本文我們介紹了NioServerSocketChannel
處理使用者端連線事件的整個過程。
接收連線的整個處理框架。
影響Netty接收連線吞吐的Bug產生的原因,以及修復的方案。
建立並初始化使用者端NioSocketChannel
。
初始化NioSocketChannel
中的pipeline
。
使用者端NioSocketChannel
向Sub Reactor
註冊的過程
其中我們也對比了NioServerSocketChannel
與NioSocketChannel
在建立初始化以及後面向Reactor
註冊過程中的差異之處。
當用戶端NioSocketChannel
接收完畢並向Sub Reactor
註冊成功後,那麼接下來Sub Reactor
就開始監聽註冊其上的所有使用者端NioSocketChannel
的OP_READ事件
,並等待使用者端向伺服器端傳送網路資料。
後面Reactor
的主角就該變為Sub Reactor
以及註冊在其上的使用者端NioSocketChannel
了。
下篇文章,我們將會討論Netty是如何接收網路資料的~~~~ 我們下篇文章見~~
歡迎關注公眾號:bin的技術小屋