dubbo的消費者是怎麼獲取提供者服務介面參照的?

2022-06-16 18:01:18

本文主要解讀dubbo消費者是如何參照伺服器端介面的,是如何像本地呼叫一樣呼叫遠端服務的。

並試著從設計者的角度思考,為何這樣設計。

@Component
public class DubboConsumer {

    @Reference(check = false)
    private HelloService helloService;

    public String sayHello(String name) {
        return helloService.sayHello(name);
    }
}

拿這個實際的例子來說,DubboConsumer在建立範例的時候,是如何注入HelloService依賴的,因為HelloService實現類在遠端的另一臺伺服器上。

我們都知道在Spring容器啟動的時候,會載入所有類為BeanDefinition,然後在調getBean方法的時候,會對類進行範例化,並注入依賴。

使用@EnableDubbo表示要啟動dubbo,會註冊幾個post processor,其中包括ReferenceAnnotationBeanPostProcessor,處理具有@Reference註解的屬性,即在Spring bean範例化時,會注入通過dubbo方式生成的伺服器端參照。

1.先整體看看dubbo的參照

(1)從整體架構上看消費端參照建立的過程

 很多人看到這張圖,一臉懵逼,我也不例外,經過七七四十九天的磨練,終於搞清了咋回事,我將一點點拆解,分享給你。

首先是消費者獲得伺服器端介面參照,核心入口是ReferenceConfig,整體可以劃分為這7步:

  • 1.調ReferenceConfig的get方法,獲取伺服器端介面範例;

  • 2.調Protocol的refer方法,得到調伺服器端介面使用的invoker

  • 3.拿到invoker後,封裝在RegistryDirectory中,並通過RegistryProtocol完成對提供者以及相關設定的訂閱

  • 4.實際invoker是通過DubboProtocol建立的DubboInvoker,其包含了消費者到提供者的連線使用者端

  • 5.使用者端的建立入口為Exchangers類,呼叫Exchanger(預設實現類為HeaderExchanger)的connect方法獲取連線伺服器端的使用者端

  • 6.真實的使用者端的建立,是在Transporters類中發生的,會調Transporter(預設實現類為NettyTransporter)的connect方法建立NettyClient

  • 7.根據獲取到的invoker,通過反射得到伺服器端介面的範例,從而可以像調本地方法一樣呼叫遠端

其中在程式碼呼叫關係上看,第2步是在第3步之後,這裡這樣劃分是為了方便敘述,讓官網的架構圖更容易理解。

沿著呼叫鏈路,我們可以暫時不關係第3步,專注於invoker的生成,呼叫序列圖如下:

 上圖描述了獲取invoker過程中最關鍵的呼叫序列,包括使用者端建立、伺服器端介面資訊封裝(invoker)、代理類生成。

(2)看看註冊中心

我們知道使用者端想要調伺服器端,首先需要知道服務的ip和埠號等資訊,那在dubbo消費端啟動的時候,我怎麼知道伺服器端介面的資訊呢?

最先想到的方式就是在建立invoker的入口,將伺服器端資訊傳進去,在new NettyClient時,建立與伺服器端的TCP連線。

但這種方式對使用者特別不方便,同時對程式碼也有侵入性。

那麼是否可以將提供者的介面資訊放在某個地方,在建立invoker的時候,從那裡獲取到介面資訊,大家自然想到的是設定中心。

但設定中心需要人工設定值,然後推播到消費端,當介面非常多時,將無法維護。

所以引入了註冊中心,消費端與伺服器端都可以與註冊中心進行互動。

伺服器端可以將介面資訊自動暴露到註冊中心,消費者可以從註冊中心獲取到介面資訊。

 

又有一個問題,如果消費者參照的介面發生變動,比如新增了一臺提供者,或者伺服器端宕機了,消費者如何能夠實時得感知到並及時做出調整呢?

這就需要消費者能夠監聽註冊中心,註冊中心發生變更,及時通知消費者

最終消費者、服務提供者和註冊中心的關係如下圖:

 

 也就要考慮我們的第3步了,這時呼叫序列圖變為:

 

紅色線框部分即為與註冊中心相關的呼叫,三個核心類:

RegistryProtocol:處理註冊中心相關的Protocol實現,如獲取註冊中心範例,關聯註冊中心與invoker

ZookeeperRegistry:代表Zookeeper為註冊中心的實體,封裝了與Zookeeper互動操作,如訂閱、監聽等

RegistryDirectory:是一個目錄實現類,顧名思義,它持有Invoker列表,同時還有到路由、負載均衡等

另外,RegistryDirectory實現了NotifyListener,在註冊中心資訊發生變更的時候,會調notify方法,更新RegistryDirectory中的invoker列表,從而實現了消費端對伺服器端介面的動態同步。

 

  

2.對原始碼庖丁解牛

dubbo消費端注入提供者服務參照,可以認為從ReferenceAnnotationBeanPostProcessor.doGetInjectedBean開始,

該類在dubbo-spring中,程式碼與註釋如下:

//ReferenceAnnotationBeanPostProcessor.
@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                   InjectionMetadata.InjectedElement injectedElement) throws Exception {
    /**
     * The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
     獲取@Service註解的服務bean name,即被參照的bean
     */
    String referencedBeanName = buildReferencedBeanName(attributes, injectedType);

    /**
     * The name of bean that is declared by {@link Reference @Reference} annotation injection
     獲取@Reference註解的服務參照bean name,即提供者服務bean name
     */
    String referenceBeanName = getReferenceBeanName(attributes, injectedType);

      //獲取ReferenceBean範例,ReferenceConfig的範例
    ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
      //註冊ReferenceBean到Spring容器中
    registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);
    
    cacheInjectedReferenceBean(referenceBean, injectedElement);
        //獲取並建立提供者服務介面的代理類,即使用者最終得到的範例,通過該範例完成RPC透明化呼叫
    return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
}

private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
              //如果參照的服務介面在本地,則直接使用本地Spring容器中的服務範例
        if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
            return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
                    wrapInvocationHandler(referenceBeanName, referenceBean));
        } else { // ReferenceBean should be initialized and get immediately
            //獲取遠端服務介面範例
            return referenceBean.get();
        }
    }

 接下來,我們按之前的呼叫序列圖,一步步往下看。

 

(1)Reference的get

原始碼非常長(包含URL的構建等),這裡做了精簡,只保留了關鍵部分,如下:

//ReferenceConfig
public synchronized T get() {
    if (ref == null) {
        init();
    }
    return ref;
}

public synchronized void init() {
  //1.引數準備和處理
  //2.建立代理
  ref = createProxy(map);
}

private T createProxy(Map<String, String> map) {
  //獲取註冊中心
  ConfigValidationUtils.loadRegistries(this, false);
  
  //1.根據註冊中心構造註冊URL,由此去構建invoker。存在多個註冊中心時,會通過CLUSTER進行包裝
  //此處,REF_PROTOCOL範例為RegistryProtocol,通過RegistryProtocol獲取invoker
  invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
  
  //2.建立遠端invoker的代理類,便於消費者無侵入使用,預設PROXY_FACTORY=JavassistProxyFactory
  return (T) PROXY_FACTORY.getProxy(invoker);
}

 其中,程式碼中涉及到一些ReferenceConfig關鍵屬性,前兩個是通過SPI獲取的Protocol和ProxyFactory,ref為介面的最終代理範例,invoker為參照服務的封裝,如下

/**
 * Protocol的自適應類,與URL相關,協定型別為registry(如registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=dubbo-sample),對應的類為RegistryProtocol;協定型別為dubbo,對應的類為DubboProtocol;
 * 同時為Protocol範例自動包裝兩個類,ProtocolFilterWrapper和ProtocolListenerWrapper
 */
    private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

/**
     * ProxyFactory自適應類,預設實現為JavassistProxyFactory
     */
    private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

    /**
     * The interface proxy reference
     */
    private transient volatile T ref;

    /**
     * The invoker of the reference service
     */
    private transient volatile Invoker<?> invoker;

 REF_PROTOCOL通過自適應獲取的時候,會封裝幾個關鍵包裝類,分別是ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper:

 

過濾器包裝類ProtocolFilterWrapper:對非註冊invoker增加過濾器

public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;
        //ProtocolFilterWrapper為SPI Protocol的包裝類
    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
  
      @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
          //註冊URL不需要增加過濾器
        if (UrlUtils.isRegistry(url)) {
            return protocol.refer(type, url);
        }
          //其他invoker需要通過filter進行包裝,實現過濾功能
        return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
    }
  
  private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
                //對filters迴圈遍歷,構建被filter修飾的Invoker鏈,真實的invoker在連結串列尾部
  }
}

 

監聽包裝類ProtocolListenerWrapper:對非註冊invoker註冊監聽器

public class ProtocolListenerWrapper implements Protocol {
  @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (UrlUtils.isRegistry(url)) {
            return protocol.refer(type, url);
        }
          //為普通的invoker增加監聽,從InvokerListener介面看,只有參照invoker和destroy時會觸發listener
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                Collections.unmodifiableList(
                        ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                .getActivateExtension(url, INVOKER_LISTENER_KEY)));
    }
}

 

Qos包裝類QosProtocolWrapper:該類只對註冊URL時生效

public class QosProtocolWrapper implements Protocol {
  @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
          //只有對註冊URL,才開啟QOS
        if (UrlUtils.isRegistry(url)) {
            startQosServer(url);
            return protocol.refer(type, url);
        }
        return protocol.refer(type, url);
    }
}

 用張圖總結上邊的過程:

 

2)RegistryProtocol的refer

RegistryProtocol作為註冊中心與invoker之間的溝通橋樑,程式碼如下:

public class RegistryProtocol implements Protocol {
      @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = getRegistryUrl(url);
          //1.根據URL獲取註冊中心,此處會建立註冊中心使用者端,並連線註冊中心。如果之前已經建立過,則直接返回快取值
          //此處預設使用ZookeeperRegistry,具體建立過程,後續再說,現在不用管
        Registry registry = registryFactory.getRegistry(url);
          //如果獲取的是註冊服務對應的invoker,則直接通過代理工廠生成代理物件
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }
                
          //2.判斷設定group,使用mergeable的cluster,可以暫時不關心
        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
          //3.調doRefer獲取invoker
        return doRefer(cluster, registry, type, url);
    }
}

 最終會調doRefer方法,如下:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
          //1.構造RegistryDirectory,其對註冊中心、路由、設定、負載均衡、invoker列表等資訊進行封裝
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        //2.構造消費者需要訂閱的URL,用於後續訂閱zk中設定、路由、provider等
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            //3.獲取並設定消費者的URL
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            //3.1將消費者URL註冊到註冊中心,如果沒有consumer節點,則建立
            registry.register(directory.getRegisteredConsumerUrl());
        }
        //4.構建路由鏈
        directory.buildRouterChain(subscribeUrl);
        //5 訂閱註冊中心的 provider、設定、路由等節點,當發生變動時,即時更新invoker資訊
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

        //6 cluster對目錄進行封裝,暴露給使用者只有一個invoker,在實際呼叫時,通過路由、負載均衡等,傳送請求到某一個invoker
        Invoker invoker = cluster.join(directory);
        return invoker;
    }

 在doRefer方法中,我們看到構建了RegistryDirectory(每個提供者的服務介面都對應一個RegistryDirectory),並通過RegistryDirectory關聯了註冊中心與invoker;同時完成消費者在註冊中心的註冊,以及對註冊中心的訂閱。

可能有人疑惑,invoker在哪,沒看到怎麼就去訂閱了呢?

這個地方應該是設計者為了對程式碼複用進行的設計,因為在應用執行過程中,需要監聽註冊中心,判斷提供者是否有變動。

這也是為什麼RegistryDirectory實現監聽介面,同時持有註冊中心和invoker。在變動的情況下,會更新對應的invoker列表。

接下來,我們會考到會調到RegistryDirectory的notify介面。

 

首先看上邊程式碼的第5步,通過RegistryDirectory實現消費者對註冊中心的訂閱,程式碼如下:

//RegistryDirectory
public void subscribe(URL url) {
    setConsumerUrl(url);
    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
    //註冊中心發起訂閱,註冊中心為ZookeeperRegistry,會調其父類別FailbackRegistry.subscribe,其中包含了失敗重試的策略
    registry.subscribe(url, this);
}

 其中registry為ZookeeperRegistry範例,其父類別為FailbackRegistry,此處會調膚類的訂閱方法。FailbackRegistry在註冊中心範例的基礎上,增加了失敗重試的功能。

其中引數this是NotifyListener,即RegistryDirectory本身。

//FailbackRegistry
@Override
public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    //1 從失敗訂閱列表中刪除對應的訂閱請求,取消定時重試
    removeFailedSubscribed(url, listener);
    try {
        //2 Sending a subscription request to the server side,調ZookeeperRegistry實現訂閱
        doSubscribe(url, listener);
    } catch (Exception e) {
       
        //3 Record a failed registration request to a failed list, retry regularly 失敗後加入失敗訂閱列表進行重試
        addFailedSubscribed(url, listener);
    }
}

 訂閱邏輯發生在doSubscribe中,由ZookeeperRegistry範例進行的實現。程式碼如下:

//ZookeeperRegistry
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (ANY_VALUE.equals(url.getServiceInterface())) {
            //省略
        } else {
            List<URL> urls = new ArrayList<>();
            //1 遍歷provider、設定、路由node,並註冊監聽到這些節點上,當節點發生變化,會呼叫ZookeeperRegistry的notify介面
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    //2 建立zk監聽器
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkListener = listeners.get(listener);
                }
                //3 在zk建立provider、設定、路由對應的路徑
                zkClient.create(path, false);
                //4 增加監聽器
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            //5 通知監聽器,根據urls變化更新伺服器端invoker列表,在初次啟動時,構建invoker
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

 我們看到,在ZookeeperRegistry的訂閱方法中,其持有到zk的使用者端,可以建立相應的節點,並實現監聽。

同時,for迴圈是對provider、設定、路由進行建立於訂閱,即完成消費者對提供者服務介面、設定、路由規則的訂閱,從而可以實現對提供者變化時得到通知,設定貨路由發生變化時也能得到通知。

 

第5步,notify方法,會調父類別FailbackRegistry的notify方法,如下:

//FailbackRegistry
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    try {
        //1 通知
        doNotify(url, listener, urls);
    } catch (Exception t) {
        //2 Record a failed registration request to a failed list, retry regularly 通知失敗後,加入失敗通知列表,用於重試
        addFailedNotified(url, listener, urls);
        logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    }
}

 FailbackRegistry再調父類別AbstractRegistry的notify方法,裡邊會呼叫監聽器即RegistryDirectory,進行更新或構建invoker,程式碼如下:

//AbstractRegistry
/**
 * Notify changes from the Provider side.
 *
 * @param url      consumer side url
 * @param listener listener
 * @param urls     provider latest urls  最新的伺服器端URLs(包括provider、設定、路由的URL)
 */
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    //1 keep every provider's category. 按類別劃分URL
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    //2 根據urls,通知變更所有invoker、設定等資訊
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        //3 通知監聽器RegistryDirectory,更新其中設定、路由、provider、invoker等資訊
        listener.notify(categoryList);
        saveProperties(url);
    }
}

 第3步,調監聽器RegistryDirectory,更新伺服器端資訊

//RegistryDirectory
@Override
public synchronized void notify(List<URL> urls) {
    //1 按類別(provider、設定、路由)劃分需要更新的urls
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(url -> {
                if (UrlUtils.isConfigurator(url)) {
                    return CONFIGURATORS_CATEGORY;
                } else if (UrlUtils.isRoute(url)) {
                    return ROUTERS_CATEGORY;
                } else if (UrlUtils.isProvider(url)) {
                    return PROVIDERS_CATEGORY;
                }
                return "";
            }));

    //2 更新設定資訊
    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    //3 更新路由資訊
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    //4 獲取最新的 providers URL
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    
    //5 更新消費端對應的invoker列表
    refreshOverrideAndInvoker(providerURLs);
}

 第5步,通過refreshOverrideAndInvoker更新invoker列表

private void refreshOverrideAndInvoker(List<URL> urls) {
        overrideDirectoryUrl();
        //更新invokers
        refreshInvoker(urls);
}

 繼續往後走,就是具體更新邏輯

private void refreshInvoker(List<URL> invokerUrls) {
    //invokerUrls為空,表示更新設定或路由
    Assert.notNull(invokerUrls, "invokerUrls should not be null");
    //1 如果只有一個invokerUrl,同時協定為empty,一般表示介面沒有可用提供者,會登出所有invoker
    if (invokerUrls.size() == 1
            && invokerUrls.get(0) != null
            && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // Forbid to access
        this.invokers = Collections.emptyList();
        routerChain.setInvokers(this.invokers);
        destroyAllInvokers(); // Close all invokers
    } else {
        //2 有可用的提供者,更新invoker的快取urlInvokerMap
        this.forbidden = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls == Collections.<URL>emptyList()) {
            invokerUrls = new ArrayList<>();
        }
        //3。如果invokerUrls為空,則繼續使用快取的invokerUrls。否則使用最新的
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<>();
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        //4 轉換新的invokerUrls為invoker
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

        List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        // pre-route and build cache, notice that route cache should build on original Invoker list.
        // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
        routerChain.setInvokers(newInvokers);
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
          //5 替換最新的invoker
        this.urlInvokerMap = newUrlInvokerMap;

        try {
            //6 銷燬不用的invoker
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

 調toInvokers方法,將URL轉換為invoker,並快取起來

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
  for (URL providerUrl : urls) {
    // 構造InvokerDelegate,其中protocol.refer返回的為原invoker
    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
    newUrlInvokerMap.put(key, invoker);
  }
}

 到此為止,dubbo完成了從註冊中心獲取伺服器端介面資訊,並將其轉換為invoker列表;

同時這些invoker列表儲存在RegistryDirectory中,可以實時監聽註冊中心的變更。

在toInvokers方法中,會呼叫DubboProtocol的refer方法實現服務提供者URL到invoker的轉變,具體見後邊文章。

 

這裡有個問題,為什麼要封裝invoker到在InvokerDelegate中呢?

官網的解釋:

The delegate class, which is mainly used to store the URL address sent by the registry,and can be reassembled on the basis of providerURL queryMap overrideMap for re-refer.

 其實就是對invoker和伺服器端URL的封裝,便於後續使用。

具體如何使用的,我們後邊再說。

用張圖總結上邊的過程:起於RegistryProtocol,終於RegistryDirectory。

 

(3)DubboProtocol的refer

在呼叫DubboProtocol的refer方法的過程中,也還會呼叫ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper這三個包裝類,實現對invoker的攔截與監聽。

首先會調到父類別AbstractProtocol的refer方法,如下

//AbstractProtocol
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    //構造非同步實現同步的invoker
    return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}

 我們看到,會將DubboProtocol建立的invoker封裝為AsyncToSyncInvoker,為何要進行這樣封裝呢?

原因是,dubbo呼叫底層是基於netty進行的,是非同步的過程,AsyncToSyncInvoker可以實現同步的呼叫,具體細節後邊再說,暫時可以不管,只知道進行封裝就行了。

 

千呼萬喚始出來,終於找到URL轉換為invoker的根了,我們看到invoker實際就是DubboInvoker的範例

//DubboProtocol
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.建立真實的rpc invoker,其中包含使用者端的建立
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

 到此,我們追蹤到invoker它祖先DubboInvoker,在ReferenceConfig拿到的invoker是DubboInvoker經過層層包裝的結果。

也正是由於這些包裝,我們可以對invoker進行一些客製化化和擴充套件性的控制。

用張圖總結上邊的過程:

 

(4) 建立Client

getClients(url)根據URL獲取使用者端,建立消費者與提供者之間的TCP連線。

預設情況下,對伺服器端是共用的,即一個消費者與提供者之間保持一個TCP連線。

private ExchangeClient[] getClients(URL url) {
    // whether to share connection

    boolean useShareConnect = false;

    int connections = url.getParameter(CONNECTIONS_KEY, 0);
    List<ReferenceCountExchangeClient> shareClients = null;
    // if not configured, connection is shared, otherwise, one connection for one service
    //1 預設使用共用的1個使用者端
    if (connections == 0) {
        useShareConnect = true;
        String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
        //2 預設保持消費者與提供者之間只有一個TCP連線
        connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
        //3 獲取共用使用者端
        shareClients = getSharedClient(url, connections);
    }

    //4 構造ExchangeClient陣列,若不共用,需要建立多個消費者與提供者之間的TCP連線
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (useShareConnect) {
            clients[i] = shareClients.get(i);

        } else {
            clients[i] = initClient(url);
        }
    }

    return clients;
}

 

共用使用者端是如何實現的呢?

DubboProtocol持有使用者端快取referenceClientMap,key為伺服器端host:port,value為ExchangeClient的封裝類ReferenceCountExchangeClient列表,其中包含參照計數。

/**
     * <host:port,Exchanger>
     */
private final Map<String, List<ReferenceCountExchangeClient>> referenceClientMap = new ConcurrentHashMap<>();

private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
    //1 共用的client,key為提供者ip和埠號
    String key = url.getAddress();
    List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);

    if (checkClientCanUse(clients)) {
        batchClientRefIncr(clients);
        return clients;
    }

    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        clients = referenceClientMap.get(key);
        // dubbo check
        if (checkClientCanUse(clients)) {
            batchClientRefIncr(clients);
            return clients;
        }

        // connectNum must be greater than or equal to 1
        connectNum = Math.max(connectNum, 1);

        // If the clients is empty, then the first initialization is
        if (CollectionUtils.isEmpty(clients)) {
            //2 構建參照計數的ExchangeClient
            clients = buildReferenceCountExchangeClientList(url, connectNum);
            referenceClientMap.put(key, clients);

        } else {
            for (int i = 0; i < clients.size(); i++) {
                ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                // If there is a client in the list that is no longer available, create a new one to replace him.
                if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                    clients.set(i, buildReferenceCountExchangeClient(url));
                    continue;
                }
                //參照計數增1
                referenceCountExchangeClient.incrementAndGetCount();
            }
        }

        /**
         * I understand that the purpose of the remove operation here is to avoid the expired url key
         * always occupying this memory space.
         */
        locks.remove(key);

        return clients;
    }
}

 在buildReferenceCountExchangeClientList方法中,會呼叫initClient方法,建立使用者端。

/**
 * Create new connection
 *
 * @param url
 */
private ExchangeClient initClient(URL url) {

    // client type setting.
    //1 client型別,預設為netty
    String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
    //2 編碼方式為DubboCodec
    url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    //3 增加心跳檢測,預設事件間隔為1分鐘
    url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

    ExchangeClient client;
    try {
        // connection should be lazy
        if (url.getParameter(LAZY_CONNECT_KEY, false)) {
            //4 構造懶連線,在使用的時候才會真正建立伺服器端連線
            client = new LazyConnectExchangeClient(url, requestHandler);

        } else {
            //5 預設直接建立連線
            client = Exchangers.connect(url, requestHandler);
        }

    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }

    return client;
}

 接下來,我們看看使用者端是如何建立的。

 

(5)ExchangeClient的connect

上邊所有的程式碼都是在協定層(Protocol),接下來主要聚焦在交換層(Exchanger)。

使用者端建立是通過工具類Exchangers進行建立,通過URL獲取Exchanger(預設實現為HeaderExchanger)

public class Exchangers {
  public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        //先獲取Exchanger,預設為HeaderExchanger
        return getExchanger(url).connect(url, handler);
    }
}

 從程式碼可以看出,HeaderExchanger為消費端和伺服器端建立的關鍵類,其建立client和server,分別為HeaderExchangeClient和HeaderExchangeServer。

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        //1 Transporters.connect建立連線,返回Client;
        //2 構造HeaderExchangeClient,其中包含HeaderExchangeChannel,用於傳送請求,並且失敗重試
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

又出現一個問題,為什麼通過Transporter connect獲取到的client,要經過HeaderExchangeClient封裝呢?

按官網的描述,是為上層的呼叫構建request- response的語意,相當於對netty client的封裝。除此之外,HeaderExchangeClient中還有重新建立連線的功能,具體後邊有時間再說怎麼進行重新連線的。

 

(6)Transporter的connect

到這個地方,我們來到了傳輸層(Transporter),即用於傳送和接受資料的地方。

工具類Transporters,建立使用者端連線。

public class Transporters {
  public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        //獲取Transporter,預設為NettyTransporter
        return getTransporter().connect(url, handler);
    }
}

Transporter的預設實現類為NettyTransporter,可以構建NettyClient和NettyServer,他們是資料傳送和接受的執行實體,程式碼如下:

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public RemotingServer bind(URL url, ChannelHandler listener) throws RemotingException {
        //構建NettyServer
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        //構建NettyClient
        return new NettyClient(url, listener);
    }

}

 可以看到,底層的使用者端是NettyClient,它持有URL和一系列ChannelHandler。

我們接下來看看使用者端是怎麼進行範例化的。

public class NettyClient extends AbstractClient {
  public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
        // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
        super(url, wrapChannelHandler(url, handler));
    }
}

NettyClient範例化主要通過父類別來完成的,在調父類別之前,通過wrapChannelHandler給ChannelHandler封裝了2個Handler,分別是MultiMessageHandler和HeartbeatHandler,用於多訊息處理和心跳檢測處理。

接下來看下父類別進行了什麼操作。

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);

    needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);

    initExecutor(url);

    try {
        //1 開啟client
        doOpen();
    } catch (Throwable t) {
        close();
    }
    try {
        //2 client發起連線
        connect();
    } catch (RemotingException t) {
        close();
    } catch (Throwable t) {
        close();
    }
}

 可這是一個模板方法,doOpen和connect藉助子類,即NettyClient完成的,我們再看看具體是怎麼完成使用者端建立的。

//NettyClient
@Override
protected void doOpen() throws Throwable {
    //NettyClientHandler 為dubbo主要處理器
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
            .channel(NioSocketChannel.class);

    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
    bootstrap.handler(new ChannelInitializer() {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());

            if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
            }

            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                    .addLast("decoder", adapter.getDecoder())
                    .addLast("encoder", adapter.getEncoder())
                    //空閒處理器,用於心跳檢測
                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                    .addLast("handler", nettyClientHandler);

            String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
            if(socksProxyHost != null) {
                int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                ch.pipeline().addFirst(socks5ProxyHandler);
            }
        }
    });
}

上邊的程式碼是不是非常熟悉,這就是netty建立使用者端的標準程式碼,如果不熟悉也沒關係,以後有機會再分享下netty。

ChannelPipeline中設定了編解碼器、空閒處理器、處理dubbo訊息的NettyClientHandler。

doOpen程式碼只是初始化好了Bootstrap,連線發生在doConnect方法中,如下:

//NettyClient
@Override
protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis();
    //與netty伺服器端連線,閉關獲取ChannelFuture
    ChannelFuture future = bootstrap.connect(getConnectAddress());
    boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);

    if (ret && future.isSuccess()) {
       Channel newChannel = future.channel();
       Channel oldChannel = NettyClient.this.channel;
       oldChannel.close();
       NettyClient.this.channel = newChannel;
    }
}

至此,我們瞭解了dubbo消費者是如何通過Exchanger和Transport,利用底層netty建立使用者端連線的。

將建立好的使用者端,封裝到Protocol層獲取到的invoker,在消費者發起呼叫的時候,直接可以請求到伺服器端。

用張圖總結上邊Exchanger和Transport兩層的過程:

 

 

通過Protocol、Exchange、Transport三層的支撐下,完成了最開始圖中的1-6步,獲得到了代表伺服器端的invoker。

為了減少dubbo框架對使用者的程式碼侵入,還需要對伺服器端介面進行代理,

這樣真正做到消費者如同呼叫本地一樣,呼叫遠端服務,接下來我們看看是如何代理的。

 

(7)ProxyFactory的getProxy

生成伺服器端介面代理,主要涉及ReferenceConfig中createProxy的第二步getProxy。

return (T) PROXY_FACTORY.getProxy(invoker);

 經過StubProxyFactoryWrapper包裝類,最終呼叫到預設實現JavassistProxyFactory,其通過反射獲取伺服器端介面的實現。程式碼如下:

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        //通過反射建立invoker的代理,處理器為InvokerInvocationHandler
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

該類包含兩個方法,getProxy和getInvoker,前者用於消費端參照獲取代理類,後者用於伺服器端暴露服務時獲取對應的invoker。

此處關注getProxy方法,通過Proxy.getProxy反射獲取代理,並且InvokerInvocationHandler為代理處理器。程式碼如下:

public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        rpcInvocation.setTargetServiceUniqueName(invoker.getUrl().getServiceKey());

        return invoker.invoke(rpcInvocation).recreate();
    }
}

 其中invoke方法,就是使用者調介面時,會被代理到該方法上。

我們看到服務介面等資訊被封裝到RpcInvocation中,通過持有的invoker進行呼叫。

呼叫關係如下圖:

 

 

3.用一個例子,追蹤資料的流轉

舉一個簡單的例子,定義介面

public interface HelloService {
    String sayHello(String name);
}

服務提供者

@Service
public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello(String name) {
        return "hello:"+name;
    }
}

 dubbo-provider.properties設定:

dubbo.application.name=dubbo-annotation-provider
dubbo.protocol.name=dubbo
dubbo.protocol.port=20885

啟動類:

public class DubboProviderMain {
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class);
        context.start();

        System.in.read();
    }

    @Configuration
    @EnableDubbo(scanBasePackages = "com.exm.service.impl")
    @PropertySource("classpath:/dubbo-provider.properties")
    static class ProviderConfiguration {
        @Bean
        public RegistryConfig registryConfig() {
            RegistryConfig registryConfig = new RegistryConfig();
            registryConfig.setAddress("zookeeper://127.0.0.1:2181?timeout=10000");
            return registryConfig;
        }
    }
}

 

服務消費者

@Component
public class DubboConsumer {

    @Reference(check = false)
    private HelloService helloService;

    public String sayHello(String name) {
        return helloService.sayHello(name);
    }
}

 設定:

dubbo.application.name=dubbo-annotation-consumer
dubbo.registry.address=zookeeper://127.0.0.1:2181

 啟動類:

public class DubboConsumerMain {
    public static void main(String[] args) throws IOException, InterruptedException {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);
        context.start();
        DubboConsumer service = context.getBean(DubboConsumer.class);
        while (true) {
            System.in.read();
            try {
                String hello = service.sayHello("world");
                System.out.println("result :" + hello);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Configuration
    @PropertySource("classpath:/dubbo-consumer.properties")
    @ComponentScan("com.exm.bean")
    @EnableDubbo
    static class ConsumerConfiguration {

    }
}

 

跟蹤程式碼,將流轉的URL記錄如下,可以參考著閱讀原始碼

//註冊協定的URL
RegistryProtocol#Invoker<T> refer(Class<T> type, URL url) type:interface com.exm.service.HelloService url:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-annotation-consumer&dubbo=2.0.2&pid=44159&refer=application%3Ddubbo-annotation-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26init%3Dfalse%26interface%3Dcom.exm.service.HelloService%26methods%3DsayHello%26pid%3D44159%26register.ip%3D192.168.1.65%26release%3D2.7.5%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1654352381559&registry=zookeeper&release=2.7.5&timestamp=1654352409705

//costumer的URL RegistryDirectory#void subscribe(URL url) ZookeeperRegistry#void doSubscribe(final URL url, final NotifyListener listener) url:consumer://192.168.1.65/com.exm.service.HelloService?application=dubbo-annotation-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false&timestamp=1654352381559

//provider、設定、路由對應的URL FailbackRegistry#void notify(URL url, NotifyListener listener, List<URL> urls) url:consumer://192.168.1.65/com.exm.service.HelloService?application=dubbo-annotation-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false&timestamp=1654352381559 urls: 0: dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44153&release=2.7.5&side=provider&timestamp=1654352300763 1: empty://192.168.1.65/com.exm.service.HelloService?application=dubbo-annotation-consumer&category=configurators&check=false&dubbo=2.0.2&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false&timestamp=1654352381559 2: empty://192.168.1.65/com.exm.service.HelloService?application=dubbo-annotation-consumer&category=routers&check=false&dubbo=2.0.2&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false&timestamp=1654352381559 RegistryDirectory#void refreshInvoker(List<URL> invokerUrls) invokerUrls:dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44153&release=2.7.5&side=provider&timestamp=1654352300763 InvokerDelegate#new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl) //url表示消費端,要建立伺服器端的連線 url:dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-consumer&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44159&register.ip=192.168.1.65&release=2.7.5&remote.application=dubbo-annotation-provider&side=consumer&sticky=false&timestamp=1654352300763 providerUrl:dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44153&release=2.7.5&side=provider&timestamp=1654352300763 DubboProtocol#Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) DubboProtocol#ExchangeClient initClient(URL url) url:dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-consumer&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44259&register.ip=192.168.1.65&release=2.7.5&remote.application=dubbo-annotation-provider&side=consumer&sticky=false&timestamp=1654352300763 Exchangers#ExchangeClient connect(URL url, ExchangeHandler handler) NettyTransporter#Client connect(URL url, ChannelHandler listener) AbstractClient#AbstractClient(URL url, ChannelHandler handler) url:dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-consumer&check=false&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44259&register.ip=192.168.1.65&release=2.7.5&remote.application=dubbo-annotation-provider&side=consumer&sticky=false&timestamp=1654352300763

 

到此為止,將dubbo時如何為消費端創造遠端參照範例的(invoker+代理),可能依然有講述不清晰的地方,請大家指出來,一塊研讀學習。