系列文章目錄和關於我
經過《Netty原始碼學習4——伺服器端是處理新連線的&netty的reactor模式和《Netty原始碼學習5——伺服器端是如何讀取資料的》,我們瞭解了netty伺服器端是如何建立連線,讀取使用者端資料的,通過《Netty原始碼學習6——netty編碼解碼器&粘包半包問題的解決》我們認識到編解碼在網路程式設計中的作用以及netty是如何解決TCP粘包,半包問題的。
那麼netty使用者端是如何傳送資料的,以及伺服器端是如何將響應傳送給使用者端的暱?
在我們之前編寫的小demo當中,有如下程式碼:
關鍵原始碼如下:
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 省略 部分
//flush 表示是否需要flush,呼叫writeAndFlush的時候為true
// 找到下一個ChannelHandlerContext
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
// 在eventLoop 中
if (executor.inEventLoop()) {
// 需要flush 那麼呼叫invokeWriteAndFlush
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// 在eventLoop 中 那麼提交一個WriteTask到eventLoop中
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
可以看到TailContext會通過 findContextOutbound 方法在當前 ChannelHandler 的前邊找到 ChannelOutboundHandler 型別並且覆蓋實現 write 回撥方法的 ChannelHandler 作為下一個要執行的物件。
然後如果當前執行的執行緒就是EventLoop執行緒,那麼直接呼叫,反之提交一個非同步任務,從而保證執行write的一定是 reactor 執行緒——保證執行緒安全性
如下是next.invokeWriteAndFlush
的原始碼
最終事件會傳播到HeadContext進行處理(如果中間的ChannelHandler不截胡的話)
write 事件最終會由HeadContext進行處理
可以看到HeadContext#write其實就是使用Channel的Unsafe#write,其主要邏輯如下
ChannelOutboundBuffer 是 Netty 內部使用的一個資料結構,它用於儲存待傳送的出站資料。在 Netty 的網路框架中,當需要寫資料到網路時,資料並不會立即被傳送出去,而是首先被放入一個出站緩衝區中,即 ChannelOutboundBuffer。這個緩衝區負責管理和儲存所有待寫入通道的資料。
下面是向ChannelOutboundBuffer寫入messge的原始碼
此方法只是負責更改flushedEntry 和 unflushedEntry 指標指向
將 flushedEntry 指標指向 unflushedEntry 指標表示的第一個未被 flush 的 Entry 節點。並將 unflushedEntry 指標置為空,準備開始 flush 傳送資料流程。
這樣在 flushedEntry 與 tailEntry 之間的 Entry 節點即為本次 flush 操作需要傳送的資料範圍。
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ++;
//如果當前entry對應的write操作被使用者取消,則釋放msg,並降低channelOutboundBuffer水位線
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
可以看到如果註冊了write到selector上,那麼不會進行flush,
如下是NioSockectChannel傳送資料的原始碼
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
//獲取jdk nio底層socketChannel
SocketChannel ch = javaChannel();
//最大寫入次數 預設為16 ,因為EventLoop可能單執行緒處理多Channel,需要雨露均沾
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// 如果全部資料已經寫完 則移除OP_WRITE事件並直接退出writeLoop
clearOpWrite();
return;
}
// 獲取單次傳送最大位元組數
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
//Netty的DirectBuffer底層就是JDK的DirectByteBuffer
// 將ChannelOutboundBuffer中快取的DirectBuffer轉換成JDK的ByteBuffer,
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
// ChannelOutboundBuffer中總共的DirectBuffer數
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
// 真正進行傳送
//java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)進行寫回
}
} while (writeSpinCount > 0);
// 處理Socket可寫但已經寫滿16次還沒寫完的情況
incompleteWrite(writeSpinCount < 0);
}
可以看到
如果資料全部寫完了,會呼叫clearOpWrite清除當前 Channel 在 Reactor 上註冊的 OP_WRITE 事件
這意味著,不需要再監聽write來觸發flush了
寫入的過程會寫入多次,並控制自旋次數,做到雨露均沾
如上是寫入的過程
如果ByteBuffer個數為0,說明傳送的是FileRegion 型別,case 0
的分支主要就是用於處理網路檔案傳輸的情況
case1 和 default則呼叫jdk SocketChannel#write進行資料傳送,如果寫入的資料小於等於0,說明當前Socket傳送緩衝區滿了寫不進去了,則註冊OP_WRITE事件,等待Socket傳送緩衝區可寫時再寫
觸發Write後,再Sockect寫緩衝區可寫後,會觸發對應事件,即可再NioEventLoop中進行處理,如下圖中會直接呼叫forceFlush
完成傳送會呼叫adjustMaxBytesPerGatheringWrite進行調整
兩個分支分別表示
期望寫入和真正寫入的相等,說明資料能全部寫入到 Socket 的寫緩衝區中了,那麼下次 write loop 就應該嘗試去寫入更多的資料。
本次寫入的數量x2>maxBytesPerGatheringWrite 說明要寫的資料很多,那麼更新為本次 write loop 兩倍的寫入量大小
如果本次寫入的資料還不及嘗試寫入資料的一半,說明Socket寫緩衝區容量不多了,嘗試縮容為一半
處理
protected final void incompleteWrite(boolean setOpWrite) {
if (setOpWrite) {
//socket緩衝區已滿寫不進去的情況 註冊write事件
setOpWrite();
} else {
//處理socket緩衝區依然可寫,但是寫了16次還沒寫完,提交flushTask非同步寫
clearOpWrite();
eventLoop().execute(flushTask);
}
這一節中我們學習了netty寫入資料的流程,寫入資料時出站事件,一般最終將有HeadContext進行處理
write方法將寫入的資料轉換為DirectByteBuf包裝到ChannelOutboundBuffer中,並且記錄了對應的Promise實現非同步驅動,還可以減少系統呼叫
flush方法,呼叫jdk SocketChannel#write進行寫入,使用自旋次數控制,讓多個Channel的處理得到平衡,如果Socket 緩衝區滿無法在繼續寫入那麼會OP_WRITE 事件,等 Socket 緩衝區變的可寫時,epoll 通知 EventLoop執行緒繼續傳送。
Socket 緩衝區可寫,寫滿 16 次但依然沒有寫完,這時候註冊非同步任務使用EventLoop執行緒進行非同步傳送。如果寫的時FileRegion型別,那麼會使用transferTo進行零拷貝寫入。