Netty Protobuf處理粘包分析

2023-02-10 12:00:45

背景

最近訊息中介軟體專案進行聯調,我負責Server端,使用Java的Netty框架。同事負責Client端,使用Go的net包,訊息使用Protobuf序列化。聯調時Client傳送的訊息Server端解析出錯,經過分析發現是Server與Client粘包處理方式不一致導致,Server使用的是Protobuf提供的粘包處理方式,Client使用的是訊息頭定義長度的處理方式,探索一下Protobuf粘包處理方式有何不同。

編碼類

public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {

    @Override
    protected void encode(
            ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        int bodyLen = msg.readableBytes();
        int headerLen = computeRawVarint32Size(bodyLen);
        out.ensureWritable(headerLen + bodyLen);
        writeRawVarint32(out, bodyLen);
        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }

    /**
     * Writes protobuf varint32 to (@link ByteBuf).
     * @param out to be written to
     * @param value to be written
     */
    static void writeRawVarint32(ByteBuf out, int value) {
        while (true) {
            if ((value & ~0x7F) == 0) {
                out.writeByte(value);
                return;
            } else {
                out.writeByte((value & 0x7F) | 0x80);
                value >>>= 7;
            }
        }
    }

    /**
     * Computes size of protobuf varint32 after encoding.
     * @param value which is to be encoded.
     * @return size of value encoded as protobuf varint32.
     */
    static int computeRawVarint32Size(final int value) {
        if ((value & (0xffffffff <<  7)) == 0) {
            return 1;
        }
        if ((value & (0xffffffff << 14)) == 0) {
            return 2;
        }
        if ((value & (0xffffffff << 21)) == 0) {
            return 3;
        }
        if ((value & (0xffffffff << 28)) == 0) {
            return 4;
        }
        return 5;
    }
}

encode()方法

protected void encode(
            ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        // 獲取訊息長度
	int bodyLen = msg.readableBytes(); 
	// 計算表示訊息體長度所需的位元組數量
        int headerLen = computeRawVarint32Size(bodyLen);
	// 拿到所有需要寫入的資料長度,對緩衝區進行擴容
        out.ensureWritable(headerLen + bodyLen);
	// 將表示訊息體長度的位元組寫入緩衝區
        writeRawVarint32(out, bodyLen);
        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }

writeRawVarint32()方法

先看value & ~0x7F(value & 0x7F) | 0x80value >>>= 7這幾個看不懂的地方,&|~>>>=這些符號為計算機的位運運算元號,分別代表與、或、非、忽略符號位右移(a>>>=n 相當於 a = a>>>n

計算value & ~0x7F

分別假設value值為100200

100轉二進位制為01100100200轉二進位制為11001000

計算100 & ~0x7F

十進位制 十六進位制 運運算元


100 0x64 0 1 1 0 0 1 0 0
-128 ~0x7f & 1 0 0 0 0 0 0 0
0 0x00
0 0 0 0 0 0 0 0

計算200 & ~0x7F

十進位制 十六進位制 運運算元


200 0xc8 1 1 0 0 1 0 0 0
-128 ~0x7f & 1 0 0 0 0 0 0 0
128 0x80
1 0 0 0 0 0 0 0

這裡運算結果使用十進位制表示二進位制是不準確的,僅作參考,需要根據資料型別進行轉換,比如:10000000轉換為byte型別是-128,轉換為int是128

通過以上計算可以看出:

可以使用小於7個位表示的數位即可滿足條件,7個位可以表示$2^7=128$個數位,取值範圍是0~127,也就是說0~127可以滿足條件,這一步的目的是保證寫入表示訊息體長度的最後一位位元組是正數,後面會說到。

value=100滿足條件,所以向bytebuf中寫入位元組01100100,然後return方法結束。

value=200不滿足條件,那麼看(value & 0x7F) | 0x80這一步運算。

計算(value & 0x7F) | 0x80

十進位制 十六進位制 運運算元


200 0xc8 1 1 0 0 1 0 0 0
127 0x7f & 0 1 1 1 1 1 1 1
- - - - - - - - - - -
72 0x48
0 1 0 0 1 0 0 0
128 0x80 | 1 0 0 0 0 0 0 0
- - - - - - - - - - -
200 0xc8 1 1 0 0 1 0 0 0

計算結果還是200,我們分析一下步驟:

value & 0x7f:取出最後七個位,|0x80:將首位轉為1

即取出最後7個位,高位補1,正好一個位元組的長度,將11001000寫入bytebuf,再看value >>>= 7

計算value >>>= 7

十進位制 運運算元


200 1 1 0 0 1 0 0 0

>>>






1 1 0 0 1 0 0 0
1
0 0 0 0 0 0 0 1

忽略符號右移其實就是將後7位擠出去,在前邊補7個0。

計算機中一般首位是符號位,0表示正數,1表示負數。

這裡需要注意是>>表示右移,不改變符號,最高位與原來符號保持一致。 >>>是忽略符號位右移,最高位補0。

200 >>>= 7的結果為00000001,繼續走if判斷,滿足條件,將00000001寫入bytebuf

最終value=200寫入bytebuf的位元組是1100100000000001

至此,三個看不懂的位運算都理解了,那麼我們連起來看一下:

如果value可以用7個位元組表示(或者說是value在0~127範圍內),將value轉換為位元組寫入bytebuf,跳出迴圈,方法結束。

如果value不能用7個位元組表示(或者說是value不在0~127範圍內),取最後7個位,高位補1,寫入bytebuf中,右移7位(將剛才取出的7位刪掉),再次判斷是否滿足if條件,不滿足就繼續上面的操作,直到滿足條件為止。

總結一下writeRawVarint32方法,其實是把一個整數拆分成多個位元組,倒序寫入bytebuf中,如果將每個位元組轉換為byte型別,最後一個位元組總是正數,前面的位元組都是負數。我們可以猜測,接收訊息時以第一個正數為分割,將表示訊息體長度的位元組與訊息體位元組拆分開,再通過位運算將前者組合起來就得到了訊息體的長度。

computeRawVarint32Size()方法

我們在writeRawVarint32方法分析中瞭解了位運算,再看computeRawVarint32Size方法就很簡單了。

計算機表示負數

0xffffffff轉換為二進位制是11111111 1111111 11111111 11111111轉換為有有符號int型別是-1。為什麼是-1?

因為計算機使用二進位制可以做加法運算,但是沒辦法做減法運算,加上一個負數就相當於做了減法運算,現在問題是如何表示負數?

曾經有原碼錶示法、反碼錶示法,這裡不做贅述,現在使用的是二補數表示法。

二補數表示法是將正數的二進位制取反,然後在最後一位+1。

通過例子看一下:

有符號int型別的1用二進位制可以表示為00000000 0000000 00000000 0000001取反得到11111111 11111111 11111111 11111110+1得到11111111 11111111 11111111 11111111轉換為十六進位制是0xffffffff

計算value & (0xffffffff << 7)

<<表示左移,從左邊擠出去7個位,在右邊補7個0。

這裡仍然假設value分為為100,200。

// 計算(100 & (0xffffffff <<  7))
  00000000 0000000 00000000 01100100 // 100
& 11111111 1111111 11111111 10000000 // 0xffffffff <<  7 
  00000000 0000000 00000000 00000000 // 結果:0

// 計算(200 & (0xffffffff <<  7))
  00000000 0000000 00000000 11001000 // 200
& 11111111 1111111 11111111 10000000 // 0xffffffff <<  7
  00000000 0000000 00000000 10000000 // 結果:128

// 計算(200 & (0xffffffff <<  14))
  00000000 0000000 00000000 11001000 // 200
& 11111111 1111111 11000000 00000000 // 0xffffffff <<  14
  00000000 0000000 00000000 00000000 // 結果:0

從以上計算可以看出,如果value可以用小於7個位來表示,則左移7個位可以滿足,如果value可以用8~14個位來表示,左移14個位可以滿足。

100、200計算結果分別為1、2,與writeRawVarint32方法寫入的位元組數量一致。

writeRawVarint32是方法7個7個的取出位,這裡按7個位來計算所需位元組數量,最終返回表示訊息體長度的位元組數量。

解碼類

public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {

    // TODO maxFrameLength + safe skip + fail-fast option
    //      (just like LengthFieldBasedFrameDecoder)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        in.markReaderIndex();
        int preIndex = in.readerIndex();
        int length = readRawVarint32(in);
        if (preIndex == in.readerIndex()) {
            return;
        }
        if (length < 0) {
            throw new CorruptedFrameException("negative length: " + length);
        }

        if (in.readableBytes() < length) {
            in.resetReaderIndex();
        } else {
            out.add(in.readRetainedSlice(length));
        }
    }

    /**
     * Reads variable length 32bit int from buffer
     *
     * @return decoded int if buffers readerIndex has been forwarded else nonsense value
     */
    private static int readRawVarint32(ByteBuf buffer) {
        if (!buffer.isReadable()) {
            return 0;
        }
        buffer.markReaderIndex();
        byte tmp = buffer.readByte();
        if (tmp >= 0) {
            return tmp;
        } else {
            int result = tmp & 127;
            if (!buffer.isReadable()) {
                buffer.resetReaderIndex();
                return 0;
            }
            if ((tmp = buffer.readByte()) >= 0) {
                result |= tmp << 7;
            } else {
                result |= (tmp & 127) << 7;
                if (!buffer.isReadable()) {
                    buffer.resetReaderIndex();
                    return 0;
                }
                if ((tmp = buffer.readByte()) >= 0) {
                    result |= tmp << 14;
                } else {
                    result |= (tmp & 127) << 14;
                    if (!buffer.isReadable()) {
                        buffer.resetReaderIndex();
                        return 0;
                    }
                    if ((tmp = buffer.readByte()) >= 0) {
                        result |= tmp << 21;
                    } else {
                        result |= (tmp & 127) << 21;
                        if (!buffer.isReadable()) {
                            buffer.resetReaderIndex();
                            return 0;
                        }
                        result |= (tmp = buffer.readByte()) << 28;
                        if (tmp < 0) {
                            throw new CorruptedFrameException("malformed varint.");
                        }
                    }
                }
            }
            return result;
        }
    }
}

readRawVarint32()方法

方法就不細看了,驗證一下我們之前的猜測。還是使用value=200寫入的1100100000000001位元組舉例看一下。

private static int readRawVarint32(ByteBuf buffer) {
        if (!buffer.isReadable()) {
            return 0;
        }
        buffer.markReaderIndex();
	// 讀取第一個位元組
        byte tmp = buffer.readByte(); // tmp = 11001000 首位是1,是個負數,小於0
	// 判斷是否大於等於0,
	// 大於等於0說明是最後一個表示訊息體長度的位元組,直接return
        if (tmp >= 0) {
            return tmp;
        } else {
	    // 小於0 tmp & 127 取出後七位
            int result = tmp & 127; // result = 11001000 & 01111111 = 01001000
            if (!buffer.isReadable()) {
                buffer.resetReaderIndex();
                return 0;
            }
	    // 再取第二個位元組,判斷是否大於等於0 
            if ((tmp = buffer.readByte()) >= 0) { //tmp = 00000001  
		// 這一步操作相當於是把上一步取出的7個位元組拿出來拼在tmp<<7的後面
                result |= tmp << 7;
		// result = 01001000 | tmp << 7
		// tmp << 7 = 10000000
                // 01001000 | 10000000 = 11001000 轉換為int型別是200
            }
	    // 後面的程式碼與以上步驟大同小異,不再贅述了
            return result;
        }
    }

總結

涉及到的基礎知識:計算機表示整數、位運算、進位制轉換

一般處理粘包的方式有三種:

  1. 定長訊息:每次傳送訊息的長度固定,比如,總是傳送100個位元組。

  2. 特殊符號分割:以特殊字元作為分隔符,讀到特殊字元時,認為上一條訊息結束。

  3. 訊息頭定義長度:在訊息體前增加訊息體的長度,一般使用四個位元組,讀取訊息時先讀取四個位元組,得到訊息體長度,再根據長度讀取訊息。

Netty Protobuf提供的處理粘包處理方式是在訊息體前加正負數,並且以第一個正數作為分割。可以說是訊息頭定義長度方式+特殊符號分割方式的結合版。巧妙利用二進位制的位運算和計算機表示整數的特點實現動態訊息長度,傳送較短訊息時可以比訊息頭定義長度的方式節省1-3個位元組。

部落格小白的第一篇檔案,如有錯誤,還望指正。