Netty 學習(七):NioEventLoop 對應執行緒的建立和啟動原始碼說明

2022-10-03 15:00:26

Netty 學習(七):NioEventLoop 對應執行緒的建立和啟動原始碼說明

作者: Grey

原文地址:

部落格園:Netty 學習(七):NioEventLoop 對應執行緒的建立和啟動原始碼說明

CSDN:Netty 學習(七):NioEventLoop 對應執行緒的建立和啟動原始碼說明

說明

在 Netty 伺服器端程式碼中,我們一般會建立了兩個 NioEventLoopGroup:bossGroup 和 workerGroup

其中: bossGroup用於監聽埠,接收新連線的執行緒組;workerGroup 用於處理每一個連線的資料讀寫的執行緒組。

bossGroup 建立第一個 NioEventLoop 執行緒

NioEventLoop 的啟動入口在AbstractUnsafe

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ......
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

其中inEventLoop()方法呼叫的是AbstractEventExecutor的實現

    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }

而這個實現又呼叫了子類SingleThreadEventExecutor的如下方法

    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }

在伺服器端剛啟動的時候,Thread.currentThread()就是當前 main 方法對應的主執行緒,而this.thread還沒有開始賦值,所以此時為null,

所以eventLoop.inEventLoop()在一開始呼叫的時候,返回的是 false,進入AbstractUnsafe的如下else邏輯中

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ......
            AbstractChannel.this.eventLoop = eventLoop;
            // 首次執行的時候 eventLoop.inEventLoop() 返回 false,執行 else 邏輯
            if (eventLoop.inEventLoop()) {
                ......
            } else {
               ......
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
               ......
            }
        }

其中executor方法對應的是SingleThreadEventExecutorexecute方法

    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            if (isShutdown()) {
                ......
            }
        }

        if (!addTaskWakesUp && immediate) {
           ......
        }
    }

inEventLoop()經過上述分析,為false,所以執行startThread()方法

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

這裡主要的邏輯就是判斷執行緒是否啟動,如果沒有啟動,就呼叫doStartThread()啟動。doStartThread()的邏輯是

private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                ...
                SingleThreadEventExecutor.this.run();
                ......
            }
        });
    }

通過一個成員變數thread來儲存ThreadPerTaskExecutor建立出來的執行緒(即:FastThreadLocalThread),NioEventLoop 儲存完執行緒的參照之後,隨即呼叫 run 方法。

workGroup 對應的 NioEventLoop 建立執行緒和啟動

workGroup 對應的 NioEventLoop 建立的執行緒主要做如下事情

  1. 執行一次事件輪詢。首先輪詢註冊到 Reactor 執行緒對應的 Selector 上的所有 Channel 的 IO 事件。

  2. 處理產生 IO 事件的 Channel。如果有讀寫或者新連線接入事件,則處理:

  3. 處理任務佇列。

以上三個步驟分別對應了下述三個方法

事件輪詢

事件輪詢呼叫了NioEventLoop的如下方法

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            return selector.select();
        }
        // Timeout will only be 0 if deadline is within 5 microsecs
        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

處理 IO 事件的 Channel

呼叫的是NioEventLoop的如下方法

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            // 處理優化過的 SelectedKeys
            processSelectedKeysOptimized();
        } else {
            // 處理正常的 SelectedKeys
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

上述兩個分支分別處理了不同型別的 key:重點關注優化過的 SelectedKeys,selectedKeys 在 NioEventLoop 中是一個SelectedSelectionKeySet物件,這個物件雖然叫Set,但是底層使用了陣列

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }

        return true;
    }

    ......
}

add 方法的主要流程是:

  1. 將SelectionKey塞到該陣列的尾部;

  2. 更新該陣列的邏輯長度+1;

  3. 如果該陣列的邏輯長度等於陣列的物理長度,就將該陣列擴容。

待程式執行一段時間後,等陣列的長度足夠長,每次在輪詢到 NIO 事件的時候,Netty 只需要O(1)的時間複雜度就能將SelectionKey塞到set中去,而 JDK 底層使用的HashSet,put的時間複雜度最少是O(1),最差是O(n)。

進入processSelectedKeysOptimized方法

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                for (;;) {
                    i++;
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                }

                selectAgain();
                // Need to flip the optimized selectedKeys to get the right reference to the array
                // and reset the index to -1 which will then set to 0 on the for loop
                // to start over again.
                //
                // See https://github.com/netty/netty/issues/1523
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

主要是三個步驟:

第一步,取出 IO 事件及對應的 Channel。其中selectedKeys[i] = null;的目的是防止記憶體漏失

第二步,處理 Channel

if (a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
} else {
    @SuppressWarnings("unchecked")
    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
}

Netty 的輪詢序號產生器制其實是將 AbstractNioChannel 內部的 JDK 類 SelectableChannel 物件註冊到 JDK 類 Selector 物件上,並且將 AbstractNioChannel 作為SelectableChannel 物件的一個 attachment 附屬上,這樣在 JDK 輪詢出某條 SelectableChannel 有 IO 事件發生時,就可以直接取出 AbstractNioChannel 進行後續操作。

在Netty的Channel中,有兩大型別的Channel,

一個是NioServerSocketChannel,由boss NioEventLoopGroup負責處理;

一個是NioSocketChannel,由worker NioEventLoop負責處理,

所以:

(1)對於boss NioEventLoop來說,輪詢到的是連線事件,後續通過NioServerSocketChannel的Pipeline將連線交給一個worker NioEventLoop處理;

(2)對於worker NioEventLoop來說,輪詢到的是讀寫事件,後續通過NioSocketChannel的Pipeline將讀取到的資料傳遞給每個ChannelHandler來處理。

第三步,判斷是否需要再一次輪詢

needsToSelectAgain變數控制,needsToSelectAgain變數在如下方法中被呼叫,在NioEventLoop

private static final int CLEANUP_INTERVAL = 256;
    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }

cancel方法是用於將key取消,並且在被取消的key到達CLEANUP_INTERVAL的時候,設定needsToSelectAgain為 true,CLEANUP_INTERVAL預設值為256。

也就是說,對於每個NioEventLoop而言,每隔256個Channel從Selector上移除的時候,就標記needsToSelectAgain為true,然後將SelectedKeys的內部陣列全部清空,方便JVM垃圾回收,然後呼叫selectAgain重新填裝SelectionKeys陣列。

處理任務佇列

呼叫的是如下方法

    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {
            lastExecutionTime = getCurrentTimeNanos();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

一路追蹤下去,進入SingleThreadEventExecutorofferTask()方法

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }

Netty 內部使用一個 taskQueue 將Task儲存起來。這個 taskQueue 其實是一個 MPSC Queue,每一個 NioEventLoop 都與它一一對應。

接下來執行SingleThreadEventExecutorrunAllTasks()方法

   protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = getCurrentTimeNanos();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = getCurrentTimeNanos();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

主要流程如下:

1.NioEventLoop在執行過程中不斷檢測是否有事件發生,如果有事件發生就處理,處理完事件之後再處理外部執行緒提交過來的非同步任務。

2.在檢測是否有事件發生的時候,為了保證非同步任務的及時處理,只要有任務要處理,就立即停止事件檢測,隨即處理任務。

3.外部執行緒非同步執行的任務分為兩種:定時任務和普通任務,分別落地到 MpscQueue 和 PriorityQueue ,而 PriorityQueue 中的任務最終都會填充到 MpscQueue 中處理。

4.Netty每隔64個任務檢查一次是否該退出任務迴圈。

完整程式碼見:hello-netty

本文所有圖例見:processon: Netty學習筆記

更多內容見:Netty專欄

參考資料

跟閃電俠學 Netty:Netty 即時聊天實戰與底層原理

深度解析Netty原始碼