本文主要解讀dubbo消費者是如何參照伺服器端介面的,是如何像本地呼叫一樣呼叫遠端服務的。
並試著從設計者的角度思考,為何這樣設計。
@Component public class DubboConsumer { @Reference(check = false) private HelloService helloService; public String sayHello(String name) { return helloService.sayHello(name); } }
我們都知道在Spring容器啟動的時候,會載入所有類為BeanDefinition,然後在調getBean方法的時候,會對類進行範例化,並注入依賴。
使用@EnableDubbo表示要啟動dubbo,會註冊幾個post processor,其中包括ReferenceAnnotationBeanPostProcessor,處理具有@Reference註解的屬性,即在Spring bean範例化時,會注入通過dubbo方式生成的伺服器端參照。
很多人看到這張圖,一臉懵逼,我也不例外,經過七七四十九天的磨練,終於搞清了咋回事,我將一點點拆解,分享給你。
首先是消費者獲得伺服器端介面參照,核心入口是ReferenceConfig,整體可以劃分為這7步:
1.調ReferenceConfig的get方法,獲取伺服器端介面範例;
2.調Protocol的refer方法,得到調伺服器端介面使用的invoker
3.拿到invoker後,封裝在RegistryDirectory中,並通過RegistryProtocol完成對提供者以及相關設定的訂閱
4.實際invoker是通過DubboProtocol建立的DubboInvoker,其包含了消費者到提供者的連線使用者端
6.真實的使用者端的建立,是在Transporters類中發生的,會調Transporter(預設實現類為NettyTransporter)的connect方法建立NettyClient
7.根據獲取到的invoker,通過反射得到伺服器端介面的範例,從而可以像調本地方法一樣呼叫遠端
其中在程式碼呼叫關係上看,第2步是在第3步之後,這裡這樣劃分是為了方便敘述,讓官網的架構圖更容易理解。
沿著呼叫鏈路,我們可以暫時不關係第3步,專注於invoker的生成,呼叫序列圖如下:
上圖描述了獲取invoker過程中最關鍵的呼叫序列,包括使用者端建立、伺服器端介面資訊封裝(invoker)、代理類生成。
我們知道使用者端想要調伺服器端,首先需要知道服務的ip和埠號等資訊,那在dubbo消費端啟動的時候,我怎麼知道伺服器端介面的資訊呢?
最先想到的方式就是在建立invoker的入口,將伺服器端資訊傳進去,在new NettyClient時,建立與伺服器端的TCP連線。
那麼是否可以將提供者的介面資訊放在某個地方,在建立invoker的時候,從那裡獲取到介面資訊,大家自然想到的是設定中心。
但設定中心需要人工設定值,然後推播到消費端,當介面非常多時,將無法維護。
所以引入了註冊中心,消費端與伺服器端都可以與註冊中心進行互動。
伺服器端可以將介面資訊自動暴露到註冊中心,消費者可以從註冊中心獲取到介面資訊。
又有一個問題,如果消費者參照的介面發生變動,比如新增了一臺提供者,或者伺服器端宕機了,消費者如何能夠實時得感知到並及時做出調整呢?
這就需要消費者能夠監聽註冊中心,註冊中心發生變更,及時通知消費者。
最終消費者、服務提供者和註冊中心的關係如下圖:
也就要考慮我們的第3步了,這時呼叫序列圖變為:
紅色線框部分即為與註冊中心相關的呼叫,三個核心類:
RegistryProtocol:處理註冊中心相關的Protocol實現,如獲取註冊中心範例,關聯註冊中心與invoker
RegistryDirectory:是一個目錄實現類,顧名思義,它持有Invoker列表,同時還有到路由、負載均衡等
另外,RegistryDirectory實現了NotifyListener,在註冊中心資訊發生變更的時候,會調notify方法,更新RegistryDirectory中的invoker列表,從而實現了消費端對伺服器端介面的動態同步。
該類在dubbo-spring中,程式碼與註釋如下:
//ReferenceAnnotationBeanPostProcessor. @Override protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType, InjectionMetadata.InjectedElement injectedElement) throws Exception { /** * The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext} 獲取@Service註解的服務bean name,即被參照的bean */ String referencedBeanName = buildReferencedBeanName(attributes, injectedType); /** * The name of bean that is declared by {@link Reference @Reference} annotation injection 獲取@Reference註解的服務參照bean name,即提供者服務bean name */ String referenceBeanName = getReferenceBeanName(attributes, injectedType); //獲取ReferenceBean範例,ReferenceConfig的範例 ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType); //註冊ReferenceBean到Spring容器中 registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType); cacheInjectedReferenceBean(referenceBean, injectedElement); //獲取並建立提供者服務介面的代理類,即使用者最終得到的範例,通過該範例完成RPC透明化呼叫 return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType); } private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) { //如果參照的服務介面在本地,則直接使用本地Spring容器中的服務範例 if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType}, wrapInvocationHandler(referenceBeanName, referenceBean)); } else { // ReferenceBean should be initialized and get immediately //獲取遠端服務介面範例 return referenceBean.get(); } }
接下來,我們按之前的呼叫序列圖,一步步往下看。
原始碼非常長(包含URL的構建等),這裡做了精簡,只保留了關鍵部分,如下:
//ReferenceConfig public synchronized T get() { if (ref == null) { init(); } return ref; } public synchronized void init() { //1.引數準備和處理 //2.建立代理 ref = createProxy(map); } private T createProxy(Map<String, String> map) { //獲取註冊中心 ConfigValidationUtils.loadRegistries(this, false); //1.根據註冊中心構造註冊URL,由此去構建invoker。存在多個註冊中心時,會通過CLUSTER進行包裝 //此處,REF_PROTOCOL範例為RegistryProtocol,通過RegistryProtocol獲取invoker invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); //2.建立遠端invoker的代理類,便於消費者無侵入使用,預設PROXY_FACTORY=JavassistProxyFactory return (T) PROXY_FACTORY.getProxy(invoker); }
其中,程式碼中涉及到一些ReferenceConfig關鍵屬性,前兩個是通過SPI獲取的Protocol和ProxyFactory,ref為介面的最終代理範例,invoker為參照服務的封裝,如下:
/** * Protocol的自適應類,與URL相關,協定型別為registry(如registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=dubbo-sample),對應的類為RegistryProtocol;協定型別為dubbo,對應的類為DubboProtocol; * 同時為Protocol範例自動包裝兩個類,ProtocolFilterWrapper和ProtocolListenerWrapper */ private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); /** * ProxyFactory自適應類,預設實現為JavassistProxyFactory */ private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); /** * The interface proxy reference */ private transient volatile T ref; /** * The invoker of the reference service */ private transient volatile Invoker<?> invoker;
在REF_PROTOCOL通過自適應獲取的時候,會封裝幾個關鍵包裝類,分別是ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper:
public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; //ProtocolFilterWrapper為SPI Protocol的包裝類 public ProtocolFilterWrapper(Protocol protocol) { if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { //註冊URL不需要增加過濾器 if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } //其他invoker需要通過filter進行包裝,實現過濾功能 return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER); } private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); //對filters迴圈遍歷,構建被filter修飾的Invoker鏈,真實的invoker在連結串列尾部 } }
監聽包裝類ProtocolListenerWrapper:對非註冊invoker註冊監聽器
public class ProtocolListenerWrapper implements Protocol { @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } //為普通的invoker增加監聽,從InvokerListener介面看,只有參照invoker和destroy時會觸發listener return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, INVOKER_LISTENER_KEY))); } }
Qos包裝類QosProtocolWrapper:該類只對註冊URL時生效
public class QosProtocolWrapper implements Protocol { @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { //只有對註冊URL,才開啟QOS if (UrlUtils.isRegistry(url)) { startQosServer(url); return protocol.refer(type, url); } return protocol.refer(type, url); } }
用張圖總結上邊的過程:
RegistryProtocol作為註冊中心與invoker之間的溝通橋樑,程式碼如下:
public class RegistryProtocol implements Protocol { @Override @SuppressWarnings("unchecked") public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = getRegistryUrl(url); //1.根據URL獲取註冊中心,此處會建立註冊中心使用者端,並連線註冊中心。如果之前已經建立過,則直接返回快取值 //此處預設使用ZookeeperRegistry,具體建立過程,後續再說,現在不用管 Registry registry = registryFactory.getRegistry(url); //如果獲取的是註冊服務對應的invoker,則直接通過代理工廠生成代理物件 if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } //2.判斷設定group,使用mergeable的cluster,可以暫時不關心 // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } //3.調doRefer獲取invoker return doRefer(cluster, registry, type, url); } }
最終會調doRefer方法,如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { //1.構造RegistryDirectory,其對註冊中心、路由、設定、負載均衡、invoker列表等資訊進行封裝 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); //2.構造消費者需要訂閱的URL,用於後續訂閱zk中設定、路由、provider等 URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { //3.獲取並設定消費者的URL directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); //3.1將消費者URL註冊到註冊中心,如果沒有consumer節點,則建立 registry.register(directory.getRegisteredConsumerUrl()); } //4.構建路由鏈 directory.buildRouterChain(subscribeUrl); //5 訂閱註冊中心的 provider、設定、路由等節點,當發生變動時,即時更新invoker資訊 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); //6 cluster對目錄進行封裝,暴露給使用者只有一個invoker,在實際呼叫時,通過路由、負載均衡等,傳送請求到某一個invoker Invoker invoker = cluster.join(directory); return invoker; }
RegistryDirectory實現監聽介面,同時持有註冊中心和invoker。在變動的情況下,會更新對應的invoker列表。
接下來,我們會考到會調到RegistryDirectory的notify介面。
首先看上邊程式碼的第5步,通過RegistryDirectory實現消費者對註冊中心的訂閱,程式碼如下:
//RegistryDirectory public void subscribe(URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); serviceConfigurationListener = new ReferenceConfigurationListener(this, url); //註冊中心發起訂閱,註冊中心為ZookeeperRegistry,會調其父類別FailbackRegistry.subscribe,其中包含了失敗重試的策略 registry.subscribe(url, this); }
其中引數this是NotifyListener,即RegistryDirectory本身。
//FailbackRegistry @Override public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); //1 從失敗訂閱列表中刪除對應的訂閱請求,取消定時重試 removeFailedSubscribed(url, listener); try { //2 Sending a subscription request to the server side,調ZookeeperRegistry實現訂閱 doSubscribe(url, listener); } catch (Exception e) { //3 Record a failed registration request to a failed list, retry regularly 失敗後加入失敗訂閱列表進行重試 addFailedSubscribed(url, listener); } }
訂閱邏輯發生在doSubscribe中,由ZookeeperRegistry範例進行的實現。程式碼如下:
//ZookeeperRegistry @Override public void doSubscribe(final URL url, final NotifyListener listener) { try { if (ANY_VALUE.equals(url.getServiceInterface())) { //省略 } else { List<URL> urls = new ArrayList<>(); //1 遍歷provider、設定、路由node,並註冊監聽到這些節點上,當節點發生變化,會呼叫ZookeeperRegistry的notify介面 for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { //2 建立zk監聽器 listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))); zkListener = listeners.get(listener); } //3 在zk建立provider、設定、路由對應的路徑 zkClient.create(path, false); //4 增加監聽器 List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } //5 通知監聽器,根據urls變化更新伺服器端invoker列表,在初次啟動時,構建invoker notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
我們看到,在ZookeeperRegistry的訂閱方法中,其持有到zk的使用者端,可以建立相應的節點,並實現監聽。
同時,for迴圈是對provider、設定、路由進行建立於訂閱,即完成消費者對提供者服務介面、設定、路由規則的訂閱,從而可以實現對提供者變化時得到通知,設定貨路由發生變化時也能得到通知。
第5步,notify方法,會調父類別FailbackRegistry的notify方法,如下:
//FailbackRegistry @Override protected void notify(URL url, NotifyListener listener, List<URL> urls) { try { //1 通知 doNotify(url, listener, urls); } catch (Exception t) { //2 Record a failed registration request to a failed list, retry regularly 通知失敗後,加入失敗通知列表,用於重試 addFailedNotified(url, listener, urls); logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } }
FailbackRegistry再調父類別AbstractRegistry的notify方法,裡邊會呼叫監聽器即RegistryDirectory,進行更新或構建invoker,程式碼如下:
//AbstractRegistry /** * Notify changes from the Provider side. * * @param url consumer side url * @param listener listener * @param urls provider latest urls 最新的伺服器端URLs(包括provider、設定、路由的URL) */ protected void notify(URL url, NotifyListener listener, List<URL> urls) { //1 keep every provider's category. 按類別劃分URL Map<String, List<URL>> result = new HashMap<>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); categoryList.add(u); } } if (result.size() == 0) { return; } //2 根據urls,通知變更所有invoker、設定等資訊 Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); //3 通知監聽器RegistryDirectory,更新其中設定、路由、provider、invoker等資訊 listener.notify(categoryList); saveProperties(url); } }
第3步,調監聽器RegistryDirectory,更新伺服器端資訊
//RegistryDirectory @Override public synchronized void notify(List<URL> urls) { //1 按類別(provider、設定、路由)劃分需要更新的urls Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(url -> { if (UrlUtils.isConfigurator(url)) { return CONFIGURATORS_CATEGORY; } else if (UrlUtils.isRoute(url)) { return ROUTERS_CATEGORY; } else if (UrlUtils.isProvider(url)) { return PROVIDERS_CATEGORY; } return ""; })); //2 更新設定資訊 List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); //3 更新路由資訊 List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); //4 獲取最新的 providers URL List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); //5 更新消費端對應的invoker列表 refreshOverrideAndInvoker(providerURLs); }
第5步,通過refreshOverrideAndInvoker更新invoker列表
private void refreshOverrideAndInvoker(List<URL> urls) { overrideDirectoryUrl(); //更新invokers refreshInvoker(urls); }
繼續往後走,就是具體更新邏輯
private void refreshInvoker(List<URL> invokerUrls) { //invokerUrls為空,表示更新設定或路由 Assert.notNull(invokerUrls, "invokerUrls should not be null"); //1 如果只有一個invokerUrl,同時協定為empty,一般表示介面沒有可用提供者,會登出所有invoker if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.invokers = Collections.emptyList(); routerChain.setInvokers(this.invokers); destroyAllInvokers(); // Close all invokers } else { //2 有可用的提供者,更新invoker的快取urlInvokerMap this.forbidden = false; // Allow to access Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls == Collections.<URL>emptyList()) { invokerUrls = new ArrayList<>(); } //3。如果invokerUrls為空,則繼續使用快取的invokerUrls。否則使用最新的 if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<>(); this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; } //4 轉換新的invokerUrls為invoker Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); // pre-route and build cache, notice that route cache should build on original Invoker list. // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; //5 替換最新的invoker this.urlInvokerMap = newUrlInvokerMap; try { //6 銷燬不用的invoker destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }
調toInvokers方法,將URL轉換為invoker,並快取起來
private Map<String, Invoker<T>> toInvokers(List<URL> urls) { for (URL providerUrl : urls) { // 構造InvokerDelegate,其中protocol.refer返回的為原invoker invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); newUrlInvokerMap.put(key, invoker); } }
同時這些invoker列表儲存在RegistryDirectory中,可以實時監聽註冊中心的變更。
在toInvokers方法中,會呼叫DubboProtocol的refer方法實現服務提供者URL到invoker的轉變,具體見後邊文章。
官網的解釋:
The delegate class, which is mainly used to store the URL address sent by the registry,and can be reassembled on the basis of providerURL queryMap overrideMap for re-refer.
其實就是對invoker和伺服器端URL的封裝,便於後續使用。
具體如何使用的,我們後邊再說。
在呼叫DubboProtocol的refer方法的過程中,也還會呼叫ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper這三個包裝類,實現對invoker的攔截與監聽。
首先會調到父類別AbstractProtocol的refer方法,如下
//AbstractProtocol @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { //構造非同步實現同步的invoker return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); }
原因是,dubbo呼叫底層是基於netty進行的,是非同步的過程,AsyncToSyncInvoker可以實現同步的呼叫,具體細節後邊再說,暫時可以不管,只知道進行封裝就行了。
千呼萬喚始出來,終於找到URL轉換為invoker的根了,我們看到invoker實際就是DubboInvoker的範例
//DubboProtocol @Override public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker.建立真實的rpc invoker,其中包含使用者端的建立 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
到此,我們追蹤到invoker它祖先DubboInvoker,在ReferenceConfig拿到的invoker是DubboInvoker經過層層包裝的結果。
用張圖總結上邊的過程:
getClients(url)根據URL獲取使用者端,建立消費者與提供者之間的TCP連線。
private ExchangeClient[] getClients(URL url) { // whether to share connection boolean useShareConnect = false; int connections = url.getParameter(CONNECTIONS_KEY, 0); List<ReferenceCountExchangeClient> shareClients = null; // if not configured, connection is shared, otherwise, one connection for one service //1 預設使用共用的1個使用者端 if (connections == 0) { useShareConnect = true; String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null); //2 預設保持消費者與提供者之間只有一個TCP連線 connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr); //3 獲取共用使用者端 shareClients = getSharedClient(url, connections); } //4 構造ExchangeClient陣列,若不共用,需要建立多個消費者與提供者之間的TCP連線 ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (useShareConnect) { clients[i] = shareClients.get(i); } else { clients[i] = initClient(url); } } return clients; }
共用使用者端是如何實現的呢?
DubboProtocol持有使用者端快取referenceClientMap,key為伺服器端host:port,value為ExchangeClient的封裝類ReferenceCountExchangeClient列表,其中包含參照計數。
/** * <host:port,Exchanger> */ private final Map<String, List<ReferenceCountExchangeClient>> referenceClientMap = new ConcurrentHashMap<>(); private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) { //1 共用的client,key為提供者ip和埠號 String key = url.getAddress(); List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key); if (checkClientCanUse(clients)) { batchClientRefIncr(clients); return clients; } locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { clients = referenceClientMap.get(key); // dubbo check if (checkClientCanUse(clients)) { batchClientRefIncr(clients); return clients; } // connectNum must be greater than or equal to 1 connectNum = Math.max(connectNum, 1); // If the clients is empty, then the first initialization is if (CollectionUtils.isEmpty(clients)) { //2 構建參照計數的ExchangeClient clients = buildReferenceCountExchangeClientList(url, connectNum); referenceClientMap.put(key, clients); } else { for (int i = 0; i < clients.size(); i++) { ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i); // If there is a client in the list that is no longer available, create a new one to replace him. if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { clients.set(i, buildReferenceCountExchangeClient(url)); continue; } //參照計數增1 referenceCountExchangeClient.incrementAndGetCount(); } } /** * I understand that the purpose of the remove operation here is to avoid the expired url key * always occupying this memory space. */ locks.remove(key); return clients; } }
在buildReferenceCountExchangeClientList方法中,會呼叫initClient方法,建立使用者端。
/** * Create new connection * * @param url */ private ExchangeClient initClient(URL url) { // client type setting. //1 client型別,預設為netty String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT)); //2 編碼方式為DubboCodec url = url.addParameter(CODEC_KEY, DubboCodec.NAME); // enable heartbeat by default //3 增加心跳檢測,預設事件間隔為1分鐘 url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)); ExchangeClient client; try { // connection should be lazy if (url.getParameter(LAZY_CONNECT_KEY, false)) { //4 構造懶連線,在使用的時候才會真正建立伺服器端連線 client = new LazyConnectExchangeClient(url, requestHandler); } else { //5 預設直接建立連線 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
接下來,我們看看使用者端是如何建立的。
上邊所有的程式碼都是在協定層(Protocol),接下來主要聚焦在交換層(Exchanger)。
使用者端建立是通過工具類Exchangers進行建立,通過URL獲取Exchanger(預設實現為HeaderExchanger)
public class Exchangers { public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); //先獲取Exchanger,預設為HeaderExchanger return getExchanger(url).connect(url, handler); } }
從程式碼可以看出,HeaderExchanger為消費端和伺服器端建立的關鍵類,其建立client和server,分別為HeaderExchangeClient和HeaderExchangeServer。
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { //1 Transporters.connect建立連線,返回Client; //2 構造HeaderExchangeClient,其中包含HeaderExchangeChannel,用於傳送請求,並且失敗重試 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
又出現一個問題,為什麼通過Transporter connect獲取到的client,要經過HeaderExchangeClient封裝呢?
到這個地方,我們來到了傳輸層(Transporter),即用於傳送和接受資料的地方。
工具類Transporters,建立使用者端連線。
public class Transporters { public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //獲取Transporter,預設為NettyTransporter return getTransporter().connect(url, handler); } }
Transporter的預設實現類為NettyTransporter,可以構建NettyClient和NettyServer,他們是資料傳送和接受的執行實體,程式碼如下:
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public RemotingServer bind(URL url, ChannelHandler listener) throws RemotingException { //構建NettyServer return new NettyServer(url, listener); } @Override public Client connect(URL url, ChannelHandler listener) throws RemotingException { //構建NettyClient return new NettyClient(url, listener); } }
可以看到,底層的使用者端是NettyClient,它持有URL和一系列ChannelHandler。
public class NettyClient extends AbstractClient { public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants. // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler super(url, wrapChannelHandler(url, handler)); } }
NettyClient範例化主要通過父類別來完成的,在調父類別之前,通過wrapChannelHandler給ChannelHandler封裝了2個Handler,分別是MultiMessageHandler和HeartbeatHandler,用於多訊息處理和心跳檢測處理。
接下來看下父類別進行了什麼操作。
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); initExecutor(url); try { //1 開啟client doOpen(); } catch (Throwable t) { close(); } try { //2 client發起連線 connect(); } catch (RemotingException t) { close(); } catch (Throwable t) { close(); } }
可這是一個模板方法,doOpen和connect藉助子類,即NettyClient完成的,我們再看看具體是怎麼完成使用者端建立的。
//NettyClient @Override protected void doOpen() throws Throwable { //NettyClientHandler 為dubbo主要處理器 final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); bootstrap = new Bootstrap(); bootstrap.group(nioEventLoopGroup) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) .channel(NioSocketChannel.class); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout())); bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler)); } NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) //空閒處理器,用於心跳檢測 .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) .addLast("handler", nettyClientHandler); String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST); if(socksProxyHost != null) { int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort)); ch.pipeline().addFirst(socks5ProxyHandler); } } }); }
ChannelPipeline中設定了編解碼器、空閒處理器、處理dubbo訊息的NettyClientHandler。
doOpen程式碼只是初始化好了Bootstrap,連線發生在doConnect方法中,如下:
//NettyClient @Override protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); //與netty伺服器端連線,閉關獲取ChannelFuture ChannelFuture future = bootstrap.connect(getConnectAddress()); boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS); if (ret && future.isSuccess()) { Channel newChannel = future.channel(); Channel oldChannel = NettyClient.this.channel; oldChannel.close(); NettyClient.this.channel = newChannel; } }
將建立好的使用者端,封裝到Protocol層獲取到的invoker,在消費者發起呼叫的時候,直接可以請求到伺服器端。
用張圖總結上邊Exchanger和Transport兩層的過程:
通過Protocol、Exchange、Transport三層的支撐下,完成了最開始圖中的1-6步,獲得到了代表伺服器端的invoker。
為了減少dubbo框架對使用者的程式碼侵入,還需要對伺服器端介面進行代理,
生成伺服器端介面代理,主要涉及ReferenceConfig中createProxy的第二步getProxy。
return (T) PROXY_FACTORY.getProxy(invoker);
經過StubProxyFactoryWrapper包裝類,最終呼叫到預設實現JavassistProxyFactory,其通過反射獲取伺服器端介面的實現。程式碼如下:
public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { //通過反射建立invoker的代理,處理器為InvokerInvocationHandler return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
該類包含兩個方法,getProxy和getInvoker,前者用於消費端參照獲取代理類,後者用於伺服器端暴露服務時獲取對應的invoker。
public class InvokerInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class); private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0) { if ("toString".equals(methodName)) { return invoker.toString(); } else if ("$destroy".equals(methodName)) { invoker.destroy(); return null; } else if ("hashCode".equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return invoker.equals(args[0]); } RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args); rpcInvocation.setTargetServiceUniqueName(invoker.getUrl().getServiceKey()); return invoker.invoke(rpcInvocation).recreate(); } }
其中invoke方法,就是使用者調介面時,會被代理到該方法上。
我們看到服務介面等資訊被封裝到RpcInvocation中,通過持有的invoker進行呼叫。
呼叫關係如下圖:
public interface HelloService { String sayHello(String name); }
服務提供者
@Service public class HelloServiceImpl implements HelloService { @Override public String sayHello(String name) { return "hello:"+name; } }
dubbo-provider.properties設定:
dubbo.application.name=dubbo-annotation-provider dubbo.protocol.name=dubbo dubbo.protocol.port=20885
啟動類:
public class DubboProviderMain { public static void main(String[] args) throws Exception { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class); context.start(); System.in.read(); } @Configuration @EnableDubbo(scanBasePackages = "com.exm.service.impl") @PropertySource("classpath:/dubbo-provider.properties") static class ProviderConfiguration { @Bean public RegistryConfig registryConfig() { RegistryConfig registryConfig = new RegistryConfig(); registryConfig.setAddress("zookeeper://127.0.0.1:2181?timeout=10000"); return registryConfig; } } }
服務消費者
@Component public class DubboConsumer { @Reference(check = false) private HelloService helloService; public String sayHello(String name) { return helloService.sayHello(name); } }
設定:
dubbo.application.name=dubbo-annotation-consumer dubbo.registry.address=zookeeper://127.0.0.1:2181
啟動類:
public class DubboConsumerMain { public static void main(String[] args) throws IOException, InterruptedException { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class); context.start(); DubboConsumer service = context.getBean(DubboConsumer.class); while (true) { System.in.read(); try { String hello = service.sayHello("world"); System.out.println("result :" + hello); } catch (Exception e) { e.printStackTrace(); } } } @Configuration @PropertySource("classpath:/dubbo-consumer.properties") @ComponentScan("com.exm.bean") @EnableDubbo static class ConsumerConfiguration { } }
跟蹤程式碼,將流轉的URL記錄如下,可以參考著閱讀原始碼
//註冊協定的URL
RegistryProtocol#Invoker<T> refer(Class<T> type, URL url) type:interface com.exm.service.HelloService url:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-annotation-consumer&dubbo=2.0.2&pid=44159&refer=application%3Ddubbo-annotation-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26init%3Dfalse%26interface%3Dcom.exm.service.HelloService%26methods%3DsayHello%26pid%3D44159%26register.ip%3D192.168.1.65%26release%3D2.7.5%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1654352381559®istry=zookeeper&release=2.7.5×tamp=1654352409705
//costumer的URL RegistryDirectory#void subscribe(URL url) ZookeeperRegistry#void doSubscribe(final URL url, final NotifyListener listener) url:consumer://192.168.1.65/com.exm.service.HelloService?application=dubbo-annotation-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false×tamp=1654352381559
//provider、設定、路由對應的URL FailbackRegistry#void notify(URL url, NotifyListener listener, List<URL> urls) url:consumer://192.168.1.65/com.exm.service.HelloService?application=dubbo-annotation-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false×tamp=1654352381559 urls: 0: dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44153&release=2.7.5&side=provider×tamp=1654352300763 1: empty://192.168.1.65/com.exm.service.HelloService?application=dubbo-annotation-consumer&category=configurators&check=false&dubbo=2.0.2&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false×tamp=1654352381559 2: empty://192.168.1.65/com.exm.service.HelloService?application=dubbo-annotation-consumer&category=routers&check=false&dubbo=2.0.2&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44159&release=2.7.5&side=consumer&sticky=false×tamp=1654352381559 RegistryDirectory#void refreshInvoker(List<URL> invokerUrls) invokerUrls:dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44153&release=2.7.5&side=provider×tamp=1654352300763 InvokerDelegate#new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl) //url表示消費端,要建立伺服器端的連線 url:dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-consumer&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44159®ister.ip=192.168.1.65&release=2.7.5&remote.application=dubbo-annotation-provider&side=consumer&sticky=false×tamp=1654352300763 providerUrl:dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44153&release=2.7.5&side=provider×tamp=1654352300763 DubboProtocol#Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) DubboProtocol#ExchangeClient initClient(URL url) url:dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-consumer&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44259®ister.ip=192.168.1.65&release=2.7.5&remote.application=dubbo-annotation-provider&side=consumer&sticky=false×tamp=1654352300763 Exchangers#ExchangeClient connect(URL url, ExchangeHandler handler) NettyTransporter#Client connect(URL url, ChannelHandler listener) AbstractClient#AbstractClient(URL url, ChannelHandler handler) url:dubbo://192.168.1.65:20885/com.exm.service.HelloService?anyhost=true&application=dubbo-annotation-consumer&check=false&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&init=false&interface=com.exm.service.HelloService&methods=sayHello&pid=44259®ister.ip=192.168.1.65&release=2.7.5&remote.application=dubbo-annotation-provider&side=consumer&sticky=false×tamp=1654352300763
到此為止,將dubbo時如何為消費端創造遠端參照範例的(invoker+代理),可能依然有講述不清晰的地方,請大家指出來,一塊研讀學習。