Dubbo原始碼閱讀分享系列文章,歡迎大家關注點贊
在網路互動中是以位元組流的形式傳遞的,對於位元組流都是二進位制格式,這樣我們就面臨一個問題就是如何轉化為我們可以識別的字元,協定就是來解決這個問題的,協定用通俗易懂地解釋就是通訊雙方需要遵循的約定。 在日常開發中,我們常見的網路傳輸協定有TCP、UDP、HTTP。常用的中介軟體也會定義對應的協定,如Redis、Mysql、Zookeeper等都有自己約定的協定,同樣Dubbo的通訊也採用一種協定,這些都是應用層協定,都是基於TCP或者UDP設計的。
應用層協定一般的形式有三種:定長協定、特殊結束符和變長協定,聊到這裡就可以丟擲來一個常見的面試題,如何解決網路通訊粘包和拆包的問題?該問題的解決方案也就是通過約定協定,下面我們就來聊聊這三種模式優缺點以及使用場景。
定長的協定是指協定內容的長度是固定的,比如協定byte長度是50,當從網路上讀取50個byte後,就進行decode解碼操作。
定長協定在讀取或者寫入時,效率比較高,因為資料大小都是確定的。
定長協定的缺點在於適應性不足,網路傳輸中傳輸的內容的大小不可能都是相同的,因此對於一些長度不夠的訊息,明顯過於的浪費頻寬。
特殊結束符就是在每次傳輸結束的時候使用一個特殊的結束符,在Redis中的協定採用了特殊結束符,使用者端和伺服器傳送的命令一律使用\r\n(CRLF)結尾。
與定長協定一樣讀取或者寫入時,效率比較高,同時解決定長協定的尷尬。
特殊結束符方式的問題是必須要有一個完整的訊息體才能進行傳輸,除此之外必須要防止使用者傳輸的資料不能同結束符相同,否則就會出現紊亂。
變長協定由定長以及不定長兩部分組成,定長部分一般是協定頭,此部分會包含變長部分的描述,變長協定我們經常使用的HTTP協定採用變長協定,HTTP請求報文格式是由三部分組成:
靈活性比較高,解決了定長協定以及特殊結束符的所有缺點。
複雜性比較高,需要自定義一套標準,所有訊息都需要按照該格式傳送以及解析。
Dubbo框架支援很多協定,預設採用Dubbo協定,Dubbo協定採用的是變長協定的設計,整體的格式如下:
Dubbo協定整體設計比較簡潔,能採用1個bit表示的,不會用一個byte來表示;此外請求頭和響應頭一致,整體採用一套解析標準就可以,程式碼實現起來相對簡單。
由於整體的設計相對簡潔,導致擴充套件性不夠;
在通訊篇中我們講過Codec2該介面,該介面提供了encode和decode個方法來實現訊息與位元組流之間的相互轉換,關於該介面的實現我們沒有講解,這裡我們來看看此部分和Dubbo協定有什麼關係。 AbstractCodec抽象類沒有實現Codec2中定義的介面方法,而是提供了幾個給子類用的基礎方法。
接下來我們就來聊聊子類如何被解析的,我們可以看到四個子類的繼承關係,重點介紹的是ExchangeCodec和DubboCodec,其他就是做一下簡單介紹。 TransportCodec該類已經被標註為棄用,該類內部也就是根據getSerialization方法選擇的序列化方法,對傳入訊息或ChannelBuffer進行序列化或反序列化。 TelnetCodec繼承了TransportCodec的能力,該類主要是提供了對Telnet命令處理的能力,該功能主要是對服務進行治理的功能,這裡後續我們畫一點時間來進行介紹。
ExchangeCodec繼承了TelnetCodec,在該類基礎上增加Dubbo協定頭的處理能力,接下來我們首先來看下其核心欄位,
//協定頭長度
protected static final int HEADER_LENGTH = 16;
//魔數 判斷是否是Dubbo協定
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
//設定請求響應標誌位
protected static final byte FLAG_REQUEST = (byte) 0x80;
//單向還是雙向標誌位
protected static final byte FLAG_TWOWAY = (byte) 0x40;
//是否事件訊息標誌位
protected static final byte FLAG_EVENT = (byte) 0x20;
//序列化協定標誌位
protected static final int SERIALIZATION_MASK = 0x1f;
通過核心欄位我們可以發現其實和我們介紹的Dubbo的協定是一致的,因此接下來的encode和decode就是對Dubbo協定頭的解密和編碼,我們來下看encode方法,在encode方法中會根據需要編碼的訊息型別進行分類, 分為三類:Request、Response、telenet,encodeRequest方法專門對Request物件進行編碼,encodeResponse方法對Response物件進行編碼。
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
//Request
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
//Response
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
//telenet
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel, req);
//儲存協定頭
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
//設定協定頭標誌位
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
//記錄請求ID
Bytes.long2bytes(req.getId(), header, 4);
//序列化請求 並統計序列化以後位元組數
int savedWriteIndex = buffer.writerIndex();
//將寫入位置後移16位元
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
//請求序列化
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
//是否心跳檢查 為空就是心跳檢查
if (req.isHeartbeat()) {
// heartbeat request data is always null
bos.write(CodecSupport.getNullBytesOf(serialization));
} else {
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
//事件序列化
if (req.isEvent()) {
//事件序列化
encodeEventData(channel, out, req.getData());
} else {
//Dubbo請求序列化
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
}
bos.flush();
bos.close();
//獲取位元組數
int len = bos.writtenBytes();
//檢查位元組長度
checkPayload(channel, len);
//將位元組數寫入header陣列中
Bytes.int2bytes(len, header, 12);
//重置寫入位置
buffer.writerIndex(savedWriteIndex);
//寫入訊息頭
buffer.writeBytes(header);
//buffer寫出去的位置從writeIndex開始 加上header長度 資料長度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
int savedWriteIndex = buffer.writerIndex();
try {
//序列化
Serialization serialization = getSerialization(channel, res);
//協定頭 長度為16位元組
byte[] header = new byte[HEADER_LENGTH];
//魔數
Bytes.short2bytes(MAGIC, header);
//序列化方式
header[2] = serialization.getContentTypeId();
//心跳還是正常訊息
if (res.isHeartbeat()) {
header[2] |= FLAG_EVENT;
}
//響應狀態
byte status = res.getStatus();
header[3] = status;
//設定請求ID
Bytes.long2bytes(res.getId(), header, 4);
//寫入時候真需要加上協定頭長度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
//對響應資訊進行編碼
if (status == Response.OK) {
if(res.isHeartbeat()){
//心跳
bos.write(CodecSupport.getNullBytesOf(serialization));
}else {
//正常響應
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (res.isEvent()) {
encodeEventData(channel, out, res.getResult());
} else {
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
}
} else {
//錯誤訊息
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
out.writeUTF(res.getErrorMessage());
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
}
bos.flush();
bos.close();
//寫入的長度
int len = bos.writtenBytes();
//檢查訊息長度
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
//重置寫入位置
buffer.writerIndex(savedWriteIndex);
//寫入訊息頭
buffer.writeBytes(header);
//buffer寫出去的位置從writeIndex開始 加上header長度 資料長度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
// clear buffer
buffer.writerIndex(savedWriteIndex);
// send error message to Consumer, otherwise, Consumer will wait till timeout.
if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
Response r = new Response(res.getId(), res.getVersion());
r.setStatus(Response.BAD_RESPONSE);
if (t instanceof ExceedPayloadLimitException) {
logger.warn(t.getMessage(), t);
try {
r.setErrorMessage(t.getMessage());
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
}
} else {
// FIXME log error message in Codec and handle in caught() of IoHanndler?
logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
try {
r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
}
}
}
// Rethrow exception
if (t instanceof IOException) {
throw (IOException) t;
} else if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new RuntimeException(t.getMessage(), t);
}
}
}
ExchangeCodec的decode方法是encode方法的逆過程,會先檢查魔數,然後讀取協定頭和後續訊息的長度,最後根據協定頭中的各個標誌位構造相應的物件,以及反序列化資料。
在ExchangeCodecencode的encode方法中,不論是encodeRequest還是encodeResponse都呼叫encodeRequestData方法,該方法會對Boby內容進行編碼,該方法實現是在DubboCodec中,因此DubboCodec是對訊息體的編解碼,接下來我們來看下encodeRequestData和encodeResponseData方法的實現,
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
//dubbo服務版本
out.writeUTF(version);
// https://github.com/apache/dubbo/issues/6138
String serviceName = inv.getAttachment(INTERFACE_KEY);
if (serviceName == null) {
//服務path
serviceName = inv.getAttachment(PATH_KEY);
}
//服務名
out.writeUTF(serviceName);
//版本號
out.writeUTF(inv.getAttachment(VERSION_KEY));
//方法名
out.writeUTF(inv.getMethodName());
//方法型別描述
out.writeUTF(inv.getParameterTypesDesc());
Object[] args = inv.getArguments();
if (args != null) {
for (int i = 0; i < args.length; i++) {
//引數值
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
}
//附加屬性
out.writeAttachments(inv.getObjectAttachments());
}
@Override
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
Result result = (Result) data;
//檢驗版本
boolean attach = Version.isSupportResponseAttachment(version);
Throwable th = result.getException();
if (th == null) {
Object ret = result.getValue();
if (ret == null) {
//空結果
out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
} else {
//正常寫入
out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
out.writeObject(ret);
}
} else {
//異常
out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
out.writeThrowable(th);
}
if (attach) {
//Dubbo版本號
result.getObjectAttachments().put(DUBBO_VERSION_KEY, Version.getProtocolVersion());
out.writeAttachments(result.getObjectAttachments());
}
}
歡迎大家點點關注,點點贊!