Dubbo原始碼(四)

2022-07-27 15:00:33

前言

本文基於Dubbo2.6.x版本,中文註釋版原始碼已上傳github:xiaoguyu/dubbo

上一篇文章,講了Dubbo的服務匯出:

Dubbo原始碼(三) - 服務匯出(生產者)

本文,咱們來聊聊Dubbo的服務參照。

本文案例來自Dubbo官方Demo,路徑為:

dubbo/dubbo-demo/dubbo-demo-consumer/

服務參照原理

Dubbo服務參照物件的生成,是在ReferenceBean#getObject()方法中

其生成時機有兩個:

  1. 餓漢式

    ReferenceBean物件繼承了InitializingBean介面

    public void afterPropertiesSet() throws Exception {
        ......
        Boolean b = isInit();
        if (b == null && getConsumer() != null) {
            b = getConsumer().isInit();
        }
        if (b != null && b.booleanValue()) {
            getObject();
        }
    }
    

    從程式碼可以看出,需要開啟init屬性

  2. 懶漢式

    因為ReferenceBean繼承了FactoryBean介面,當服務被注入到其他類中時,Spring會呼叫getObject方法

而服務的呼叫方式分三種:

  1. 本地參照
  2. 直連方式參照
  3. 註冊中心參照

不管是哪種參照方式,最後都會得到一個 Invoker 範例。

我們再次看看Invoker的官方解釋:

Invoker 是實體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它代表一個可執行體,可向它發起 invoke 呼叫,它有可能是一個原生的實現,也可能是一個遠端的實現,也可能一個叢集實現。

在Dubbo中,Invoker是多重套娃的(可以理解為裝飾器模式或者包裝增強類),通過一層層的包裝,使普通的Invoker具備了負載均衡、叢集的功能。

最後,為服務介面(本文為DemoService)生成代理物件,Invoker#invoke(Invocation invocation)實現服務的呼叫。

本文不討論直連方式參照,也不討論負載均衡、叢集等功能(後續再開坑說)。

建立代理物件

夢的開始,ReferenceBean#getObject()

public Object getObject() throws Exception {
    return get();
}

public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    }
    if (ref == null) {
        // init 方法主要用於處理設定,以及呼叫 createProxy 生成代理類
        init();
    }
    return ref;
}

很明顯,在 init 方法中生成了ref

private void init() {
    // 省略大堆的檢查以及引數處理
    ......
    //attributes are stored by system context.
    // 儲存 attributes 到系統上下文中
    StaticContext.getSystemContext().putAll(attributes);
    // 建立代理類
    ref = createProxy(map);
    // 根據服務名,ReferenceConfig,代理類構建 ConsumerModel,
    // 並將 ConsumerModel 存入到 ApplicationModel 中
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

直接看 createProxy(map)

private T createProxy(Map<String, String> map) {
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    final boolean isJvmRefer;
    // isJvmRefer 的賦值處理
    ......

    // 本地參照
    if (isJvmRefer) {
        // 生成invoker
        ......
    // 遠端參照
    } else {
        // 生成invoker、合併invoker
        ......
    }

    // invoker 可用性檢查
    ......
    // 生成代理類
    return (T) proxyFactory.getProxy(invoker);
}

這個方法主要做了兩件事

  1. 建立以及合併Invoker
  2. 生成代理物件

這裡先略過invoker的處理,先看看代理物件的生成。

proxyFactory 是自適應拓展類,預設實現是JavassistProxyFactory,getProxy 方法在其父類別AbstractProxyFactory

// 這是AbstractProxyFactory類的方法
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    return getProxy(invoker, false);
}

public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
    Class<?>[] interfaces = null;
    // 獲取介面列表
    String config = invoker.getUrl().getParameter("interfaces");
    if (config != null && config.length() > 0) {
        // 切分介面列表
        String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
        if (types != null && types.length > 0) {
            interfaces = new Class<?>[types.length + 2];
            // 設定服務介面類和 EchoService.class 到 interfaces 中
            interfaces[0] = invoker.getInterface();
            interfaces[1] = EchoService.class;
            for (int i = 0; i < types.length; i++) {
                // 載入介面類
                interfaces[i + 2] = ReflectUtils.forName(types[i]);
            }
        }
    }
    if (interfaces == null) {
        interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
    }

    // 為 http 和 hessian 協定提供泛化呼叫支援,參考 pull request #1827
    if (!invoker.getInterface().equals(GenericService.class) && generic) {
        int len = interfaces.length;
        Class<?>[] temp = interfaces;
        // 建立新的 interfaces 陣列
        interfaces = new Class<?>[len + 1];
        System.arraycopy(temp, 0, interfaces, 0, len);
        // 設定 GenericService.class 到陣列中
        interfaces[len] = GenericService.class;
    }

    // 呼叫過載方法
    return getProxy(invoker, interfaces);
}

這裡的大段邏輯都是在處理interfaces引數,此時interfaces的值為{ DemoService.class, EchoService.class }

繼續看子類JavassistProxyFactory實現的 getProxy(invoker, interfaces) 方法

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    // return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    // 原始碼是上面那行,我們將上面的程式碼改下面的格式
    Proxy proxy = Proxy.getProxy(interfaces);
    return (T) proxy.newInstance(new InvokerInvocationHandler(invoker));
}

注意:我下面開始講的 proxy 不是平時理解的代理物件,你可以理解為一個生成代理物件的 builder

此方法做了兩件事:

  1. 生成proxy物件
  2. 呼叫 proxy 的newInstance方法生成實際的代理物件

這裡,我就不講 Proxy.getProxy 的原始碼了,感興趣的朋友自行了解。簡單講下里面做了什麼:

  1. 構建服務介面(本文為DemoService)的代理類的位元組碼物件,其生成的位元組碼物件如下:

    這裡簡化了下程式碼,實際上還實現了EchoService介面

    package org.apache.dubbo.common.bytecode;
    
    public class proxy0 implements org.apache.dubbo.demo.DemoService {
    
        public static java.lang.reflect.Method[] methods;
    
        private java.lang.reflect.InvocationHandler handler;
    
        public proxy0() {
        }
    
        public proxy0(java.lang.reflect.InvocationHandler arg0) {
            handler = $1;
        }
    
        public java.lang.String sayHello(java.lang.String arg0) {
            Object[] args = new Object[1];
            args[0] = ($w) $1;
            Object ret = handler.invoke(this, methods[0], args);
            return (java.lang.String) ret;
        }
    }
    
  2. 構建生成服務介面代理物件的builder

    package com.alibaba.dubbo.common.bytecode;
    
    public class Proxy0 extends com.alibaba.dubbo.common.bytecode.Proxy {
    
        public Proxy0() {
        }
    
        public Object newInstance(java.lang.reflect.InvocationHandler h){
            return new com.alibaba.dubbo.common.bytecode.proxy0($1);
        }
    }
    

注意一下:一個是proxy0,另一個是Proxy0,包名不同,類名的p子也有大小寫的區別,別搞混了

再對照之前的 getProxy 方法

Proxy.getProxy(interfaces) 生成的是 Proxy0(大寫的P)

proxy.newInstance(new InvokerInvocationHandler(invoker)) 生成的是 proxy0(小寫的p)

至此,Dubbo服務參照物件已生成,可以看到,生成的參照物件結構也很簡單,主要是依賴 invoker 物件完成介面呼叫的,下面就去看看 invoker 的生成。

建立Invoker

讓我們的視線重新回到createProxy方法中

private T createProxy(Map<String, String> map) {
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    final boolean isJvmRefer;
    // isJvmRefer 的賦值處理
    ......

    // 本地參照
    if (isJvmRefer) {
        // 生成本地參照 URL,協定為 injvm
        URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
        // 呼叫 refer 方法構建 InjvmInvoker 範例
        invoker = refprotocol.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
    // 遠端參照
    } else {
        // url 不為空,表明使用者可能想進行對等呼叫
        if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
            ......
        } else { // assemble URL from register center's configuration
            // 載入註冊中心 url
            ......
        }

        // 單個註冊中心或服務提供者(服務直連,下同)
        if (urls.size() == 1) {
            // 呼叫 RegistryProtocol 的 refer 構建 Invoker 範例
            invoker = refprotocol.refer(interfaceClass, urls.get(0));

        // 多個註冊中心或多個服務提供者,或者兩者混合
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;

            // 獲取所有的 Invoker
            for (URL url : urls) {
                // 通過 refprotocol 呼叫 refer 構建 Invoker,refprotocol 會在執行時
                // 根據 url 協定頭載入指定的 Protocol 範例,並呼叫範例的 refer 方法
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // 如果註冊中心連結不為空,則將使用 AvailableCluster
                URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 建立 StaticDirectory 範例,並由 Cluster 對多個 Invoker 進行合併
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    // invoker 可用性檢查
    ......
    // 生成代理類
    return (T) proxyFactory.getProxy(invoker);
}

本地參照

if (isJvmRefer) {
    // 生成本地參照 URL,協定為 injvm
    URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
    // 呼叫 refer 方法構建 InjvmInvoker 範例
    invoker = refprotocol.refer(interfaceClass, url);
    if (logger.isInfoEnabled()) {
        logger.info("Using injvm service " + interfaceClass.getName());
    }
}

refprotocol 是自適應拓展,根據URL中的協定,確定實現類是InjvmProtocol

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}

其 refer 方法也很簡單,就生成了 InjvmInvoker 物件並返回。其實這裡搭配服務呼叫過程才容易理解(也就是InjvmInvoker#doInvoke(Invocation invocation方法),但本文是將服務參照過程,所以不展開。

遠端參照

遠端參照區分單註冊中心或單服務提供者和多註冊中心或多服務提供者,此處我們以單註冊中心或單服務提供者舉例,主要邏輯是下面這段

// 單個註冊中心或服務提供者(服務直連,下同)
if (urls.size() == 1) {
    // 呼叫 RegistryProtocol 的 refer 構建 Invoker 範例
    invoker = refprotocol.refer(interfaceClass, urls.get(0));
}

refprotocol 是自適應拓展類,根據 url 中的協定引數,其實現類為RegistryProtocol

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    // 獲取註冊中心範例
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    // 將 url 查詢字串轉為 Map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    // 獲取 group 設定
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                || "*".equals(group)) {
            // 通過 SPI 載入 MergeableCluster 範例,並呼叫 doRefer 繼續執行服務參照邏輯
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // 呼叫 doRefer 繼續執行服務參照邏輯
    return doRefer(cluster, registry, type, url);
}

獲取註冊中心範例的過程,就是建立 zookeeper 連線,我在上一篇Dubbo服務匯出博文中講過了,請自行查詢。

我們繼續關注主要方法doRefer(cluster, registry, type, url)

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 建立 RegistryDirectory 範例
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 設定註冊中心和協定
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    // 生成服務消費者連結
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    // 註冊服務消費者,在 consumers 目錄下新節點
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
        registry.register(registeredConsumerUrl);
        directory.setRegisteredConsumerUrl(registeredConsumerUrl);
    }

    // 訂閱 providers、configurators、routers 等節點資料
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

    // 一個註冊中心可能有多個服務提供者,因此這裡需要將多個服務提供者合併為一個
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

此方法主要做了4個操作:

  1. 建立一個 RegistryDirectory 範例,這是一個服務目錄物件。

    服務目錄中儲存了一些和服務提供者有關的資訊,通過服務目錄,服務消費者可獲取到服務提供者的資訊,比如 ip、埠、服務協定等。通過這些資訊,服務消費者就可通過 Netty 等使用者端進行遠端呼叫。

  2. 向註冊中心進行註冊

  3. 訂閱 providers、configurators、routers 等節點下的資料

  4. 生成invoker

    cluster.join(directory) 預設實現類是FailoverCluster,這個是叢集處理,後續文章再討論。

討論了這麼久,還沒看到如何連線暴露出來的遠端服務。

其實,連線遠端服務的操作,就是在訂閱 providers 節點資料時完成的

連線遠端服務

這裡,就不細說訂閱 providers 之後的各種處理,直接快進到遠端服務的連線。下面放上訂閱節點資料到啟動遠端連線的呼叫路徑

別問為什麼是DubboProtocol,因為服務匯出時,也就會zookeeper的providers節點中註冊的url,就是Dubbo協定

下面來看看DubboProtocol的 refer 方法

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // 序列化優化處理
    optimizeSerialization(url);
    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

此方法建立了 DubboInvoker 並返回,但是 DubboInvoker 的構造方法沒啥好說的,就是一些類變數的賦值。我們主要關注 getClients ,其返回的是使用者端範例

private ExchangeClient[] getClients(URL url) {
    // whether to share connection
    // 是否共用連線
    boolean service_share_connect = false;
    // 獲取連線數,預設為0,表示未設定
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // 如果未設定 connections,則共用連線
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            // 獲取共用使用者端
            clients[i] = getSharedClient(url);
        } else {
            // 初始化新的使用者端
            clients[i] = initClient(url);
        }
    }
    return clients;
}

connections 的預設值為0,也就是 service_share_connect 為 true ,進入 getSharedClient(url) 方法

private ExchangeClient getSharedClient(URL url) {
    String key = url.getAddress();
    // 獲取帶有「參照計數」功能的 ExchangeClient
    ReferenceCountExchangeClient client = referenceClientMap.get(key);
    if (client != null) {
        if (!client.isClosed()) {
            // 增加參照計數
            client.incrementAndGetCount();
            return client;
        } else {
            referenceClientMap.remove(key);
        }
    }

    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        if (referenceClientMap.containsKey(key)) {
            return referenceClientMap.get(key);
        }

        // 建立 ExchangeClient 使用者端
        ExchangeClient exchangeClient = initClient(url);
        // 將 ExchangeClient 範例傳給 ReferenceCountExchangeClient,這裡使用了裝飾模式
        client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
        referenceClientMap.put(key, client);
        ghostClientMap.remove(key);
        locks.remove(key);
        return client;
    }
}

此處就是一些參照計數和快取操作,主要關注 ExchangeClient 的建立

private ExchangeClient initClient(URL url) {
    // 獲取使用者端型別,預設為 netty
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
    ......

    ExchangeClient client;
    try {
        // 獲取 lazy 設定,並根據設定值決定建立的使用者端型別
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            // 建立懶載入 ExchangeClient 範例
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 建立普通 ExchangeClient 範例
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }
    return client;
}

我們這裡不討論懶載入的情況。有見到了熟悉的 Exchangers, 在服務匯出的時候,呼叫的是Exchangers.bind 方法,服務參照這裡用的是 Exchangers.connect

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger 範例,預設為 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);
}

這裡 getExchanger(url) 返回的是 HeaderExchangeClient,直接進去看 connect 方法

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 這裡包含了多個呼叫,分別如下:
    // 1. 建立 HeaderExchangeHandler 物件
    // 2. 建立 DecodeHandler 物件
    // 3. 通過 Transporters 構建 Client 範例
    // 4. 建立 HeaderExchangeClient 物件
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

HeaderExchangeClient內部持有 client ,並封裝了心跳的功能。我們重點在 Transporters.connect ,也就是Dubbo的網路傳輸層是如何連線的

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handler 數量大於1,則建立一個 ChannelHandler 分發器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // 獲取 Transporter 自適應拓展類,並呼叫 connect 方法生成 Client 範例
    return getTransporter().connect(url, handler);
}

getTransporter() 獲取的是Transporter的自適應拓展類,預設是NettyTransporter

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyClient(url, listener);
}

NettyTransporter的 connect 方法就建立了一個 NettyClient 物件,所以啟動連線的相關邏輯在其建構函式中

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
    super(url, wrapChannelHandler(url, handler));
}

// NettyClient的父類別AbstractClient
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    ......

    try {
        doOpen();
    } catch (Throwable t) {
        ......
    }
    try {
        connect();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
        }
    } catch (RemotingException t) {
        ......
    }

    ......
}

這裡又是使用模板方法,doOpen() 和 connect() 的具體實現在子類NettyClient中,其作用就是建立對遠端服務的連線。這部分屬於Netty的API呼叫,就不做具體描述了。

總結

本文講述了Dubbo服務匯出的過程,也就是建立服務介面代理物件的過程。其中服務呼叫、叢集、負載均衡等部分並未描述,可以期待後續文章。