Netty原始碼學習7——netty是如何傳送資料的

2023-12-03 21:00:20

零丶引入

系列文章目錄和關於我
經過《Netty原始碼學習4——伺服器端是處理新連線的&netty的reactor模式《Netty原始碼學習5——伺服器端是如何讀取資料的》,我們瞭解了netty伺服器端是如何建立連線,讀取使用者端資料的,通過《Netty原始碼學習6——netty編碼解碼器&粘包半包問題的解決》我們認識到編解碼在網路程式設計中的作用以及netty是如何解決TCP粘包,半包問題的。

那麼netty使用者端是如何傳送資料的,以及伺服器端是如何將響應傳送給使用者端的暱?

在我們之前編寫的小demo當中,有如下程式碼:

關鍵原始碼如下:

private void write(Object msg, boolean flush, ChannelPromise promise) {
   // 省略 部分
	
    //flush 表示是否需要flush,呼叫writeAndFlush的時候為true
    // 找到下一個ChannelHandlerContext
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    // 在eventLoop 中
    if (executor.inEventLoop()) {
        // 需要flush 那麼呼叫invokeWriteAndFlush
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        // 在eventLoop 中 那麼提交一個WriteTask到eventLoop中
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}

可以看到TailContext會通過 findContextOutbound 方法在當前 ChannelHandler 的前邊找到 ChannelOutboundHandler 型別並且覆蓋實現 write 回撥方法的 ChannelHandler 作為下一個要執行的物件。

然後如果當前執行的執行緒就是EventLoop執行緒,那麼直接呼叫,反之提交一個非同步任務,從而保證執行write的一定是 reactor 執行緒——保證執行緒安全性

如下是next.invokeWriteAndFlush的原始碼

最終事件會傳播到HeadContext進行處理(如果中間的ChannelHandler不截胡的話)

二丶write 原始碼解析

write 事件最終會由HeadContext進行處理

可以看到HeadContext#write其實就是使用Channel的Unsafe#write,其主要邏輯如下

ChannelOutboundBuffer#addMessage

ChannelOutboundBuffer 是 Netty 內部使用的一個資料結構,它用於儲存待傳送的出站資料。在 Netty 的網路框架中,當需要寫資料到網路時,資料並不會立即被傳送出去,而是首先被放入一個出站緩衝區中,即 ChannelOutboundBuffer。這個緩衝區負責管理和儲存所有待寫入通道的資料。

  • 批次傳送優化: ChannelOutboundBuffer 允許 Netty 批次地傳送資料,而不是每次寫操作都立即進行網路傳送。這樣可以減少系統呼叫次數,提高網路效率。
  • 流量控制: 它有助於實現流量控制,防止資料傳送過快,導致接收方處理不過來。
  • 緩衝區管理: 可以有效地管理記憶體,當資料被寫入網路後,及時釋放相應的記憶體。
  • 非同步處理: Netty 是非同步事件驅動的框架,使用 ChannelOutboundBuffer 可以將資料傳送的非同步化,提升處理效能

下面是向ChannelOutboundBuffer寫入messge的原始碼

1.addFlush

此方法只是負責更改flushedEntry 和 unflushedEntry 指標指向

將 flushedEntry 指標指向 unflushedEntry 指標表示的第一個未被 flush 的 Entry 節點。並將 unflushedEntry 指標置為空,準備開始 flush 傳送資料流程。

這樣在 flushedEntry 與 tailEntry 之間的 Entry 節點即為本次 flush 操作需要傳送的資料範圍。

public void addFlush() {
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                flushedEntry = entry;
            }
            do {
                flushed ++;
                //如果當前entry對應的write操作被使用者取消,則釋放msg,並降低channelOutboundBuffer水位線
                if (!entry.promise.setUncancellable()) {
                  
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }

2.flush0

可以看到如果註冊了write到selector上,那麼不會進行flush,

如下是NioSockectChannel傳送資料的原始碼

@Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        //獲取jdk nio底層socketChannel
        SocketChannel ch = javaChannel();
        //最大寫入次數 預設為16 ,因為EventLoop可能單執行緒處理多Channel,需要雨露均沾
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // 如果全部資料已經寫完 則移除OP_WRITE事件並直接退出writeLoop
                clearOpWrite();             
                return;
            }

            // 獲取單次傳送最大位元組數
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            //Netty的DirectBuffer底層就是JDK的DirectByteBuffer
            // 將ChannelOutboundBuffer中快取的DirectBuffer轉換成JDK的ByteBuffer,
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            // ChannelOutboundBuffer中總共的DirectBuffer數
            int nioBufferCnt = in.nioBufferCount();

            switch (nioBufferCnt) {
                    // 真正進行傳送
               //java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)進行寫回
            }
        } while (writeSpinCount > 0);
        // 處理Socket可寫但已經寫滿16次還沒寫完的情況
     incompleteWrite(writeSpinCount < 0);
    }

可以看到

  • 如果資料全部寫完了,會呼叫clearOpWrite清除當前 Channel 在 Reactor 上註冊的 OP_WRITE 事件

    這意味著,不需要再監聽write來觸發flush了

  • 寫入的過程會寫入多次,並控制自旋次數,做到雨露均沾

如上是寫入的過程

  • 如果ByteBuffer個數為0,說明傳送的是FileRegion 型別,case 0 的分支主要就是用於處理網路檔案傳輸的情況

  • case1 和 default則呼叫jdk SocketChannel#write進行資料傳送,如果寫入的資料小於等於0,說明當前Socket傳送緩衝區滿了寫不進去了,則註冊OP_WRITE事件,等待Socket傳送緩衝區可寫時再寫

    觸發Write後,再Sockect寫緩衝區可寫後,會觸發對應事件,即可再NioEventLoop中進行處理,如下圖中會直接呼叫forceFlush

  • 完成傳送會呼叫adjustMaxBytesPerGatheringWrite進行調整

兩個分支分別表示

  • 期望寫入和真正寫入的相等,說明資料能全部寫入到 Socket 的寫緩衝區中了,那麼下次 write loop 就應該嘗試去寫入更多的資料。

    本次寫入的數量x2>maxBytesPerGatheringWrite 說明要寫的資料很多,那麼更新為本次 write loop 兩倍的寫入量大小

  • 如果本次寫入的資料還不及嘗試寫入資料的一半,說明Socket寫緩衝區容量不多了,嘗試縮容為一半

image-20231203174406700

  • 處理

    protected final void incompleteWrite(boolean setOpWrite) {
        
            if (setOpWrite) {
                //socket緩衝區已滿寫不進去的情況 註冊write事件
                setOpWrite();
            } else {
                //處理socket緩衝區依然可寫,但是寫了16次還沒寫完,提交flushTask非同步寫
                clearOpWrite();
                eventLoop().execute(flushTask);
    
            }
    

四丶總結

這一節中我們學習了netty寫入資料的流程,寫入資料時出站事件,一般最終將有HeadContext進行處理

  • write方法將寫入的資料轉換為DirectByteBuf包裝到ChannelOutboundBuffer中,並且記錄了對應的Promise實現非同步驅動,還可以減少系統呼叫

  • flush方法,呼叫jdk SocketChannel#write進行寫入,使用自旋次數控制,讓多個Channel的處理得到平衡,如果Socket 緩衝區滿無法在繼續寫入那麼會OP_WRITE 事件,等 Socket 緩衝區變的可寫時,epoll 通知 EventLoop執行緒繼續傳送。

    Socket 緩衝區可寫,寫滿 16 次但依然沒有寫完,這時候註冊非同步任務使用EventLoop執行緒進行非同步傳送。如果寫的時FileRegion型別,那麼會使用transferTo進行零拷貝寫入。