在前面《Netty原始碼學習4——伺服器端是處理新連線的&netty的reactor模式》的學習中,我們瞭解到伺服器端是如何處理新連線的,即註冊ServerSocketChannel對accept事件感興趣,然後包裝ServerSocketChannel為NioServerSockectChannel,最後由主Reactor在迴圈中利用selector進行IO多路複用產生事件,如果產生accept事件那麼呼叫ServerSocketChannel#accept將其結果(SocketChannel)包裝為NioSockectChannel,然後傳播channelRead事件,然後由ServerBootstrapAcceptor 將NioSockectChannel註冊到子Reactor中。
也就是說ServerBootstrapAcceptor 是派活的大哥,屬於main reactor,而真正幹活的是子reactor中的NioEventLoop,它們會負責後續的資料讀寫與寫入。
這一篇我們就來學習NioSockectChannel是如何讀取資料的。
原始碼學習的入口和伺服器端處理accept事件一致
可以看到子reactor執行緒讀取使用者端傳送的資料,使用的是NioSockectChannelUnsafe#read方法。如下是read方法原始碼:
public final void read() {
final ChannelConfig config = config();
// 省略read-half相關處理
final ChannelPipeline pipeline = pipeline();
// ByteBuf分配器,預設為堆外記憶體+池化分配器
final ByteBufAllocator allocator = config.getAllocator();
// allocHandle用來控制下面讀取流程
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// allocHandle使用allocator來分配記憶體給byteBuf
byteBuf = allocHandle.allocate(allocator);
// doReadBytes讀取資料到byteBuf,記錄最後一次讀取的位元組數
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// 小於0==>通道已到達流結束
if (allocHandle.lastBytesRead() <= 0) {
// 釋放byteBuf
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 記錄讀取次數+1
allocHandle.incMessagesRead(1);
readPending = false;
// 觸發channelRead
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());//判斷是否繼續讀
allocHandle.readComplete();
// 觸發readComplete
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
// 省略
} finally {
// 省略
}
}
可以看到每次讀取到資料都會觸發channelRead,讀取完畢後會觸發readComplete
我們的業務邏輯就需要自己實現ChannelHandler#channelRead和channelReadComplete進行資料處理(解碼,執行業務操作,編碼,寫回)
可以看到是否繼續讀取使用者端傳送資料,是由allocHandle.continueReading()決定的,並且讀取使用者端的資料會存放到ByteBuf中,ByteBuf的分配是allocHandle.allocate(allocator)
來控制
我們先忽略ByteBufAllocator和RecvByteBufAllocator ,直接看看doReadBytes(byteBuf)
是如何讀取的資料
最終呼叫setBytes進行讀取
下面我們看看ByteBufAllocator和RecvByteBufAllocator 在這個過程中取到了什麼作用
ByteBufAllocator的主要作用是分配和回收ByteBuf物件,以及管理記憶體的分配和釋放。
如上是ByteBufAllocator的具體實現
AbstractByteBufAllocator:這是一個抽象類,提供了一些通用的方法和邏輯,用於建立和管理ByteBuf範例。它是UnpooledByteBufAllocator和PooledByteBufAllocator的基礎類別。
UnpooledByteBufAllocator:這是ByteBufAllocator的預設實現。它採用非池化的方式進行記憶體分配,每次都會建立新的ByteBuf物件,不會使用記憶體池。
PooledByteBufAllocator:這是使用記憶體池的ByteBufAllocator實現。它通過重用記憶體池中的ByteBuf物件來提高效能和記憶體利用率。PooledByteBufAllocator可以根據需求使用池化和非池化的ByteBuf範例。
PreferredDirectByteBufAllocator:偏好使用直接記憶體的分配器,ByteBufAllocator#buffer並沒有說明是堆內還是堆外,PreferredDirectByteBufAllocator會優先使用堆內(裝飾器模式)
PreferHeapByteBufAllocator:偏好使用堆內記憶體的分配器
ByteBufAllocator是真正分配記憶體產生ByteBuf的分配器,但是在網路io中通常需要根據讀取資料的多少動態調整ByteBuf的。預設情況下netty在讀取使用者端資料的時候使用的是AdaptiveRecvByteBufAllocator
,顧名思義可以調整ByteBuf的RecvByteBufAllocator 實現。
也就是說,ByteBufAllocator是真正負責記憶體分配的,RecvByteBufAllocator是負責根據網路IO情況去呼叫ByteBufAllocator調整ByteBuf的。
其內部還有一個Handler,Handler才是真正實現這些邏輯的類,這樣做法的好處在於解耦合——RecvByteBufAllocator和Handler是鬆耦合的,多個RecvByteBufAllocator可以基於相同的Handler。
如下是 AdaptiveRecvByteBufAllocator#HandleImpl在讀取使用者端資料的過程中取到的作用:
可以看到真正分配ByteBuf的是ByteBufAllocator,而大小是AdaptiveRecvByteBufAllocator#HandleImpl使用guess方法猜測出來的
首次guess會返回預設的值(2048)後續該方法根據之前讀取資料的多少來「猜」這次使用多大ByteBuf比較合適
這個猜其實就是返回記憶體中記錄的下一次大小,那麼是怎麼實現猜測的過程的暱?
可以看到在記錄讀取數量的時候,如果是滿載而歸(比如上一次猜需要2048位元組,由於使用者端傳送資料很多,讀滿了ByteBuf)會呼叫record進行記錄和調整
可以看到容量大小記錄在了SIZE_TABLE中,SIZE_TABLE的初始化如下
可以看到AdaptiveRecvByteBufAllocator#HandlerImpl調整的策略有以下特點
continueReading會判斷是否繼續讀取:需要開啟自動讀,且maybe存在更多資料需要讀取,且累計讀取訊息數小於最大訊息數,且上一次讀到了資料
自動讀:預設情況下,Netty的Channel是處於自動讀取模式的。這意味著當有新資料可讀時,Netty會自動觸發讀事件,從Channel中讀取資料並傳遞給下一個處理器進行處理。自動讀適合在高吞吐量的場景開啟,但是如果處理資料的速度跟不上讀取資料速度會出現資料堆積,記憶體佔用過高,rt增加的問題。
maybe存在更多資料需要讀取:
其實就是判斷上一次讀取的位元組數和預估的數量是否相等,也就是是否滿載而歸
累計讀取訊息數小於最大訊息數
雖然一個NioServerChannel只會繫結到一個執行緒,但是一個執行緒可以註冊多個NioServerChannel,so如果一個使用者端瘋狂發資料, 伺服器端不做干預,將導致這個執行緒上的其他Channel永遠得不到處理
so netty設定maxMessagePerRead(單次read最多可以讀取多少訊息——指回圈讀取ServerChannel多少次)
這一篇我們看了NioServerChannel是如何讀取資料的,其Unsafe依賴JDK原生的SocketChannel#read(ByteBuffer)來讀取資料,但是netty在此之上做了如下優化
使用ByteBufAllocator優化ByteBuf的分配,預設使用池化的直接記憶體策略
記憶體池這一篇沒用做過多學習,後續單獨學習
使用AdaptiveRecvByteBufAllocator對讀取過程進行優化
這一篇我們看到每一次迴圈讀取NioSocketChannel資料後會觸發channelRead,讀取完畢後會觸發readComplete,
我們的業務邏輯就需要自己實現ChannelHandler#channelRead和channelReadComplete進行資料處理(解碼,執行業務操作,編碼,寫回)
那麼netty中有哪些內建的編碼解碼器暱?下一篇我們再來嘮嘮