RPC是遠過程呼叫(Remote Procedure Call)的縮寫形式,其區別於一個程式內部基本的過程呼叫(或者叫函數/方法呼叫)。
隨著應用程式變得越來越複雜,在單個機器上中僅通過一個程序來執行整個應用程式的方式已經難以滿足現實中日益增長的需求。
開發者對應用程式進行模組化的拆分,以分散式部署的方式來降低程式整體的複雜度和提升效能方面的可拓展性(分而治之的思想)。
拆分後部署在不同機器上的各個模組無法像之前那樣通過記憶體定址的方式來互相存取,而是需要通過網路來進行通訊。
RPC最主要的功能就是在提供不同模組服務間的網路通訊能力的同時,又儘可能的不丟失本地呼叫時語意的簡潔性。rpc可以認為是分散式系統中類似人體經絡一樣的基礎設施,因此有必要對其工作原理有一定的瞭解。
要學習rpc的原理,理論上最好的辦法就是去看流行的開源框架原始碼。但dubbo這樣成熟的rpc框架由於已經迭代了很多年,為了滿足多樣的需求而有著複雜的架構和龐大的程式碼量。對於普通初學者來說往往很難從層層抽象封裝中把握住關於rpc框架最核心的內容。
MyRpc是我最近在學習MIT6.824分散式系統公開課時,使用java並基於netty實現的一個簡易rpc框架,實現的過程中許多地方都參考了dubbo以及一些demo級別的rpc框架。
MyRpc是demo級別的框架,理解起來會輕鬆不少。在對基礎的rpc實現原理有一定了解後,能對後續研究dubbo等開源rpc框架帶來很大的幫助。
目前MyRpc實現了以下功能
限於篇幅,以上功能會拆分為兩篇部落格分別介紹。其中前3個功能實現了基本的對等通訊的rpc功能,將在本篇部落格中結合原始碼詳細分析。
MyRpc架構圖
MyRpc是以netty為基礎的,下面展示一個最基礎的netty使用者端/伺服器端互動的demo。
netty伺服器端:
/**
* 最原始的netty伺服器端demo
* */
public class PureNettyServer {
public static void main(String[] args) throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
EventLoopGroup workerGroup = new NioEventLoopGroup(8,new DefaultThreadFactory("NettyServerWorker", true));
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// 實際呼叫業務方法的處理器
.addLast("serverHandler", new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf requestByteBuf) {
String requestStr = requestByteBuf.toString(CharsetUtil.UTF_8);
System.out.println("PureNettyServer read request=" + JsonUtil.json2Obj(requestStr, User.class));
// 伺服器端響應echo
ByteBuf byteBuf = Unpooled.copiedBuffer("echo:" + requestStr,CharsetUtil.UTF_8);
channelHandlerContext.writeAndFlush(byteBuf);
}
})
;
}
});
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 8888).sync();
System.out.println("netty server started!");
// 一直阻塞在這裡
channelFuture.channel().closeFuture().sync();
}
}
netty使用者端:
/**
* 最原始的netty使用者端demo
* */
public class PureNettyClient {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8,
new DefaultThreadFactory("NettyClientWorker", true));
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
.addLast("clientHandler", new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf responseByteBuf) {
String responseStr = responseByteBuf.toString(CharsetUtil.UTF_8);
System.out.println("PureNettyClient received response=" + responseStr);
}
})
;
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
Channel channel = channelFuture.sync().channel();
// 傳送一個user物件的json串
User user = new User("Tom",10);
ByteBuf requestByteBuf = Unpooled.copiedBuffer(JsonUtil.obj2Str(user), CharsetUtil.UTF_8);
channel.writeAndFlush(requestByteBuf);
System.out.println("netty client send request success!");
channelFuture.channel().closeFuture().sync();
}
}
上面展示了一個最基礎的netty網路通訊的demo,似乎一個對等的傳輸功能已經得到了良好的實現。
但作為一個rpc框架,還需要解決tcp傳輸層基於位元組流的訊息黏包/拆包問題。
作業系統實現的傳輸層tcp協定中,向上層的應用保證盡最大可能的(best effort delivery)、可靠的傳輸位元組流,但並不關心實際傳輸的封包是否總是符合應用層的要求。
一個封包中可能同時存在黏包問題和拆包問題(如下圖所示)
黏包拆包示意圖
解決黏包/拆包問題最核心的思路是,如何知道一個應用層完整請求的邊界。
對於黏包問題,基於邊界可以獨立的拆分出每一個請求;對於拆包問題,如果發現收到的封包末尾沒有邊界,則繼續等待新的封包,直到發現邊界後再一併上交給應用程式。
主流的解決黏包拆包的應用層協定設計方案有三種:
介紹 | 優點 | 缺點 | |
---|---|---|---|
1.基於固定長度的協定 | 每個訊息都是固定的大小,如果實際上小於固定值,則需要填充 | 簡單;易於實現 | 固定值過大,填充會浪費大量傳輸頻寬;固定值過小則限制了可用的訊息體大小 |
2.基於特殊分隔符的協定 | 約定一個特殊的分隔符,以這個分割符為訊息邊界 | 簡單;且訊息體長度是可變的,效能好 | 訊息體的業務資料不允許包含這個特殊分隔符,否則會錯誤的拆分封包。因此相容性較差 |
3.基於業務資料長度編碼的協定 | 設計一個固定大小的訊息請求頭(比如固定16位元組、20位元組大小),在訊息請求頭中包含實際的業務訊息體長度 | 訊息體長度可變,效能好;對業務資料內容無限制,相容性也好 | 實現起來稍顯複雜 |
對於流行的rpc框架,一般都是選用效能與相容性皆有的方案3:即自己設計一個固定大小的、包含了請求體長度欄位的請求頭。MyRpc參考dubbo,也設計了一個固定16位元組大小的請求頭(裡面有幾個欄位暫時沒用上)。
請求頭: MessageHeader
/**
* 共16位元組的請求頭
* */
public class MessageHeader implements Serializable {
public static final int MESSAGE_HEADER_LENGTH = 16;
public static final int MESSAGE_SERIALIZE_TYPE_LENGTH = 5;
public static final short MAGIC = (short)0x2233;
// ================================ 訊息頭 =================================
/**
* 魔數(佔2位元組)
* */
private short magicNumber = MAGIC;
/**
* 訊息標識(0代表請求事件;1代表響應事件, 佔1位)
* @see MessageFlagEnums
* */
private Boolean messageFlag;
/**
* 是否是雙向請求(0代表oneWay請求;1代表twoWay請求)
* (雙向代表使用者端會等待伺服器端的響應,單向則請求傳送完成後即向上層返回成功)
* */
private Boolean twoWayFlag;
/**
* 是否是心跳訊息(0代表正常訊息;1代表心跳訊息, 佔1位)
* */
private Boolean eventFlag;
/**
* 訊息體序列化型別(佔5位,即所支援的序列化型別不得超過2的5次方,32種)
* @see MessageSerializeType
* */
private Boolean[] serializeType;
/**
* 響應狀態(佔1位元組)
* */
private byte responseStatus;
/**
* 訊息的唯一id(佔8位元組)
* */
private long messageId;
/**
* 業務資料長度(佔4位元組)
* */
private int bizDataLength;
}
完整的訊息物件: MessageProtocol
public class MessageProtocol<T> implements Serializable {
/**
* 請求頭
* */
private MessageHeader messageHeader;
/**
* 請求體(實際的業務訊息物件)
* */
private T bizDataBody;
}
MyRpc訊息範例圖
/**
* rpc請求物件
* */
public class RpcRequest implements Serializable {
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
/**
* 訊息的唯一id(佔8位元組)
* */
private final long messageId;
/**
* 介面名
* */
private String interfaceName;
/**
* 方法名
* */
private String methodName;
/**
* 引數型別陣列(每個引數一項)
* */
private Class<?>[] parameterClasses;
/**
* 實際引數物件陣列(每個引數一項)
* */
private Object[] params;
public RpcRequest() {
// 每個請求物件生成時都自動生成單機全域性唯一的自增id
this.messageId = INVOKE_ID.getAndIncrement();
}
}
/**
* rpc響應物件
* */
public class RpcResponse implements Serializable {
/**
* 訊息的唯一id(佔8位元組)
* */
private long messageId;
/**
* 返回值
*/
private Object returnValue;
/**
* 異常值
*/
private Exception exceptionValue;
}
在上一節的netty demo中的訊息處理器中,一共做了兩件事情;一是將原始封包的位元組流轉化成了應用程式所需的String物件;二是拿到String物件後進行響應的業務處理(比如列印在控制檯上)。
而netty框架允許設定多個訊息處理器組成鏈條,按約定的順序處理出站/入站的訊息;因此從模組化的出發,應該將編碼/解碼的邏輯和實際業務的處理拆分成多個處理器。
在自定義的訊息編碼器、解碼器中進行應用層請求/響應資料的序列化/反序列化,同時處理上述的黏包/拆包問題。
編解碼工具類
public class MessageCodecUtil {
/**
* 報文協定編碼
* */
public static <T> void messageEncode(MessageProtocol<T> messageProtocol, ByteBuf byteBuf) {
MessageHeader messageHeader = messageProtocol.getMessageHeader();
// 寫入魔數
byteBuf.writeShort(MessageHeader.MAGIC);
// 寫入訊息標識
byteBuf.writeBoolean(messageHeader.getMessageFlag());
// 寫入單/雙向標識
byteBuf.writeBoolean(messageHeader.getTwoWayFlag());
// 寫入訊息事件標識
byteBuf.writeBoolean(messageHeader.getEventFlag());
// 寫入序列化型別
for(boolean b : messageHeader.getSerializeType()){
byteBuf.writeBoolean(b);
}
// 寫入響應狀態
byteBuf.writeByte(messageHeader.getResponseStatus());
// 寫入訊息uuid
byteBuf.writeLong(messageHeader.getMessageId());
// 序列化訊息體
MyRpcSerializer myRpcSerializer = MyRpcSerializerManager.getSerializer(messageHeader.getSerializeType());
byte[] bizMessageBytes = myRpcSerializer.serialize(messageProtocol.getBizDataBody());
// 獲得並寫入訊息正文長度
byteBuf.writeInt(bizMessageBytes.length);
// 寫入訊息正文內容
byteBuf.writeBytes(bizMessageBytes);
}
/**
* 報文協定header頭解碼
* */
public static MessageHeader messageHeaderDecode(ByteBuf byteBuf){
MessageHeader messageHeader = new MessageHeader();
// 讀取魔數
messageHeader.setMagicNumber(byteBuf.readShort());
// 讀取訊息標識
messageHeader.setMessageFlag(byteBuf.readBoolean());
// 讀取單/雙向標識
messageHeader.setTwoWayFlag(byteBuf.readBoolean());
// 讀取訊息事件標識
messageHeader.setEventFlag(byteBuf.readBoolean());
// 讀取序列化型別
Boolean[] serializeTypeBytes = new Boolean[MessageHeader.MESSAGE_SERIALIZE_TYPE_LENGTH];
for(int i=0; i<MessageHeader.MESSAGE_SERIALIZE_TYPE_LENGTH; i++){
serializeTypeBytes[i] = byteBuf.readBoolean();
}
messageHeader.setSerializeType(serializeTypeBytes);
// 讀取響應狀態
messageHeader.setResponseStatus(byteBuf.readByte());
// 讀取訊息uuid
messageHeader.setMessageId(byteBuf.readLong());
// 讀取訊息正文長度
int bizDataLength = byteBuf.readInt();
messageHeader.setBizDataLength(bizDataLength);
return messageHeader;
}
/**
* 報文協定正文body解碼
* */
public static <T> T messageBizDataDecode(MessageHeader messageHeader, ByteBuf byteBuf, Class<T> messageBizDataType){
// 讀取訊息正文
byte[] bizDataBytes = new byte[messageHeader.getBizDataLength()];
byteBuf.readBytes(bizDataBytes);
// 反序列化訊息體
MyRpcSerializer myRpcSerializer = MyRpcSerializerManager.getSerializer(messageHeader.getSerializeType());
return (T) myRpcSerializer.deserialize(bizDataBytes,messageBizDataType);
}
}
自定義編碼器: NettyEncoder
public class NettyEncoder<T> extends MessageToByteEncoder<MessageProtocol<T>> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol<T> messageProtocol, ByteBuf byteBuf) {
// 繼承自MessageToByteEncoder中,只需要將編碼後的資料寫入引數中指定的byteBuf中即可
// MessageToByteEncoder原始碼邏輯中會自己去將byteBuf寫入channel的
MessageCodecUtil.messageEncode(messageProtocol,byteBuf);
}
}
自定義解碼器: NettyDecoder
/**
* netty 解碼器
*/
public class NettyDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(NettyDecoder.class);
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list){
// 儲存讀取前的讀指標
int beforeReadIndex = byteBuf.readerIndex();
do{
try {
MessageDecodeResult messageDecodeResult = decodeHeader(byteBuf);
if (messageDecodeResult.isNeedMoreData()) {
// 出現拆包沒有讀取到一個完整的rpc請求,還原byteBuf讀指標,等待下一次讀事件
byteBuf.readerIndex(beforeReadIndex);
break;
} else {
// 正常解析完一個完整的message,交給後面的handler處理
list.add(messageDecodeResult.getMessageProtocol());
}
}catch (Exception e){
// 比如decodeHeader裡json序列化失敗了等等.直接跳過這個封包不還原了
logger.error("NettyDecoder error!",e);
}
// 迴圈,直到整個ByteBuf讀取完
}while(byteBuf.isReadable());
}
private MessageDecodeResult decodeHeader(ByteBuf byteBuf){
int readable = byteBuf.readableBytes();
if(readable < MessageHeader.MESSAGE_HEADER_LENGTH){
// 無法讀取到一個完整的header,說明出現了拆包,等待更多的資料
return MessageDecodeResult.needMoreData();
}
// 讀取header頭
MessageHeader messageHeader = MessageCodecUtil.messageHeaderDecode(byteBuf);
int bizDataLength = messageHeader.getBizDataLength();
if(byteBuf.readableBytes() < bizDataLength){
// 無法讀取到一個完整的正文內容,說明出現了拆包,等待更多的資料
return MessageDecodeResult.needMoreData();
}
// 基於訊息型別標識,解析rpc正文物件
boolean messageFlag = messageHeader.getMessageFlag();
if(messageFlag == MessageFlagEnums.REQUEST.getCode()){
RpcRequest rpcRequest = MessageCodecUtil.messageBizDataDecode(messageHeader,byteBuf,RpcRequest.class);
MessageProtocol<RpcRequest> messageProtocol = new MessageProtocol<>(messageHeader,rpcRequest);
// 正確的解析完一個rpc請求訊息
return MessageDecodeResult.decodeSuccess(messageProtocol);
}else{
RpcResponse rpcResponse = MessageCodecUtil.messageBizDataDecode(messageHeader,byteBuf,RpcResponse.class);
MessageProtocol<RpcResponse> messageProtocol = new MessageProtocol<>(messageHeader,rpcResponse);
// 正確的解析完一個rpc響應訊息
return MessageDecodeResult.decodeSuccess(messageProtocol);
}
}
}
demo的服務範例:
public class User implements Serializable {
private String name;
private Integer age;
}
public interface UserService {
User getUserFriend(User user, String message);
}
public class UserServiceImpl implements UserService {
@Override
public User getUserFriend(User user, String message) {
System.out.println("execute getUserFriend, user=" + user + ",message=" + message);
// demo返回一個不同的user物件回去
return new User(user.getName() + ".friend", user.getAge() + 1);
}
}
netty伺服器端:
public class RpcServer {
private static final Map<String,Object> interfaceImplMap = new HashMap<>();
static{
/**
* 簡單一點設定死實現
* */
interfaceImplMap.put(UserService.class.getName(), new UserServiceImpl());
}
public static void main(String[] args) throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
EventLoopGroup workerGroup = new NioEventLoopGroup(8,new DefaultThreadFactory("NettyServerWorker", true));
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// 編碼、解碼處理器
.addLast("encoder", new NettyEncoder<>())
.addLast("decoder", new NettyDecoder())
// 實際呼叫業務方法的處理器
.addLast("serverHandler", new SimpleChannelInboundHandler<MessageProtocol<RpcRequest>>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol<RpcRequest> msg) {
// 找到原生的方法進行呼叫,並獲得返回值(demo,簡單起見直接同步呼叫)
MessageProtocol<RpcResponse> result = handlerRpcRequest(msg);
// 將返回值響應給使用者端
ctx.writeAndFlush(result);
}
});
}
});
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 8888).sync();
System.out.println("netty server started!");
// 一直阻塞在這裡
channelFuture.channel().closeFuture().sync();
}
private static MessageProtocol<RpcResponse> handlerRpcRequest(MessageProtocol<RpcRequest> rpcRequestMessageProtocol){
long requestMessageId = rpcRequestMessageProtocol.getMessageHeader().getMessageId();
MessageHeader messageHeader = new MessageHeader();
messageHeader.setMessageId(requestMessageId);
messageHeader.setMessageFlag(MessageFlagEnums.RESPONSE.getCode());
messageHeader.setTwoWayFlag(false);
messageHeader.setEventFlag(false);
messageHeader.setSerializeType(rpcRequestMessageProtocol.getMessageHeader().getSerializeType());
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setMessageId(requestMessageId);
try {
// 反射呼叫具體的實現方法
Object result = invokeTargetService(rpcRequestMessageProtocol.getBizDataBody());
// 設定返回值
rpcResponse.setReturnValue(result);
}catch (Exception e){
// 呼叫具體實現類時,出現異常,設定異常的值
rpcResponse.setExceptionValue(e);
}
return new MessageProtocol<>(messageHeader,rpcResponse);
}
private static Object invokeTargetService(RpcRequest rpcRequest) throws Exception {
String interfaceName = rpcRequest.getInterfaceName();
Object serviceImpl = interfaceImplMap.get(interfaceName);
// 按照請求裡的方法名和參數列找到對應的方法
final Method method = serviceImpl.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterClasses());
// 傳遞引數,反射呼叫該方法並返回結果
return method.invoke(serviceImpl, rpcRequest.getParams());
}
}
netty使用者端:
public class RpcClientNoProxy {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8,
new DefaultThreadFactory("NettyClientWorker", true));
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// 編碼、解碼處理器
.addLast("encoder", new NettyEncoder<>())
.addLast("decoder", new NettyDecoder())
.addLast("clientHandler", new SimpleChannelInboundHandler<MessageProtocol>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol) {
System.out.println("PureNettyClient received messageProtocol=" + messageProtocol);
}
})
;
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
Channel channel = channelFuture.sync().channel();
// 構造訊息物件
MessageProtocol<RpcRequest> messageProtocol = buildMessage();
// 傳送訊息
channel.writeAndFlush(messageProtocol);
System.out.println("RpcClientNoProxy send request success!");
channelFuture.channel().closeFuture().sync();
}
private static MessageProtocol<RpcRequest> buildMessage(){
// 構造請求
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setInterfaceName("myrpc.demo.common.service.UserService");
rpcRequest.setMethodName("getUserFriend");
rpcRequest.setParameterClasses(new Class[]{User.class,String.class});
User user = new User("Jerry",10);
String message = "hello hello!";
rpcRequest.setParams(new Object[]{user,message});
// 構造協定頭
MessageHeader messageHeader = new MessageHeader();
messageHeader.setMessageFlag(MessageFlagEnums.REQUEST.getCode());
messageHeader.setTwoWayFlag(false);
messageHeader.setEventFlag(true);
messageHeader.setSerializeType(MessageSerializeType.JSON.getCode());
messageHeader.setMessageId(rpcRequest.getMessageId());
return new MessageProtocol<>(messageHeader,rpcRequest);
}
}
netty處理流程圖
截止目前,我們已經實現了一個對等rpc使用者端/伺服器端互動的功能,但是使用者端這邊的邏輯依然比較複雜(buildMessage方法)。
前面提到,rpc中很重要的功能就是保持本地呼叫時語意的簡潔性,即使用者端實際使用時是希望直接用以下這種方式來進行呼叫,而不是去繁瑣的處理底層的網路互動邏輯。
User user = new User("Jerry",10);
String message = "hello hello!";
// 發起rpc呼叫並獲得返回值
User userFriend = userService.getUserFriend(user,message);
System.out.println("userService.getUserFriend result=" + userFriend);
rpc框架需要遮蔽掉構造底層訊息傳送/接受,序列化/反序列化相關的複雜性,而這時候就需要引入代理模式(動態代理)了。
在MyRpc的底層,我們將使用者端需要呼叫的一個服務(比如UserService)抽象為Consumer物件,伺服器端的一個具體服務實現抽象為Provider物件。
其中包含了對應的服務的類以及對應的服務地址,使用者端這邊使用jdk的動態代理生成代理物件,將複雜的、需要遮蔽的訊息處理/網路互動等邏輯都封裝在這個代理物件中。
public class Consumer<T> {
private Class<?> interfaceClass;
private T proxy;
private Bootstrap bootstrap;
private URLAddress urlAddress;
public Consumer(Class<?> interfaceClass, Bootstrap bootstrap, URLAddress urlAddress) {
this.interfaceClass = interfaceClass;
this.bootstrap = bootstrap;
this.urlAddress = urlAddress;
ClientDynamicProxy clientDynamicProxy = new ClientDynamicProxy(bootstrap,urlAddress);
this.proxy = (T) Proxy.newProxyInstance(
clientDynamicProxy.getClass().getClassLoader(),
new Class[]{interfaceClass},
clientDynamicProxy);
}
public T getProxy() {
return proxy;
}
public Class<?> getInterfaceClass() {
return interfaceClass;
}
}
public class ConsumerBootstrap {
private final Map<Class<?>,Consumer<?>> consumerMap = new HashMap<>();
private final Bootstrap bootstrap;
private final URLAddress urlAddress;
public ConsumerBootstrap(Bootstrap bootstrap, URLAddress urlAddress) {
this.bootstrap = bootstrap;
this.urlAddress = urlAddress;
}
public <T> Consumer<T> registerConsumer(Class<T> clazz){
if(!consumerMap.containsKey(clazz)){
Consumer<T> consumer = new Consumer<>(clazz,this.bootstrap,this.urlAddress);
consumerMap.put(clazz,consumer);
return consumer;
}
throw new MyRpcException("duplicate consumer! clazz=" + clazz);
}
}
public class Provider<T> {
private Class<?> interfaceClass;
private T ref;
private URLAddress urlAddress;
}
/**
* 使用者端動態代理
* */
public class ClientDynamicProxy implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(ClientDynamicProxy.class);
private final Bootstrap bootstrap;
private final URLAddress urlAddress;
public ClientDynamicProxy(Bootstrap bootstrap, URLAddress urlAddress) {
this.bootstrap = bootstrap;
this.urlAddress = urlAddress;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object localMethodResult = processLocalMethod(proxy,method,args);
if(localMethodResult != null){
// 處理toString等物件自帶方法,不發起rpc呼叫
return localMethodResult;
}
logger.debug("ClientDynamicProxy before: methodName=" + method.getName());
// 構造請求和協定頭
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setInterfaceName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterClasses(method.getParameterTypes());
rpcRequest.setParams(args);
MessageHeader messageHeader = new MessageHeader();
messageHeader.setMessageFlag(MessageFlagEnums.REQUEST.getCode());
messageHeader.setTwoWayFlag(false);
messageHeader.setEventFlag(true);
messageHeader.setSerializeType(GlobalConfig.messageSerializeType.getCode());
messageHeader.setResponseStatus((byte)'a');
messageHeader.setMessageId(rpcRequest.getMessageId());
logger.debug("ClientDynamicProxy rpcRequest={}", JsonUtil.obj2Str(rpcRequest));
ChannelFuture channelFuture = bootstrap.connect(urlAddress.getHost(),urlAddress.getPort()).sync();
Channel channel = channelFuture.sync().channel();
// 通過Promise,將netty的非同步轉為同步,參考dubbo DefaultFuture
DefaultFuture<RpcResponse> defaultFuture = DefaultFutureManager.createNewFuture(channel,rpcRequest);
channel.writeAndFlush(new MessageProtocol<>(messageHeader,rpcRequest));
logger.debug("ClientDynamicProxy writeAndFlush success, wait result");
// 呼叫方阻塞在這裡
RpcResponse rpcResponse = defaultFuture.get();
logger.debug("ClientDynamicProxy defaultFuture.get() rpcResponse={}",rpcResponse);
return processRpcResponse(rpcResponse);
}
private Object processLocalMethod(Object proxy, Method method, Object[] args) throws Exception {
// 處理toString等物件自帶方法,不發起rpc呼叫
if (method.getDeclaringClass() == Object.class) {
return method.invoke(proxy, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return proxy.toString();
} else if ("hashCode".equals(methodName)) {
return proxy.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return proxy.equals(args[0]);
}
// 返回null標識非本地方法,需要進行rpc呼叫
return null;
}
private Object processRpcResponse(RpcResponse rpcResponse){
if(rpcResponse.getExceptionValue() == null){
// 沒有異常,return正常的返回值
return rpcResponse.getReturnValue();
}else{
// 有異常,往外丟擲去
throw new MyRpcRemotingException(rpcResponse.getExceptionValue());
}
}
}
/**
* 使用者端 rpc響應處理器
*/
public class NettyRpcResponseHandler extends SimpleChannelInboundHandler<MessageProtocol<RpcResponse>> {
private static final Logger logger = LoggerFactory.getLogger(NettyRpcResponseHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol<RpcResponse> rpcResponseMessageProtocol) throws Exception {
logger.debug("NettyRpcResponseHandler channelRead0={}",JsonUtil.obj2Str(rpcResponseMessageProtocol));
// 觸發使用者端的future,令其同步阻塞的執行緒得到結果
DefaultFutureManager.received(rpcResponseMessageProtocol.getBizDataBody());
}
}
public class DefaultFutureManager {
private static Logger logger = LoggerFactory.getLogger(DefaultFutureManager.class);
public static final Map<Long,DefaultFuture> DEFAULT_FUTURE_CACHE = new ConcurrentHashMap<>();
public static void received(RpcResponse rpcResponse){
Long messageId = rpcResponse.getMessageId();
logger.debug("received rpcResponse={},DEFAULT_FUTURE_CACHE={}",rpcResponse,DEFAULT_FUTURE_CACHE);
DefaultFuture defaultFuture = DEFAULT_FUTURE_CACHE.remove(messageId);
if(defaultFuture != null){
logger.debug("remove defaultFuture success");
if(rpcResponse.getExceptionValue() != null){
// 例外處理
defaultFuture.completeExceptionally(rpcResponse.getExceptionValue());
}else{
// 正常返回
defaultFuture.complete(rpcResponse);
}
}else{
logger.debug("remove defaultFuture fail");
}
}
public static DefaultFuture createNewFuture(Channel channel, RpcRequest rpcRequest){
DefaultFuture defaultFuture = new DefaultFuture(channel,rpcRequest);
return defaultFuture;
}
}
public class RpcClientProxy {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8, new DefaultThreadFactory("NettyClientWorker", true));
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// 編碼、解碼處理器
.addLast("encoder", new NettyEncoder<>())
.addLast("decoder", new NettyDecoder())
// 響應處理器
.addLast("clientHandler", new NettyRpcResponseHandler())
;
}
});
ConsumerBootstrap consumerBootstrap = new ConsumerBootstrap(bootstrap, new URLAddress("127.0.0.1", 8888));
Consumer<UserService> userServiceConsumer = consumerBootstrap.registerConsumer(UserService.class);
// 獲得UserService的代理物件
UserService userService = userServiceConsumer.getProxy();
User user = new User("Jerry", 10);
String message = "hello hello!";
// 發起rpc呼叫並獲得返回值
User userFriend = userService.getUserFriend(user, message);
System.out.println("userService.getUserFriend result=" + userFriend);
}
}
可以看到,引入了代理模式後的使用方式就變得簡單很多了。
到這一步,我們已經實現了一個對等的rpc通訊的能力,並且如部落格開頭中所提到的,沒有喪失本地呼叫語意的簡潔性。