歡迎關注公眾號:bin的技術小屋,本文圖片載入不出來的話可檢視公眾號原文
本系列Netty原始碼解析文章基於 4.1.56.Final版本
在前邊的系列文章中,筆者為大家詳細剖析了 Reactor 模型在 netty 中的建立,啟動,執行,接收連線,接收資料,傳送資料的完整流程,在詳細剖析整個 Reactor 模型如何在 netty 中實現的過程裡,我們或多或少的見到了 pipeline 的身影。
比如在 Reactor 啟動的過程中首先需要建立 NioServerSocketChannel ,在建立的過程中會為 NioServerSocketChannel 建立分配一個 pipeline ,用於對 OP_ACCEPT 事件的編排。
當 NioServerSocketChannel 向 main reactor 註冊成功後,會在 pipeline 中觸發 ChannelRegistered 事件的傳播。
當 NioServerSocketChannel 繫結埠成功後,會在 pipeline 中觸發 ChannelActive 事件的傳播。
又比如在 Reactor 接收連線的過程中,當用戶端發起一個連線並完成三次握手之後,連線對應的 Socket 會存放在核心中的全連線佇列中,隨後 JDK Selector 會通知 main reactor 此時 NioServerSocketChannel 上有 OP_ACCEPT 事件活躍,最後 main reactor 開始執行 NioServerSocketChannel 的底層操作類 NioMessageUnsafe#read 方法在 NioServerSocketChannel 中的 pipeline 中傳播 ChannelRead 事件。
最終會在 NioServerSocketChannel 的 pipeline 中的 ServerBootstrapAcceptor 中響應 ChannelRead 事件並建立初始化 NioSocketChannel ,隨後會為每一個新建立的 NioSocetChannel 建立分配一個獨立的 pipeline ,用於各自 NioSocketChannel 上的 IO 事件的編排。並向 sub reactor 註冊 NioSocketChannel ,隨後在 NioSocketChannel 的 pipeline 中傳播 ChannelRegistered 事件,最後傳播 ChannelActive 事件。
還有在《Netty如何高效接收網路資料》一文中,我們也提過當 sub reactor 讀取 NioSocketChannel 中來自使用者端的請求資料時,會在 NioSocketChannel 的 pipeline 中傳播 ChannelRead 事件,在一個完整的 read loop 讀取完畢後會傳播 ChannelReadComplete 事件。
在《一文搞懂Netty傳送資料全流程》一文中,我們講到了在使用者經過業務處理後,通過 write 方法和 flush 方法分別在 NioSocketChannel 的 pipeline 中傳播 write 事件和 flush 事件的過程。
筆者帶大家又回顧了一下在前邊系列文章中關於 pipeline 的使用場景,但是在這些系列文章中並未對 pipeline 相關的細節進行完整全面地描述,那麼本文筆者將為大家詳細的剖析下 pipeline 在 IO 事件的編排和傳播場景下的完整實現原理。
Netty 會為每一個 Channel 分配一個獨立的 pipeline ,pipeline 伴隨著 channel 的建立而建立。
前邊介紹到 NioServerSocketChannel 是在 netty 伺服器端啟動的過程中建立的。而 NioSocketChannel 的建立是在當 NioServerSocketChannel 上的 OP_ACCEPT 事件活躍時,由 main reactor 執行緒在 NioServerSocketChannel 中建立,並在 NioServerSocketChannel 的 pipeline 中對 OP_ACCEPT 事件進行編排時(圖中的 ServerBootstrapAcceptor 中)初始化的。
無論是建立 NioServerSocketChannel 裡的 pipeline 還是建立 NioSocketChannel 裡的 pipeline , 最終都會委託給它們的父類別 AbstractChannel 。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel全域性唯一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用於底層socket的相關操作
unsafe = newUnsafe();
//為channel分配獨立的pipeline用於IO事件編排
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
....................
//pipeline中的頭結點
final AbstractChannelHandlerContext head;
//pipeline中的尾結點
final AbstractChannelHandlerContext tail;
//pipeline中持有對應channel的參照
private final Channel channel;
....................
protected DefaultChannelPipeline(Channel channel) {
//pipeline中持有對應channel的參照
this.channel = ObjectUtil.checkNotNull(channel, "channel");
............省略.......
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
....................
}
在前邊的系列文章中筆者多次提到過,pipeline 的結構是由 ChannelHandlerContext 型別的節點構成的雙向連結串列。其中頭結點為 HeadContext ,尾結點為 TailContext 。其初始結構如下:
private static final String HEAD_NAME = generateName0(HeadContext.class);
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
//headContext中持有對channel unsafe操作類的參照 用於執行channel底層操作
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
//持有channel unsafe操作類的參照,後續用於執行channel底層操作
unsafe = pipeline.channel().unsafe();
//設定channelHandler的狀態為ADD_COMPLETE
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
.......................
}
我們知道雙向連結串列結構的 pipeline 中的節點元素為 ChannelHandlerContext ,既然 HeadContext 作為 pipeline 的頭結點,那麼它一定是 ChannelHandlerContext 型別的,所以它需要繼承實現 AbstractChannelHandlerContext ,相當於一個哨兵的作用,因為使用者可以以任意順序向 pipeline 中新增 ChannelHandler ,需要用 HeadContext 來固定指向第一個 ChannelHandlerContext 。
在《一文搞懂Netty傳送資料全流程》 一文中的《1. ChannelHandlerContext》小節中,筆者曾為大家詳細介紹過 ChannelHandlerContext 在 pipeline 中的作用,忘記的同學可以在回看下。
於此同時 HeadContext 又實現了 ChannelInboundHandler 和 ChannelOutboundHandler 介面,說明 HeadContext 即是一個 ChannelHandlerContext 又是一個 ChannelHandler ,它可以同時處理 Inbound 事件和 Outbound 事件。
我們也注意到 HeadContext 中持有了對應 channel 的底層操作類 unsafe ,這也說明 IO 事件在 pipeline 中的傳播最終會落在 HeadContext 中進行最後的 IO 處理。它是 Inbound 事件的處理起點,也是 Outbound 事件的處理終點。這裡也可以看出 HeadContext 除了起到哨兵的作用,它還承擔了對 channel 底層相關的操作。
比如我們在《Reactor在Netty中的實現(啟動篇)》中介紹的 NioServerSocketChannel 在向 main reactor 註冊完成後會觸發 ChannelRegistered 事件從 HeadContext 開始依次在 pipeline 中向後傳播。
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
//此時firstRegistration已經變為false,在pipeline.invokeHandlerAddedIfNeeded中已被呼叫過
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
以及 NioServerSocketChannel 在與埠繫結成功後會觸發 ChannelActive 事件從 HeadContext 開始依次在 pipeline 中向後傳播,並在 HeadContext 中通過 unsafe.beginRead() 註冊 OP_ACCEPT 事件到 main reactor 中。
@Override
public void read(ChannelHandlerContext ctx) {
//觸發註冊OP_ACCEPT或者OP_READ事件
unsafe.beginRead();
}
同理在 NioSocketChannel 在向 sub reactor 註冊成功後。會先後觸發 ChannelRegistered 事件和 ChannelActive 事件從 HeadContext 開始在 pipeline 中向後傳播。並在 HeadContext 中通過 unsafe.beginRead() 註冊 OP_READ 事件到 sub reactor 中。
@Override
public void channelActive(ChannelHandlerContext ctx) {
//pipeline中繼續向後傳播channelActive事件
ctx.fireChannelActive();
//如果是autoRead 則自動觸發read事件傳播
//在read回撥函數中 觸發OP_ACCEPT或者OP_READ事件註冊
readIfIsAutoRead();
}
在《一文搞懂Netty傳送資料全流程》中介紹的 write 事件和 flush 事件最終會在 pipeline 中從後向前一直傳播到 HeadContext ,並在 HeadContext 中相應事件回撥函數中呼叫 unsafe 類操作底層 channel 傳送資料。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
//到headContext這裡 msg的型別必須是ByteBuffer,也就是說必須經過編碼器將業務層寫入的實體編碼為ByteBuffer
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
從本小節的內容介紹中,我們可以看出在 Netty 中對於 Channel 的相關底層操作呼叫均是在 HeadContext 中觸發的。
private static final String TAIL_NAME = generateName0(TailContext.class);
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
//設定channelHandler的狀態為ADD_COMPLETE
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
......................
}
同樣 TailContext 作為雙向連結串列結構的 pipeline 中的尾結點,也需要繼承實現 AbstractChannelHandlerContext 。但它同時又實現了 ChannelInboundHandler 。
這說明 TailContext 除了是一個 ChannelHandlerContext 同時也是一個 ChannelInboundHandler 。
TailContext 作為一個 ChannelHandlerContext 的作用是負責將 outbound 事件從 pipeline 的末尾一直向前傳播直到 HeadContext 。當然前提是使用者需要呼叫 channel 的相關 outbound 方法。
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
@Override
public final ChannelPipeline flush() {
tail.flush();
return this;
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
}
這裡我們可以看到,當我們在自定義 ChannelHandler 中呼叫 ctx.channel().write(msg)
時,會在 AbstractChannel 中觸發 pipeline.write(msg) ,最終在 DefaultChannelPipeline 中呼叫 tail.write(msg) 。使得 write 事件可以從 pipeline 的末尾開始向前傳播,其他 outbound 事件的傳播也是一樣的道理。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.channel().write(msg);
}
}
而我們自定義的 ChannelHandler 會被封裝在一個 ChannelHandlerContext 中從而加入到 pipeline 中,而這個用於裝載自定義 ChannelHandler 的 ChannelHandlerContext 與 TailContext 一樣本質也都是 ChannelHandlerContext ,只不過在 pipeline 中的位置不同罷了。
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
}
我們看到 ChannelHandlerContext 介面本身也會繼承 ChannelInboundInvoker
和 ChannelOutboundInvoker 介面,所以說 ContextHandlerContext 也可以觸發 inbound 事件和 outbound 事件,只不過表達的語意是在 pipeline 中從當前 ChannelHandler 開始向前或者向後傳播 outbound 事件或者 inbound 事件。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.write(msg);
}
}
這裡表示 write 事件從當前 EchoServerHandler 開始在 pipeline 中向前傳播直到 HeadContext 。
最後 TailContext 作為一個 ChannelInboundHandler 的作用就是為 inbound 事件在 pipeline 中的傳播做一個兜底的處理。
這裡提到的兜底處理是什麼意思呢?
比如我們前邊介紹到的,在 NioSocketChannel 向 sub reactor 註冊成功後之後觸發的 ChannelRegistered 事件和 ChannelActive 事件。或者在 reactor 執行緒讀取 NioSocketChannel 中的請求資料時所觸發的 channelRead 事件和 ChannelReadComplete 事件。
這些 inbound 事件都會首先從 HeadContext 開始在 pipeline 中一個一個的向後傳遞。
極端的情況是如果 pipeline 中所有 ChannelInboundHandler 中相應的 inbound 事件回撥方法均不對事件作出處理,並繼續向後傳播。如下範例程式碼所示:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.fireChannelReadComplete();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
}
最終這些 inbound 事件在 pipeline 中得不到處理,最後會傳播到 TailContext 中。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
onUnhandledInboundChannelReadComplete();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
onUnhandledInboundChannelActive();
}
}
而在 TailContext 中需要對這些得不到任何處理的 inbound 事件做出最終處理。比如丟棄該 msg,並釋放所佔用的 directByteBuffer,以免發生記憶體洩露。
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(msg);
if (logger.isDebugEnabled()) {
logger.debug("Discarded message pipeline : {}. Channel : {}.",
ctx.pipeline().names(), ctx.channel());
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
在前邊的系列文章中,筆者多次介紹過,Netty 中的 IO 事件一共分為兩大類: inbound 類事件和 outbound 類事件。其實如果嚴格來分的話應該分為三類。第三種事件型別為 exceptionCaught 異常事件型別。
而 exceptionCaught 事件在事件傳播角度上來說和 inbound 類事件一樣,都是從 pipeline 的 HeadContext 開始一直向後傳遞或者從當前 ChannelHandler 開始一直向後傳遞直到 TailContext 。所以一般也會將 exceptionCaught 事件統一歸為 inbound 類事件。
而根據事件型別的分類,相應負責處理事件回撥的 ChannelHandler 也會被分為兩類:
ChannelInboundHandler
:主要負責響應處理 inbound 類事件回撥和 exceptionCaught 事件回撥。
ChannelOutboundHandler
:主要負責響應處理 outbound 類事件回撥。
那麼我們常說的 inbound 類事件和 outbound 類事件具體都包含哪些事件呢?
final class ChannelHandlerMask {
// inbound事件集合
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
// inbound 類事件相關掩碼
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
}
netty 會將其支援的所有非同步事件用掩碼來表示,定義在 ChannelHandlerMask 類中, netty 框架通過這些事件掩碼可以很方便的知道使用者自定義的 ChannelHandler 是屬於什麼型別的(ChannelInboundHandler or ChannelOutboundHandler )。
除此之外,inbound 類事件如此之多,使用者也並不是對所有的 inbound 類事件感興趣,使用者可以在自定義的 ChannelInboundHandler 中覆蓋自己感興趣的 inbound 事件回撥,從而達到針對特定 inbound 事件的監聽。
這些使用者感興趣的 inbound 事件集合同樣也會用掩碼的形式儲存在自定義 ChannelHandler 對應的 ChannelHandlerContext 中,這樣當特定 inbound 事件在 pipeline 中開始傳播的時候,netty 可以根據對應 ChannelHandlerContext 中儲存的 inbound 事件集合掩碼來判斷,使用者自定義的 ChannelHandler 是否對該 inbound 事件感興趣,從而決定是否執行使用者自定義 ChannelHandler 中的相應回撥方法或者跳過對該 inbound 事件不感興趣的 ChannelHandler 繼續向後傳播。
從以上描述中,我們也可以窺探出,Netty 引入 ChannelHandlerContext 來封裝 ChannelHandler 的原因,在程式碼設計上還是遵循單一職責的原則, ChannelHandler 是使用者接觸最頻繁的一個 netty 元件,netty 希望使用者能夠把全部注意力放在最核心的 IO 處理上,使用者只需要關心自己對哪些非同步事件感興趣並考慮相應的處理邏輯即可,而並不需要關心非同步事件在 pipeline 中如何傳遞,如何選擇具有執行條件的 ChannelHandler 去執行或者跳過。這些切面性質的邏輯,netty 將它們作為上下文資訊全部封裝在 ChannelHandlerContext 中由netty框架本身負責處理。
以上這些內容,筆者還會在事件傳播相關小節做詳細的介紹,之所以這裡引出,還是為了讓大家感受下利用掩碼進行集合操作的便利性,netty 中類似這樣的設計還有很多,比如前邊系列文章中多次提到過的,channel 再向 reactor 註冊 IO 事件時,netty 也是將 channel 感興趣的 IO 事件用掩碼的形式儲存於 SelectionKey 中的 int interestOps 中。
接下來筆者就為大家介紹下這些 inbound 事件,並梳理出這些 inbound 事件的觸發時機。方便大家根據各自業務需求靈活地進行監聽。
在本小節介紹的這些 inbound 類事件在 pipeline 中傳播的過程中,如果在相應事件回撥函數執行的過程中發生異常,那麼就會觸發對應 ChannelHandler 中的 exceptionCaught 事件回撥。
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug(
"An exception {}" +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
ThrowableUtil.stackTraceToString(error), cause);
} else if (logger.isWarnEnabled()) {
logger.warn(
"An exception '{}' [enable DEBUG level for full stacktrace] " +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:", error, cause);
}
}
} else {
fireExceptionCaught(cause);
}
}
當然使用者可以選擇在 exceptionCaught 事件回撥中是否執行 ctx.fireExceptionCaught(cause) 從而決定是否將 exceptionCaught 事件繼續向後傳播。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
..........
ctx.fireExceptionCaught(cause);
}
當 netty 核心處理連線的接收,以及資料的讀取過程中如果發生異常,會在整個 pipeline 中觸發 exceptionCaught 事件的傳播。
這裡筆者為什麼要單獨強調在 inbound 事件傳播的過程中發生異常,才會回撥 exceptionCaught 呢 ?
因為 inbound 事件一般都是由 netty 核心觸發傳播的,而 outbound 事件一般都是由使用者選擇觸發的,比如使用者在處理完業務邏輯觸發的 write 事件或者 flush 事件。
而在使用者觸發 outbound 事件後,一般都會得到一個 ChannelPromise 。使用者可以向 ChannelPromise 新增各種 listener 。當 outbound 事件在傳播的過程中發生異常時,netty 會通知使用者持有的這個 ChannelPromise ,但不會觸發 exceptionCaught 的回撥。
比如我們在《一文搞懂Netty傳送資料全流程》一文中介紹到的在 write 事件傳播的過程中就不會觸發 exceptionCaught 事件回撥。只是去通知使用者的 ChannelPromise 。
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//呼叫當前ChannelHandler中的write方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
}
而 outbound 事件中只有 flush 事件的傳播是個例外,當 flush 事件在 pipeline 傳播的過程中發生異常時,會觸發對應異常 ChannelHandler 的 exceptionCaught 事件回撥。因為 flush 方法的簽名中不會給使用者返回 ChannelPromise 。
@Override
ChannelHandlerContext flush();
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
}
當 main reactor 在啟動的時候,NioServerSocketChannel 會被建立並初始化,隨後就會向main reactor註冊,當註冊成功後就會在 NioServerSocketChannel 中的 pipeline 中傳播 ChannelRegistered 事件。
當 main reactor 接收使用者端發起的連線後,NioSocketChannel 會被建立並初始化,隨後會向 sub reactor 註冊,當註冊成功後會在 NioSocketChannel 中的 pipeline 傳播 ChannelRegistered 事件。
private void register0(ChannelPromise promise) {
................
//執行真正的註冊操作
doRegister();
...........
//觸發channelRegister事件
pipeline.fireChannelRegistered();
.......
}
注意:此時對應的 channel 還沒有註冊 IO 事件到相應的 reactor 中。
當 NioServerSocketChannel 再向 main reactor 註冊成功並觸發 ChannelRegistered 事件傳播之後,隨後就會在 pipeline 中觸發 bind 事件,而 bind 事件是一個 outbound 事件,會從 pipeline 中的尾結點 TailContext 一直向前傳播最終在 HeadContext 中執行真正的繫結操作。
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
//觸發AbstractChannel->bind方法 執行JDK NIO SelectableChannel 執行底層繫結操作
unsafe.bind(localAddress, promise);
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
..............
doBind(localAddress);
...............
//繫結成功後 channel啟用 觸發channelActive事件傳播
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//HeadContext->channelActive回撥方法 執行註冊OP_ACCEPT事件
pipeline.fireChannelActive();
}
});
}
...............
}
當 netty 伺服器端 NioServerSocketChannel 繫結埠成功之後,才算是真正的 Active ,隨後觸發 ChannelActive 事件在 pipeline 中的傳播。
之前我們也提到過判斷 NioServerSocketChannel 是否 Active 的標準就是 : 底層 JDK Nio ServerSocketChannel 是否 open 並且 ServerSocket 是否已經完成繫結。
@Override
public boolean isActive() {
return isOpen() && javaChannel().socket().isBound();
}
而使用者端 NioSocketChannel 中觸發 ChannelActive 事件就會比較簡單,當 NioSocketChannel 再向 sub reactor 註冊成功並觸發 ChannelRegistered 之後,緊接著就會觸發 ChannelActive 事件在 pipeline 中傳播。
private void register0(ChannelPromise promise) {
................
//執行真正的註冊操作
doRegister();
...........
//觸發channelRegister事件
pipeline.fireChannelRegistered();
.......
if (isActive()) {
if (firstRegistration) {
//觸發channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
}
而使用者端 NioSocketChannel 是否 Active 的標識是:底層 JDK NIO
SocketChannel 是否 open 並且底層 socket 是否連線。毫無疑問,這裡的 socket 一定是 connected 。所以直接觸發 ChannelActive 事件。
@Override
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
注意:此時 channel 才會到相應的 reactor 中去註冊感興趣的 IO 事件。當用戶自定義的 ChannelHandler 接收到 ChannelActive 事件時,表明 IO 事件已經註冊到 reactor 中了。
當用戶端有新連線請求的時候,伺服器端的 NioServerSocketChannel 上的 OP_ACCEPT 事件會活躍,隨後 main reactor 會在一個 read loop 中不斷的呼叫 serverSocketChannel.accept() 接收新的連線直到全部接收完畢或者達到 read loop 最大次數 16 次。
在 NioServerSocketChannel 中,每 accept 一個新的連線,就會在 pipeline 中觸發 ChannelRead 事件。一個完整的 read loop 結束之後,會觸發 ChannelReadComplete 事件。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
......................
try {
do {
//底層呼叫NioServerSocketChannel->doReadMessages 建立使用者端SocketChannel
int localRead = doReadMessages(readBuf);
.................
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
pipeline.fireChannelReadComplete();
.................
}
}
當用戶端 NioSocketChannel 上有請求資料到來時,NioSocketChannel 上的 OP_READ 事件活躍,隨後 sub reactor 也會在一個 read loop 中對 NioSocketChannel 中的請求資料進行讀取直到讀取完畢或者達到 read loop 的最大次數 16 次。
在 read loop 的讀取過程中,每讀取一次就會在 pipeline 中觸發 ChannelRead 事件。當一個完整的 read loop 結束之後,會在 pipeline 中觸發 ChannelReadComplete 事件。
這裡需要注意的是當 ChannelReadComplete 事件觸發時,此時並不代表 NioSocketChannel 中的請求資料已經讀取完畢,可能的情況是傳送的請求資料太多,在一個 read loop 中讀取不完達到了最大限制次數 16 次,還沒全部讀取完畢就退出了 read loop 。一旦退出 read loop 就會觸發 ChannelReadComplete 事件。詳細內容可以檢視筆者的這篇文章《Netty如何高效接收網路資料》。
當我們處理完業務邏輯得到業務處理結果後,會呼叫 ctx.write(msg) 觸發 write 事件在 pipeline 中的傳播。
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.write(msg);
}
最終 netty 會將傳送資料 msg 寫入 NioSocketChannel 中的待傳送緩衝佇列 ChannelOutboundBuffer 中。並等待使用者呼叫 flush 操作從 ChannelOutboundBuffer 中將待傳送資料 msg ,寫入到底層 Socket 的傳送緩衝區中。
當對端的接收處理速度非常慢或者網路狀況極度擁塞時,使得 TCP 滑動視窗不斷的縮小,這就導致傳送端的傳送速度也變得越來越小,而此時使用者還在不斷的呼叫 ctx.write(msg) ,這就會導致 ChannelOutboundBuffer 會急劇增大,從而可能導致 OOM 。netty 引入了高低水位線來控制 ChannelOutboundBuffer 的記憶體佔用。
public final class WriteBufferWaterMark {
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
}
當 ChanneOutboundBuffer 中的記憶體佔用量超過高水位線時,netty 就會將對應的 channel 置為不可寫狀態,並在 pipeline 中觸發 ChannelWritabilityChanged 事件。
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0) {
//觸發fireChannelWritabilityChanged事件 表示當前channel變為不可寫
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
當 ChannelOutboundBuffer 中的記憶體佔用量低於低水位線時,netty 又會將對應的 NioSocketChannel 設定為可寫狀態,並再次觸發 ChannelWritabilityChanged 事件。
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
使用者可在自定義 ChannelHandler 中通過 ctx.channel().isWritable() 判斷當前 channel 是否可寫。
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isWritable()) {
...........當前channel可寫.........
} else {
...........當前channel不可寫.........
}
}
netty 提供了一種事件擴充套件機制可以允許使用者自定義非同步事件,這樣可以使得使用者能夠靈活的定義各種複雜場景的處理機制。
下面我們來看下如何在 Netty 中自定義非同步事件。
public final class OurOwnDefinedEvent {
public static final OurOwnDefinedEvent INSTANCE = new OurOwnDefinedEvent();
private OurOwnDefinedEvent() { }
}
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
......省略.......
//事件在pipeline中從當前ChannelHandlerContext開始向後傳播
ctx.fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);
//事件從pipeline的頭結點headContext開始向後傳播
ctx.channel().pipeline().fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);
}
}
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (OurOwnDefinedEvent.INSTANCE == evt) {
.....自定義事件處理......
}
}
}
後續隨著我們原始碼解讀的深入,我們還會看到 Netty 自己本身也定義了許多 UserEvent 事件,我們後面還會在介紹,大家這裡只是稍微瞭解一下相關的用法即可。
當 Channel 被關閉之後會在 pipeline 中先觸發 ChannelInactive 事件的傳播然後在觸發 ChannelUnregistered 事件的傳播。
我們可以在 Inbound 型別的 ChannelHandler 中響應 ChannelInactive 和 ChannelUnregistered 事件。
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
......響應inActive事件...
//繼續向後傳播inActive事件
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
......響應Unregistered事件...
//繼續向後傳播Unregistered事件
super.channelUnregistered(ctx);
}
這裡和連線建立之後的事件觸發順序正好相反,連線建立之後是先觸發 ChannelRegistered 事件然後在觸發 ChannelActive 事件。
final class ChannelHandlerMask {
// outbound 事件的集合
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
// outbound 事件掩碼
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
}
和 Inbound 類事件一樣,Outbound 類事件也有對應的掩碼錶示。下面我們來看下 Outbound類事件的觸發時機:
大家這裡需要注意區分 read 事件和 ChannelRead 事件的不同。
ChannelRead 事件前邊我們已經介紹了,當 NioServerSocketChannel 接收到新連線時,會觸發 ChannelRead 事件在其 pipeline 上傳播。
當 NioSocketChannel 上有請求資料時,在 read loop 中讀取請求資料時會觸發 ChannelRead 事件在其 pipeline 上傳播。
而 read 事件則和 ChannelRead 事件完全不同,read 事件特指使 Channel 具備感知 IO 事件的能力。NioServerSocketChannel 對應的 OP_ACCEPT 事件的感知能力,NioSocketChannel 對應的是 OP_READ 事件的感知能力。
read 事件的觸發是在當 channel 需要向其對應的 reactor 註冊讀型別事件時(比如 OP_ACCEPT 事件 和 OP_READ 事件)才會觸發。read 事件的響應就是將 channel 感興趣的 IO 事件註冊到對應的 reactor 上。
比如 NioServerSocketChannel 感興趣的是 OP_ACCEPT 事件, NioSocketChannel 感興趣的是 OP_READ 事件。
在前邊介紹 ChannelActive 事件時我們提到,當 channel 處於 active 狀態後會在 pipeline 中傳播 ChannelActive 事件。而在 HeadContext 中的 ChannelActive 事件回撥中會觸發 Read 事件的傳播。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
//如果是autoRead 則觸發read事件傳播
channel.read();
}
}
@Override
public void read(ChannelHandlerContext ctx) {
//觸發註冊OP_ACCEPT或者OP_READ事件
unsafe.beginRead();
}
}
而在 HeadContext 中的 read 事件回撥中會呼叫 Channel 的底層操作類 unsafe 的 beginRead 方法,在該方法中會向 reactor 註冊 channel 感興趣的 IO 事件。對於 NioServerSocketChannel 來說這裡註冊的就是 OP_ACCEPT 事件,對於 NioSocketChannel 來說這裡註冊的則是 OP_READ 事件。
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//註冊監聽OP_ACCEPT或者OP_READ事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
細心的同學可能注意到了 channel 對應的設定類中包含了一個 autoRead 屬性,那麼這個 autoRead 到底是幹什麼的呢?
其實這是 netty 為大家提供的一種背壓機制,用來防止 OOM ,想象一下當對端傳送資料非常多並且傳送速度非常快,而伺服器端處理速度非常慢,一時間消費不過來。而對端又在不停的大量傳送資料,伺服器端的 reactor 執行緒不得不在 read loop 中不停的讀取,並且為讀取到的資料分配 ByteBuffer 。而伺服器端業務執行緒又處理不過來,這就導致了大量來不及處理的資料佔用了大量的記憶體空間,從而導致 OOM 。
面對這種情況,我們可以通過 channelHandlerContext.channel().config().setAutoRead(false)
將 autoRead 屬性設定為 false 。隨後 netty 就會將 channel 中感興趣的讀型別事件從 reactor 中登出,從此 reactor 不會再對相應事件進行監聽。這樣 channel 就不會在讀取資料了。
這裡 NioServerSocketChannel 對應的是 OP_ACCEPT 事件, NioSocketChannel 對應的是 OP_READ 事件。
protected final void removeReadOp() {
SelectionKey key = selectionKey();
if (!key.isValid()) {
return;
}
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
key.interestOps(interestOps & ~readInterestOp);
}
}
而當伺服器端的處理速度恢復正常,我們又可以通過 channelHandlerContext.channel().config().setAutoRead(true)
將 autoRead 屬性設定為 true 。這樣 netty 會在 pipeline 中觸發 read 事件,最終在 HeadContext 中的 read 事件回撥方法中通過呼叫 unsafe#beginRead 方法將 channel 感興趣的讀型別事件重新註冊到對應的 reactor 中。
@Override
public ChannelConfig setAutoRead(boolean autoRead) {
boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
if (autoRead && !oldAutoRead) {
//autoRead從false變為true
channel.read();
} else if (!autoRead && oldAutoRead) {
//autoRead從true變為false
autoReadCleared();
}
return this;
}
read 事件可以理解為使 channel 擁有讀的能力,當有了讀的能力後, channelRead 就可以讀取具體的資料了。
write 事件和 flush 事件我們在《一文搞懂Netty傳送資料全流程》一文中已經非常詳盡的介紹過了,這裡筆者在帶大家簡單回顧一下。
write 事件和 flush 事件均由使用者在處理完業務請求得到業務結果後在業務執行緒中主動觸發。
使用者既可以通過 ChannelHandlerContext 觸發也可以通過 Channel 來觸發。
不同之處在於如果通過 ChannelHandlerContext 觸發,那麼 write 事件或者 flush 事件就會在 pipeline 中從當前 ChannelHandler 開始一直向前傳播直到 HeadContext 。
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
如果通過 Channel 觸發,那麼 write 事件和 flush 事件就會從 pipeline 的尾部節點 TailContext 開始一直向前傳播直到 HeadContext 。
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
ctx.channel().write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.channel().flush();
}
當然還有一個 writeAndFlush 方法,也會分為 ChannelHandlerContext 觸發和 Channel 的觸發。觸發 writeAndFlush 後,write 事件首先會在 pipeline 中傳播,最後 flush 事件在 pipeline 中傳播。
netty 對 write 事件的處理最終會將傳送資料寫入 Channel 對應的寫緩衝佇列 ChannelOutboundBuffer 中。此時資料並沒有傳送出去而是在寫緩衝佇列中快取,這也是 netty 實現非同步寫的核心設計。
最終通過 flush 操作從 Channel 中的寫緩衝佇列 ChannelOutboundBuffer 中獲取到待傳送資料,並寫入到 Socket 的傳送緩衝區中。
當用戶在 ChannelHandler 中呼叫如下方法對 Channel 進行關閉時,會觸發 Close 事件在 pipeline 中從後向前傳播。
//close事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.close();
//close事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().close();
我們可以在Outbound型別的ChannelHandler中響應close事件。
public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
.....使用者端channel關閉之前的處理回撥.....
//繼續向前傳播close事件
super.close(ctx, promise);
}
}
最終 close 事件會在 pipeline 中一直向前傳播直到頭結點 HeadConnect 中,並在 HeadContext 中完成連線關閉的操作,當連線完成關閉之後,會在 pipeline中先後觸發 ChannelInactive 事件和 ChannelUnregistered 事件。
使用者可呼叫如下程式碼將當前 Channel 從 Reactor 中登出掉。
//deregister事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.deregister();
//deregister事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().deregister();
我們可以在 Outbound 型別的 ChannelHandler 中響應 deregister 事件。
public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
.....使用者端channel取消註冊之前的處理回撥.....
//繼續向前傳播connect事件
super.deregister(ctx, promise);
}
}
最終 deRegister 事件會傳播至 pipeline 中的頭結點 HeadContext 中,並在 HeadContext 中完成底層 channel 取消註冊的操作。當 Channel 從 Reactor 上登出之後,從此 Reactor 將不會在監聽 Channel 上的 IO 事件,並觸發 ChannelUnregistered 事件在 pipeline 中傳播。
在 Netty 的使用者端中我們可以利用 NioSocketChannel 的 connect 方法觸發 connect 事件在 pipeline 中傳播。
//connect事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.connect(remoteAddress);
//connect事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().connect(remoteAddress);
我們可以在 Outbound 型別的 ChannelHandler 中響應 connect 事件。
public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
.....使用者端channel連線成功之前的處理回撥.....
//繼續向前傳播connect事件
super.connect(ctx, remoteAddress, localAddress, promise);
}
}
最終 connect 事件會在 pipeline 中的頭結點 headContext 中觸發底層的連線建立請求。當用戶端成功連線到伺服器端之後,會在使用者端 NioSocketChannel 的 pipeline 中傳播 channelActive 事件。
在 Netty 的使用者端中我們也可以呼叫 NioSocketChannel 的 disconnect 方法在 pipeline 中觸發 disconnect 事件,這會導致 NioSocketChannel 的關閉。
//disconnect事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.disconnect();
//disconnect事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().disconnect();
我們可以在 Outbound 型別的 ChannelHandler 中響應 disconnect 事件。
public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
.....使用者端channel即將關閉前的處理回撥.....
//繼續向前傳播disconnect事件
super.disconnect(ctx, promise);
}
}
最終 disconnect 事件會傳播到 HeadContext 中,並在 HeadContext 中完成底層的斷開連線操作,當用戶端斷開連線成功關閉之後,會在 pipeline 中先後觸發 ChannelInactive 事件和 ChannelUnregistered 事件。
在我們詳細介紹了全部的 inbound 類事件和 outbound 類事件的掩碼錶示以及事件的觸發和傳播路徑後,相信大家現在可以通過 ChannelInboundHandler 和 ChannelOutboundHandler 來根據具體的業務場景選擇合適的 ChannelHandler 型別以及監聽合適的事件來完成業務需求了。
本小節就該介紹一下自定義的 ChannelHandler 是如何新增到 pipeline 中的,netty 在這個過程中幫我們作了哪些工作?
final EchoServerHandler serverHandler = new EchoServerHandler();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.............
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
......可新增多個channelHandler......
}
});
以上是筆者簡化的一個 netty 伺服器端設定 ServerBootstrap 啟動類的一段範例程式碼。我們可以看到再向 channel 對應的 pipeline 中新增 ChannelHandler 是通過 ChannelPipeline#addLast 方法將指定 ChannelHandler 新增到 pipeline 的末尾處。
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
//向pipeline的末尾處批次新增多個channelHandler
ChannelPipeline addLast(ChannelHandler... handlers);
//指定channelHandler的executor,由指定的executor執行channelHandler中的回撥方法
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
//為channelHandler指定名稱
ChannelPipeline addLast(String name, ChannelHandler handler);
//為channelHandler指定executor和name
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
}
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
}
最終 addLast 的這些過載方法都會呼叫到 DefaultChannelPipeline#addLast(EventExecutorGroup, String, ChannelHandler)
這個方法從而完成 ChannelHandler 的新增。
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//檢查同一個channelHandler範例是否允許被重複新增
checkMultiplicity(handler);
//建立channelHandlerContext包裹channelHandler並封裝執行傳播事件相關的上下文資訊
newCtx = newContext(group, filterName(name, handler), handler);
//將channelHandelrContext插入到pipeline中的末尾處。雙向連結串列操作
//此時channelHandler的狀態還是ADD_PENDING,只有當channelHandler的handlerAdded方法被回撥後,狀態才會為ADD_COMPLETE
addLast0(newCtx);
//如果當前channel還沒有向reactor註冊,則將handlerAdded方法的回撥新增進pipeline的任務佇列中
if (!registered) {
//這裡主要是用來處理ChannelInitializer的情況
//設定channelHandler的狀態為ADD_PENDING 即等待新增,當狀態變為ADD_COMPLETE時 channelHandler中的handlerAdded會被回撥
newCtx.setAddPending();
//向pipeline中新增PendingHandlerAddedTask任務,在任務中回撥handlerAdded
//當channel註冊到reactor後,pipeline中的pendingHandlerCallbackHead任務連結串列會被挨個執行
callHandlerCallbackLater(newCtx, true);
return this;
}
//如果當前channel已經向reactor註冊成功,那麼就直接回撥channelHandler中的handlerAddded方法
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
//這裡需要確保channelHandler中handlerAdded方法的回撥是在channel指定的executor中
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
//回撥channelHandler中的handlerAddded方法
callHandlerAdded0(newCtx);
return this;
}
這個方法的邏輯還是比較複雜的,涉及到很多細節,為了清晰地為大家講述,筆者這裡還是採用總分總的結構,先描述該方法的總體邏輯,然後在針對核心細節要點展開細節分析。
因為向 pipeline 中新增 channelHandler 的操作可能會在多個執行緒中進行,所以為了確保新增操作的執行緒安全性,這裡採用一個 synchronized 語句塊將整個新增邏輯包裹起來。
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
//只有標註@Sharable註解的channelHandler,才被允許同一個範例被新增進多個pipeline中
//注意:標註@Sharable之後,一個channelHandler的範例可以被新增到多個channel對應的pipeline中
//可能被多執行緒執行,需要確保執行緒安全
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
這裡大家需要注意的是,如果一個 ChannelHandler 被標註了 @Sharable 註解,這就意味著它的一個範例可以被多次新增進多個 pipeline 中(每個 channel 對應一個 pipeline 範例),而這多個不同的 pipeline 可能會被不同的 reactor 執行緒執行,所以在使用共用 ChannelHandler 的時候需要確保其執行緒安全性。
比如下面的範例程式碼:
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
.............需要確保執行緒安全.......
}
final EchoServerHandler serverHandler = new EchoServerHandler();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
..................
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});
EchoServerHandler 為我們自定義的 ChannelHandler ,它被 @Sharable 註解標註,全域性只有一個範例,被新增進多個 Channel 的 pipeline 中。從而會被多個 reactor 執行緒執行到。
為 ChannelHandler 建立其 ChannelHandlerContext ,用於封裝 ChannelHandler 的名稱,狀態資訊,執行上下文資訊,以及用於感知 ChannelHandler 在 pipeline 中的位置資訊。newContext 方法涉及的細節較多,後面我們單獨介紹。
通過 addLast0 將新建立出來的 ChannelHandlerContext 插入到 pipeline 中末尾處。方法的邏輯很簡單其實就是一個普通的雙向連結串列插入操作。
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
但是這裡大家需要注意的點是:雖然此時 ChannelHandlerContext 被物理的插入到了 pipeline 中,但是此時 channelHandler 的狀態依然為 INIT 狀態,從邏輯上來說並未算是真正的插入到 pipeline 中,需要等到 ChannelHandler 的 handlerAdded 方法被回撥時,狀態才變為 ADD_COMPLETE ,而只有 ADD_COMPLETE 狀態的 ChannelHandler 才能響應 pipeline 中傳播的事件。
在上篇文章《一文搞懂Netty傳送資料全流程》中的《3.1.5 觸發nextChannelHandler的write方法回撥》小節中我們也提過,在每次 write 事件或者 flush 事件傳播的時候,都需要通過 invokeHandler 方法來判斷 channelHandler 的狀態是否為 ADD_COMPLETE ,否則當前 channelHandler 則不能響應正在 pipeline 中傳播的事件。必須要等到對應的 handlerAdded 方法被回撥才可以,因為 handlerAdded 方法中可能包含一些 ChannelHandler 初始化的重要邏輯。
private boolean invokeHandler() {
// 這裡是一個優化點,netty 用一個區域性變數儲存 handlerState
// 目的是減少 volatile 變數 handlerState 的讀取次數
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
// 當前channelHandler雖然新增到pipeline中,但是並沒有呼叫handlerAdded
// 所以不能呼叫當前channelHandler中的回撥方法,只能繼續向前傳遞write事件
write(msg, promise);
}
}
private void invokeFlush() {
if (invokeHandler()) {
invokeFlush0();
} else {
//如果該ChannelHandler雖然加入到pipeline中但handlerAdded方法並未被回撥,則繼續向前傳遞flush事件
flush();
}
}
事實上不僅僅是 write 事件和 flush 事件在傳播的時候需要判斷 ChannelHandler 的狀態,所有的 inbound 類事件和 outbound 類事件在傳播的時候都需要通過 invokeHandler 方法來判斷當前 ChannelHandler 的狀態是否為 ADD_COMPLETE ,需要確保在 ChannelHandler 響應事件之前,它的 handlerAdded 方法被回撥。
這段邏輯主要用來處理 ChannelInitializer 的新增場景,因為目前只有 ChannelInitializer 這個特殊的 channelHandler 會在 channel 沒有註冊之前被新增進 pipeline 中
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
向 pipeline 的任務列表 pendingHandlerCallbackHead 中新增 PendingHandlerAddedTask 任務:
public class DefaultChannelPipeline implements ChannelPipeline {
// pipeline中的任務列表
private PendingHandlerCallback pendingHandlerCallbackHead;
// 向任務列表尾部新增PendingHandlerAddedTask
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
}
PendingHandlerAddedTask 任務負責回撥 ChannelHandler 中的 handlerAdded 方法。
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
...............
@Override
public void run() {
callHandlerAdded0(ctx);
}
...............
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
...............
}
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
callHandlerAdded0(newCtx);
如果當前執行執行緒並不是 ChannelHandler 指定的 executor ( !executor.inEventLoop() ),那麼就需要確保 handlerAdded 方法的回撥在 channel 指定的 executor 中進行。
private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
}
這裡需要注意的是需要在回撥 handlerAdded 方法之前將 ChannelHandler 的狀態提前設定為 ADD_COMPLETE 。 因為使用者可能在 ChannelHandler 中的 handerAdded 回撥中觸發一些事件,而如果此時 ChannelHandler 的狀態不是 ADD_COMPLETE 的話,就會停止對事件的響應,從而錯過事件的處理。
這種屬於一種使用者極端的使用情況。
final void callHandlerAdded() throws Exception {
if (setAddComplete()) {
handler().handlerAdded(this);
}
}
在介紹完 ChannelHandler 向 pipeline 新增的整個邏輯過程後,本小節我們來看下如何為 ChannelHandler 建立對應的 ChannelHandlerContext ,以及 ChannelHandlerContext 中具體包含了哪些上下文資訊。
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
................
//建立channelHandlerContext包裹channelHandler並封裝執行傳播相關的上下文資訊
newCtx = newContext(group, filterName(name, handler), handler);
................
}
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
}
在建立 ChannelHandlerContext 之前,需要做兩個重要的前置操作:
通過 filterName 方法為 ChannelHandlerContext 過濾出在 pipeline 中唯一的名稱。
如果使用者為 ChannelHandler 指定了特殊的 EventExecutorGroup ,這裡就需要通過 childExecutor 方法從指定的 EventExecutorGroup 中選出一個 EventExecutor 與 ChannelHandler 繫結。
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
// 如果沒有指定name,則會為handler預設生成一個name,該方法可確保預設生成的name在pipeline中不會重複
return generateName(handler);
}
// 如果指定了name,需要確保name在pipeline中是唯一的
checkDuplicateName(name);
return name;
}
如果使用者再向 pipeline 新增 ChannelHandler 的時候,為其指定了具體的名稱,那麼這裡需要確保使用者指定的名稱在 pipeline 中是唯一的。
private void checkDuplicateName(String name) {
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
/**
* 通過指定名稱在pipeline中查詢對應的channelHandler 沒有返回null
* */
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
如果使用者沒有為 ChannelHandler 指定名稱,那麼就需要為 ChannelHandler 在 pipeline 中預設生成一個唯一的名稱。
// pipeline中channelHandler對應的name快取
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() {
return new WeakHashMap<Class<?>, String>();
}
};
private String generateName(ChannelHandler handler) {
// 獲取pipeline中channelHandler對應的name快取
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
if (name == null) {
// 當前handler還沒對應的name快取,則預設生成:simpleClassName + #0
name = generateName0(handlerType);
cache.put(handlerType, name);
}
if (context0(name) != null) {
// 不斷重試名稱字尾#n + 1 直到沒有重複
String baseName = name.substring(0, name.length() - 1);
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
private static String generateName0(Class<?> handlerType) {
return StringUtil.simpleClassName(handlerType) + "#0";
}
pipeline 中使用了一個 FastThreadLocal 型別的 nameCaches 來快取各種型別 ChannelHandler 的基礎名稱。後面會根據這個基礎名稱不斷的重試生成一個沒有衝突的正式名稱。快取 nameCaches 中的 key 表示特定的 ChannelHandler 型別,value 表示該特定型別的 ChannelHandler 的基礎名稱 simpleClassName + #0
。
自動為 ChannelHandler 生成預設名稱的邏輯是:
首先從快取中 nameCaches 獲取當前新增的 ChannelHandler 的基礎名稱 simpleClassName + #0
。
如果該基礎名稱 simpleClassName + #0
在 pipeline 中是唯一的,那麼就將基礎名稱作為 ChannelHandler 的名稱。
如果快取的基礎名稱在 pipeline 中不是唯一的,則不斷的增加名稱字尾 simpleClassName#1 ,simpleClassName#2 ...... simpleClassName#n
直到產生一個沒有重複的名稱。
雖然使用者不大可能將同一型別的 channelHandler 重複新增到 pipeline 中,但是 netty 為了防止這種反覆新增同一型別 ChannelHandler 的行為導致的名稱衝突,從而利用 nameCaches 來快取同一型別 ChannelHandler 的基礎名稱
simpleClassName + #0
,然後通過不斷的重試遞增名稱字尾,來生成一個在pipeline中唯一的名稱。
通過前邊的介紹我們瞭解到,當我們向 pipeline 新增 ChannelHandler 的時候,netty 允許我們為 ChannelHandler 指定特定的 executor 去執行 ChannelHandler 中的各種事件回撥方法。
通常我們會為 ChannelHandler 指定一個EventExecutorGroup,在建立ChannelHandlerContext 的時候,會通過 childExecutor 方法從 EventExecutorGroup 中選取一個 EventExecutor 來與該 ChannelHandler 繫結。
EventExecutorGroup 是 netty 自定義的一個執行緒池模型,其中包含多個 EventExecutor ,而 EventExecutor 在 netty 中是一個執行緒的執行模型。相關的具體實現和用法筆者已經在《Reactor在Netty中的實現(建立篇)》一文中給出了詳盡的介紹,忘記的同學可以在回顧下。
在介紹 executor 的繫結邏輯之前,這裡筆者需要先為大家介紹一個相關的重要引數:SINGLE_EVENTEXECUTOR_PER_GROUP
,預設為 true 。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.........
.childOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP,true)
我們知道在 netty 中,每一個 channel 都會對應一個獨立的 pipeline ,如果我們開啟了 SINGLE_EVENTEXECUTOR_PER_GROUP
引數,表示在一個 channel 對應的 pipeline 中,如果我們為多個 ChannelHandler 指定了同一個 EventExecutorGroup ,那麼這多個 channelHandler 只能繫結到 EventExecutorGroup 中的同一個 EventExecutor 上。
什麼意思呢??比如我們有下面一段初始化pipeline
的程式碼:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
........................
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(eventExecutorGroup,channelHandler1)
pipeline.addLast(eventExecutorGroup,channelHandler2)
pipeline.addLast(eventExecutorGroup,channelHandler3)
}
});
eventExecutorGroup 中包含 EventExecutor1,EventExecutor2 , EventExecutor3 三個執行執行緒。
假設此時第一個連線進來,在建立 channel1 後初始化 pipeline1 的時候,如果在開啟 SINGLE_EVENTEXECUTOR_PER_GROUP
引數的情況下,那麼在 channel1 對應的 pipeline1 中 channelHandler1,channelHandler2 , channelHandler3 繫結的 EventExecutor 均為 EventExecutorGroup 中的 EventExecutor1 。
第二個連線 channel2 對應的 pipeline2 中 channelHandler1 , channelHandler2 ,channelHandler3 繫結的 EventExecutor 均為 EventExecutorGroup 中的 EventExecutor2 。
第三個連線 channel3 對應的 pipeline3 中 channelHandler1 , channelHandler2 ,channelHandler3 繫結的 EventExecutor 均為 EventExecutorGroup 中的 EventExecutor3 。
以此類推........
如果在關閉 SINGLE_EVENTEXECUTOR_PER_GROUP
引數的情況下,
channel1 對應的 pipeline1 中 channelHandler1 會繫結到 EventExecutorGroup 中的 EventExecutor1 ,channelHandler2 會繫結到 EventExecutor2 ,channelHandler3 會繫結到 EventExecutor3 。
同理其他 channel 對應的 pipeline 中的 channelHandler 繫結邏輯同 channel1 。它們均會繫結到 EventExecutorGroup 中的不同 EventExecutor 中。
當我們瞭解了 SINGLE_EVENTEXECUTOR_PER_GROUP
引數的作用之後,再來看下面這段繫結邏輯就很容易理解了。
// 在每個pipeline中都會儲存EventExecutorGroup中繫結的執行緒
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
//如果沒有開啟SINGLE_EVENTEXECUTOR_PER_GROUP,則按順序從指定的EventExecutorGroup中為channelHandler分配EventExecutor
return group.next();
}
//獲取pipeline繫結到EventExecutorGroup的執行緒(在一個pipeline中會為每個指定的EventExecutorGroup繫結一個固定的執行緒)
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
//獲取該pipeline繫結在指定EventExecutorGroup中的執行緒
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
如果我們並未特殊指定 ChannelHandler 的 executor ,那麼預設會是對應 channel 繫結的 reactor 執行緒負責執行該 ChannelHandler 。
如果我們未開啟 SINGLE_EVENTEXECUTOR_PER_GROUP
,netty 就會從我們指定的 EventExecutorGroup 中按照 round-robin 的方式為 ChannelHandler 繫結其中一個 eventExecutor 。
如果我們開啟了 SINGLE_EVENTEXECUTOR_PER_GROUP
,相同的 EventExecutorGroup 在同一個 pipeline 範例中的繫結關係是固定的。在 pipeline 中如果多個 channelHandler 指定了同一個 EventExecutorGroup ,那麼這些 channelHandler 的 executor 均會繫結到一個固定的 eventExecutor 上。
這種固定的繫結關係快取於每個 pipeline 中的 Map<EventExecutorGroup, EventExecutor> childExecutors 欄位中,key 是使用者為 channelHandler 指定的 EventExecutorGroup ,value 為該 EventExecutorGroup 在 pipeline 範例中的繫結 eventExecutor 。
接下來就是從 childExecutors 中獲取指定 EventExecutorGroup 在該 pipeline 範例中的繫結 eventExecutor,如果繫結關係還未建立,則通過 round-robin 的方式從 EventExecutorGroup 中選取一個 eventExecutor 進行繫結,並在 childExecutor 中快取繫結關係。
如果繫結關係已經建立,則直接為 ChannelHandler 指定繫結好的 eventExecutor。
在介紹完建立 ChannelHandlerContext 的兩個前置操作後,我們回頭來看下 ChannelHandlerContext 中包含了哪些具體的上下文資訊。
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
// ChannelHandlerContext包裹的channelHandler
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass());
//包裹的channelHandler
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
//對應channelHandler的名稱
private final String name;
//ChannelHandlerContext中持有pipeline的參照
private final DefaultChannelPipeline pipeline;
// channelHandler對應的executor 預設為reactor
final EventExecutor executor;
//channelHandlerContext中儲存channelHandler的執行條件掩碼(是什麼型別的ChannelHandler,對什麼事件感興趣)
private final int executionMask;
//false表示 當channelHandler的狀態為ADD_PENDING的時候,也可以響應pipeline中的事件
//true表示只有在channelHandler的狀態為ADD_COMPLETE的時候才能響應pipeline中的事件
private final boolean ordered;
//channelHandelr的狀態,初始化為INIT
private volatile int handlerState = INIT;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
//channelHandlerContext中儲存channelHandler的執行條件掩碼(是什麼型別的ChannelHandler,對什麼事件感興趣)
this.executionMask = mask(handlerClass);
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
}
這裡筆者重點介紹 orderd 屬性和 executionMask 屬性,其他的屬性大家很容易理解。
ordered = executor == null || executor instanceof OrderedEventExecutor;
當我們不指定 channelHandler 的 executor 時或者指定的 executor 型別為 OrderedEventExecutor 時,ordered = true。
那麼這個 ordered 屬性對於 ChannelHandler 響應 pipeline 中的事件有什麼影響呢?
我們之前介紹過在 ChannelHandler 響應 pipeline 中的事件之前都會呼叫 invokeHandler() 方法來判斷是否回撥 ChannelHandler 的事件回撥方法還是跳過。
private boolean invokeHandler() {
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
當 ordered == false
時,channelHandler 的狀態為 ADD_PENDING 的時候,也可以響應 pipeline 中的事件。
當 ordered == true
時,只有在 channelHandler 的狀態為 ADD_COMPLETE 的時候才能響應 pipeline 中的事件
另一個重要的屬性 executionMask
儲存的是當前 ChannelHandler 的一些執行條件資訊掩碼,比如:
當前 ChannelHandler 是什麼型別的( ChannelInboundHandler or ChannelOutboundHandler ?)。
當前 ChannelHandler 對哪些事件感興趣(覆蓋了哪些事件回撥方法?)
private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
@Override
protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32);
}
};
static int mask(Class<? extends ChannelHandler> clazz) {
// 因為每建立一個channel就會初始化一個pipeline,這裡需要將ChannelHandler對應的mask快取
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
// 計算ChannelHandler對應的mask(什麼型別的ChannelHandler,對什麼事件感興趣)
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
這裡需要一個 FastThreadLocal 型別的 MASKS 欄位來快取 ChannelHandler 對應的執行掩碼。因為 ChannelHandler 類一旦被定義出來它的執行掩碼就固定了,而 netty 需要接收大量的連線,建立大量的 channel ,併為這些 channel 初始化對應的 pipeline ,需要頻繁的記錄 channelHandler 的執行掩碼到 context 類中,所以這裡需要將掩碼快取起來。
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
//如果該ChannelHandler是Inbound型別的,則先將inbound事件全部設定進掩碼中
mask |= MASK_ALL_INBOUND;
//最後在對不感興趣的事件一一排除(handler中的事件回撥方法如果標註了@Skip註解,則認為handler對該事件不感興趣)
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
//如果handler為Outbound型別的,則先將全部outbound事件設定進掩碼中
mask |= MASK_ALL_OUTBOUND;
//最後對handler不感興趣的事件從掩碼中一一排除
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
//計算出的掩碼需要快取,因為每次向pipeline中新增該型別的handler的時候都需要獲取掩碼(建立一個channel 就需要為其初始化pipeline)
return mask;
}
計算 ChannelHandler 的執行掩碼 mask0 方法雖然比較長,但是邏輯卻十分簡單。在本文的第三小節《3. pipeline中的事件分類》中,筆者為大家詳細介紹了各種事件型別的掩碼錶示,這裡我來看下如何利用這些基本事件掩碼來計算出 ChannelHandler 的執行掩碼的。
如果 ChannelHandler 是 ChannelInboundHandler 型別的,那麼首先會將所有 Inbound 事件掩碼設定進執行掩碼 mask 中。
最後挨個遍歷所有 Inbound 事件,從掩碼集合 mask 中排除該 ChannelHandler 不感興趣的事件。這樣一輪下來,就得到了 ChannelHandler 的執行掩碼。
從這個過程中我們可以看到,ChannelHandler 的執行掩碼包含的是該 ChannelHandler 感興趣的事件掩碼集合。當事件在 pipeline 中傳播的時候,在 ChannelHandlerContext 中可以利用這個執行掩碼來判斷,當前 ChannelHandler 是否符合響應該事件的資格。
同理我們也可以計算出 ChannelOutboundHandler 型別的 ChannelHandler 對應的執行掩碼。
那麼 netty 框架是如何判斷出我們自定義的 ChannelHandler 對哪些事件感興趣,對哪些事件不感興趣的呢?
這裡我們以 ChannelInboundHandler 型別舉例說明,在本文第三小節中,筆者對所有 Inbound 型別的事件作了一個全面的介紹,但是在實際開發中,我們可能並不需要監聽所有的 Inbound 事件,可能只是需要監聽其中的一到兩個事件。
對於我們不感興趣的事件,我們只需要在其對應的回撥方法上標註 @Skip 註解即可,netty 就會認為該 ChannelHandler 對標註 @Skip 註解的事件不感興趣,當不感興趣的事件在 pipeline 傳播的時候,該 ChannelHandler 就不需要執行響應。
private static boolean isSkippable(
final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
Method m;
try {
// 首先檢視類中是否覆蓋實現了對應的事件回撥方法
m = handlerType.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {
if (logger.isDebugEnabled()) {
logger.debug(
"Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
}
return false;
}
return m != null && m.isAnnotationPresent(Skip.class);
}
});
}
那我們在編寫自定義 ChannelHandler 的時候是不是要在 ChannelInboundHandler 或者 ChannelOutboundHandler 介面提供的所有事件回撥方法上,對我們不感興趣的事件繁瑣地一一標註 @Skip 註解呢?
其實是不需要的,netty 為我們提供了 ChannelInboundHandlerAdapter 類和 ChannelOutboundHandlerAdapter 類,netty 事先已經在這些 Adapter 類中的事件回撥方法上全部標註了 @Skip 註解,我們在自定義實現 ChannelHandler 的時候只需要繼承這些 Adapter 類並覆蓋我們感興趣的事件回撥方法即可。
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
@Skip
@Override
@SuppressWarnings("deprecation")
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}
從上個小節的內容中我們可以看到向 pipeline 中新增 ChannelHandler 的邏輯還是比較複雜的,涉及到的細節比較多。
那麼在瞭解了向 pipeline 中新增 ChannelHandler 的過程之後,從 pipeline 中刪除 ChannelHandler 的邏輯就變得很好理解了。
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
//從pipeline中刪除指定的channelHandler
ChannelPipeline remove(ChannelHandler handler);
//從pipeline中刪除指定名稱的channelHandler
ChannelHandler remove(String name);
//從pipeline中刪除特定型別的channelHandler
<T extends ChannelHandler> T remove(Class<T> handlerType);
}
netty 提供了以上三種方式從 pipeline 中刪除指定 ChannelHandler ,下面我們以第一種方式為例來介紹 ChannelHandler 的刪除過程。
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
}
首先需要通過 getContextOrDie 方法在 pipeline 中查詢到指定的 ChannelHandler 對應的 ChannelHandelrContext 。以便確認要刪除的 ChannelHandler 確實是存在於 pipeline 中。
context 方法是通過遍歷 pipeline 中的雙向連結串列來查詢要刪除的 ChannelHandlerContext 。
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
ObjectUtil.checkNotNull(handler, "handler");
// 獲取 pipeline 雙向連結串列結構的頭結點
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
remove 方法的整體程式碼結構和 addLast0 方法的程式碼結構一樣,整體邏輯也是先從 pipeline 中的雙向連結串列結構中將指定的 ChanneHandlerContext 刪除,然後在處理被刪除的 ChannelHandler 中 handlerRemoved 方法的回撥。
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
synchronized (this) {
//從pipeline的雙向列表中刪除指定channelHandler對應的context
atomicRemoveFromHandlerList(ctx);
if (!registered) {
//如果此時channel還未向reactor註冊,則通過向pipeline中新增PendingHandlerRemovedTask任務
//在註冊之後回撥channelHandelr中的handlerRemoved方法
callHandlerCallbackLater(ctx, false);
return ctx;
}
//channelHandelr從pipeline中刪除後,需要回撥其handlerRemoved方法
//需要確保handlerRemoved方法在channelHandelr指定的executor中進行
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
callHandlerRemoved0(ctx);
return ctx;
}
private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerRemoved0(ctx);
}
}
在執行 ChannelHandler 中 handlerRemoved 回撥的時候,需要對 ChannelHandler 的狀態進行判斷:只有當 handlerState 為 ADD_COMPLETE 的時候才能回撥 handlerRemoved 方法。
這裡表達的語意是隻有當 ChannelHanler 的 handlerAdded 方法被回撥之後,那麼在 ChannelHanler 被從 pipeline 中刪除的時候它的 handlerRemoved 方法才可以被回撥。
在 ChannelHandler 的 handlerRemove 方法被回撥之後,將 ChannelHandler 的狀態設定為 REMOVE_COMPLETE 。
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
try {
// 在這裡回撥 handlerRemoved 方法
ctx.callHandlerRemoved();
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
final void callHandlerRemoved() throws Exception {
try {
if (handlerState == ADD_COMPLETE) {
handler().handlerRemoved(this);
}
} finally {
// Mark the handler as removed in any case.
setRemoved();
}
}
final void setRemoved() {
handlerState = REMOVE_COMPLETE;
}
其實關於 pipeline 初始化的相關內容我們在《詳細圖解 Netty Reactor 啟動全流程》中已經簡要介紹了 NioServerSocketChannel 中的 pipeline 的初始化時機以及過程。
在《Netty 如何高效接收網路連線》中筆者也簡要介紹了 NioSocketChannel 中 pipeline 的初始化時機以及過程。
本小節筆者將結合這兩種型別的 Channel 來完整全面的介紹 pipeline 的整個初始化過程。
從前邊提到的這兩篇文章以及本文前邊的相關內容我們知道,Netty 提供了一個特殊的 ChannelInboundHandler 叫做 ChannelInitializer ,使用者可以利用這個特殊的 ChannelHandler 對 Channel 中的 pipeline 進行自定義的初始化邏輯。
如果使用者只希望在 pipeline 中新增一個固定的 ChannelHandler 可以通過如下程式碼直接新增。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//設定主從Reactor
...........
.handler(new LoggingHandler(LogLevel.INFO))
如果希望新增多個 ChannelHandler ,則可以通過 ChannelInitializer 來自定義新增邏輯。
由於使用 ChannelInitializer 初始化 NioServerSocketChannel 中 pipeline 的邏輯會稍微複雜一點,下面我們均以這個複雜的案例來講述 pipeline 的初始化過程。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//設定主從Reactor
...........
.handler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
....自定義pipeline初始化邏輯....
ChannelPipeline p = ch.pipeline();
p.addLast(channelHandler1);
p.addLast(channelHandler2);
p.addLast(channelHandler3);
........
}
})
以上這些由使用者自定義的用於初始化 pipeline 的 ChannelInitializer ,被儲存至 ServerBootstrap 啟動類中的 handler 欄位中。用於後續的初始化呼叫
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable
private volatile ChannelHandler handler;
}
在伺服器端啟動的時候,會伴隨著 NioServeSocketChannel 的建立以及初始化,在初始化 NioServerSokcetChannel 的時候會將一個新的 ChannelInitializer 新增進 pipeline 中,在新的 ChannelInitializer 中才會將使用者自定義的 ChannelInitializer 新增進 pipeline 中,隨後才執行初始化過程。
Netty 這裡之所以引入一個新的 ChannelInitializer 來初始化 NioServerSocketChannel 中的 pipeline 的原因是需要相容前邊介紹的這兩種初始化 pipeline 的方式。
一種是直接使用一個具體的 ChannelHandler 來初始化 pipeline。
另一種是使用 ChannelInitializer 來自定義初始化 pipeline 邏輯。
忘記 netty 啟動過程的同學可以在回看下筆者的《詳細圖解 Netty Reactor 啟動全流程》這篇文章。
@Override
void init(Channel channel) {
.........
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中使用者指定的channelHandler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
.........
}
});
}
注意此時 NioServerSocketChannel 並未開始向 Main Reactor 註冊,根據本文第四小節《4. 向 pipeline 新增 channelHandler 》中的介紹,此時向 pipeline 中新增這個新的 ChannelInitializer 之後,netty 會向 pipeline 的任務列表中新增 PendingHandlerAddedTask 。當 NioServerSocketChannel 向 Main Reactor 註冊成功之後,緊接著 Main Reactor 執行緒會呼叫這個 PendingHandlerAddedTask ,在任務中會執行這個新的 ChannelInitializer 的 handlerAdded 回撥。在這個回撥方法中會執行上邊 initChannel 方法裡的程式碼。
當 NioServerSocketChannel 在向 Main Reactor 註冊成功之後,就挨個執行 pipeline 中的任務列表中的任務。
private void register0(ChannelPromise promise) {
.........
boolean firstRegistration = neverRegistered;
//執行真正的註冊操作
doRegister();
//修改註冊狀態
neverRegistered = false;
registered = true;
//呼叫pipeline中的任務連結串列,執行PendingHandlerAddedTask
pipeline.invokeHandlerAddedIfNeeded();
.........
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// 執行 pipeline 任務列表中的 PendingHandlerAddedTask 任務。
callHandlerAddedForAllHandlers();
}
}
執行 pipeline 任務列表中的 PendingHandlerAddedTask 任務:
private void callHandlerAddedForAllHandlers() {
// pipeline 任務列表中的頭結點
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
// 挨個執行任務列表中的任務
while (task != null) {
//觸發 ChannelInitializer 的 handlerAdded 回撥
task.execute();
task = task.next;
}
}
最終在 PendingHandlerAddedTask 中執行 pipeline 中 ChannelInitializer 的 handlerAdded 回撥。
這個 ChannelInitializer 就是在初始化 NioServerSocketChannel 的 init 方法中向 pipeline 新增的 ChannelInitializer。
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
//初始化工作完成後,需要將自身從pipeline中移除
removeState(ctx);
}
}
}
}
在 handelrAdded 回撥中執行 ChannelInitializer 匿名類中 initChannel 方法,注意此時執行的 ChannelInitializer 類為在本小節開頭 init 方法中由 Netty 框架新增的 ChannelInitializer ,並不是使用者自定義的 ChannelInitializer 。
@Override
void init(Channel channel) {
.........
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中使用者指定的ChannelInitializer
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
.........
}
});
}
執行完 ChannelInitializer 匿名類中 initChannel 方法後,需將 ChannelInitializer 從 pipeline 中刪除。並回撥 ChannelInitializer 的 handlerRemoved 方法。刪除過程筆者已經在第六小節《6. 從 pipeline 刪除 channelHandler》詳細介紹過了。
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
//執行ChannelInitializer匿名類中的initChannel方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
//初始化完畢後,從pipeline中移除自身
pipeline.remove(this);
}
}
return true;
}
return false;
}
當執行完 initChannel 方法後此時 pipeline 的結構如下圖所示:
當用戶的自定義 ChannelInitializer 被新增進 pipeline 之後,根據第四小節所講的新增邏輯,此時 NioServerSocketChannel 已經向 main reactor 成功註冊完畢,不再需要向 pipeine 的任務列表中新增 PendingHandlerAddedTask 任務,而是直接呼叫自定義 ChannelInitializer 中的 handlerAdded 回撥,和上面的邏輯一樣。不同的是這裡最終回撥至使用者自定義的初始化邏輯實現 initChannel 方法中。執行完使用者自定義的初始化邏輯之後,從 pipeline 刪除使用者自定義的 ChannelInitializer 。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//設定主從Reactor
...........
.handler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
....自定義pipeline初始化邏輯....
ChannelPipeline p = ch.pipeline();
p.addLast(channelHandler1);
p.addLast(channelHandler2);
p.addLast(channelHandler3);
........
}
})
隨後 netty 會以非同步任務的形式向 pipeline 的末尾新增 ServerBootstrapAcceptor ,至此 NioServerSocketChannel 中 pipeline 的初始化工作就全部完成了。
在 7.1 小節中筆者舉的這個 pipeline 初始化的例子相對來說比較複雜,當我們把這個複雜例子的初始化邏輯搞清楚之後,NioSocketChannel 中 pipeline 的初始化過程就變的很簡單了。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//設定主從Reactor
...........
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
....自定義pipeline初始化邏輯....
ChannelPipeline p = ch.pipeline();
p.addLast(channelHandler1);
p.addLast(channelHandler2);
p.addLast(channelHandler3);
........
}
})
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//儲存使用者自定義ChannelInitializer
private volatile ChannelHandler childHandler;
}
在《Netty 如何高效接收網路連線》一文中我們介紹過,當用戶端發起連線,完成三次握手之後,NioServerSocketChannel 上的 OP_ACCEPT 事件活躍,隨後會在 NioServerSocketChannel 的 pipeline 中觸發 channelRead 事件。並最終在 ServerBootstrapAcceptor 中初始化使用者端 NioSocketChannel 。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
...........
}
}
在這裡會將使用者自定義的 ChannelInitializer 新增進 NioSocketChannel 中的 pipeline 中,由於此時 NioSocketChannel 還沒有向 sub reactor 開始註冊。所以在向 pipeline 中新增 ChannelInitializer 的同時會伴隨著 PendingHandlerAddedTask 被新增進 pipeline 的任務列表中。
後面的流程大家應該很熟悉了,和我們在7.1小節中介紹的一模一樣,當 NioSocketChannel 再向 sub reactor 註冊成功之後,會執行 pipeline 中的任務列表中的 PendingHandlerAddedTask 任務,在 PendingHandlerAddedTask 任務中會回撥使用者自定義 ChannelInitializer 的 handelrAdded 方法,在該方法中執行 initChannel 方法,使用者自定義的初始化邏輯就封裝在這裡面。在初始化完 pipeline 後,將 ChannelInitializer 從 pipeline 中刪除,並回撥其 handlerRemoved 方法。
至此使用者端 NioSocketChannel 中 pipeline 初始化工作就全部完成了。
在本文第三小節《3. pipeline中的事件分類》中我們介紹了 Netty 事件型別共分為三大類,分別是 Inbound類事件,Outbound類事件,ExceptionCaught事件。並詳細介紹了這三類事件的掩碼錶示,和觸發時機,以及事件傳播的方向。
本小節我們就來按照 Netty 中非同步事件的分類從原始碼角度分析下事件是如何在 pipeline 中進行傳播的。
在第三小節中我們介紹了所有的 Inbound 類事件,這些事件在 pipeline 中的傳播邏輯和傳播方向都是一樣的,唯一的區別就是執行的回撥方法不同。
本小節我們就以 ChannelRead 事件的傳播為例,來說明 Inbound 類事件是如何在 pipeline 中進行傳播的。
第三小節中我們提到過,在 NioSocketChannel 中,ChannelRead 事件的觸發時機是在每一次 read loop 讀取資料之後在 pipeline 中觸發的。
do {
............
allocHandle.lastBytesRead(doReadBytes(byteBuf));
............
// 在使用者端NioSocketChannel的pipeline中觸發ChannelRead事件
pipeline.fireChannelRead(byteBuf);
} while (allocHandle.continueReading());
從這裡可以看到,任何 Inbound 類事件在 pipeline 中的傳播起點都是從 HeadContext 頭結點開始的。
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
.........
}
ChannelRead 事件從 HeadContext 開始在 pipeline 中傳播,首先就會回撥 HeadContext 中的 channelRead 方法。
在執行 ChannelHandler 中的相應事件回撥方法時,需要確保回撥方法的執行在指定的 executor 中進行。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
//需要保證channelRead事件回撥在channelHandler指定的executor中進行
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
在執行 HeadContext 的 channelRead 方法發生異常時,就會回撥 HeadContext 的 exceptionCaught 方法。並在相應的事件回撥方法中決定是否將事件繼續在 pipeline 中傳播。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}
}
在 HeadContext 中通過 ctx.fireChannelRead(msg) 繼續將 ChannelRead 事件在 pipeline 中向後傳播。
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
}
這裡的 findContextInbound 方法是整個 inbound 類事件在 pipeline 中傳播的核心所在。
因為我們現在需要繼續將 ChannelRead 事件在 pipeline 中傳播,所以我們目前的核心問題就是通過 findContextInbound 方法在 pipeline 中找到下一個對 ChannelRead 事件感興趣的 ChannelInboundHandler 。然後執行該 ChannelInboundHandler 的 ChannelRead 事件回撥。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
//需要保證channelRead事件回撥在channelHandler指定的executor中進行
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
ChannelRead 事件就這樣迴圈往復的一直在 pipeline 中傳播,在傳播的過程中只有對 ChannelRead 事件感興趣的 ChannelInboundHandler 才可以響應。其他型別的 ChannelHandler 則直接跳過。
如果 ChannelRead 事件在 pipeline 中傳播的過程中,沒有得到其他 ChannelInboundHandler 的有效處理,最終會被傳播到 pipeline 的末尾 TailContext 中。而在本文第二小節中,我們也提到過 TailContext 對於 inbound 事件存在的意義就是做一個兜底的處理。比如:列印紀錄檔,釋放 bytebuffer 。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(ctx, msg);
}
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(msg);
if (logger.isDebugEnabled()) {
logger.debug("Discarded message pipeline : {}. Channel : {}.",
ctx.pipeline().names(), ctx.channel());
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
// 釋放DirectByteBuffer
ReferenceCountUtil.release(msg);
}
}
}
本小節要介紹的 findContextInbound 方法和我們在上篇文章《一文聊透 Netty 傳送資料全流程》中介紹的 findContextOutbound 方法均是 netty 非同步事件在 pipeline 中傳播的核心所在。
事件傳播的核心問題就是需要高效的在 pipeline 中按照事件的傳播方向,找到下一個具有響應事件資格的 ChannelHandler 。
比如:這裡我們在 pipeline 中傳播的 ChannelRead 事件,我們就需要在 pipeline 中找到下一個對 ChannelRead 事件感興趣的 ChannelInboundHandler ,並執行該 ChannelInboudnHandler 的 ChannelRead 事件回撥,在 ChannelRead 事件回撥中對事件進行業務處理,並決定是否通過 ctx.fireChannelRead(msg) 將 ChannelRead 事件繼續向後傳播。
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
引數 mask 表示我們正在傳播的 ChannelRead 事件掩碼 MASK_CHANNEL_READ 。
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
通過 ctx = ctx.next 在 pipeline 中找到下一個 ChannelHandler ,並通過 skipContext 方法判斷下一個 ChannelHandler 是否具有響應事件的資格。如果沒有則跳過繼續向後查詢。
比如:下一個 ChannelHandler 如果是一個 ChannelOutboundHandler,或者下一個 ChannelInboundHandler 對 ChannelRead 事件不感興趣,那麼就直接跳過。
該方法主要用來判斷下一個 ChannelHandler 是否具有 mask 代表的事件的響應資格。
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
引數 onlyMask 表示我們需要查詢的 ChannelHandler 型別,比如這裡我們正在傳播 ChannelRead 事件,它是一個 inbound 類事件,那麼必須只能由 ChannelInboundHandler 來響應處理,所以這裡傳入的 onlyMask 為 MASK_ONLY_INBOUND ( ChannelInboundHandler 的掩碼錶示)
ctx.executionMask 我們已經在《5.3 ChanneHandlerContext》小節中詳細介紹過了,當 ChannelHandler 被新增進 pipeline 中時,需要計算出該 ChannelHandler 感興趣的事件集合掩碼來,儲存在對應 ChannelHandlerContext 的 executionMask 欄位中。
首先會通過 ctx.executionMask & (onlyMask | mask)) == 0
來判斷下一個 ChannelHandler 型別是否正確,比如我們正在傳播 inbound 類事件,下一個卻是一個 ChannelOutboundHandler ,那麼肯定是要跳過的,繼續向後查詢。
如果下一個 ChannelHandler 的型別正確,那麼就會通過 (ctx.executionMask & mask) == 0
來判斷該 ChannelHandler 是否對正在傳播的 mask 事件感興趣。如果該 ChannelHandler 中覆蓋了 ChannelRead 回撥則執行,如果沒有覆蓋對應的事件回撥方法則跳過,繼續向後查詢,直到 TailContext 。
以上就是 skipContext 方法的核心邏輯,這裡表達的核心語意是:
如果 pipeline 中傳播的是 inbound 類事件,則必須由 ChannelInboundHandler 來響應,並且該 ChannelHandler 必須覆蓋實現對應的 inbound 事件回撥。
如果 pipeline 中傳播的是 outbound 類事件,則必須由 ChannelOutboundHandler 來響應,並且該 ChannelHandler 必須覆蓋實現對應的 outbound 事件回撥。
這裡大部分同學可能會對 ctx.executor() == currentExecutor
這個條件感到很疑惑。加上這個條件,其實對我們這裡的核心語意並沒有多大影響。
當 ctx.executor() == currentExecutor 也就是說前後兩個 ChannelHandler 指定的 executor 相同時,我們核心語意保持不變。
當 ctx.executor() != currentExecutor
也就是前後兩個 ChannelHandler 指定的 executor 不同時,語意變為:只要前後兩個 ChannelHandler 指定的 executor 不同,不管下一個ChannelHandler有沒有覆蓋實現指定事件的回撥方法,均不能跳過。 在這種情況下會執行到 ChannelHandler 的預設事件回撥方法,繼續在 pipeline 中傳遞事件。我們在《5.3 ChanneHandlerContext》小節提到過 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 會分別對 inbound 類事件回撥方法和 outbound 類事件回撥方法進行預設的實現。
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.disconnect(promise);
}
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.close(promise);
}
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
而這裡之所以需要加入 ctx.executor() == currentExecutor
條件的判斷,是為了防止 HttpContentCompressor 在被指定不同的 executor 情況下無法正確的建立壓縮內容,導致的一些異常。但這個不是本文的重點,大家只需要理解這裡的核心語意就好,這種特殊情況的特殊處理了解一下就好。
關於 Outbound 類事件的傳播,筆者在上篇文章《一文搞懂 Netty 傳送資料全流程》中已經進行了詳細的介紹,本小節就不在贅述。
在最後我們來介紹下異常事件在 pipeline 中的傳播,ExceptionCaught 事件和 Inbound 類事件一樣都是在 pipeline 中從前往後開始傳播。
ExceptionCaught 事件的觸發有兩種情況:一種是 netty 框架內部產生的異常,這時 netty 會直接在 pipeline 中觸發 ExceptionCaught 事件的傳播。異常事件會在 pipeline 中從 HeadContext 開始一直向後傳播直到 TailContext。
比如 netty 在 read loop 中讀取資料時發生異常:
try {
...........
do {
............
allocHandle.lastBytesRead(doReadBytes(byteBuf));
............
//使用者端NioSocketChannel的pipeline中觸發ChannelRead事件
pipeline.fireChannelRead(byteBuf);
} while (allocHandle.continueReading());
...........
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
}
這時會 netty 會直接從 pipeline 中觸發 ExceptionCaught 事件的傳播。
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
.............
pipeline.fireExceptionCaught(cause);
.............
}
和 Inbound 類事件一樣,ExceptionCaught 事件會在 pipeline 中從 HeadContext 開始一直向後傳播。
@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this;
}
第二種觸發 ExceptionCaught 事件的情況是,當 Inbound 類事件或者 flush 事件在 pipeline 中傳播的過程中,在某個 ChannelHandler 中的事件回撥方法處理中發生異常,這時該 ChannelHandler 的 exceptionCaught 方法會被回撥。使用者可以在這裡處理異常事件,並決定是否通過 ctx.fireExceptionCaught(cause) 繼續向後傳播異常事件。
比如我們在 ChannelInboundHandler 中的 ChannelRead 回撥中處理業務請求時發生異常,就會觸發該 ChannelInboundHandler 的 exceptionCaught 方法。
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
//觸發channelHandler的exceptionCaught回撥
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
........
} else {
........
}
}
再比如:當我們在 ChannelOutboundHandler 中的 flush 回撥中處理業務結果傳送的時候發生異常,也會觸發該 ChannelOutboundHandler 的 exceptionCaught 方法。
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
}
我們可以在 ChannelHandler 的 exceptionCaught 回撥中進行例外處理,並決定是否通過 ctx.fireExceptionCaught(cause) 繼續向後傳播異常事件。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
.........例外處理.......
ctx.fireExceptionCaught(cause);
}
@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
return this;
}
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}
雖然 ExceptionCaught 事件和 Inbound 類事件在傳播方向都是在 pipeline 中從前向後傳播。但是大家這裡注意區分這兩個事件的區別。
在 Inbound 類事件傳播過程中是會查詢下一個具有事件響應資格的 ChannelInboundHandler 。遇到 ChannelOutboundHandler 會直接跳過。
而 ExceptionCaught 事件無論是在哪種型別的 channelHandler 中觸發的,都會從當前異常 ChannelHandler 開始一直向後傳播,ChannelInboundHandler 可以響應該異常事件,ChannelOutboundHandler 也可以響應該異常事件。
由於無論異常是在 ChannelInboundHandler 中產生的還是在 ChannelOutboundHandler 中產生的, exceptionCaught 事件都會在 pipeline 中是從前向後傳播,並且不關心 ChannelHandler 的型別。所以我們一般將負責統一例外處理的 ChannelHandler 放在 pipeline 的最後,這樣它對於 inbound 類異常和 outbound 類異常均可以捕獲得到。
本文涉及到的內容比較多,通過 netty 非同步事件在 pipeline 中的編排和傳播這條主線,我們相當於將之前的文章內容重新又回顧總結了一遍。
本文中我們詳細介紹了 pipeline 的組成結構,它主要是由 ChannelHandlerContext 型別節點組成的雙向連結串列。ChannelHandlerContext 包含了 ChannelHandler 執行上下文的資訊,從而可以使 ChannelHandler 只關注於 IO 事件的處理,遵循了單一原則和開閉原則。
此外 pipeline 結構中還包含了一個任務連結串列,用來存放執行 ChannelHandler 中的 handlerAdded 回撥和 handlerRemoved 回撥。pipeline 還持有了所屬 channel 的參照。
我們還詳細介紹了 Netty 中非同步事件的分類:Inbound 類事件,Outbound 類事件,ExceptionCaught 事件。並詳細介紹了每種分類下的所有事件的觸發時機和在 pipeline 中的傳播路徑。
最後介紹了 pipeline 的結構以及建立和初始化過程,以及對 pipeline 相關操作的原始碼實現。
中間我們又穿插介紹了 ChannelHanderContext 的結構,介紹了 ChannelHandlerContext 具體封裝了哪些關於 ChannelHandler 執行的上下文資訊。
本文的內容到這裡就結束了,感謝大家的觀看,我們下篇文章見~~~
歡迎關注公眾號:bin的技術小屋