Dubbo原始碼(三)

2022-07-25 18:02:54

前言

本文基於Dubbo2.6.x版本,中文註釋版原始碼已上傳github:xiaoguyu/dubbo

在瞭解了Dubbo SPI後,我們來了解下Dubbo服務匯出的過程。

Dubbo的設定是通過DubboNamespaceHandler讀取解析的,其中會將Dubbo服務提供者封裝成ServiceBean注入Spring容器中。而服務匯出就是在ServiceBean的onApplicationEvent開始的。

想了解DubboNamespaceHandler的工作原理,請自行去了解Spring自定義標籤,本文略。

前置工作

服務匯出的入口方法是 ServiceBean 的 onApplicationEvent。因為程式碼過多,接下來會忽略部分程式碼,提供呼叫鏈條,專注於主要部分。

// ServiceBean#onApplicationEvent(ContextRefreshedEvent event) ->
// ServiceBean#export() -> 
// ServiceConfig#export() ->
// ServiceConfig#doExport() ->
// ServiceConfig#doExportUrls()
private void doExportUrls() {
    // 載入註冊中心連結
    List<URL> registryURLs = loadRegistries(true);
    // 遍歷 protocols,並在每個協定下匯出服務
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

Dubbo是支援多註冊中心多協定的。下面繼續看doExportUrlsFor1Protocol方法。

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    ......

    // 判斷協定名是否為 injvm(本地呼叫)
    if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
        protocolConfig.setRegister(false);
        map.put("notify", "false");
    }

    ......

    String scope = url.getParameter(Constants.SCOPE_KEY);
    // 如果 scope = none,則什麼都不做
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

        // scope != remote,匯出到本地
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        // scope != local,匯出到遠端
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    ......

                    // 為服務提供類(ref)生成 Invoker
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    // DelegateProviderMetaDataInvoker 用於持有 Invoker 和 ServiceConfig
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    // 匯出服務,並生成 Exporter
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            } else {
                ......
            }
        }
    }
    this.urls.add(url);
}

這裡我們只保留主要程式碼,完整的註釋可以去看我fork的註釋版原始碼或者官方開發指南。

這裡主要邏輯有3部分:

  1. 本地服務暴露

    exportLocal(url);

  2. invoker的生成

    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

  3. 匯出服務,並生成 Exporter

    Exporter<?> exporter = protocol.export(wrapperInvoker);

    匯出服務包含了遠端服務暴露和註冊中心處理。

下面我們來一一講解這3部分

Invoker建立過程

在講服務暴露之前,需要先了解下Invoker

官方檔案是這麼說的

Invoker 是實體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它代表一個可執行體,可向它發起 invoke 呼叫,它有可能是一個原生的實現,也可能是一個遠端的實現,也可能一個叢集實現。

Invoker 是由 ProxyFactory 建立而來,ProxyFactory 是方法級別的自適應拓展介面,其生成的自適應拓展類如下:

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {

    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0);
    }

    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0, boolean arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0, arg1);
    }

    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}

從上面可以看到,預設的實現類是JavassistProxyFactory,進去看看是如何建立invoker的

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // 為目標類建立 Wrapper
    // 此處是動態生成的Wrapper的實現類,會重寫invokeMethod方法
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    // 建立匿名 Invoker 類物件,並實現 doInvoke 方法。
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

這裡建立的是抽象類AbstractProxyInvoker,使用的是經典的模板模式,具體的邏輯由子類實現,去看看 AbstractProxyInvoker 是怎麼封裝 invoke 方法的

public Result invoke(Invocation invocation) throws RpcException {
    try {
        // 呼叫 doInvoke 執行後續的呼叫,並將呼叫結果封裝到 RpcResult 中
        return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
    } catch (InvocationTargetException e) {
        return new RpcResult(e.getTargetException());
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

別問我wrapper是怎麼生成的,我也看不懂。。。

本地服務暴露

當一個應用既是一個服務的提供者,同時也是這個服務的消費者的時候,可以直接對本機提供的服務發起本地呼叫。一般情況下也用不到,所以不感興趣的可以略過此節。

當不指定範圍為遠端時,Dubbo預設支援本地呼叫的,參見前面的doExportUrlsFor1Protocol方法

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    ......

    String scope = url.getParameter(Constants.SCOPE_KEY);
    // 如果 scope = none,則什麼都不做
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        // scope != remote,匯出到本地
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        .....
    }

}

private void exportLocal(URL url) {
    // 如果 URL 的協定頭等於 injvm,說明已經匯出到本地了,無需再次匯出
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        // 將協定設定為injvm
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(LOCALHOST)
                .setPort(0);
        StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
        // 建立 Invoker,並匯出服務,這裡的 protocol 會在執行時呼叫 InjvmProtocol 的 export 方法
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
    }
}

重點是protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));這句,前面已經講了invoker的生成,略過proxyFactory.getInvoker,直接看protocol.export,因為protocol也是方法級別的自適應拓展,下面照舊放上自適應拓展生成的類

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
}

預設協定使用的是dubbo,但是 exportLocal 方法中,已經將協定轉成LOCAL_PROTOCOL,也就是InjvmProtocol,下面去看看他的 export 方法做了什麼

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

這裡只建立了一個 InjvmExporter,無其他邏輯。具體使用結合服務的引出和呼叫過程才能分析清除,此處不多贅述。

遠端服務暴露

讓我們回到 前置準備 小節的末尾,前面說了 protocol.export(wrapperInvoker) 方法包含了遠端服務暴露和註冊中心處理。引數 wrapperInvoker 中的url的協定是 registry ,所以實際呼叫的是RegistryProtocol的export方法

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 遠端服務暴露
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

    // 獲取註冊中心 URL,以 zookeeper 註冊中心為例,得到的範例 URL 如下:
    // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
    URL registryUrl = getRegistryUrl(originInvoker);

    // 根據 URL 載入 Registry 實現類,比如 ZookeeperRegistry,這裡已經連線上註冊中心,但是還沒有將服務註冊上去
    final Registry registry = getRegistry(originInvoker);
    // 獲取已註冊的服務提供者 URL,比如:
    // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

    // 獲取 register 引數
    boolean register = registeredProviderUrl.getParameter("register", true);

    // 向服務提供者與消費者登入檔中註冊服務提供者
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

    // 根據 register 的值決定是否註冊服務
    if (register) {
        // 向註冊中心註冊服務
        register(registryUrl, registeredProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }

    ......
    return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}

這裡有3個重點:

  1. 遠端服務暴露

    doLocalExport(originInvoker);

  2. 建立註冊中心連線

    getRegistry(originInvoker)

  3. 向註冊中心註冊服務

    register(registryUrl, registeredProviderUrl)

本節重點在遠端服務暴露,所以繼續看doLocalExport方法

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
    String key = getCacheKey(originInvoker);
    // 存取快取
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                // 建立 Invoker 為委託類物件
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                // 呼叫 protocol 的 export 方法匯出服務
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

這裡就是一些快取操作,ExporterChangeableWrapper就是對unexport方法做了一下包裝,邏輯很簡單。我們重點關注protocol.export(invokerDelegete),而protocol是自適應拓展類,根據引數invokerDelegete中的url,實際呼叫的是DubboProtocol#export(invokerDelegete)

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    // 獲取服務標識,理解成服務座標也行。由服務組名,服務名,服務版本號以及埠組成。比如:
    // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
    String key = serviceKey(url);
    // 建立 DubboExporter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // 將 <key, exporter> 鍵值對放入快取中
    exporterMap.put(key, exporter);
    // 本地存根相關程式碼
    ......
    // 啟動伺服器
    openServer(url);
    // 優化序列化
    optimizeSerialization(url);
    return exporter;
}

Dubbo的網路傳輸層預設使用的是Netty,openServer方法就是啟動netty服務,進去看看

private void openServer(URL url) {
    // 獲取 host:port,並將其作為伺服器範例的 key,用於標識當前的伺服器範例
    String key = url.getAddress();
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            // 建立伺服器範例
            serverMap.put(key, createServer(url));
        } else {
            // 伺服器已建立,則根據 url 中的設定重置伺服器
            server.reset(url);
        }
    }
}

根據ip+埠判斷是否已經建立,已經建立就根據 url 中的設定重置伺服器(例如修改執行緒池設定,這個涉及執行緒派發模型,此處不多贅述)。我們重點關注伺服器範例的建立。

private ExchangeServer createServer(URL url) {
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // 新增心跳檢測設定到 url 中
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    // 獲取 server 引數,預設為 netty
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    // 通過 SPI 檢測是否存在 server 引數所代表的 Transporter 拓展,不存在則丟擲異常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    // 新增編碼解碼器引數
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // 建立 ExchangeServer
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    // 獲取 client 引數,可指定 netty,mina
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        // 獲取所有的 Transporter 實現類名稱集合,比如 supportedTypes = [netty, mina]
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        // 檢測當前 Dubbo 所支援的 Transporter 實現類名稱列表中,
        // 是否包含 client 所表示的 Transporter,若不包含,則丟擲異常
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

這裡我們關注Exchangers.bind(url, requestHandler)方法,其前面邏輯是檢測是否支援server所需的協定,後面的邏輯是檢測是否支援client所需的協定。

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    ......
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger,預設為 HeaderExchanger。
    // 緊接著呼叫 HeaderExchanger 的 bind 方法建立 ExchangeServer 範例
    return getExchanger(url).bind(url, handler);
}

不多bb,直接跳到HeaderExchanger.bind(url, handler)

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    // 建立 HeaderExchangeServer 範例,該方法包含了多個邏輯,分別如下:
    //   1. new HeaderExchangeHandler(handler)
    //   2. new DecodeHandler(new HeaderExchangeHandler(handler))
    //   3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

HeaderExchangeHandler 和 DecodeHandler 都是handler的裝飾類。HeaderExchangeServer內部持有Server,並封裝了心跳的功能。不多贅述,我們的重點在 Transporters.bind ,也就是如何生成Server

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    ......
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handlers 元素數量大於1,則建立 ChannelHandler 分發器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // 獲取自適應 Transporter 範例,並呼叫實體方法
    return getTransporter().bind(url, handler);
}

public static Transporter getTransporter() {
    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

getTransporter() 獲取的是自適應拓展類,預設是NettyTransporter, 去看看

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}

這裡就建立了一個NettyServer物件,很明顯,啟動服務的相關邏輯在NettyServer的建構函式中

public class NettyServer extends AbstractServer implements Server {
  // 構造方法
  public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
      super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
  }
}

public abstract class AbstractServer extends AbstractEndpoint implements Server {
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        ......
        try {
            // 呼叫模板方法 doOpen 啟動伺服器
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        ......
    }
}

NettyServer 呼叫的是父類別的構造,而在父類別 AbstractServer 中,又呼叫了子類的 doOpen() ,明顯的模板模式。重新回到 NettyServer 看 doOpen 方法

protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    // 建立 boss 和 worker 執行緒池
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("handler", nettyServerHandler);
                }
            });
    // 繫結到指定的 ip 和埠上
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

這段是Netty啟動服務的程式碼,就不多解釋了。

至此,遠端服務暴露的過程就分析完了。

註冊中心處理

OK,讓我們把視線放回RegistryProtocolexport方法中,上節說了遠端服務暴露,本節就來說說剩下的建立註冊中心連線以及向註冊中心註冊服務

建立註冊中心連線

建立註冊中心連線,是在getRegistry(originInvoker)方法中

private Registry getRegistry(final Invoker<?> originInvoker) {
    URL registryUrl = getRegistryUrl(originInvoker);
    return registryFactory.getRegistry(registryUrl);
}

registryFactory 是自適應拓展類,根據引數 registryUrl 的協定protocol欄位,可知實際呼叫的是ZookeeperRegistryFactory,getRegistry 方法在ZookeeperRegistryFactory的父類別AbstractRegistryFactory

// AbstractRegistryFactory#getRegistry(URL url)
public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceStringWithoutResolving();
    LOCK.lock();
    try {
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        // 快取未命中,建立 Registry 範例
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}

這裡就是一些URL的引數處理以及快取操作,主要看createRegistry(url),此方法由子類ZookeeperRegistryFactory實現

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    // zookeeperTransporter 由 SPI 在執行時注入,型別為 ZookeeperTransporter$Adaptive
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}

這裡注意一下,zookeeperTransporter 是由Dubbo SPI機制自動注入的。createRegistry 方法就建立了 ZookeeperRegistry 物件,所以處理邏輯應該就在它的構造方法中。

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    ......
    // 建立 Zookeeper 使用者端,預設為 CuratorZookeeperTransporte
    zkClient = zookeeperTransporter.connect(url);
    // 新增狀態監聽器
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

這裡,我們關注的是 zookeeperTransporter.connect(url),zookeeperTransporter的預設實現類是CuratorZookeeperTransporte,它的 connect 方法就是建立了CuratorZookeeperClient,所以直接去看構造方法

public CuratorZookeeperClient(URL url) {
    super(url);
    try {
        int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
        // 建立 CuratorFramework 構造器
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(url.getBackupAddress())
                .retryPolicy(new RetryNTimes(1, 1000))
                .connectionTimeoutMs(timeout);
        String authority = url.getAuthority();
        if (authority != null && authority.length() > 0) {
            builder = builder.authorization("digest", authority.getBytes());
        }
        // 構建 CuratorFramework 範例
        client = builder.build();
        // 新增監聽器
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState state) {
                if (state == ConnectionState.LOST) {
                    CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                } else if (state == ConnectionState.CONNECTED) {
                    CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                } else if (state == ConnectionState.RECONNECTED) {
                    CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                }
            }
        });
        // 啟動使用者端
        client.start();
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

這裡就是 Apache Curator 操作 zookeeper 的一些相關操作了,不多贅述。至此,註冊中心建立過程結束,已連線 zookeeper

向註冊中心註冊服務

註冊中心已建立,下一步就是向註冊中心註冊服務。

回到RegistryProtocolexport方法中,register(registryUrl, registeredProviderUrl) 方法就是向註冊中心註冊服務

public void register(URL registryUrl, URL registedProviderUrl) {
    // 建立註冊中心(此時會從快取獲取)
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 將服務註冊到註冊中心
    registry.register(registedProviderUrl);
}

registryFactory.getRegistry(registryUrl) 在上一節講過,已經建立了註冊中心(型別為ZookeeperRegistry),此時會命中快取,直接返回。

ZookeeperRegistry並沒有實現 register 方法,在其父類別FailbackRegistry中。

public void register(URL url) {
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 模板方法,由子類實現
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // 獲取 check 引數,若 check = true 將會直接丟擲異常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // 記錄註冊失敗的連結
        failedRegistered.add(url);
    }
}

如果註冊失敗,則記錄在failedRegistered中,用於重試。我們重點關注doRegister(url),其實現在子類ZookeeperRegistry

protected void doRegister(URL url) {
    try {
        // 通過 Zookeeper 使用者端建立節點,節點路徑由 toUrlPath 方法生成,路徑格式如下:
        //   /${group}/${serviceInterface}/providers/${url}
        // 比如
        //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

繼續看 zkClient.create 方法

public void create(String path, boolean ephemeral) {
    if (!ephemeral) {
        // 如果要建立的節點型別非臨時節點,那麼這裡要檢測節點是否存在
        if(persistentExistNodePath.contains(path)){
            return;
        }
        if (checkExists(path)) {
            persistentExistNodePath.add(path);
            return;
        }
    }
    int i = path.lastIndexOf('/');
    if (i > 0) {
        // 遞迴建立上一級路徑
        create(path.substring(0, i), false);
    }
    if (ephemeral) {
        createEphemeral(path);
    } else {
        createPersistent(path);
        persistentExistNodePath.add(path);

    }
}

根據 ephemeral 引數判斷,是建立臨時節點還是永久節點。這裡注意一下,path 的切割,是從後往前的。下面放上zookeeper的節點結構(視覺化工具ZooInspector)

到這裡,向註冊中心註冊服務也講完了

總結

在Dubbo中,大量使用了Dubbo SPI機制(自動注入、自適應拓展),且很多地方都使用了模板模式。


參考資料

Dubbo開發指南