前段時間瞭解了泛化呼叫這個玩意兒,又想到自己之前寫過一個RPC框架(參考《手寫一個RPC框架》),於是便想小試牛刀。
什麼是泛化呼叫
泛化呼叫就是在不依賴服務方介面jar包的情況下進行呼叫,包括對呼叫方法的泛化、引數的泛化和返回值的泛化。
泛化呼叫的使用場景
常規的PRC呼叫都是使用者端依賴伺服器端提供的介面jar包,然後利用動態代理技術,像呼叫本地方法一樣呼叫遠端方法,但是有些場景下使用者端無法依賴jar包,也要呼叫遠端方法,這時就需要用到泛化呼叫了。
常規的使用場景包括:
閘道器,如果服務內部呼叫使用RPC協定,對外暴露HTTP介面,這時就需要在閘道器做協定轉換(HTTP轉RPC協定),但是閘道器不可能依賴所有介面的jar包,只能採用泛化呼叫。
測試平臺
實現方案
實現方案有兩種:
使用者端
服務註冊和發現
首先定義一個泛化呼叫介面GenericService
public interface GenericService {
/**
* 泛化呼叫
* @param methodName
* @param parameterTypeNames
* @param args
* @return
*/
Object $invoke(String methodName, String[] parameterTypeNames, Object[] args);
}
注意: 這裡引數型別parameterTypeNames用的是引數型別名稱陣列而不是Class陣列,是因為泛化呼叫使用者端可能不存在對應的類。
預設實現類DefaultGenericService
/**
* @Author: Ship
* @Description:
* @Date: Created in 2023/6/15
*/
public class DefaultGenericService implements GenericService {
private MethodInvoker methodInvoker;
private String interfaceClassName;
public DefaultGenericService(MethodInvoker methodInvoker, String interfaceClassName) {
this.methodInvoker = methodInvoker;
this.interfaceClassName = interfaceClassName;
}
@Override
public Object $invoke(String methodName, String[] parameterTypeNames, Object[] args) {
return methodInvoker.$invoke(interfaceClassName, methodName, parameterTypeNames, args, true);
}
}
因為DefaultGenericService是介面維度的,所以我們還需要一個工廠類去建立它的範例,同時為了避免重複建立物件,還要快取介面維度的範例(享元模式)。
/**
* @Author: Ship
* @Description:
* @Date: Created in 2023/6/15
*/
public final class GenericServiceFactory {
/**
* 範例快取,key:介面類名
*/
private static final Map<String, GenericService> INSTANCE_MAP = new ConcurrentHashMap<>();
private GenericServiceFactory() {}
/**
* @param interfaceClassName
* @return
*/
public static GenericService getInstance(String interfaceClassName) {
return INSTANCE_MAP.computeIfAbsent(interfaceClassName, clz -> {
MethodInvoker methodInvoker = SpringContextHolder.getBean(MethodInvoker.class);
DefaultGenericService genericService = new DefaultGenericService(methodInvoker, interfaceClassName);
return genericService;
});
}
}
MethodInvoker維護了使用者端呼叫伺服器端的核心邏輯,同時相容泛化呼叫和普通RPC呼叫這兩種呼叫方式。
實現類DefaultMethodInvoker
public class DefaultMethodInvoker implements MethodInvoker {
private ServerDiscoveryManager serverDiscoveryManager;
private NetClient netClient;
private LoadBalance loadBalance;
public DefaultMethodInvoker(ServerDiscoveryManager serverDiscoveryManager, NetClient netClient, LoadBalance loadBalance) {
this.serverDiscoveryManager = serverDiscoveryManager;
this.netClient = netClient;
this.loadBalance = loadBalance;
}
@Override
public Object $invoke(String interfaceClassName, String methodName, String[] parameterTypeNames, Object[] args, Boolean generic) {
// 1.獲得服務資訊
String serviceName = interfaceClassName;
List<Service> services = serverDiscoveryManager.getServiceList(serviceName);
Service service = loadBalance.chooseOne(services);
// 2.構造request物件
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setServiceName(service.getName());
request.setMethod(methodName);
request.setParameters(args);
request.setParameterTypeNames(parameterTypeNames);
request.setGeneric(generic);
// 3.協定層編組
MessageProtocol messageProtocol = MessageProtocolsManager.get(service.getProtocol());
RpcResponse response = netClient.sendRequest(request, service, messageProtocol);
if (response == null) {
throw new RpcException("the response is null");
}
// 6.結果處理
if (RpcStatusEnum.ERROR.getCode().equals(response.getRpcStatus())) {
throw response.getException();
}
if (RpcStatusEnum.NOT_FOUND.getCode().equals(response.getRpcStatus())) {
throw new RpcException(" service not found!");
}
return response.getReturnValue();
}
}
RequestHandler的核心邏輯就是利用反射呼叫對應的方法
public class RequestHandler {
private MessageProtocol protocol;
private ServerRegister serverRegister;
public RequestHandler(MessageProtocol protocol, ServerRegister serverRegister) {
this.protocol = protocol;
this.serverRegister = serverRegister;
}
public byte[] handleRequest(byte[] data) throws Exception {
// 1.解組訊息
RpcRequest req = this.protocol.unmarshallingRequest(data);
// 2.查詢服務對應
ServiceObject so = serverRegister.getServiceObject(req.getServiceName());
RpcResponse response = null;
if (so == null) {
response = new RpcResponse(RpcStatusEnum.NOT_FOUND);
} else {
try {
// 3.反射呼叫對應的方法過程
Method method = so.getClazz().getMethod(req.getMethod(), ReflectUtils.convertToParameterTypes(req.getParameterTypeNames()));
Object returnValue = method.invoke(so.getObj(), req.getParameters());
response = new RpcResponse(RpcStatusEnum.SUCCESS);
if (req.getGeneric()) {
response.setReturnValue(RpcResponseUtils.handlerReturnValue(returnValue));
} else {
response.setReturnValue(returnValue);
}
} catch (Exception e) {
response = new RpcResponse(RpcStatusEnum.ERROR);
String errMsg = JSON.toJSONString(e);
response.setException(new RpcException(errMsg));
}
}
// 編組響應訊息
response.setRequestId(req.getRequestId());
return this.protocol.marshallingResponse(response);
}
}
可以看到這裡針對泛化呼叫的返回值作了特殊處理,因為如果返回的是POJO物件的話使用者端是沒有對應的類的,那麼如何泛化處理呢?
分了三種情況處理:
服務註冊和發現部分的程式碼就不貼了,有興趣可以自行檢視,程式碼地址。
public interface UserService {
ApiResult<User> getUser(Long id);
String getUserString(Long id);
}
<dependency>
<groupId>io.github.2ysp</groupId>
<artifactId>ship-rpc-spring-boot-starter</artifactId>
<version>1.0.1-RELEASE</version>
</dependency>
@RestController
@RequestMapping("/GenericTest")
public class GenericTestController {
@GetMapping("/user")
public String getUserString(@RequestParam("id") Long id) {
//cn.sp.UserService.getUserString
GenericService instance = GenericServiceFactory.getInstance("cn.sp.UserService");
Object result = instance.$invoke("getUserString", new String[]{"java.lang.Long"}, new Object[]{id});
return result.toString();
}
@GetMapping("")
public String getUser(@RequestParam("id") Long id) {
//cn.sp.UserService.getUser
GenericService instance = GenericServiceFactory.getInstance("cn.sp.UserService");
Object result = instance.$invoke("getUser", new String[]{"java.lang.Long"}, new Object[]{id});
return result.toString();
}
}
控制檯能看到註冊的服務說明provider啟動成功。
{"code":200,"data":{"gender":2,"id":1,"name":"XX","webSite":"www.aa.com"},"message":"success"}
然後在cn.sp.GenericTestController#getUser方法打斷點,再請求介面http://localhost:8081/GenericTest?id=1
可以看出介面正確返回了,並且把ApiResult
壓測環境:
MacBook Pro 13英寸
處理器 2.3 GHz 四核Intel Core i7
記憶體 16 GB 3733 MHz LPDDR4X
一個生產者一個消費者
壓測工具:
wrk
壓測命令:
wrk -c 100 -t 20 -d 10s http://localhost:8081/GenericTest?id=1
用100個連結,20個執行緒壓測10秒鐘
壓測結果:
Running 10s test @ http://localhost:8081/GenericTest?id=1
20 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 28.12ms 19.11ms 175.97ms 71.66%
Req/Sec 185.58 35.41 272.00 78.75%
37000 requests in 10.03s, 7.13MB read
Requests/sec: 3689.17
Transfer/sec: 728.30KB
可以看到QPS大概能達到3600多,還是不錯的。
希望本篇文章能幫助你瞭解泛化呼叫,這次除了增加了泛化呼叫的功能外,還對以前的程式碼進行了重構,包括增加Nacos註冊中心支援,增加hessian序列化協定,包結構優化等,後面有時間會該框架增加更多功能。
參考:
RPC框架泛化呼叫原理及轉轉的實踐
https://nacos.io/zh-cn/docs/sdk.html