Netty需要的執行環境很簡單,只有2個。
如圖,展示了一個我們將要編寫的 Echo 使用者端和伺服器應用程式。該圖展示是多個使用者端同時連線到一臺伺服器。所能夠支援的使用者端數量,在理論上,僅受限於系統的可用資源(以及所使用的 JDK 版本可能會施加的限制)。
Echo 使用者端和伺服器之間的互動是非常簡單的;在使用者端建立一個連線之後,它會向伺服器傳送一個或多個訊息,反過來伺服器又會將每個訊息回送給使用者端。雖然它本身看起來好像用處不大,但它充分地體現了使用者端/伺服器系統中典型的請求-響應互動模式。
所有的 Netty 伺服器都需要以下兩部分。
上一篇博文我們介紹了 Future 和回撥,並且闡述了它們在事件驅動設計中的應用。我們還討論了 ChannelHandler,它是一個介面族的父介面,它的實現負責接收並響應事件通知。
在 Netty 應用程式中,所有的資料處理邏輯都包含在這些核心抽象的實現中。因為你的 Echo 伺服器會響應傳入的訊息,所以它需要實現ChannelInboundHandler 介面,用來定義響應入站事件的方法。簡單的應用程式只需要用到少量的這些方法,所以繼承 ChannelInboundHandlerAdapter 類也就足夠了,它提供了ChannelInboundHandler 的預設實現。
我們將要用到的方法是:
該 Echo 伺服器的 ChannelHandler 實現是 EchoServerHandler,如程式碼:
package com.example.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author lhd
* @date 2023/05/16 15:05
* @notes Netty Echo伺服器端簡單邏輯
*/
//表示channel可以並多個範例共用,它是執行緒安全的
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
//將訊息列印到控制檯
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
//將收到的訊息寫給傳送者,而不沖刷出站訊息
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//將未決訊息沖刷到遠端節點,並且關閉該 Channe
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//列印異常堆疊跟蹤
cause.printStackTrace();
//關閉該channel
ctx.close();
}
}
ChannelInboundHandlerAdapter 有一個直觀的 API,並且它的每個方法都可以被重寫以掛鉤到事件生命週期的恰當點上。
因為需要處理所有接收到的資料,所以我們重寫了 channelRead() 方法。在這個伺服器應用程式中,我們將資料簡單地回送給了遠端節點。
重寫 exceptionCaught() 方法允許我們對 Throwable 的任何子型別做出反應,在這裡你記錄了異常並關閉了連線。
雖然一個更加完善的應用程式也許會嘗試從異常中恢復,但在這個場景下,只是通過簡單地關閉連線來通知遠端節點發生了錯誤。
ps:如果不捕獲異常,會發生什麼呢?
每個 Channel 都擁有一個與之相關聯的 ChannelPipeline,其持有一個 ChannelHandler 的範例鏈。在預設的情況下,ChannelHandler 會把對它的方法的呼叫轉發給鏈中的下一個 ChannelHandler。因此,如果 exceptionCaught()方法沒有被該鏈中的某處實現,那麼所接收的異常將會被傳遞到 ChannelPipeline 的尾端並被記錄。為此,你的應用程式應該提供至少有一個實現exceptionCaught()方法的 ChannelHandler。
除了 ChannelInboundHandlerAdapter 之外,還有很多需要學習ChannelHandler的子型別和實現。這些之後會一一說明,目前,我們只關注:
下面我們準備開始構建伺服器。構建伺服器涉及到兩個內容:
package com.example.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* @author lhd
* @date 2023/05/16 15:21
* @notes Netty引導伺服器
*/
public class EchoServer {
public static void main(String[] args) throws Exception {
//呼叫伺服器的 start()方法
new EchoServer().start();
}
public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
//建立EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
//建立ServerBootstra
ServerBootstrap b = new ServerBootstrap();
//指定伺服器監視埠
int port = 8080;
b.group(group)
//指定所使用的 NIO 傳輸 Channel
//因為我們正在使用的是 NIO 傳輸,所以你指定了 NioEventLoopGroup 來接受和處理新的連線,
// 並且將 Channel 的型別指定為 NioServerSocketChannel 。
.channel(NioServerSocketChannel.class)
//使用指定的埠設定通訊端地址
//將本地地址設定為一個具有選定埠的 InetSocketAddress 。伺服器將繫結到這個地址以監聽新的連線請求
.localAddress(new InetSocketAddress(port))
//新增一個EchoServerHandler 到子Channel的 ChannelPipeline
//這裡使用了一個特殊的類——ChannelInitializer。這是關鍵。
// 當一個新的連線被接受時,一個新的子 Channel 將會被建立,而 ChannelInitializer 將會把一個你的
//EchoServerHandler 的範例新增到該 Channel 的 ChannelPipeline 中。正如我們之前所解釋的,
// 這個 ChannelHandler 將會收到有關入站訊息的通知。
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception {
//EchoServerHandler 被標註為 @Shareable,所以我們可以總是使用同樣的範例
//實際上所有使用者端都是使用的同一個EchoServerHandler
ch.pipeline().addLast(serverHandler);
}
});
//非同步地繫結伺服器,呼叫 sync()方法阻塞等待直到繫結完成
//sync()方法的呼叫將導致當前 Thread阻塞,一直到繫結操作完成為止
ChannelFuture f = b.bind().sync();
//獲取 Channel 的CloseFuture,並且阻塞當前線
//該應用程式將會阻塞等待直到伺服器的 Channel關閉(因為你在 Channel 的 CloseFuture 上呼叫了 sync()方法)
f.channel().closeFuture().sync();
} finally {
//關閉 EventLoopGroup,釋放所有的資源,包括所有被建立的執行緒
group.shutdownGracefully().sync();
}
}
}
我們總結一下伺服器實現中的重要步驟。下面這些是伺服器的主要程式碼元件:
到此我們的引導伺服器已經完成。
Echo 使用者端將會:
(1)連線到伺服器;
(2)傳送一個或者多個訊息;
(3)對於每個訊息,等待並接收從伺服器發回的相同的訊息;
(4)關閉連線。
編寫使用者端所涉及的兩個主要程式碼部分也是業務邏輯和引導,和你在伺服器中看到的一樣。
如同伺服器,使用者端將擁有一個用來處理資料的 ChannelInboundHandler。在這個場景下,我們將擴充套件 SimpleChannelInboundHandler 類以處理所有必須的任務。這要求重寫下面的方法:
具體程式碼可以參考如下:
package com.example.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @author lhd
* @date 2023/05/16 15:45
* @notes Netty 簡單的使用者端邏輯
*/
//標記該類的範例可以被多個 Channel 共用
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
//當被通知 Channel是活躍的時候,傳送一條訊息
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
//記錄已接收訊息的轉儲
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
}
//在發生異常時,記錄錯誤並關閉Channel
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
首先,我們重寫了 channelActive() 方法,其將在一個連線建立時被呼叫。這確保了資料將會被儘可能快地寫入伺服器,其在這個場景下是一個編碼了字串"Netty rocks!"的位元組緩衝區。
接下來,我們重寫了 channelRead0() 方法。每當接收資料時,都會呼叫這個方法。由伺服器傳送的訊息可能會被分塊接收。也就是說,如果伺服器傳送了 5 位元組,那麼不能保證這 5 位元組會被一次性接收。即使是對於這麼少量的資料,channelRead0()方法也可能會被呼叫兩次,第一次使用一個持有 3 位元組的 ByteBuf(Netty 的位元組容器),第二次使用一個持有 2 位元組的 ByteBuf。作為一個面向流的協定,TCP 保證了位元組陣列將會按照伺服器傳送它們的順序被接收。
ps:所以channelRead0()的呼叫次數不一定等於伺服器釋出訊息的次數
重寫的第三個方法是 exceptionCaught()。如同在 EchoServerHandler(3.1中的程式碼範例)中所示,記錄 Throwable,關閉 Channel,在這個場景下,終止到伺服器的連線。
ps:為什麼使用者端繼承SimpleChannelInboundHandler 而不是ChannelInboundHandler?
在使用者端,當 channelRead0()方法完成時,我們已經有了傳入訊息,並且已經處理完它了。當該方法返回時,SimpleChannelInboundHandler 負責釋放指向儲存該訊息的 ByteBuf 的記憶體參照。
在 EchoServerHandler 中,我們仍然需要將傳入訊息回送給傳送者,而 write()操作是非同步的,直到 channelRead()方法返回後可能仍然沒有完成。為此,EchoServerHandler擴充套件了 ChannelInboundHandlerAdapter,其在這個時間點上不會釋放訊息。訊息在 EchoServerHandler 的 channelReadComplete()方法中,當 writeAndFlush()方法被呼叫時被釋放。
引導使用者端類似於引導伺服器,不同的是,使用者端是使用主機和埠引數來連線遠端地址,也就是這裡的 Echo 伺服器的地址,而不是繫結到一個一直被監聽的埠。
package com.example.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* @author lhd
* @date 2023/05/16 15:59
* @notes 引導使用者端
*/
public class EchoClient {
public void start() throws Exception {
//指定 EventLoopGroup 以處理使用者端事件;需要適用於 NIO 的實現
EventLoopGroup group = new NioEventLoopGroup();
try {
//建立 Bootstrap
Bootstrap b = new Bootstrap();
b.group(group)
//適用於 NIO 傳輸的 Channel 型別
.channel(NioSocketChannel.class)
//設定伺服器的InetSocketAddress
.remoteAddress(new InetSocketAddress("127.0.0.1", 8080))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//在建立Channel時,向 ChannelPipeline中新增一個 EchoClientHandler 範例
ch.pipeline().addLast(new EchoClientHandler());}
});
//連線到遠端節點,阻塞等待直到連線完成
ChannelFuture f = b.connect().sync();
//阻塞,直到Channel 關閉
f.channel().closeFuture().sync();
} finally {
//關閉執行緒池並且釋放所有的資源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new EchoClient().start();
}
}
總結一下要點:
綜上使用者端的構建已經完成。
將我們上面的程式碼複製到IDEA中執行,先啟動伺服器端在啟動使用者端會得到以下預期效果:
伺服器端控制檯列印:
使用者端控制檯列印:
我們關閉伺服器端後,使用者端控制檯列印:
因為伺服器端關閉,觸發了使用者端 EchoClientHandler 中的exceptionCaught()方法,列印出了異常堆疊並關閉了連線。
這只是一個簡單的應用程式,但是它可以伸縮到支援數千個並行連線——每秒可以比普通的基於通訊端的 Java 應用程式處理多得多的訊息。