作者: Grey
原文地址:
部落格園:Netty 學習(六):建立 NioEventLoopGroup 的核心原始碼說明
CSDN:Netty 學習(六):建立 NioEventLoopGroup 的核心原始碼說明
基於 JDK 的 API 自己實現 NIO 程式設計,需要一個執行緒池來不斷監聽埠。接收到新連線之後,這條連線上資料的讀寫會在另外一個執行緒池中進行。
在 Netty 實現的伺服器端中, 有如下經典程式碼
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
// 設定伺服器端的執行緒模型。
// bossGroup 負責不斷接收新的連線,將新的連線交給 workerGroup 來處理。
b.group(bossGroup, workerGroup)
其中 bossGroup 對應的就是監聽埠的執行緒池,在繫結一個埠的情況下,這個執行緒池裡只有一個執行緒;workerGroup 對應的是連線的資料讀寫的執行緒。
通過 debug 並設定斷點的方式,我們來檢視下建立 NioEventLoopGroup 的核心過程,
在沒有指定執行緒數的情況下new NioEventLoopGroup()
會呼叫如下構造方法
public NioEventLoopGroup() {
this(0);
}
即傳入 0,然後一路跟下去,發現呼叫了MultithreadEventLoopGroup
的如下邏輯
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
由於我們傳入的nThreads == 0
,所以獲取DEFAULT_EVENT_LOOP_THREADS
的值,在MultithreadEventLoopGroup
中,DEFAULT_EVENT_LOOP_THREADS
的初始化邏輯如下
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
在nThreads == 0
的情況下,那麼 NioEventLoopGroup 的預設執行緒的個數為 CPU 的核數乘以 2,即:NettyRuntime.availableProcessors() * 2
。
繼續跟下去,可以看到 NioEventLoopGroup 呼叫了如下的構造方法,其核心程式碼如下
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
……
// 建立ThreadPerTaskExecutor:ThreadPerTaskExecutor表示每次呼叫execute()方法的時候,都會建立一個執行緒。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
……
// 2.建立NioEventLoop:NioEventLoop對應執行緒池裡執行緒的概念,這裡其實就是用一個for迴圈建立的。
children = new EventExecutor[nThreads];
……
for (int i = 0; i < nThreads; i ++) {
……
children[i] = newChild(executor, args);
……
}
// 3.建立執行緒選擇器:執行緒選擇器的作用是確定每次如何從執行緒池中選擇一個執行緒,也就是每次如何從NioEventLoopGroup中選擇一個NioEventLoop。
chooser = chooserFactory.newChooser(children);
……
}
這個構造方法包括了三個內容
建立 ThreadPerTaskExecutor:ThreadPerTaskExecutor 主要是用來建立執行緒。
建立 NioEventLoop:NioEventLoop 對應執行緒池裡執行緒的概念。
建立執行緒選擇器:執行緒選擇器的作用是確定每次如何從執行緒池中選擇一個執行緒,也就是每次如何從 NioEventLoopGroup 中選擇一個 NioEventLoop。
首先,我們看 ThreadPerTaskExecutor 如何建立執行緒,核心程式碼如下
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
這裡的 threadFactory 就是前面傳入的newDefaultThreadFactory()
,這個方法定義了預設執行緒的一些基本資訊,一路追蹤到DefaultThreadFactory
中
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
ObjectUtil.checkNotNull(poolName, "poolName");
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
// 建立執行緒,將 JDK 的 Runnable 包裝成 FastThreadLocalRunnable
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
可以看到 Netty 的執行緒實體是由 ThreadPerTaskExecutor 建立的,ThreadPerTaskExecutor 每次執行 execute 的時候都會建立一個 FastThreadLocalThread 的執行緒實體。
接下來是建立 NioEventLoop,Netty 使用 for 迴圈來建立 nThreads 個 NioEventLoop,通過前面的分析,我們可能已經猜到,一個NioEventLoop對應一個執行緒實體,即 Netty 自己封裝的 FastThreadLocalThread。
來到 NioEventLoop 的構造方法
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
......
final SelectorTuple selectorTuple = openSelector();
......
}
即建立了一個 Selector,Selector 是 NIO 程式設計裡最核心的概念,一個 Selector 可以將多個連線繫結在一起,負責監聽這些連線的讀寫事件,即多路複用。
繼續往上呼叫構造方法
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
......
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
......
}
NioEventLoop 重寫了 taskQueue 的建立邏輯
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
即建立一個 MPSC 佇列,
MPSC 佇列,Selector,NioEventLoop,這三者均為一對一關係。
接下來是建立執行緒選擇器,
chooser = chooserFactory.newChooser(children);
這裡的選擇器是
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
中的DefaultEventExecutorChooserFactory.INSTANCE
,進入
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
Netty 通過判斷 NioEventLoopGroup 中的 NioEventLoop 是否是2的冪來建立不同的執行緒選擇器,不管是哪一種選擇器,最終效果都是從第一個 NioEvenLoop 遍歷到最後一個NioEventLoop,再從第一個開始,如此迴圈。GenericEventExecutorChooser 通過簡單的累加取模來實現迴圈的邏輯,而 PowerOfTowEventExecutorChooser 是通過位運算實現的。
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
......
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
......
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
......
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
......
}
最後總結一下,NioEventLoopGroup 的建立核心就三步
建立ThreadPerTaskExecutor;
建立NioEventLoop;
建立執行緒選擇器。
完整程式碼見:hello-netty
本文所有圖例見:processon: Netty學習筆記
更多內容見:Netty專欄
本文來自部落格園,作者:Grey Zeng,轉載請註明原文連結:https://www.cnblogs.com/greyzeng/p/16747333.html