Netty實戰(二)

2023-05-24 12:00:26

一、環境準備

Netty需要的執行環境很簡單,只有2個。

  • JDK 1.8+
  • Apache Maven 3.3.9+

二、Netty 使用者端/伺服器概覽


如圖,展示了一個我們將要編寫的 Echo 使用者端和伺服器應用程式。該圖展示是多個使用者端同時連線到一臺伺服器。所能夠支援的使用者端數量,在理論上,僅受限於系統的可用資源(以及所使用的 JDK 版本可能會施加的限制)。

Echo 使用者端和伺服器之間的互動是非常簡單的;在使用者端建立一個連線之後,它會向伺服器傳送一個或多個訊息,反過來伺服器又會將每個訊息回送給使用者端。雖然它本身看起來好像用處不大,但它充分地體現了使用者端/伺服器系統中典型的請求-響應互動模式

三、編寫 Echo 伺服器

所有的 Netty 伺服器都需要以下兩部分。

  • 至少一個 ChannelHandler—該元件實現了伺服器對從使用者端接收的資料的處理,即它的業務邏輯。
  • 引導—這是設定伺服器的啟動程式碼。至少,它會將伺服器繫結到它要監聽連線請求的埠上。

3.1 ChannelHandler 和業務邏輯

上一篇博文我們介紹了 Future 和回撥,並且闡述了它們在事件驅動設計中的應用。我們還討論了 ChannelHandler,它是一個介面族的父介面,它的實現負責接收並響應事件通知。

在 Netty 應用程式中,所有的資料處理邏輯都包含在這些核心抽象的實現中。因為你的 Echo 伺服器會響應傳入的訊息,所以它需要實現ChannelInboundHandler 介面,用來定義響應入站事件的方法。簡單的應用程式只需要用到少量的這些方法,所以繼承 ChannelInboundHandlerAdapter 類也就足夠了,它提供了ChannelInboundHandler 的預設實現。

我們將要用到的方法是:

  • channelRead() :對於每個傳入的訊息都要呼叫;
  • channelReadComplete() : 通知ChannelInboundHandler最後一次對channelRead()的呼叫是當前批次讀取中的最後一條訊息;
  • exceptionCaught() :在讀取操作期間,有異常丟擲時會呼叫。

該 Echo 伺服器的 ChannelHandler 實現是 EchoServerHandler,如程式碼:

package com.example.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author lhd
 * @date 2023/05/16 15:05
 * @notes Netty Echo伺服器端簡單邏輯
 */

//表示channel可以並多個範例共用,它是執行緒安全的
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        //將訊息列印到控制檯
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
        //將收到的訊息寫給傳送者,而不沖刷出站訊息
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //將未決訊息沖刷到遠端節點,並且關閉該 Channe
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //列印異常堆疊跟蹤
        cause.printStackTrace();
        //關閉該channel
        ctx.close();
    }
}

ChannelInboundHandlerAdapter 有一個直觀的 API,並且它的每個方法都可以被重寫以掛鉤到事件生命週期的恰當點上。

因為需要處理所有接收到的資料,所以我們重寫了 channelRead() 方法。在這個伺服器應用程式中,我們將資料簡單地回送給了遠端節點。

重寫 exceptionCaught() 方法允許我們對 Throwable 的任何子型別做出反應,在這裡你記錄了異常並關閉了連線。

雖然一個更加完善的應用程式也許會嘗試從異常中恢復,但在這個場景下,只是通過簡單地關閉連線來通知遠端節點發生了錯誤。

ps:如果不捕獲異常,會發生什麼呢?

每個 Channel 都擁有一個與之相關聯的 ChannelPipeline,其持有一個 ChannelHandler 的範例鏈。在預設的情況下,ChannelHandler 會把對它的方法的呼叫轉發給鏈中的下一個 ChannelHandler。因此,如果 exceptionCaught()方法沒有被該鏈中的某處實現,那麼所接收的異常將會被傳遞到 ChannelPipeline 的尾端並被記錄。為此,你的應用程式應該提供至少有一個實現exceptionCaught()方法的 ChannelHandler。

除了 ChannelInboundHandlerAdapter 之外,還有很多需要學習ChannelHandler的子型別和實現。這些之後會一一說明,目前,我們只關注:

  • 針對不同型別的事件來呼叫 ChannelHandler;
  • 應用程式通過實現或者擴充套件 ChannelHandler 來掛鉤到事件的生命週期,並且提供自定義的應用程式邏輯;
  • 在架構上,ChannelHandler 有助於保持業務邏輯與網路處理程式碼的分離。這簡化了開發過程,因為程式碼必須不斷地演化以響應不斷變化的需求。

3.2 引導伺服器

下面我們準備開始構建伺服器。構建伺服器涉及到兩個內容:

  • 繫結到伺服器將在其上監聽並接受傳入連線請求的埠;
  • 設定 Channel,以將有關的入站訊息通知給 EchoServerHandler 範例。
package com.example.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * @author lhd
 * @date 2023/05/16 15:21
 * @notes Netty引導伺服器
 */
public class EchoServer {

    public static void main(String[] args) throws Exception {
        //呼叫伺服器的 start()方法
        new EchoServer().start();
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //建立EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //建立ServerBootstra
            ServerBootstrap b = new ServerBootstrap();
            //指定伺服器監視埠
             int port = 8080;
            b.group(group)
                    //指定所使用的 NIO 傳輸 Channel
                    //因為我們正在使用的是 NIO 傳輸,所以你指定了 NioEventLoopGroup 來接受和處理新的連線,
                    // 並且將 Channel 的型別指定為 NioServerSocketChannel 。
                    .channel(NioServerSocketChannel.class)
                    //使用指定的埠設定通訊端地址
                    //將本地地址設定為一個具有選定埠的 InetSocketAddress 。伺服器將繫結到這個地址以監聽新的連線請求
                    .localAddress(new InetSocketAddress(port))
                    //新增一個EchoServerHandler 到子Channel的 ChannelPipeline
                    //這裡使用了一個特殊的類——ChannelInitializer。這是關鍵。
                    // 當一個新的連線被接受時,一個新的子 Channel 將會被建立,而 ChannelInitializer 將會把一個你的
                    //EchoServerHandler 的範例新增到該 Channel 的 ChannelPipeline 中。正如我們之前所解釋的,
                    // 這個 ChannelHandler 將會收到有關入站訊息的通知。
                    .childHandler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //EchoServerHandler 被標註為 @Shareable,所以我們可以總是使用同樣的範例
                            //實際上所有使用者端都是使用的同一個EchoServerHandler
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            //非同步地繫結伺服器,呼叫 sync()方法阻塞等待直到繫結完成
            //sync()方法的呼叫將導致當前 Thread阻塞,一直到繫結操作完成為止
            ChannelFuture f = b.bind().sync();
            //獲取 Channel 的CloseFuture,並且阻塞當前線
            //該應用程式將會阻塞等待直到伺服器的 Channel關閉(因為你在 Channel 的 CloseFuture 上呼叫了 sync()方法)
            f.channel().closeFuture().sync();
        } finally {
            //關閉 EventLoopGroup,釋放所有的資源,包括所有被建立的執行緒
            group.shutdownGracefully().sync();
        }
    }
}

我們總結一下伺服器實現中的重要步驟。下面這些是伺服器的主要程式碼元件:

  • EchoServerHandler 實現了業務邏輯;
  • main()方法引導了伺服器;
    引導過程中所需要的步驟如下:
    • 建立一個 ServerBootstrap 的範例以引導和繫結伺服器;
    • 建立並分配一個 NioEventLoopGroup 範例以進行事件的處理,如接受新連線以及讀/寫資料;
    • 指定伺服器繫結的原生的 InetSocketAddress;
    • 使用一個 EchoServerHandler 的範例初始化每一個新的 Channel;
    • 呼叫 ServerBootstrap.bind()方法以繫結伺服器。

到此我們的引導伺服器已經完成。

四、編寫 Echo 使用者端

Echo 使用者端將會:
(1)連線到伺服器;
(2)傳送一個或者多個訊息;
(3)對於每個訊息,等待並接收從伺服器發回的相同的訊息;
(4)關閉連線。
編寫使用者端所涉及的兩個主要程式碼部分也是業務邏輯和引導,和你在伺服器中看到的一樣。

4.1 通過 ChannelHandler 實現使用者端邏輯

如同伺服器,使用者端將擁有一個用來處理資料的 ChannelInboundHandler。在這個場景下,我們將擴充套件 SimpleChannelInboundHandler 類以處理所有必須的任務。這要求重寫下面的方法:

  • channelActive() : 在到伺服器的連線已經建立之後將被呼叫;
  • channelRead0() : 當從伺服器接收到一條訊息時被呼叫;
  • exceptionCaught() :在處理過程中引發異常時被呼叫。

具體程式碼可以參考如下:

package com.example.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * @author lhd
 * @date 2023/05/16 15:45
 * @notes Netty 簡單的使用者端邏輯
 */

//標記該類的範例可以被多個 Channel 共用
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    //當被通知 Channel是活躍的時候,傳送一條訊息
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }

    //記錄已接收訊息的轉儲
    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
        System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
    }

    //在發生異常時,記錄錯誤並關閉Channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

首先,我們重寫了 channelActive() 方法,其將在一個連線建立時被呼叫。這確保了資料將會被儘可能快地寫入伺服器,其在這個場景下是一個編碼了字串"Netty rocks!"的位元組緩衝區。

接下來,我們重寫了 channelRead0() 方法。每當接收資料時,都會呼叫這個方法。由伺服器傳送的訊息可能會被分塊接收。也就是說,如果伺服器傳送了 5 位元組,那麼不能保證這 5 位元組會被一次性接收。即使是對於這麼少量的資料,channelRead0()方法也可能會被呼叫兩次,第一次使用一個持有 3 位元組的 ByteBuf(Netty 的位元組容器),第二次使用一個持有 2 位元組的 ByteBuf。作為一個面向流的協定,TCP 保證了位元組陣列將會按照伺服器傳送它們的順序被接收。

ps:所以channelRead0()的呼叫次數不一定等於伺服器釋出訊息的次數

重寫的第三個方法是 exceptionCaught()。如同在 EchoServerHandler(3.1中的程式碼範例)中所示,記錄 Throwable,關閉 Channel,在這個場景下,終止到伺服器的連線。

ps:為什麼使用者端繼承SimpleChannelInboundHandler 而不是ChannelInboundHandler?

在使用者端,當 channelRead0()方法完成時,我們已經有了傳入訊息,並且已經處理完它了。當該方法返回時,SimpleChannelInboundHandler 負責釋放指向儲存該訊息的 ByteBuf 的記憶體參照。

在 EchoServerHandler 中,我們仍然需要將傳入訊息回送給傳送者,而 write()操作是非同步的,直到 channelRead()方法返回後可能仍然沒有完成。為此,EchoServerHandler擴充套件了 ChannelInboundHandlerAdapter,其在這個時間點上不會釋放訊息。訊息在 EchoServerHandler 的 channelReadComplete()方法中,當 writeAndFlush()方法被呼叫時被釋放。

4.2 引導使用者端

引導使用者端類似於引導伺服器,不同的是,使用者端是使用主機和埠引數來連線遠端地址,也就是這裡的 Echo 伺服器的地址,而不是繫結到一個一直被監聽的埠。

package com.example.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
 * @author lhd
 * @date 2023/05/16 15:59
 * @notes 引導使用者端
 */
public class EchoClient {
  
    public void start() throws Exception {
        //指定 EventLoopGroup 以處理使用者端事件;需要適用於 NIO 的實現
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //建立 Bootstrap
            Bootstrap b = new Bootstrap();
            b.group(group)
                    //適用於 NIO 傳輸的 Channel 型別
                    .channel(NioSocketChannel.class)
                    //設定伺服器的InetSocketAddress
                    .remoteAddress(new InetSocketAddress("127.0.0.1", 8080))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //在建立Channel時,向 ChannelPipeline中新增一個 EchoClientHandler 範例
                            ch.pipeline().addLast(new EchoClientHandler());}
                    });
            //連線到遠端節點,阻塞等待直到連線完成
            ChannelFuture f = b.connect().sync();
            //阻塞,直到Channel 關閉
            f.channel().closeFuture().sync();
        } finally {
            //關閉執行緒池並且釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }
 public static void main(String[] args) throws Exception {
        new EchoClient().start();
    }
}

總結一下要點:

  • 為初始化使用者端,建立了一個 Bootstrap 範例;
  • 為進行事件處理分配了一個 NioEventLoopGroup 範例,其中事件處理包括建立新的連線以及處理入站和出站資料;
  • 為伺服器連線建立了一個 InetSocketAddress 範例;
  • 當連線被建立時,一個 EchoClientHandler 範例會被安裝到(該 Channel 的)
    ChannelPipeline 中;
  • 在一切都設定完成後,呼叫 Bootstrap.connect()方法連線到遠端節點;

綜上使用者端的構建已經完成。

五、構建和執行 Echo 伺服器和使用者端

將我們上面的程式碼複製到IDEA中執行,先啟動伺服器端在啟動使用者端會得到以下預期效果:

伺服器端控制檯列印:

使用者端控制檯列印:

我們關閉伺服器端後,使用者端控制檯列印:

因為伺服器端關閉,觸發了使用者端 EchoClientHandler 中的exceptionCaught()方法,列印出了異常堆疊並關閉了連線。

這只是一個簡單的應用程式,但是它可以伸縮到支援數千個並行連線——每秒可以比普通的基於通訊端的 Java 應用程式處理多得多的訊息。