作者:京東物流 王奕龍
Netty是一個非同步基於事件驅動的高效能網路通訊框架,可以看做是對NIO和BIO的封裝,並提供了簡單易用的API、Handler和工具類等,用以快速開發高效能、高可靠性的網路伺服器端和使用者端程式。
伺服器端啟動需要建立 ServerBootstrap
物件,並完成初始化執行緒模型,設定IO模型和新增業務處理邏輯(Handler) 。在新增業務處理邏輯時,呼叫的是 childHandler()
方法新增了一個ChannelInitializer
,程式碼範例如下
// 負責伺服器端的啟動
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 以下兩個物件可以看做是兩個執行緒組
// boss執行緒組負責監聽埠,接受新的連線
NioEventLoopGroup boss = new NioEventLoopGroup();
// worker執行緒組負責讀取資料
NioEventLoopGroup worker = new NioEventLoopGroup();
// 設定執行緒組並指定NIO模型
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
// 定義後續每個 新連線 的讀寫業務邏輯
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
// 新增業務處理邏輯
.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg);
}
});
}
});
// 繫結埠號
serverBootstrap.bind(2002);
通過呼叫 .channel(NioServerSocketChannel.class)
方法指定 Channel
型別為NIO型別,如果要指定為BIO型別,引數改成 OioServerSocketChannel.class
即可。
其中 nioSocketChannel.pipeline()
用來獲取 PipeLine
物件,呼叫方法 addLast()
新增必要的業務處理邏輯,這裡採用的是責任鏈模式,會將每個Handler作為一個節點進行處理。
使用者端與伺服器端啟動類似,不同的是,使用者端需要建立 Bootstrap
物件來啟動,並指定一個使用者端執行緒組,相同的是都需要完成初始化執行緒模型,設定IO模型和新增業務處理邏輯(Handler) , 程式碼範例如下
// 負責使用者端的啟動
Bootstrap bootstrap = new Bootstrap();
// 使用者端的執行緒模型
NioEventLoopGroup group = new NioEventLoopGroup();
// 指定執行緒組和NIO模型
bootstrap.group(group).channel(NioSocketChannel.class)
// handler() 方法封裝業務處理邏輯
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline()
// 新增業務處理邏輯
.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg);
}
});
}
});
// 連線伺服器端IP和埠
bootstrap.connect("127.0.0.1", 2002);
(注意:下文中內容均以伺服器端程式碼範例為準)
使用者端與伺服器端進行通訊,通訊的訊息是以二進位制位元組流的形式通過 Channel
進行傳遞的,所以當我們在使用者端封裝好Java業務物件後,需要將其按照協定轉換成位元組陣列,並且當伺服器端接受到該二進位制位元組流時,需要將其根據協定再次解碼成Java業務物件進行邏輯處理,這就是編碼和解碼的過程。Netty 為我們提供了MessageToByteEncoder
用於編碼,ByteToMessageDecoder
用於解碼。
用於將Java物件編碼成位元組陣列並寫入 ByteBuf
,程式碼範例如下
public class TcpEncoder extends MessageToByteEncoder<Message> {
/**
* 序列化器
*/
private final Serializer serializer;
public TcpEncoder(Serializer serializer) {
this.serializer = serializer;
}
/**
* 編碼的執行邏輯
*
* @param message 需要被編碼的訊息物件
* @param byteBuf 將位元組陣列寫入ByteBuf
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
// 通過自定義的序列化器將物件轉換成位元組陣列
byte[] bytes = serializer.serialize(message);
// 將位元組陣列寫入 ByteBuf 便完成了物件的編碼流程
byteBuf.writeBytes(bytes);
}
}
它用於將接收到的二進位制資料流解碼成Java物件,與上述程式碼類似,只不過是將該過程反過來了而已,程式碼範例如下
public class TcpDecoder extends ByteToMessageDecoder {
/**
* 序列化器
*/
private final Serializer serializer;
public TcpDecoder(Serializer serializer) {
this.serializer = serializer;
}
/**
* 解碼的執行邏輯
*
* @param byteBuf 接收到的ByteBuf物件
* @param list 任何完成解碼的Java物件新增到該List中即可
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
// 根據協定自定義的解碼邏輯將其解碼成Java物件
Message message = serializer.deSerialize(byteBuf);
// 解碼完成後新增到List中即可
list.add(message);
}
}
ByteBuf預設情況下使用的是堆外記憶體,不進行記憶體釋放會發生記憶體溢位。不過 ByteToMessageDecoder
和 MessageToByteEncoder
這兩個解碼和編碼Handler
會自動幫我們完成記憶體釋放的操作,無需再次手動釋放。因為我們實現的 encode()
和 decode()
方法只是這兩個 Handler
原始碼中執行的一個環節,最終會在 finally 程式碼塊中完成對記憶體的釋放,具體內容可閱讀 MessageToByteEncoder
中第99行 write()
方法原始碼。
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
// 接收到請求時進行解碼
.addLast(new TcpDecoder(serializer))
// 傳送請求時進行編碼
.addLast(new TcpEncoder(serializer));
}
});
在Netty框架中,使用者端與伺服器端的每個連線都對應著一個 Channel
,而這個 Channel
的所有處理邏輯都封裝在一個叫作ChannelPipeline
的物件裡。ChannelPipeline
是一個雙向連結串列,它使用的是責任鏈模式,每個連結串列節點都是一個 Handler
,能通它能獲取 Channel
相關的上下文資訊(ChannelHandlerContext)。
Netty為我們提供了多種讀取 Channel
中資料的 Handler
,其中比較常用的是 ChannelInboundHandlerAdapter
和SimpleChannelInboundHandler
,下文中我們以讀取心跳訊息為例。
如下為處理心跳業務邏輯的 Handler
,具體執行邏輯參考程式碼和註釋即可
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
/**
* channel中有資料可讀時,會回撥該方法
*
* @param msg 如果在該Handler前沒有解碼Handler節點處理,該物件型別為ByteBuf;否則為解碼後的Java物件
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Message message = (Message) msg;
// 處理心跳訊息
processHeartBeatMessage(message);
// 初始化Ack訊息
Message ackMessage = initialAckMessage();
// 回寫給使用者端
ctx.channel().writeAndFlush(ackMessage);
}
}
SimpleChannelInboundHandler
是ChannelInboundHandlerAdapter
的實現類,SimpleChannelInboundHandler
能夠指定泛型,這樣在處理業務邏輯時,便無需再新增上文程式碼中物件強轉的邏輯,這部分程式碼實現是在 SimpleChannelInboundHandler
的 channelRead()
方法中完成的,它是一個模版方法,我們僅僅需要實現 channelRead0()
方法即可,程式碼範例如下
public class HeartBeatHandler extends SimpleChannelInboundHandler<Message> {
/**
* @param msg 注意這裡的物件型別即為 Message
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
// 處理心跳訊息
processHeartBeatMessage(message);
// 初始化Ack訊息
Message ackMessage = initialAckMessage();
// 回寫給使用者端
ctx.channel().writeAndFlush(ackMessage);
}
}
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
// 接收到進行解碼
.addLast(new TcpDecoder(serializer))
// 心跳業務處理Handler
.addLast(new HeartBeatHandler())
// 傳送請求時進行編碼
.addLast(new TcpEncoder(serializer));
}
});
在 ChannelInboundHandlerAdapter
可以通過實現不同的方法來完成指定時機的方法回撥,具體可參考如下程式碼
public class LifeCycleHandler extends ChannelInboundHandlerAdapter {
/**
* 當檢測到新連線之後,呼叫 ch.pipeline().addLast(...); 之後的回撥
* 表示當前channel中成功新增了 Handler
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("邏輯處理器被新增時回撥:handlerAdded()");
super.handlerAdded(ctx);
}
/**
* 表示當前channel的所有邏輯處理已經和某個NIO執行緒建立了繫結關係
* 這裡的NIO執行緒通常指的是 NioEventLoop
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 繫結到執行緒(NioEventLoop)時回撥:channelRegistered()");
super.channelRegistered(ctx);
}
/**
* 當Channel的所有業務邏輯鏈準備完畢,連線被啟用時
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 準備就緒時回撥:channelActive()");
super.channelActive(ctx);
}
/**
* 使用者端向伺服器端傳送資料,表示有資料可讀時,就會回撥該方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channel 有資料可讀時回撥:channelRead()");
super.channelRead(ctx, msg);
}
/**
* 伺服器端每完整的讀完一次資料,都會回撥該方法
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 某次資料讀完時回撥:channelReadComplete()");
super.channelReadComplete(ctx);
}
// ---斷開連線時---
/**
* 該使用者端與伺服器端的連線被關閉時回撥
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 被關閉時回撥:channelInactive()");
super.channelInactive(ctx);
}
/**
* 對應的NIO執行緒移除了對這個連線的處理
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 取消執行緒(NioEventLoop) 的繫結時回撥: channelUnregistered()");
super.channelUnregistered(ctx);
}
/**
* 為該連線新增的所有業務邏輯Handler被移除時
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("邏輯處理器被移除時回撥:handlerRemoved()");
super.handlerRemoved(ctx);
}
}
即使我們傳送訊息的時候是以 ByteBuf
的形式傳送的,但是到了底層作業系統,仍然是以位元組流的形式對資料進行傳送的,而且伺服器端也以位元組流的形式讀取,因此在伺服器端對位元組流進行拼接時,可能就會造成傳送時 ByteBuf
與讀取時的 ByteBuf
不對等的情況,這就是所謂的粘包或半包現象。
以如下情況為例,當用戶端頻繁的向伺服器端傳送心跳訊息時,讀取到的ByteBuf資訊如下,其中一個心跳請求是用紅框圈出的部分
可以發現多個心跳請求"粘"在了一起,那麼我們需要對它進行拆包處理,否則只會讀取第一條心跳請求,之後的請求會全部失效
Netty 為我們提供了基於長度的拆包器LengthFieldBasedFrameDecoder
來進行拆包工作,它能對超過所需資料量的包進行拆分,也能在資料不足的時候等待讀取,直到資料足夠時,構成一個完整的封包並進行業務處理。
以標準介面檔案中的協定(圖示)為準,程式碼範例如下,其中的四個引數比較重要,詳細資訊可見註釋描述
public class SplitHandler extends LengthFieldBasedFrameDecoder {
/**
* 在協定中表示資料長度的欄位在位元組流首尾中的偏移量
*/
private static final Integer LENGTH_FIELD_OFFSET = 10;
/**
* 表示資料長度的位元組長度
*/
private static final Integer LENGTH_FIELD_LENGTH = 4;
/**
* 資料長度後邊的頭資訊中的位元組偏移量
*/
private static final Integer LENGTH_ADJUSTMENT = 10;
/**
* 表示從第一個位元組開始需要捨去的位元組數,在我們的協定中,不需要進行捨去
*/
private static final Integer INITIAL_BYTES_TO_STRIP = 0;
public SplitHandler() {
super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);
}
}
之後將其新增到Handler中即可,如果遇到其他協定,更改其中引數或檢視 LengthFieldBasedFrameDecoder
的JavaDoc中詳細描述。
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
// 拆包Handler
.addLast(new SplitHandler())
// 接收到進行解碼
.addLast(new TcpDecoder(serializer))
// 心跳業務處理Handler
.addLast(new HeartBeatHandler())
// 傳送請求時進行編碼
.addLast(new TcpEncoder(serializer));
}
});
Netty 在每次有新連線到來的時候,都會呼叫 ChannelInitializer
的 initChannel()
方法,會將其中相關的 Handler
都建立一次,
如果其中的 Handler
是無狀態且能夠通用的,可以將其改成單例,這樣就能夠在每次連線建立時,避免多次建立相同的物件。
以如下伺服器端程式碼為例,包含如下Handler,可以將編碼解碼、以及業務處理Handler都定義成Spring單例bean的形式注入進來,這樣就能夠完成物件的複用而無需每次建立連線都建立相同的物件了
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 拆包Handler
.addLast(new SplitHandler())
// 紀錄檔Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解碼Handler
.addLast(new TcpDecoder(serializer))
// 心跳、格口狀態、裝置狀態、RFID上報、掃碼上報和分揀結果上報Handler
.addLast(new HeartBeatHandler(), new ChuteStatusHandler())
.addLast(new DeviceStatusReceiveHandler(), new RfidBindReceiveHandler())
.addLast(new ScanReceiveHandler(), new SortResultHandler())
// 編碼Handler
.addLast(new TcpEncoder(serializer));
}
});
改造完成後如下
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 拆包Handler
.addLast(new SplitHandler())
// 紀錄檔Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解碼Handler
.addLast(tcpDecoder)
// 心跳、格口狀態、裝置狀態、RFID上報、掃碼上報和分揀結果上報Handler
.addLast(heartBeatHandler, chuteStatusHandler)
.addLast(deviceStatusReceiveHandler, rfidBindReceiveHandler)
.addLast(scanReceiveHandler, sortResultHandler)
// 編碼Handler
.addLast(tcpEncoder);
}
});
不過需要注意在每個單例Handler的類上標註 @ChannelHandler.Sharable
註解,否則會丟擲如下異常
io.netty.channel.ChannelPipelineException: netty.book.practice.handler.server.LoginHandler is not a @Sharable handler, so can't be added or removed multiple times
另外,SplitHanlder
不能進行單例處理,因為它的內部實現與每個 Channel
都有關,每個 SplitHandler
都需要維持每個Channel
讀到的資料,即它是有狀態的。
對伺服器端來說,每次解碼出來的Java物件在多個業務處理 Handler
中只會經過一個其中 Handler
完成業務處理,那麼我們將所有業務相關的 Handler
封裝起來到一個Map中,每次只讓它經過必要的Handler而不是經過整個責任鏈,那麼便可以提高Netty處理請求的效能。
定義如下 ServerHandlers
單例bean,並使用 策略模式 將對應的 Handler
管理起來,每次處理時根據訊息型別獲取對應的 Handler
來完成業務邏輯
@ChannelHandler.Sharable
public class ServerHandlers extends SimpleChannelInboundHandler<Message> {
@Resourse
private HeartBeatHandler heartBeatHandler;
/**
* 策略模式封裝Handler,這樣就能在回撥 ServerHandler 的 channelRead0 方法時
* 找到具體的Handler,而不需要經過責任鏈的每個 Handler 節點,以此來提高效能
*/
private final Map<Command, SimpleChannelInboundHandler<Message>> map;
public ServerHandler() {
map = new HashMap();
// key: 訊息型別列舉 value: 對應的Handler
map.put(MessageType.HEART_BEAT, heartBeatHandler);
// ...
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
// 呼叫 channelRead() 方法完成業務邏輯處理
map.get(msg.getMessageType()).channelRead(ctx, msg);
}
}
改造完成後,伺服器端程式碼如下,因為我們封裝了平行的業務處理Handler
,所以程式碼很清爽
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 拆包Handler
.addLast(new SplitHandler())
// 紀錄檔Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解碼Handler
.addLast(tcpDecoder)
// serverHandlers 封裝了 心跳、格口狀態、裝置狀態、RFID上報、掃碼上報和分揀結果上報Handler
.addLast(serverHandlers)
// 編碼Handler
.addLast(tcpEncoder);
}
});
Netty 對編碼解碼提供了統一處理Handler是MessageToMessageCodec
,這樣我們就能將編碼和解碼的Handler合併成一個新增介面,程式碼範例如下
@ChannelHandler.Sharable
public class MessageCodecHandler extends MessageToMessageCodec<ByteBuf, Message> {
/**
* 序列化器
*/
@Resourse
private Serializer serializer;
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
// 將位元組陣列寫入 ByteBuf
ByteBuf byteBuf = ctx.alloc().ioBuffer();
serializer.serialize(byteBuf, msg);
// 這個編碼也需要新增到List中
out.add(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
// 根據協定自定義的解碼邏輯將其解碼成Java物件,並新增到List中
out.add(serializer.deSerialize(msg));
}
}
改造完成後,伺服器端程式碼如下,將其放在業務處理Handler前即可,呼叫完業務Handler邏輯,會執行編碼邏輯
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 拆包Handler
.addLast(new SplitHandler())
// 紀錄檔Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解碼、編碼Handler
.addLast(messageCodecHandler)
// serverHandlers 封裝了 心跳、格口狀態、裝置狀態、RFID上報、掃碼上報和分揀結果上報Handler
.addLast(serverHandlers);
}
});
對於耗時的業務操作,需要將它們都丟到業務執行緒池中去處理,因為單個NIO執行緒會管理很多 Channel
,只要有一個 Channel
中的 Handler
的 channelRead()
方法被業務邏輯阻塞,那麼它就會拖慢繫結在該NIO執行緒上的其他所有 Channel
。
為了避免上述情況,可以在包含長時間業務處理邏輯的Handler中建立一個執行緒池,並將其丟入執行緒池中進行執行,虛擬碼如下
protected void channelRead(ChannelHandlerContext ctx, Object message) {
threadPool.submit(new Runnable() {
// 耗時的業務處理邏輯
doSomethingSependTooMuchTime();
writeAndFlush();
});
}
如果底層的TCP連線已經斷開,但是另一端服務並沒有捕獲到,在某一端(使用者端或伺服器端)看來會認為這條連線仍然存在,這就是連線"假死"現象。這造成的問題就是,對於伺服器端來說,每個連線連線都會耗費CPU和記憶體資源,過多的假死連線會造成效能下降和服務崩潰;對使用者端來說,
連線假死會使得發往伺服器端的請求都會超時,所以需要儘可能避免假死現象的發生。
造成假死的原因可能是公網丟包、使用者端或伺服器端網路故障等,Netty為我們提供了 IdleStateHandler
來解決超時假死問題,範例程式碼如下
public class MyIdleStateHandler extends IdleStateHandler {
private static final int READER_IDLE_TIME = 15;
public MyIdleStateHandler() {
// 讀超時時間、寫超時時間、讀寫超時時間 指定0值不判斷超時
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
System.out.println(READER_IDLE_TIME + "秒內沒有讀到資料,關閉連線");
ctx.channel().close();
}
}
其構造方法中有三個引數來分別指定讀、寫和讀寫超時時間,當指定0時不判斷超時,除此之外Netty也有專門用來處理讀和寫超時的Handler,分別為 ReadTimeoutHandler
, WriteTimeoutHandler
。
將其新增到伺服器端 Handler
的首位即可
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 超時判斷Handler
.addLast(new MyIdleStateHandler())
// 拆包Handler
.addLast(new SplitHandler())
// 紀錄檔Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解碼、編碼Handler
.addLast(messageCodecHandler)
// serverHandlers 封裝了 心跳、格口狀態、裝置狀態、RFID上報、掃碼上報和分揀結果上報Handler
.addLast(serverHandlers);
}
});
ChannelPipeline
與 Channel
密切相關,它可以看做是一條流水線,資料以位元組流的形式進來,經過不同 Handler
的"加工處理",
最終以位元組流的形式輸出。ChannelPipeline
在每條新連線建立的時候被建立,是一條雙向連結串列,其中每一個節點都是ChannelHadnlerContext
物件,能夠通過它拿到相關的上下文資訊,預設它有頭節點 HeadContext
和尾結點 TailContext
。
定義在 ChannelPipeline
中的 Handler 是可插拔的,能夠完成動態編織,呼叫 ctx.pipeline().remove()
方法可移除,呼叫 ctx.pipeline().addXxx()
方法可進行新增。
InboundHandler
與 OutboundHandler
處理的事件不同,前者處理 Inbound事件
,典型的就是讀取資料流並加工處理;後者會對呼叫 writeAndFlush()
方法的 Outbound事件
進行處理。
此外,兩者的傳播機制也是不同的:
InboundHandler
會從連結串列頭逐個向下呼叫,頭節點只是簡單的將該事件傳播下去(ctx.fireChannelRead(mug)
),執行過程中呼叫findContextInbound()
方法來尋找 InboundHandler
節點,直到 TailContext
節點執行方法完畢,結束呼叫。
一般自定義的 ChannelInboundHandler
都繼承自ChannelInboundHandlerAdapter
, 如果沒有覆蓋channelXxx()
相關方法,那麼該事件正常會遍歷雙向連結串列一直傳播到尾結點,否則就會在當前節點執行完結束;當然也可以呼叫 fireXxx()
方法讓事件從當前節點繼續向下傳播。
OutboundHandler
是從連結串列尾向連結串列頭呼叫,相當於反向遍歷 ChannelPipeline
雙向連結串列,Outbound事件
會先經過TailContext
尾節點,並在執行過程中不斷尋找OutboundHandler
節點加工處理,直到頭節點 HeadContext
呼叫 Unsafe.write()
方法結束。
異常的傳播機制和 Inbound事件
的傳播機制類似,在任何節點發生的異常都會向下一個節點傳遞。如果自定義的 Handler 沒有處理異常也沒有實現 exceptionCaught()
方法,最終則會落到 TailContext
節點,控制檯列印異常未處理的警告資訊。
通常例外處理,我們會定義一個例外處理器,繼承自ChannelDuplexHandler
,放在自定義連結串列節點的末尾,這樣就能夠一定捕獲和處理異常。
建立 new NioEventLoopGroup()
它的預設執行緒數是當前CPU執行緒數的2倍,最終會呼叫到如下原始碼
// 這裡計算的執行緒數量
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
跟進到構造方法的最終實現,會執行如下業務邏輯
其中在第2步建立 NioEventLoop
時,值得關注的是建立了一個 Selector
,以此來實現IO多路複用;另外它還建立了高效能 MPSC
(多生產者單消費者)佇列,藉助它來協調任務的非同步執行,如此單條執行緒(NioEventLoop)、Selector和MPSC它們三者是一對一的關係。而每條連線都對應一個 Channel
,每個 Channel
都繫結唯一一個 NioEventLoop
,因此單個連線的所有操作都是在一個執行緒中執行,是執行緒安全的。
第3步驟建立執行緒選擇器,它的作用是為連線在NioEventLoopGroup
中選擇一個 NioEventLoop
,並將該連線與 NioEventLoop
中的 Selector
完成繫結。
在底層有兩種選擇器的實現,分別是PowerOfTowEventExecutorChooser
和GenericEventExecutorChooser
,它們的原理都是從執行緒池裡迴圈選擇執行緒,不同的是前者計算迴圈的索引採用的是位運算而後者採用的是取餘運算。
原始碼位置 NioEventLoop
的 run()
方法, select
操作會不斷輪詢是否有IO事件發生,並且在輪詢過程中不斷檢查是否有任務需要執行,保證Netty任務佇列中的任務能夠及時執行,輪詢過程使用一個計數器避開了 JDK 的空輪詢Bug
在 Netty 的 Channel
中,有兩大型別的 Channel
,一個是 NioServerSocketChannel
,由 boss NioEventLoop 處理;另一個是 NioSocketChannel
,由worker NioEventLoop 處理,所以
注意任務的執行都是非同步的。
上文中提到了我們建立了高效能的MPSC
佇列,它是用來聚集非Reactor執行緒建立的任務的,NioEventLoop
會在執行的過程中不斷檢測是否有事件發生,如果有事件發生就處理,處理完事件之後再處理非Reactor執行緒建立的任務。在檢測是否有事件發生的時候,為了保證非同步任務的及時處理,只要有任務要處理,就會停止任務檢測,去處理任務,處理任務時是Reactor單執行緒執行。
當 boss Reactor執行緒檢測到 ACCEPT 事件之後,建立一個 NioSocketChannel
,並把使用者設定的 ChannelOption(Option引數設定)、ChannelAttr(Channel 引數)、ChannelHandler(ChannelInitializer)封裝到 NioSocketChannel
中。接著,使用執行緒選擇器在NioEventLoopGroup
中選擇一條 NioEventLoop
(執行緒),把 NioSocketChannel
中包裝的JDK Channel 當做Key,自身(NioSocketChannel)作為 attachment,註冊 NioEventLoop 對應的 Selector上。這樣,後續有讀寫事件發生,就可以直接獲取 attachment 來處理讀寫資料的邏輯。
簡單地說:IO多路複用是指可以在一個執行緒內處理多個連線的IO事件請求。以Java中的IO多路複用為例,伺服器端建立 Selector
物件不斷的呼叫 select()
方法來處理各個連線上的IO事件,之後將這些IO事件交給任務執行緒非同步去執行,這就達到了在一個執行緒內同時處理多個連線的IO請求事件的目的。