1、NIO 的類庫和 API 繁雜,使用麻煩:需要熟練掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
2、需要具備其他的額外技能:要熟悉 Java 多執行緒程式設計,因為 NIO 程式設計涉及到 Reactor 模式,你必須對多執行緒和網路程式設計非常熟悉,才能編寫出高質量的 NIO 程式。
3、開發工作量和難度都非常大:例如使用者端面臨斷連重連、網路閃斷、半包讀寫、失敗快取、網路擁塞和異常流的處理等等。
4、JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它會導致 Selector 空輪詢,最終導致 CPU 100%。直到 JDK 1.7版本該問題仍舊存在,沒有被根本解決。
官網:https://netty.io/
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients
Netty 對 JDK 自帶的 NIO 的 API 進行了封裝,解決了上述問題。
1、設計優雅:適用於各種傳輸型別的統一 API 阻塞和非阻塞 Socket;基於靈活且可延伸的事件模型,可以清晰地分離關注點;高度可客製化的執行緒模型 - 單執行緒,一個或多個執行緒池.
2、使用方便:詳細記錄的 Javadoc,使用者指南和範例;沒有其他依賴項,JDK 5(Netty 3.x)或 6(Netty 4.x)就足夠了。
3、高效能、吞吐量更高:延遲更低;減少資源消耗;最小化不必要的記憶體複製。
4、安全:完整的 SSL/TLS 和 StartTLS 支援。
5、社群活躍、不斷更新:社群活躍,版本迭代週期短,發現的 Bug 可以被及時修復,同時,更多的新功能會被加入
1、netty 版本分為 netty3.x 和 netty4.x、netty5.x
2、因為 Netty5 出現重大 bug,已經被官網廢棄了,目前推薦使用的是 Netty4.x 的穩定版本
3、目前在官網可下載的版本 netty3.x netty4.0.x 和 netty4.1.x
4、netty下載地址: https://bintray.com/netty/downloads/netty/
1、不同的執行緒模式,對程式的效能有很大影響,為了搞清 Netty 執行緒模式,我們來系統的講解下 各個執行緒模式,最後看看 Netty 執行緒模型有什麼優越性.
2、目前存在的執行緒模型有:
- 傳統阻塞 I/O 服務模型
- Reactor 模式
3、根據 Reactor 的數量和處理資源池執行緒的數量不同,有 3 種典型的實現
- 單 Reactor 單執行緒;
- 單 Reactor 多執行緒;
- 主從 Reactor 多執行緒
4、Netty 執行緒模式
(
Netty
主要基於主從 Reactor 多執行緒模型
做了一定的改進,其中主從 Reactor 多執行緒模型有多個 Reactor)
- 黃色的框表示物件, 藍色的框表示執行緒
- 白色的框表示方法(API)
- 採用阻塞 IO 模式獲取輸入的資料
- 每個連線都需要獨立的執行緒完成資料的輸入,業務處理,資料返回
- 當並行數很大,就會建立大量的執行緒,
佔用很大系統資源
- 連線建立後,如果當前執行緒暫時沒有資料可讀,該執行緒會阻塞在 read 操作,造成執行緒資源浪費
基於I/O 複用模型
:
多個連線共用一個阻塞物件,應用程式只需要在一個阻塞物件等待,無需阻塞等待所有連線。
當某個連線有新的資料可以處理時,作業系統通知應用程式,執行緒從阻塞狀態返回,開始進行業務處理Reactor 對應的叫法:
- 反應器模式
- 分發者模式(Dispatcher)
- 通知者模式(notifier)
基於執行緒池複用執行緒資源:
不必再為每個連線建立執行緒,將連線完成後的業務處理任務分配給執行緒進行處理,一個執行緒可以處理多個連線的業務
。
對上圖說明:
1、Reactor 模式,通過一個或多個輸入同時傳遞給服務處理器的模式(基於事件驅動)
2、伺服器端程式處理傳入的多個請求,並將它們同步分派到相應的處理執行緒, 因此 Reactor 模式也叫 Dispatcher模式
3、Reactor 模式使用 IO 複用監聽事件, 收到事件後,分發給某個執行緒(程序), 這點就是網路伺服器高並行處理關鍵
1、Reactor:
Reactor 在一個單獨的執行緒中執行,負責監聽和分發事件,分發給適當的處理程式來對 IO 事件做出反應。它就像公司的電話接線員,它接聽來自客戶的電話並將線路轉移到適當的聯絡人;2、Handlers:
處理程式執行 I/O 事件要完成的實際事件,類似於客戶想要與之交談的公司中的實際官員。Reactor通過排程適當的處理程式來響應 I/O 事件,處理程式執行非阻塞操作。
根據 Reactor 的數量和處理資源池執行緒的數量不同,有 3 種典型的實現:
- 單 Reactor 單執行緒
- 單 Reactor 多執行緒
- 主從 Reactor 多執行緒
原理圖,並使用 NIO 群聊系統驗證
1、Select 是前面 I/O 複用模型介紹的標準網路程式設計 API,可以實現應用程式通過一個阻塞物件監聽多路連線請求
2、Reactor 物件通過 Select 監控使用者端請求事件,收到事件後通過 Dispatch 進行分發
3、如果是建立連線請求事件,則由 Acceptor 通過 Accept 處理連線請求,然後建立一個 Handler 物件處理連線完成後的後續業務處理
4、如果不是建立連線事件,則 Reactor 會分發呼叫連線對應的 Handler 來響應
5、Handler 會完成 Read→業務處理→Send 的完整業務流程 →再返回給Client
結合範例:
伺服器端用一個執行緒通過多路複用搞定所有的 IO 操作(包括連線,讀、寫等),編碼簡單,清晰明瞭,但是如果使用者端連線數量較多,將無法支撐,前面的 NIO 案例就屬於這種模型。
1、優點:
模型簡單,沒有多執行緒、程序通訊、競爭的問題,全部都在一個執行緒中完成
2、缺點:
效能問題
,只有一個執行緒,無法完全發揮多核 CPU 的效能。Handler 在處理某個連線上的業務時,整個程序無法處理其他連線事件,很容易導致效能瓶頸可靠性問題
,執行緒意外終止,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障
3、使用場景:
使用者端的數量有限,業務處理非常快速,比如 Redis 在業務處理的時間複雜度 O(1) 的情況
Handler將具體的業務處理Worker執行緒池分層出去,並通過開闢新的執行緒去完成
1、Reactor 物件通過 select 監控使用者端請求事件, 收到事件後,通過 dispatch 進行分發
2、如果建立連線請求, 則右 Acceptor 通過accept 處理連線請求, 然後建立一個 Handler 物件處理完成連線後的各種事件
3、如果不是連線請求,則由 reactor 分發呼叫連線對應的 handler 來處理
4、handler 只負責響應事件,不做具體的業務處理, 通過 read 讀取資料後,會分發給後面的 worker 執行緒池的某個執行緒處理業務
5、worker 執行緒池會分配獨立執行緒完成真正的業務,並將結果返回給 handler
6、handler 收到響應後,通過 send 將結果返回給 client
1、優點:
可以充分的利用
多核 cpu 的處理能力
2、缺點:
多執行緒資料共用和存取比較
複雜
, reactor 處理所有的事件的監聽和響應在
單執行緒執行
, 在高並行場景容易出現效能瓶頸
.
多加了一層派發層並採用新開執行緒(Reactor子執行緒,SubReactor),分為了3層,獨立開
針對單 Reactor 多執行緒模型中,Reactor 在單執行緒中執行,高並行場景下容易成為效能瓶頸,可以讓 Reactor 在多執行緒中執行
- Reactor 主執行緒 MainReactor 物件通過 select 監聽連線事件, 收到事件後,通過 Acceptor 處理連線事件
- 當 Acceptor 處理連線事件後,MainReactor 將連線分配給 SubReactor
- subreactor 將連線加入到連線佇列進行監聽,並建立 handler 進行各種事件處理
- 當有新事件發生時, subreactor 就會呼叫對應的 handler 處理
- handler 通過 read 讀取資料,分發給後面的 worker 執行緒處理
- worker 執行緒池分配獨立的 worker 執行緒進行業務處理,並返回結果
- handler 收到響應的結果後,再通過 send 將結果返回給 client
- Reactor 主執行緒可以對應多個 Reactor 子執行緒, 即 MainRecator 可以關聯多個 SubReactor
1、優點:
父執行緒與子執行緒的資料互動簡單職責明確
父執行緒與子執行緒的資料互動簡單
,Reactor 主執行緒只需要把新連線傳給子執行緒,子執行緒無需返回資料。
2、缺點:
程式設計複雜
度較高結合範例:
- 單 Reactor 單執行緒:前臺接待員和服務員是同一個人,全程為顧客服務
- 單 Reactor 多執行緒:1 個前臺接待員,多個服務員,接待員只負責接待
- 主從 Reactor 多執行緒:多個前臺接待員,多個服務生
- 響應快,不必為單個同步時間所阻塞,雖然 Reactor 本身依然是同步的
- 可以最大程度的避免複雜的多執行緒及同步問題
- 避免了多執行緒/程序的切換開銷
- 擴充套件性好,可以方便的通過增加 Reactor 範例個數來充分利用 CPU 資源
- 複用性好,Reactor 模型本身與具體事件處理邏輯無關,具有很高的複用性
Netty 主要基於主從 Reactors 多執行緒模型(如圖)做了一定的改進,其中主從 Reactor 多執行緒模型有多個 Reactor
- BossGroup 執行緒維護 Selector , 只關注 Accecpt
- 當接收到 Accept 事件,獲取到對應的 SocketChannel, 封裝成 NIOScoketChannel 並註冊到 Worker 執行緒(事件迴圈), 並進行維護
- 當 Worker 執行緒監聽到 selector 中通道發生自己感興趣的事件後,就進行處理(就由handler), 注意 handler 已經加入到通道
1、Netty 抽象出兩組執行緒池
- BossGroup 專門負責接收使用者端的連線
- WorkerGroup 專門負責網路的讀寫
2、BossGroup 和 WorkerGroup 型別都是 NioEventLoopGroup
3、NioEventLoopGroup 相當於一個事件迴圈組, 這個組中含有多個事件迴圈 ,每一個事件迴圈是 NioEventLoop
4、NioEventLoop 表示一個不斷迴圈的執行處理任務的執行緒, 每個 NioEventLoop 都有一個selector , 用於監聽繫結在其上的 socket 的網路通訊
5、NioEventLoopGroup 可以有多個執行緒, 即可以含有多個 NioEventLoop
6、每個 Boss NioEventLoop 迴圈執行的步驟有 3 步
- 輪詢 accept 事件
- 處理 accept 事件 , 與 client 建立連線 , 生成 NioScocketChannel , 並將其註冊到某個 worker NIOEventLoop 上的 selector
- 處理任務佇列的任務 , 即 runAllTasks
7、每個 Worker NIOEventLoop 迴圈執行的步驟
- 輪詢 read, write 事件
- 處理 i/o 事件, 即 read , write 事件,在對應 NioScocketChannel 處理
- 處理任務佇列的任務 , 即 runAllTasks
8、每個Worker NIOEventLoop 處理業務時,會使用pipeline(管道), pipeline 中包含了 channel , 即通過pipeline可以獲取到對應通道, 管道中維護了很多的 處理器
1、引入netty依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.20.Final</version> </dependency>
2、NettyServer伺服器端
package com.sun.netty.nettySimple; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { public static void main(String[] args) throws InterruptedException { /* 建立BossGroup 和 WorkerGroup 說明: 1、建立兩個執行緒組 BossGroup 和 WorkerGroup 2、 BossGroup:只處理連線請求 WorkerGroup: 處理和使用者端業務處理 3、兩個執行緒組都是自旋 */ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 建立伺服器啟動物件,設定啟動引數 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 鏈式程式設計進行設定 serverBootstrap.group(bossGroup, workerGroup) // 設定兩個執行緒組 .channel(NioServerSocketChannel.class) // 設定伺服器使用的通道 .option(ChannelOption.SO_BACKLOG, 128) // 設定執行緒佇列等待連線個數 .childOption(ChannelOption.SO_KEEPALIVE, true) // 設定保持活動連線狀態 .childHandler(new ChannelInitializer<SocketChannel>() { // 建立一個通道初始化物件 // 給pipeline設定處理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("伺服器端準備完畢..."); // 伺服器端繫結埠並同步處理,返回ChannelFuture物件,啟動伺服器端 ChannelFuture channelFuture = serverBootstrap.bind(9998).sync(); // 對關閉通道進行監聽 channelFuture.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
3、NettyServerHandler伺服器端處理器
package com.sun.netty.nettySimple; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { public static void main(String[] args) throws InterruptedException { /* 建立BossGroup 和 WorkerGroup 說明: 1、建立兩個執行緒組 BossGroup 和 WorkerGroup 2、 BossGroup:只處理連線請求 WorkerGroup: 處理和使用者端業務處理 3、兩個執行緒組都是自旋 */ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 建立伺服器啟動物件,設定啟動引數 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 鏈式程式設計進行設定 serverBootstrap.group(bossGroup, workerGroup) // 設定兩個執行緒組 .channel(NioServerSocketChannel.class) // 設定伺服器使用的通道 .option(ChannelOption.SO_BACKLOG, 128) // 設定執行緒佇列等待連線個數 .childOption(ChannelOption.SO_KEEPALIVE, true) // 設定保持活動連線狀態 .childHandler(new ChannelInitializer<SocketChannel>() { // 建立一個通道初始化物件 // 給pipeline設定處理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("伺服器端準備完畢..."); // 伺服器端繫結埠並同步處理,返回ChannelFuture物件,啟動伺服器端 ChannelFuture channelFuture = serverBootstrap.bind(9998).sync(); // 對關閉通道進行監聽 channelFuture.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
4、NettyClient使用者端
package com.sun.netty.nettySimple; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @Title: NettyClient * @Author sunguoqiang * @Package com.sun.netty.nettySimple * @Date 2022/12/29 16:28 * @description: */ public class NettyClient { public static void main(String[] args) throws InterruptedException { // 使用者端需要一個事件迴圈組 NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); try { //建立使用者端的啟動物件 Bootstrap bootstrap = new Bootstrap(); //設定啟動引數 bootstrap.group(eventExecutors) //設定執行緒組 .channel(NioSocketChannel.class) //設定使用者端通道實現類 .handler(new ChannelInitializer<SocketChannel>() { //建立一個通道初始化物件 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器 } }); System.out.println("使用者端準備完畢..."); // 指定使用者端連線的伺服器地址 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9998).sync(); // 對關閉通道進行監聽 channelFuture.channel().closeFuture().sync(); } finally { // 優雅關閉 eventExecutors.shutdownGracefully(); } } }
5、NettyClient使用者端處理器
package com.sun.netty.nettySimple; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; /** * @Title: NettyClientHandler * @Author sunguoqiang * @Package com.sun.netty.nettySimple * @Date 2022/12/29 16:47 * @description: */ public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 通道就緒就會觸發該方法 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client ctx:"+ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("我是使用者端...", StandardCharsets.UTF_8)); } /** * 接受伺服器端返回的訊息,當通道有讀取事件時就觸發 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf= (ByteBuf) msg; System.out.println("來自伺服器端的訊息:"+buf.toString(StandardCharsets.UTF_8)); System.out.println("伺服器端地址:"+ctx.channel().remoteAddress()); } /** * 例外處理 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel().close(); } }
執行結果:
1、使用者程式自定義的普通任務
2、使用者自定義定時任務
3、非當前 Reactor 執行緒呼叫 Channel 的各種方法
例如在推播系統的業務執行緒裡面,根據使用者的標識,找到對應的 Channel 參照,然後呼叫 Write 類方法向該使用者推播訊息,就會進入到這種場景。最終的 Write 會提交到任務佇列中後被非同步消費
4、程式碼
package com.sun.netty.nettySimple; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.util.concurrent.TimeUnit; /** * 伺服器端處理器,自定義handler需要繼承netty規定好的某個HandlerAdapter介面卡才能生效 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 讀取資料事件(這裡可以讀取使用者端傳送來的訊息) * 1、ChannelHandlerContext ctx:上下文物件。含有管道pipeline、通道channel、地址等 * 2、Object msg:使用者端傳送來的資料 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server ctx:" + ctx); //將msg轉為ByteBuffer(這個ByteBuf和nio的ByteBuffer是有區別的) ByteBuf buf = (ByteBuf) msg; System.out.println("使用者端傳送訊息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("使用者端地址為:" + ctx.channel().remoteAddress()); // 使用者程式自定義普通任務,該任務提交到taskQueue自定義非同步任務 ctx.channel().eventLoop().execute(()->{ try { Thread.sleep(10*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("使用者自定義普通任務,taskQueue...",CharsetUtil.UTF_8)); }catch (Exception e){ e.printStackTrace(); } }); // 使用者自定義定時任務,該任務提交到scheduleTaskQueue中 ctx.channel().eventLoop().schedule(()->{ try { Thread.sleep(10*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("使用者自定義定時任務,scheduleTaskQueue...",CharsetUtil.UTF_8)); }catch (Exception e){ e.printStackTrace(); } },10, TimeUnit.SECONDS); } /** * 資料讀取完畢事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { /** * 1、writeAndFlush是 write+Flush方法的合併 * 2、將資料寫入快取並重新整理 * 3、對傳送的資料進行編碼 */ ctx.writeAndFlush(Unpooled.copiedBuffer("我是伺服器端...",CharsetUtil.UTF_8)); } /** * 發生例外處理,傳送異常事件,一般是關閉通道 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } }
說明:
1、Netty 抽象出兩組執行緒池,
1、BossGroup 專門負責接收使用者端連線,
2、WorkerGroup 專門負責網路讀寫操作。
2、NioEventLoop 表示一個不斷迴圈執行處理任務的執行緒,每個 NioEventLoop 都有一個 selector,用於監聽繫結在其上的 socket 網路通道。
3、NioEventLoop 內部採用序列化設計,從訊息的讀取->解碼->處理->編碼->傳送,始終由 IO 執行緒 NioEventLoop負責
- NioEventLoopGroup 下包含多個 NioEventLoop
- 每個 NioEventLoop 中包含有一個 Selector,一個 taskQueue
- 每個 NioEventLoop 的 Selector 上可以註冊監聽多個 NioChannel
- 每個 NioChannel 只會繫結在唯一的 NioEventLoop 上
- 每個 NioChannel 都繫結有一個自己的 ChannelPipeline
1、非同步的概念和同步相對。
2、當一個非同步過程呼叫發出後,呼叫者不能立刻得到結果。
3、實際處理這個呼叫的元件在完成後,通過狀態、通知和回撥來通知呼叫者。
4、Netty 中的 I/O 操作是非同步的,包括 Bind、Write、Connect 等操作會簡單的返回一個 ChannelFuture。
5、呼叫者並不能立刻獲得結果,而是通過 Future-Listener 機制
,使用者可以方便的主動獲取或者通過通知機制獲得IO 操作結果
6、Netty 的非同步模型是建立在 future 和 callback 的之上的。callback 就是回撥。重點說 Future,它的核心思想是:
假設一個方法 fun,計算過程可能非常耗時,等待 fun 返回顯然不合適。那麼可以在呼叫 fun 的時候,立馬返回一個 Future,後續可以通過 Future 去監控方法 fun 的處理過程(即 : Future-Listener 機制)
1、表示 非同步的執行結果, 可以通過它提供的方法來檢測執行是否完成,比如檢索計算等等.
2、ChannelFuture 是一個介面 :*public interface ChannelFuture extends Future< Void >*我們可以新增監聽器,當監聽的事件發生時,就會通知到監聽器
說明
1、在使用 Netty 進行程式設計時,攔截操作和轉換出入站資料只需要您提供 callback 或利用 future 即可。這使得鏈式操作簡單、高效, 並有利於編寫可重用的、通用的程式碼。
2、Netty 框架的目標就是讓你的業務邏輯從網路基礎應用編碼中分離出來、解脫出來
1、當 Future 物件剛剛建立時,處於非完成狀態,呼叫者可以通過返回的 ChannelFuture 來獲取操作執行的狀態,註冊監聽函數來執行完成後的操作。
2、常見有如下操作
1、通過 isDone 方法來判斷當前操作是否完成;
2、通過isSuccess 方法來判斷已完成的當前操作是否成功;
3、通過 getCause 方法來獲取已完成的當前操作失敗的原因;
4、通過 isCancelled 方法來判斷已完成的當前操作是否被取消;
5、通過 addListener 方法來註冊監聽器,當操作已完成(isDone 方法返回完成),將會通知指定的監聽器;如果Future 物件已完成,則通知指定的監聽器
程式碼
繫結埠是非同步操作,當繫結操作處理完,將會呼叫相應的監聽器處理邏輯,該位於使用者端程式碼
ChannelFuture cf = bootstrap.connect("127.0.0.1", 6668).sync(); cf.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()){ System.out.println("監聽6668埠成功"); }else { System.out.println("監聽6668埠失敗"); } } });
1、範例要求:使用 IDEA 建立 Netty 專案
2、Netty 伺服器在 7000 埠監聽,瀏覽器發出請求 "http://localhost:7000/ "
3、伺服器可以回覆訊息給使用者端 "hello,我是阿昌的伺服器 " , 並對特定請求資源進行過濾.
這裡我們用瀏覽器來作為使用者端,所以就不需要寫使用者端的程式碼
HttpServer
package com.sun.netty.http; import com.sun.security.ntlm.Server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2023/1/4 10:55 **/ public class HttpServer { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerInitializer()); System.out.println("伺服器已啟動..."); ChannelFuture channelFuture = serverBootstrap.bind(9090).sync(); channelFuture.channel().closeFuture().sync(); } finally { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
ServerInitializer
package com.sun.netty.http; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; /** * @Author: sunguoqiang * @Description: TODO * @DateTime: 2023/1/4 11:18 **/ public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast("myHttpServerCodec", new HttpServerCodec()) .addLast(new HttpServerHandler()); } }
HttpServerHandler
package com.sun.netty.http; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; /** * @Author: sunguoqiang * @Description: SimpleChannelInboundHandler:他就是ChannelInboundHandlerAdapter的子類 * HttpObject:表示使用者端和伺服器端相互通訊的資料被封裝成HttpObject型別 * @DateTime: 2023/1/4 11:24 **/ public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> { // 當有讀取事件就會觸發該事件,讀取使用者端資料 @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception { if (httpObject instanceof HttpRequest) { // 列印相關資訊 System.out.println("pipeline hashcode:" + channelHandlerContext.pipeline().hashCode()); System.out.println("msg(httpObject)型別:" + httpObject.getClass()); System.out.println("使用者端瀏覽器地址:" + channelHandlerContext.channel().remoteAddress()); System.out.println("請求URI:" + ((HttpRequest) httpObject).getUri()); // 過濾不響應uri請求 String uri = ((HttpRequest) httpObject).getUri(); if ("/favicon.ico".equals(uri)) { System.out.println("此[/favicon.ico]請求不處理."); return; } // 回覆資訊給使用者端瀏覽器[http協定] ByteBuf byteBuf = Unpooled.copiedBuffer("你好瀏覽器,我是server...", CharsetUtil.UTF_8); // 構造http響應,即HttpResponse響應 DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8"); httpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes()); // 將構建好的Response返回 channelHandlerContext.writeAndFlush(httpResponse); } } }
測試
因為http協定是短連線協定,所以每次請求完就會斷開連線,所以每次請求都會分配一個新的pipeline物件,與對應的channelSocket