自己動手實現rpc框架(二) 實現叢集間rpc通訊

2023-07-07 06:14:11

自己動手實現rpc框架(二) 實現叢集間rpc通訊

1. 叢集間rpc通訊

上一篇部落格中MyRpc框架實現了基本的對等rpc通訊功能。而在這篇部落格中我們需要實現MyRpc的叢集間rpc通訊功能。

上篇部落格的對等rpc通訊實現中,使用者端和伺服器端的ip地址和埠都是固定設定死的。而通常為了提升服務總負載,使用者端和伺服器端都是以叢集的方式部署的(水平拓展),使用者端和伺服器端的節點都不止1個。

叢集條件下出現了很多新的問題需要解決:

  1. 對於某一特定服務,使用者端該如何知道當前環境下哪些機器能提供這一服務?
  2. 伺服器端叢集中的某些節點如果發生了變化(比如老節點下線或宕機),使用者端該如何及時的感知到,而不會呼叫到已經停止服務的節點上?
  3. 存在多個伺服器端時,使用者端應該向哪一個伺服器端節點發起請求?怎樣才能使得每個伺服器端的負載儘量均衡,而不會讓某些伺服器端飢餓或者壓力過大。

2. 服務註冊/發現與註冊中心

  • 針對第一個問題,最先想到的自然是直接在每個使用者端都設定一個固定的伺服器端節點列表,但這一方案無法很好的解決伺服器端節點動態變化的問題。
    如果一個伺服器端節點下線了,就需要人工的去修改每個使用者端那裡維護的伺服器端節點列表的話,在叢集節點數量較多、伺服器端節點上下線頻繁的場景下是不可接受的。
  • 解決這一問題的思路是伺服器端節點資訊的中心化,將伺服器端節點的資訊都集中維護在一個地方。
    伺服器端在啟動成功後將自己的資訊註冊在上面(服務註冊),而使用者端也能實時的查詢出最新的伺服器端列表(服務發現)。
    這個統一維護伺服器端節點資訊的地方被叫做註冊中心,一般是以獨立服務的形式與rpc的伺服器端/使用者端機器部署在同一環境內。
  • 由於節點資訊的中心化,所以註冊中心需要具備高可用能力(叢集部署來提供容錯能力),避免單點故障而導致整個rpc叢集的不可用。
    同時在伺服器端節點因為一些原因不可用時能實時的感知並移除掉對應節點,同時通知監聽變更使用者端(解決第二個關於provider資訊實時性的問題)。
    因此zookeeper、eureka、nacos、etcd等等具備上述能力的中介軟體都被廣泛的用作rpc框架的註冊中心。
MyRpc整合註冊中心

MyRpc目前支援使用zookeeper作為註冊中心。
zookeeper作為一個高效能的分散式協調器,儲存的資料以ZNode節點樹的形式存在。ZNode節點有兩種屬性,有序/無序,持久/臨時。

  • rpc框架中一般設定一個持久的根路徑節點用於與zk上儲存其它的業務資料作區分(例如/my_rpc)。
  • 在根節點下有著代表著某一特定服務的子節點,其也是持久節點。服務子節點的路徑名是標識介面的唯一名稱(比如包名+類名:myrpc.demo.common.service.UserService)
  • 而服務節點下則可以儲存各種關於provider、consumer等等相關的後設資料。
    MyRpc中為了簡單起見,服務節點的子節點直接就是對應特定provider註冊的臨時節點。臨時節點中資料儲存了provider的ip/port等必要的資訊。
    由於是臨時節點,在provider因為各種故障而不可用而導致與zookeeper的連線斷開,zookeeper會在等待一小會後將該臨時節點刪除,並通知監聽該服務的使用者端以重新整理使用者端的對應設定。

MyRpc的zookeeper節點結構圖

註冊中心介面

/**
 * 註冊中心的抽象
 * */
public interface Registry {

    /**
     * 服務註冊
     * */
    void doRegistry(ServiceInfo serviceInfo);

    /**
     * 服務發現
     * */
    List<ServiceInfo> discovery(String serviceName);
}

zookeeper註冊中心實現(原始的zk使用者端)

/**
 * 簡易的zk註冊中心(原始的zk使用者端很多地方都需要使用者去處理異常,但為了更簡單的展示zk註冊中心的使用,基本上沒有處理這些異常情況)
 * */
public class ZookeeperRegistry implements Registry{

    private static final Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);

    private final ZooKeeper zooKeeper;

    private final ConcurrentHashMap<String,List<ServiceInfo>> serviceInfoCacheMap = new ConcurrentHashMap<>();

    public ZookeeperRegistry(String zkServerAddress) {
        try {
            this.zooKeeper = new ZooKeeper(zkServerAddress,2000, event -> {});

            // 確保root節點是一定存在的
            createPersistentNode(MyRpcRegistryConstants.BASE_PATH);
        } catch (Exception e) {
            throw new MyRpcException("init zkClient error",e);
        }
    }

    @Override
    public void doRegistry(ServiceInfo serviceInfo) {
        // 先建立永久的服務名節點
        createServiceNameNode(serviceInfo.getServiceName());
        // 再建立臨時的providerInfo節點
        createProviderInfoNode(serviceInfo);
    }

    @Override
    public List<ServiceInfo> discovery(String serviceName) {
        return serviceInfoCacheMap.computeIfAbsent(serviceName,(key)-> findProviderInfoList(serviceName));
    }

    private String getServiceNameNodePath(String serviceName){
        return MyRpcRegistryConstants.BASE_PATH + "/" + serviceName;
    }

    // ================================ zk工具方法 ==================================
    private void createServiceNameNode(String serviceName){
        try {
            String serviceNameNodePath = getServiceNameNodePath(serviceName);

            // 服務名節點是永久節點
            createPersistentNode(serviceNameNodePath);
            logger.info("createServiceNameNode success! serviceNameNodePath={}",serviceNameNodePath);
        } catch (Exception e) {
            throw new MyRpcException("createServiceNameNode error",e);
        }
    }

    private void createProviderInfoNode(ServiceInfo serviceInfo){
        try {
            String serviceNameNodePath = getServiceNameNodePath(serviceInfo.getServiceName());
            // 子節點用一個uuid做path防重複
            String providerInfoNodePath = serviceNameNodePath + "/" + UUID.randomUUID();

            String providerInfoJsonStr = JsonUtil.obj2Str(serviceInfo);
            // providerInfo節點是臨時節點(如果節點宕機了,zk的連線斷開一段時間後,臨時節點會被自動刪除)
            zooKeeper.create(providerInfoNodePath, providerInfoJsonStr.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            logger.info("createProviderInfoNode success! path={}",providerInfoNodePath);
        } catch (Exception e) {
            throw new MyRpcException("createProviderInfoNode error",e);
        }
    }

    private void createPersistentNode(String path){
        try {
            if (zooKeeper.exists(path, false) == null) {
                // 服務名節點是永久節點
                zooKeeper.create(path, "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }catch (Exception e){
            throw new MyRpcException("createPersistentNode error",e);
        }
    }

    private List<ServiceInfo> findProviderInfoList(String serviceName){
        String serviceNameNodePath = getServiceNameNodePath(serviceName);

        List<ServiceInfo> serviceInfoList = new ArrayList<>();
        try {
            List<String> providerInfoPathList = zooKeeper.getChildren(serviceNameNodePath, new ZookeeperListener(serviceNameNodePath));
            for(String providerInfoPath : providerInfoPathList){
                try{
                    String fullProviderInfoPath = serviceNameNodePath + "/" + providerInfoPath;
                    byte[] data = zooKeeper.getData(fullProviderInfoPath,false,null);
                    String jsonStr = new String(data,StandardCharsets.UTF_8);
                    ServiceInfo serviceInfo = JsonUtil.json2Obj(jsonStr,ServiceInfo.class);

                    serviceInfoList.add(serviceInfo);
                }catch (Exception e){
                    logger.error("findProviderInfoList getData error",e);
                }
            }

            logger.info("findProviderInfoList={}",JsonUtil.obj2Str(serviceInfoList));
            return serviceInfoList;
        } catch (Exception e) {
            throw new MyRpcException("findProviderInfoList error",e);
        }
    }

    private class ZookeeperListener implements Watcher{
        private final String path;
        private final String serviceName;

        public ZookeeperListener(String serviceName) {
            this.path = getServiceNameNodePath(serviceName);
            this.serviceName = serviceName;
        }

        @Override
        public void process(WatchedEvent event) {
            logger.info("ZookeeperListener process! path={}",path);

            try {
                // 重新整理快取
                List<ServiceInfo> serviceInfoList = findProviderInfoList(path);
                serviceInfoCacheMap.put(serviceName,serviceInfoList);
            } catch (Exception e) {
                logger.error("ZookeeperListener getChildren error! path={}",path,e);
            }
        }
    }
}

zookeeper註冊中心實現(curator使用者端,通過自動重試等操作解決了原生使用者端的一些坑)

public class ZkCuratorRegistry implements Registry {
    private static final Logger logger = LoggerFactory.getLogger(ZkCuratorRegistry.class);

    private CuratorFramework curatorZkClient;

    private final ConcurrentHashMap<String, List<ServiceInfo>> serviceInfoCacheMap = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, PathChildrenCache> nodeCacheMap = new ConcurrentHashMap<>();


    public ZkCuratorRegistry(String zkServerAddress) {
        try {
            this.curatorZkClient = CuratorFrameworkFactory.newClient(zkServerAddress, new ExponentialBackoffRetry(3000, 1));
            this.curatorZkClient.start();
        } catch (Exception e) {
            throw new MyRpcException("init zkClient error", e);
        }
    }

    @Override
    public void doRegistry(ServiceInfo serviceInfo) {
        // 先建立永久的服務名節點
        createServiceNameNode(serviceInfo.getServiceName());
        // 再建立臨時的providerInfo節點
        createProviderInfoNode(serviceInfo);
    }

    @Override
    public List<ServiceInfo> discovery(String serviceName) {
        return this.serviceInfoCacheMap.computeIfAbsent(serviceName,(key)->{
            List<ServiceInfo> serviceInfoList = findProviderInfoList(serviceName);

            // 建立對子節點的監聽
            String serviceNodePath = getServiceNameNodePath(serviceName);
            PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorZkClient, serviceNodePath, true);
            try {
                pathChildrenCache.start();
                nodeCacheMap.put(serviceName,pathChildrenCache);
                pathChildrenCache.getListenable().addListener(new ZkCuratorListener(serviceName));
            } catch (Exception e) {
                throw new MyRpcException("PathChildrenCache start error!",e);
            }

            return serviceInfoList;
        });
    }

    private void createServiceNameNode(String serviceName) {
        try {
            String serviceNameNodePath = getServiceNameNodePath(serviceName);

            // 服務名節點是永久節點
            if (curatorZkClient.checkExists().forPath(serviceNameNodePath) == null) {
                curatorZkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(serviceNameNodePath);
            }

            logger.info("createServiceNameNode success! serviceNameNodePath={}", serviceNameNodePath);
        } catch (Exception e) {
            throw new MyRpcException("createServiceNameNode error", e);
        }
    }

    private void createProviderInfoNode(ServiceInfo serviceInfo) {
        try {
            String serviceNameNodePath = getServiceNameNodePath(serviceInfo.getServiceName());
            // 子節點用一個uuid做path防重複
            String providerInfoNodePath = serviceNameNodePath + "/" + UUID.randomUUID();

            String providerInfoJsonStr = JsonUtil.obj2Str(serviceInfo);

            // providerInfo節點是臨時節點(如果節點宕機了,zk的連線斷開一段時間後,臨時節點會被自動刪除)
            curatorZkClient.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(providerInfoNodePath, providerInfoJsonStr.getBytes(StandardCharsets.UTF_8));
            logger.info("createProviderInfoNode success! path={}", providerInfoNodePath);
        } catch (Exception e) {
            throw new MyRpcException("createProviderInfoNode error", e);
        }
    }

    private String getServiceNameNodePath(String serviceName) {
        return MyRpcRegistryConstants.BASE_PATH + "/" + serviceName;
    }

    private List<ServiceInfo> findProviderInfoList(String serviceName) {
        String serviceNameNodePath = getServiceNameNodePath(serviceName);

        try {
            List<String> providerInfoPathList = curatorZkClient.getChildren().forPath(serviceNameNodePath);
            List<ServiceInfo> serviceInfoList = new ArrayList<>();

            for(String providerInfoPath : providerInfoPathList){
                try{
                    String fullProviderInfoPath = serviceNameNodePath + "/" + providerInfoPath;
                    byte[] data = curatorZkClient.getData().forPath(fullProviderInfoPath);
                    String jsonStr = new String(data,StandardCharsets.UTF_8);
                    ServiceInfo serviceInfo = JsonUtil.json2Obj(jsonStr,ServiceInfo.class);

                    serviceInfoList.add(serviceInfo);
                }catch (Exception e){
                    logger.error("findProviderInfoList getData error",e);
                }
            }

            logger.info("findProviderInfoList={}",JsonUtil.obj2Str(serviceInfoList));
            return serviceInfoList;
        } catch (Exception e) {
            throw new MyRpcException("findProviderInfoList error",e);
        }
    }

    private class ZkCuratorListener implements PathChildrenCacheListener {
        private final String serviceName;

        public ZkCuratorListener(String serviceName) {
            this.serviceName = serviceName;
        }

        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
            logger.info("ZookeeperListener process! serviceName={}",serviceName);

            try {
                // 重新整理快取
                List<ServiceInfo> serviceInfoList = findProviderInfoList(serviceName);
                serviceInfoCacheMap.put(serviceName,serviceInfoList);
            } catch (Exception e) {
                logger.error("ZookeeperListener getChildren error! serviceName={}",serviceName,e);
            }
        }
    }
}

3. 負載均衡

現在使用者端已經能通過服務發現得到實時的provider集合了,那麼使用者端發起請求時應該如何決定向哪個provider發起請求以實現provider側的負載均衡呢?

  • 常見的負載均衡演演算法有很多,MyRpc抽象出了負載均衡演演算法的介面,並實現了最簡單的兩種負載均衡演演算法(無權重的純隨機 + roundRobin)。
  • 在實際的環境中,每個provider可能機器的設定、網路延遲、執行時的動態負載、請求處理的延遲等都各有不同,優秀的負載均衡演演算法能夠通過預先的設定和採集執行時的各項指標來計算出最優的請求順序。
    MyRpc實現的負載均衡演演算法在這裡只起到一個拋磚引玉的參考作用。

負載均衡介面

/**
 * 負載均衡選擇器
 * */
public interface LoadBalance {

    ServiceInfo select(List<ServiceInfo> serviceInfoList);
}

隨機負載均衡

/**
 * 無權重,純隨機的負載均衡選擇器
 * */
public class RandomLoadBalance implements LoadBalance{
    @Override
    public ServiceInfo select(List<ServiceInfo> serviceInfoList) {
        int selectedIndex = ThreadLocalRandom.current().nextInt(serviceInfoList.size());
        return serviceInfoList.get(selectedIndex);
    }
}
/**
 * 無權重的輪訓負載均衡(後續增加帶權重的輪訓)
 * */
public class SimpleRoundRobinBalance implements LoadBalance{

    private final AtomicInteger count = new AtomicInteger();

    @Override
    public ServiceInfo select(List<ServiceInfo> serviceInfoList) {
        if(serviceInfoList.isEmpty()){
            throw new MyRpcException("serviceInfoList is empty!");
        }

        // 考慮一下溢位,取絕對值
        int selectedIndex = Math.abs(count.getAndIncrement());
        return serviceInfoList.get(selectedIndex % serviceInfoList.size());
    }
}

4. 支援多種叢集服務呼叫方式

由於需要通過網路發起rpc呼叫,比起本地呼叫很容易因為網路波動、遠端機器故障等原因而導致呼叫失敗。
使用者端有時希望能通過重試等方式遮蔽掉可能出現的偶發錯誤,儘可能的保證rpc請求的成功率,最好rpc框架能解決這個問題。但另一方面,能夠安全重試的基礎是下游服務能夠做到冪等,否則重複的請求會帶來意想不到的後果,而不冪等的下游服務只能至多呼叫一次。
因此rpc框架需要能允許使用者不同的服務可以有不同的叢集服務呼叫方式,這樣冪等的服務可以設定成可自動重試N次的failover呼叫、只能呼叫1次的fast-fail呼叫或者廣播呼叫等等方式。

MyRpc的Invoker介面用於抽象上述的不同叢集呼叫方式,並簡單的實現了failover和fast-fail等多種呼叫方式(參考dubbo)。

public interface InvokerCallable {
    RpcResponse invoke(NettyClient nettyClient);
}
/**
 * 不同的叢集呼叫方式
 * */
public interface Invoker {

    RpcResponse invoke(InvokerCallable callable, String serviceName,
                                      Registry registry, LoadBalance loadBalance);
}
/**
 * 快速失敗,無論成功與否呼叫1次就返回
 * */
public class FastFailInvoker implements Invoker {

  private static final Logger logger = LoggerFactory.getLogger(FastFailInvoker.class);

  @Override
  public RpcResponse invoke(InvokerCallable callable, String serviceName,
                            Registry registry, LoadBalance loadBalance) {
    List<ServiceInfo> serviceInfoList = registry.discovery(serviceName);
    logger.debug("serviceInfoList.size={},serviceInfoList={}",serviceInfoList.size(), JsonUtil.obj2Str(serviceInfoList));
    NettyClient nettyClient = InvokerUtil.getTargetClient(serviceInfoList,loadBalance);
    logger.info("ClientDynamicProxy getTargetClient={}", nettyClient);

    // fast-fail,簡單的呼叫一次就行,有錯誤就直接向上拋
    return callable.invoke(nettyClient);
  }
}
/**
 * 故障轉移呼叫(如果呼叫出現了錯誤,則重試指定次數)
 * 1 如果重試過程中成功了,則快讀返回
 * 2 如果重試了指定次數後還是沒成功,則丟擲異常
 * */
public class FailoverInvoker implements Invoker {

    private static final Logger logger = LoggerFactory.getLogger(FailoverInvoker.class);

    private final int defaultRetryCount = 2;
    private final int retryCount;

    public FailoverInvoker() {
        this.retryCount = defaultRetryCount;
    }

    public FailoverInvoker(int retryCount) {
        this.retryCount = Math.max(retryCount,1);
    }

    @Override
    public RpcResponse invoke(InvokerCallable callable, String serviceName, Registry registry, LoadBalance loadBalance) {
        MyRpcException myRpcException = null;

        for(int i=0; i<retryCount; i++){
            List<ServiceInfo> serviceInfoList = registry.discovery(serviceName);
            logger.debug("serviceInfoList.size={},serviceInfoList={}",serviceInfoList.size(), JsonUtil.obj2Str(serviceInfoList));
            NettyClient nettyClient = InvokerUtil.getTargetClient(serviceInfoList,loadBalance);
            logger.info("ClientDynamicProxy getTargetClient={}", nettyClient);

            try {
                RpcResponse rpcResponse = callable.invoke(nettyClient);
                if(myRpcException != null){
                    // 雖然最終重試成功了,但是之前請求失敗過
                    logger.warn("FailRetryInvoker finally success, but there have been failed providers");
                }
                return rpcResponse;
            }catch (Exception e){
                myRpcException = new MyRpcException(e);

                logger.warn("FailRetryInvoker callable.invoke error",e);
            }
        }

        // 走到這裡說明經過了retryCount次重試依然不成功,myRpcException一定不為null
        throw myRpcException;
    }
}
接入invoker之後的使用者端請求邏輯
/**
 * 使用者端動態代理
 * */
public class ClientDynamicProxy implements InvocationHandler {

    private static final Logger logger = LoggerFactory.getLogger(ClientDynamicProxy.class);

    private final Registry registry;
    private final LoadBalance loadBalance;
    private final Invoker invoker;

    public ClientDynamicProxy(Registry registry, LoadBalance loadBalance, Invoker invoker) {
        this.registry = registry;
        this.loadBalance = loadBalance;
        this.invoker = invoker;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Tuple<Object,Boolean> localMethodResult = processLocalMethod(proxy,method,args);
        if(localMethodResult.getRight()){
            // right為true,代表是本地方法,返回toString等物件自帶方法的執行結果,不發起rpc呼叫
            return localMethodResult.getLeft();
        }

        logger.debug("ClientDynamicProxy before: methodName=" + method.getName());

        String serviceName = method.getDeclaringClass().getName();

        // 構造請求和協定頭
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setInterfaceName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameterClasses(method.getParameterTypes());
        rpcRequest.setParams(args);

        MessageHeader messageHeader = new MessageHeader();
        messageHeader.setMessageFlag(MessageFlagEnums.REQUEST.getCode());
        messageHeader.setTwoWayFlag(false);
        messageHeader.setEventFlag(true);
        messageHeader.setSerializeType(GlobalConfig.messageSerializeType.getCode());
        messageHeader.setResponseStatus((byte)'a');
        messageHeader.setMessageId(rpcRequest.getMessageId());

        logger.debug("ClientDynamicProxy rpcRequest={}", JsonUtil.obj2Str(rpcRequest));

        RpcResponse rpcResponse = this.invoker.invoke((nettyClient)->{
            Channel channel = nettyClient.getChannel();
            // 將netty的非同步轉為同步,參考dubbo DefaultFuture
            DefaultFuture<RpcResponse> newDefaultFuture = DefaultFutureManager.createNewFuture(channel,rpcRequest);

            try {
                nettyClient.send(new MessageProtocol<>(messageHeader,rpcRequest));

                // 呼叫方阻塞在這裡
                return newDefaultFuture.get();
            } catch (Exception e) {
                throw new MyRpcException("InvokerCallable error!",e);
            }
        },serviceName,registry,loadBalance);

        logger.debug("ClientDynamicProxy defaultFuture.get() rpcResponse={}",rpcResponse);

        return processRpcResponse(rpcResponse);
    }

    /**
     * 處理本地方法
     * @return tuple.right 標識是否是本地方法, true是
     * */
    private Tuple<Object,Boolean> processLocalMethod(Object proxy, Method method, Object[] args) throws Exception {
        // 處理toString等物件自帶方法,不發起rpc呼叫
        if (method.getDeclaringClass() == Object.class) {
            return new Tuple<>(method.invoke(proxy, args),true);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return new Tuple<>(proxy.toString(),true);
            } else if ("hashCode".equals(methodName)) {
                return new Tuple<>(proxy.hashCode(),true);
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return new Tuple<>(proxy.equals(args[0]),true);
        }

        // 返回null標識非本地方法,需要進行rpc呼叫
        return new Tuple<>(null,false);
    }

    private Object processRpcResponse(RpcResponse rpcResponse){
        if(rpcResponse.getExceptionValue() == null){
            // 沒有異常,return正常的返回值
            return rpcResponse.getReturnValue();
        }else{
            // 有異常,往外丟擲去
            throw new MyRpcRemotingException(rpcResponse.getExceptionValue());
        }
    }
}

5. 使用者端請求設定超時時間

使用者端發起請求後,可能由於網路原因,可能由於伺服器端負載過大等原因而遲遲無法收到回覆。
出於效能或者自身業務的考慮,使用者端不能無限制的等待下去,因此rpc框架需要能允許使用者端設定請求的超時時間。在一定的時間內如果無法收到響應則需要丟擲超時異常,令呼叫者及時的感知到問題。

在使用者端側DefaultFuture.get方法,指定超時時間是可以做到這一點的。
但其依賴底層作業系統的定時任務機制,雖然超時時間的精度很高(nanos級別),但在高並行場景下效能不如時間輪。
具體原理可以參考我之前的部落格:時間輪TimeWheel工作原理解析

MyRpc參考dubbo,引入時間輪來實現使用者端設定請求超時時間的功能。

public class DefaultFutureManager {

    private static final Logger logger = LoggerFactory.getLogger(DefaultFutureManager.class);

    public static final Map<Long,DefaultFuture> DEFAULT_FUTURE_CACHE = new ConcurrentHashMap<>();
    public static final HashedWheelTimer TIMER = new HashedWheelTimer();

    public static void received(RpcResponse rpcResponse){
        Long messageId = rpcResponse.getMessageId();

        logger.debug("received rpcResponse={},DEFAULT_FUTURE_CACHE={}",rpcResponse,DEFAULT_FUTURE_CACHE);
        DefaultFuture defaultFuture = DEFAULT_FUTURE_CACHE.remove(messageId);

        if(defaultFuture != null){
            logger.debug("remove defaultFuture success");
            if(rpcResponse.getExceptionValue() != null){
                // 例外處理
                defaultFuture.completeExceptionally(rpcResponse.getExceptionValue());
            }else{
                // 正常返回
                defaultFuture.complete(rpcResponse);
            }
        }else{
            // 可能超時了,接到響應前已經remove掉了這個future(超時和實際接到請求都會呼叫received方法)
            logger.debug("remove defaultFuture fail");
        }
    }

    public static DefaultFuture createNewFuture(Channel channel, RpcRequest rpcRequest){
        DefaultFuture defaultFuture = new DefaultFuture(channel,rpcRequest);
        // 增加超時處理的邏輯
        newTimeoutCheck(defaultFuture);

        return defaultFuture;
    }

    public static DefaultFuture getFuture(long messageId){
        return DEFAULT_FUTURE_CACHE.get(messageId);
    }

    /**
     * 增加請求超時的檢查任務
     * */
    public static void newTimeoutCheck(DefaultFuture defaultFuture){
        TimeoutCheckTask timeoutCheckTask = new TimeoutCheckTask(defaultFuture.getMessageId());
        TIMER.newTimeout(timeoutCheckTask, defaultFuture.getTimeout(), TimeUnit.MILLISECONDS);
    }
}
public class TimeoutCheckTask implements TimerTask {

    private final long messageId;

    public TimeoutCheckTask(long messageId) {
        this.messageId = messageId;
    }

    @Override
    public void run(Timeout timeout) {
        DefaultFuture defaultFuture = DefaultFutureManager.getFuture(this.messageId);
        if(defaultFuture == null || defaultFuture.isDone()){
            // 請求已經在超時前返回,處理過了,直接返回即可
            return;
        }

        // 構造超時的響應
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setMessageId(this.messageId);
        rpcResponse.setExceptionValue(new MyRpcTimeoutException(
            "request timeout:" + defaultFuture.getTimeout() + " channel=" + defaultFuture.getChannel()));

        DefaultFutureManager.received(rpcResponse);
    }
}

6. 總結

  • 經過兩個迭代,目前MyRpc已經是一個麻雀雖小五臟俱全的rpc框架了。
    雖然無論在功能上還是在各種細節的處理上都還有很多需要優化的地方,但作為一個demo級別的框架,其沒有過多的抽象封裝,更有利於rpc框架的初學者去理解。
  • 做為Mit6.824課程學習的一部分,rpc的實現到此就暫時告一段落。後續我會繼續分享實現簡易raft kv資料庫的學習心得。
  • 部落格中展示的完整程式碼在我的github上:https://github.com/1399852153/MyRpc (release/lab2分支),內容如有錯誤,還請多多指教。