手寫RPC框架之泛化呼叫

2023-06-27 15:01:00

一、背景

前段時間瞭解了泛化呼叫這個玩意兒,又想到自己之前寫過一個RPC框架(參考《手寫一個RPC框架》),於是便想小試牛刀。

二、泛化呼叫簡介

什麼是泛化呼叫

泛化呼叫就是在不依賴服務方介面jar包的情況下進行呼叫,包括對呼叫方法的泛化、引數的泛化和返回值的泛化。

泛化呼叫的使用場景

常規的PRC呼叫都是使用者端依賴伺服器端提供的介面jar包,然後利用動態代理技術,像呼叫本地方法一樣呼叫遠端方法,但是有些場景下使用者端無法依賴jar包,也要呼叫遠端方法,這時就需要用到泛化呼叫了。

常規的使用場景包括:

  1. 閘道器,如果服務內部呼叫使用RPC協定,對外暴露HTTP介面,這時就需要在閘道器做協定轉換(HTTP轉RPC協定),但是閘道器不可能依賴所有介面的jar包,只能採用泛化呼叫。

  2. 測試平臺

實現方案

實現方案有兩種:

  1. 第一種是基於Java Bean的泛化呼叫,例如dubbo的泛化呼叫會將引數轉換成JavaBeanDescriptor,程式碼可以參考GenericFilter。
  2. 第二種是基於序列化中間體的泛化呼叫,如sofa-rpc,使用了sofa-hessian序列化框架,sofa-hessian是在hessian序列化框架基礎上進行二次開發的,抽象出了序列化中間體,如GenericObject、GenericMap、GenericArray等。

三、開發實現

3.1 類圖

使用者端

服務註冊和發現

3.2 資料傳輸過程

3.3 使用者端實現

首先定義一個泛化呼叫介面GenericService

public interface GenericService {

    /**
     * 泛化呼叫
     * @param methodName
     * @param parameterTypeNames
     * @param args
     * @return
     */
    Object $invoke(String methodName, String[] parameterTypeNames, Object[] args);
}

注意: 這裡引數型別parameterTypeNames用的是引數型別名稱陣列而不是Class陣列,是因為泛化呼叫使用者端可能不存在對應的類。

預設實現類DefaultGenericService

/**
 * @Author: Ship
 * @Description:
 * @Date: Created in 2023/6/15
 */
public class DefaultGenericService implements GenericService {

    private MethodInvoker methodInvoker;

    private String interfaceClassName;

    public DefaultGenericService(MethodInvoker methodInvoker, String interfaceClassName) {
        this.methodInvoker = methodInvoker;
        this.interfaceClassName = interfaceClassName;
    }


    @Override
    public Object $invoke(String methodName, String[] parameterTypeNames, Object[] args) {
        return methodInvoker.$invoke(interfaceClassName, methodName, parameterTypeNames, args, true);
    }


}

因為DefaultGenericService是介面維度的,所以我們還需要一個工廠類去建立它的範例,同時為了避免重複建立物件,還要快取介面維度的範例(享元模式)。

/**
 * @Author: Ship
 * @Description:
 * @Date: Created in 2023/6/15
 */
public final class GenericServiceFactory {

    /**
     * 範例快取,key:介面類名
     */
    private static final Map<String, GenericService> INSTANCE_MAP = new ConcurrentHashMap<>();

    private GenericServiceFactory() {}

    /**
     * @param interfaceClassName
     * @return
     */
    public static GenericService getInstance(String interfaceClassName) {
        return INSTANCE_MAP.computeIfAbsent(interfaceClassName, clz -> {
            MethodInvoker methodInvoker = SpringContextHolder.getBean(MethodInvoker.class);
            DefaultGenericService genericService = new DefaultGenericService(methodInvoker, interfaceClassName);
            return genericService;
        });
    }
}

MethodInvoker維護了使用者端呼叫伺服器端的核心邏輯,同時相容泛化呼叫和普通RPC呼叫這兩種呼叫方式。

實現類DefaultMethodInvoker

public class DefaultMethodInvoker implements MethodInvoker {

    private ServerDiscoveryManager serverDiscoveryManager;

    private NetClient netClient;

    private LoadBalance loadBalance;

    public DefaultMethodInvoker(ServerDiscoveryManager serverDiscoveryManager, NetClient netClient, LoadBalance loadBalance) {
        this.serverDiscoveryManager = serverDiscoveryManager;
        this.netClient = netClient;
        this.loadBalance = loadBalance;
    }

    @Override
    public Object $invoke(String interfaceClassName, String methodName, String[] parameterTypeNames, Object[] args, Boolean generic) {
        // 1.獲得服務資訊
        String serviceName = interfaceClassName;
        List<Service> services = serverDiscoveryManager.getServiceList(serviceName);
        Service service = loadBalance.chooseOne(services);
        // 2.構造request物件
        RpcRequest request = new RpcRequest();
        request.setRequestId(UUID.randomUUID().toString());
        request.setServiceName(service.getName());
        request.setMethod(methodName);
        request.setParameters(args);
        request.setParameterTypeNames(parameterTypeNames);
        request.setGeneric(generic);
        // 3.協定層編組
        MessageProtocol messageProtocol = MessageProtocolsManager.get(service.getProtocol());
        RpcResponse response = netClient.sendRequest(request, service, messageProtocol);
        if (response == null) {
            throw new RpcException("the response is null");
        }
        // 6.結果處理
        if (RpcStatusEnum.ERROR.getCode().equals(response.getRpcStatus())) {
            throw response.getException();
        }
        if (RpcStatusEnum.NOT_FOUND.getCode().equals(response.getRpcStatus())) {
            throw new RpcException(" service not found!");
        }
        return response.getReturnValue();
    }
}

3.4 伺服器端實現

RequestHandler的核心邏輯就是利用反射呼叫對應的方法

public class RequestHandler {

    private MessageProtocol protocol;


    private ServerRegister serverRegister;

    public RequestHandler(MessageProtocol protocol, ServerRegister serverRegister) {
        this.protocol = protocol;
        this.serverRegister = serverRegister;
    }


    public byte[] handleRequest(byte[] data) throws Exception {
        // 1.解組訊息
        RpcRequest req = this.protocol.unmarshallingRequest(data);

        // 2.查詢服務對應
        ServiceObject so = serverRegister.getServiceObject(req.getServiceName());

        RpcResponse response = null;

        if (so == null) {
            response = new RpcResponse(RpcStatusEnum.NOT_FOUND);

        } else {
            try {
                // 3.反射呼叫對應的方法過程
                Method method = so.getClazz().getMethod(req.getMethod(), ReflectUtils.convertToParameterTypes(req.getParameterTypeNames()));
                Object returnValue = method.invoke(so.getObj(), req.getParameters());
                response = new RpcResponse(RpcStatusEnum.SUCCESS);
                if (req.getGeneric()) {
                    response.setReturnValue(RpcResponseUtils.handlerReturnValue(returnValue));
                } else {
                    response.setReturnValue(returnValue);
                }
            } catch (Exception e) {
                response = new RpcResponse(RpcStatusEnum.ERROR);
                String errMsg = JSON.toJSONString(e);
                response.setException(new RpcException(errMsg));
            }
        }
        // 編組響應訊息
        response.setRequestId(req.getRequestId());
        return this.protocol.marshallingResponse(response);
    }


}

可以看到這裡針對泛化呼叫的返回值作了特殊處理,因為如果返回的是POJO物件的話使用者端是沒有對應的類的,那麼如何泛化處理呢?

分了三種情況處理:

  1. 如果是JDK的基本型別包裝類,如Long、Integer則直接不處理返回。
  2. 如果是原始型別如int、long,則報錯不支援。
  3. 如果是POJO自定義物件,則轉換成Map返回給使用者端。

服務註冊和發現部分的程式碼就不貼了,有興趣可以自行檢視,程式碼地址

四、測試

4.1 功能測試

  1. 伺服器端provider專案提供兩個根據id查詢使用者的介面,如下
public interface UserService {
    ApiResult<User> getUser(Long id);


    String getUserString(Long id);
}
  1. 建立SpringBoot工程consumer-v2,並新增ship-rpc-spring-boot-starter依賴
        <dependency>
            <groupId>io.github.2ysp</groupId>
            <artifactId>ship-rpc-spring-boot-starter</artifactId>
            <version>1.0.1-RELEASE</version>
        </dependency>
  1. 編寫泛化呼叫測試介面GenericTestController
@RestController
@RequestMapping("/GenericTest")
public class GenericTestController {


    @GetMapping("/user")
    public String getUserString(@RequestParam("id") Long id) {
        //cn.sp.UserService.getUserString
        GenericService instance = GenericServiceFactory.getInstance("cn.sp.UserService");
        Object result = instance.$invoke("getUserString", new String[]{"java.lang.Long"}, new Object[]{id});
        return result.toString();
    }


    @GetMapping("")
    public String getUser(@RequestParam("id") Long id) {
        //cn.sp.UserService.getUser
        GenericService instance = GenericServiceFactory.getInstance("cn.sp.UserService");
        Object result = instance.$invoke("getUser", new String[]{"java.lang.Long"}, new Object[]{id});
        return result.toString();
    }
}
  1. 本地依次啟動nacos,provider和consumer-v2工程


控制檯能看到註冊的服務說明provider啟動成功。

  1. postman請求介面http://localhost:8081/GenericTest/user?id=1,返回如下說明調通了
{"code":200,"data":{"gender":2,"id":1,"name":"XX","webSite":"www.aa.com"},"message":"success"}

然後在cn.sp.GenericTestController#getUser方法打斷點,再請求介面http://localhost:8081/GenericTest?id=1


可以看出介面正確返回了,並且把ApiResult物件轉換成了Map。

4.2 壓測

壓測環境:

MacBook Pro 13英寸

處理器 2.3 GHz 四核Intel Core i7

記憶體 16 GB 3733 MHz LPDDR4X

一個生產者一個消費者

壓測工具:

wrk

壓測命令:

wrk -c 100 -t 20 -d 10s http://localhost:8081/GenericTest?id=1

用100個連結,20個執行緒壓測10秒鐘

壓測結果:

Running 10s test @ http://localhost:8081/GenericTest?id=1
  20 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    28.12ms   19.11ms 175.97ms   71.66%
    Req/Sec   185.58     35.41   272.00     78.75%
  37000 requests in 10.03s, 7.13MB read
Requests/sec:   3689.17
Transfer/sec:    728.30KB

可以看到QPS大概能達到3600多,還是不錯的。

五、總結

希望本篇文章能幫助你瞭解泛化呼叫,這次除了增加了泛化呼叫的功能外,還對以前的程式碼進行了重構,包括增加Nacos註冊中心支援,增加hessian序列化協定,包結構優化等,後面有時間會該框架增加更多功能。

參考:

RPC框架泛化呼叫原理及轉轉的實踐
https://nacos.io/zh-cn/docs/sdk.html