作者: Grey
原文地址:
部落格園:Netty 學習(七):NioEventLoop 對應執行緒的建立和啟動原始碼說明
CSDN:Netty 學習(七):NioEventLoop 對應執行緒的建立和啟動原始碼說明
在 Netty 伺服器端程式碼中,我們一般會建立了兩個 NioEventLoopGroup:bossGroup 和 workerGroup
其中: bossGroup
用於監聽埠,接收新連線的執行緒組;workerGroup
用於處理每一個連線的資料讀寫的執行緒組。
NioEventLoop 的啟動入口在AbstractUnsafe
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
......
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
其中inEventLoop()
方法呼叫的是AbstractEventExecutor
的實現
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
而這個實現又呼叫了子類SingleThreadEventExecutor
的如下方法
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
在伺服器端剛啟動的時候,Thread.currentThread()
就是當前 main 方法對應的主執行緒,而this.thread
還沒有開始賦值,所以此時為null,
所以eventLoop.inEventLoop()
在一開始呼叫的時候,返回的是 false,進入AbstractUnsafe
的如下else
邏輯中
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
......
AbstractChannel.this.eventLoop = eventLoop;
// 首次執行的時候 eventLoop.inEventLoop() 返回 false,執行 else 邏輯
if (eventLoop.inEventLoop()) {
......
} else {
......
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
......
}
}
其中executor
方法對應的是SingleThreadEventExecutor
的execute
方法
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
......
}
}
if (!addTaskWakesUp && immediate) {
......
}
}
inEventLoop()
經過上述分析,為false
,所以執行startThread()
方法
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
這裡主要的邏輯就是判斷執行緒是否啟動,如果沒有啟動,就呼叫doStartThread()
啟動。doStartThread()
的邏輯是
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
...
SingleThreadEventExecutor.this.run();
......
}
});
}
通過一個成員變數thread
來儲存ThreadPerTaskExecutor
建立出來的執行緒(即:FastThreadLocalThread),NioEventLoop 儲存完執行緒的參照之後,隨即呼叫 run 方法。
workGroup 對應的 NioEventLoop 建立的執行緒主要做如下事情
執行一次事件輪詢。首先輪詢註冊到 Reactor 執行緒對應的 Selector 上的所有 Channel 的 IO 事件。
處理產生 IO 事件的 Channel。如果有讀寫或者新連線接入事件,則處理:
處理任務佇列。
以上三個步驟分別對應了下述三個方法
事件輪詢呼叫了NioEventLoop
的如下方法
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
呼叫的是NioEventLoop
的如下方法
private void processSelectedKeys() {
if (selectedKeys != null) {
// 處理優化過的 SelectedKeys
processSelectedKeysOptimized();
} else {
// 處理正常的 SelectedKeys
processSelectedKeysPlain(selector.selectedKeys());
}
}
上述兩個分支分別處理了不同型別的 key:重點關注優化過的 SelectedKeys,selectedKeys 在 NioEventLoop 中是一個SelectedSelectionKeySet
物件,這個物件雖然叫Set
,但是底層使用了陣列
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
......
}
add 方法的主要流程是:
將SelectionKey塞到該陣列的尾部;
更新該陣列的邏輯長度+1;
如果該陣列的邏輯長度等於陣列的物理長度,就將該陣列擴容。
待程式執行一段時間後,等陣列的長度足夠長,每次在輪詢到 NIO 事件的時候,Netty 只需要O(1)
的時間複雜度就能將SelectionKey
塞到set
中去,而 JDK 底層使用的HashSet,put的時間複雜度最少是O(1),最差是O(n)。
進入processSelectedKeysOptimized
方法
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
// and reset the index to -1 which will then set to 0 on the for loop
// to start over again.
//
// See https://github.com/netty/netty/issues/1523
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
主要是三個步驟:
第一步,取出 IO 事件及對應的 Channel。其中selectedKeys[i] = null;
的目的是防止記憶體漏失
第二步,處理 Channel
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
Netty 的輪詢序號產生器制其實是將 AbstractNioChannel 內部的 JDK 類 SelectableChannel 物件註冊到 JDK 類 Selector 物件上,並且將 AbstractNioChannel 作為SelectableChannel 物件的一個 attachment 附屬上,這樣在 JDK 輪詢出某條 SelectableChannel 有 IO 事件發生時,就可以直接取出 AbstractNioChannel 進行後續操作。
在Netty的Channel中,有兩大型別的Channel,
一個是NioServerSocketChannel,由boss NioEventLoopGroup負責處理;
一個是NioSocketChannel,由worker NioEventLoop負責處理,
所以:
(1)對於boss NioEventLoop來說,輪詢到的是連線事件,後續通過NioServerSocketChannel的Pipeline將連線交給一個worker NioEventLoop處理;
(2)對於worker NioEventLoop來說,輪詢到的是讀寫事件,後續通過NioSocketChannel的Pipeline將讀取到的資料傳遞給每個ChannelHandler來處理。
第三步,判斷是否需要再一次輪詢
由needsToSelectAgain
變數控制,needsToSelectAgain
變數在如下方法中被呼叫,在NioEventLoop
中
private static final int CLEANUP_INTERVAL = 256;
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
cancel方法是用於將key取消,並且在被取消的key到達CLEANUP_INTERVAL
的時候,設定needsToSelectAgain
為 true,CLEANUP_INTERVAL
預設值為256。
也就是說,對於每個NioEventLoop而言,每隔256個Channel從Selector上移除的時候,就標記needsToSelectAgain為true,然後將SelectedKeys的內部陣列全部清空,方便JVM垃圾回收,然後呼叫selectAgain重新填裝SelectionKeys陣列。
呼叫的是如下方法
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = getCurrentTimeNanos();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
一路追蹤下去,進入SingleThreadEventExecutor
的 offerTask()
方法
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
Netty 內部使用一個 taskQueue 將Task儲存起來。這個 taskQueue 其實是一個 MPSC Queue,每一個 NioEventLoop 都與它一一對應。
接下來執行SingleThreadEventExecutor
的 runAllTasks()
方法
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = getCurrentTimeNanos();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = getCurrentTimeNanos();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
主要流程如下:
1.NioEventLoop在執行過程中不斷檢測是否有事件發生,如果有事件發生就處理,處理完事件之後再處理外部執行緒提交過來的非同步任務。
2.在檢測是否有事件發生的時候,為了保證非同步任務的及時處理,只要有任務要處理,就立即停止事件檢測,隨即處理任務。
3.外部執行緒非同步執行的任務分為兩種:定時任務和普通任務,分別落地到 MpscQueue 和 PriorityQueue ,而 PriorityQueue 中的任務最終都會填充到 MpscQueue 中處理。
4.Netty每隔64個任務檢查一次是否該退出任務迴圈。
完整程式碼見:hello-netty
本文所有圖例見:processon: Netty學習筆記
更多內容見:Netty專欄
本文來自部落格園,作者:Grey Zeng,轉載請註明原文連結:https://www.cnblogs.com/greyzeng/p/16750342.html