Netty實踐-處理基於流的傳輸


TCP/IP的基於流的傳輸中,接收的資料被儲存到通訊端接收緩衝器中。不幸的是,基於流的傳輸的緩衝器不是分組的佇列,而是位元組的佇列。 這意味著,即使將兩個訊息作為兩個獨立的資料包傳送,作業系統也不會將它們視為兩個訊息,而只是一組位元組(有點悲劇)。 因此,不能保證讀的是您在遠端定入的行資料。 例如,假設作業系統的TCP/IP堆疊已收到三個資料包:

由於基於流的協定的這種通用屬性,在應用程式中以下面的碎片形式(只是其中的一種)讀取它們的機會很高:

因此,接收部分,無論是伺服器側還是用戶端側,都應該將接收到的資料碎片整理成邏輯可由應用容易地理解的一個或多個有意義的影格。 在上述範例的情況下,接收的資料應該如下成影格:

針對上面的問題,下面列出了兩個解決方案。

第一個解決方案

現在我們回到TIME用戶端範例。在這裡有同樣的問題。 32位整數可以算是非常少量的資料量了,並且不可能經常被分段。 然而,問題是它可以分割,並且碎片的可能性將隨著流量增加而增加。

簡單的解決方案是建立一個內部累積緩衝區,並等待所有4個位元組被接收到內部緩衝區。 以下是修正的TimeClientHandler實現,它修復了問題:

package com.yiibai.netty.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();

        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelHandler有兩個生命週期偵聽器方法:handlerAdded()handlerRemoved()。 只要不會阻塞很長時間,您就可以執行任意初始化任務。

  2. 首先,所有接收到的資料應累加到buf中。

  3. 然後,處理程式必須檢查buf是否有足夠的資料(在此範例中為4個位元組),當足夠時就繼續進行實際的業務邏輯。否則,在有更多資料到達時Netty將再次呼叫channelRead()方法,最終累積到達4個位元組再執行實際的業務。

第二個解決方案

雖然第一個解決方案已經解決了TIME用戶端的問題,但修改的處理程式看起來不那麼乾淨。想象如果一個更複雜的協定,它由多個欄位組成,例如:可變長度欄位等。上面的ChannelInboundHandler實現很快就無法維護了。

可能已經注意到,可以向ChannelPipeline新增多個ChannelHandler,因此,可將一個單片的ChannelHandler拆分為多個模組,以降低應用程式的複雜性。 例如,可將TimeClientHandler拆分為兩個處理程式:

  • TimeDecoder處理碎片問題
  • TimeClientHandler的初始簡單版本

幸運的是,Netty提供了一個可延伸類,可以幫助我們方便地編寫:

package com.yiibai.netty.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderChannelInboundHandler的一個實現,它使得處理碎片問題變得容易。

  2. ByteToMessageDecoder在接收到新資料時,使用內部維護的累積緩衝區呼叫decode()方法。

  3. decode()可以決定在累積緩衝區中沒有足夠資料的情況下不新增任何東西。 當接收到更多資料時,ByteToMessageDecoder將再次呼叫decode()

  4. 如果decode()將物件新增到out,則意味著解碼器成功地解碼了訊息。 ByteToMessageDecoder將丟棄累積緩衝區的讀取部分。要記住,不需要解碼多個訊息。 ByteToMessageDecoder將繼續呼叫decode()方法,直到它沒有再有任何東西新增。

現在我們有另一個處理程式插入ChannelPipeline,應該在TimeClient中修改ChannelInitializer實現:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果您喜歡折騰,也可以想嘗試使用ReplayDecoder,這簡化了解碼器更多的工作。但需要參考API參考以獲得更多資訊。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

此外,Netty提供了現成的解碼器,使我們能夠非常容易地實現大多數的協定,並幫助您避免使用一個單一的不可維護的處理程式實現。有關更多詳細範例,請參閱以下範例:

二進位制協定實現: Netty實踐-factorial伺服器
基於文字行的協定實現: Netty實踐-telnet伺服器