Dubbo原始碼(七)

2022-08-10 15:03:43

前言

本文基於Dubbo2.6.x版本,中文註釋版原始碼已上傳github:xiaoguyu/dubbo

叢集(cluster)就是一組計算機,它們作為一個總體向用戶提供一組網路資源。這些單個的計算機系統就是叢集的節點(node)。

在Dubbo中,為了避免單點故障,同一個服務允許有多個服務提供者,也允許同時連線多個註冊中心。那麼,服務消費者參照服務時,該請求哪個註冊中心的服務提供者以及呼叫失敗之後該如何處理呢?這些就是Dubbo叢集所做的事。

叢集容錯

在分析叢集原始碼之前,先看看叢集容錯的所有元件,下圖是官方檔案的元件圖

Dubbo 定義了叢集介面 Cluster 以及 Cluster Invoker:

  • Cluster 是介面,其只有一個方法,負責生成Cluster Invoker
  • Cluster Invoker繼承了Invoker介面,是一個 Invoker,是主要邏輯實現的地方

將上圖從中間切分,可將叢集工作過程分為兩個階段,左邊為第一階段。

  1. 第一個階段是在服務消費者初始化期間。

    叢集 Cluster 實現類為服務消費者建立 Cluster Invoker,即圖上的 merge 操作,也就是將多個服務提供者合併為一個 Cluster Invoker

  2. 第二個階段是在服務消費者進行遠端呼叫時。

    步驟大體上就如圖所示:list → route → select → invoke

    • list:從服務目錄拿到 invoker 集合
    • route:通過路由過濾出符合規則的 invoker 集合
    • select:通過負載均衡從 invoker 集合中選擇一個
    • invoke:執行 invoker 的 invoke 方法,進行真正的遠端呼叫

    其中,list、route操作在之前文章講過了,傳送門:《服務目錄》《服務路由》

    select 不是本文重點,後續負載均衡時講解。

以上就是叢集工作的整個流程,這裡並沒有介紹叢集是如何容錯的,也就是 invoke 步驟呼叫失敗的處理。Dubbo提供了多種容錯方式:叢集容錯範例

下面的原始碼我們以預設的 Failover Cluster - 失敗自動切換 進行分析

原始碼分析

Cluster

首先來看看 Cluster 介面,這是一個自適應拓展類,預設實現為FailoverCluster

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }

}

前面講了,Cluster 的作用就是將多個服務提供者合併為一個 Cluster Invoker

多個服務提供者合併也就是 服務目錄(Directory) 中的 invoker 集合。join 方法返回了一個 Cluster Invoker

接下來,我們看看呼叫路徑。 Cluster 介面在多個地方被呼叫,我們看服務消費者初始化期間的呼叫。

// 呼叫路徑如下:
// ReferenceBean#getObject()
// ReferenceConfig#get()
// ReferenceConfig#init()
// ReferenceConfig#createProxy(Map<String, String> map)
// RegistryProtocol#refer(Class<T> type, URL url)
// RegistryProtocol#doRefer(Cluster cluster, Registry registry, Class<T> type, URL url)
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 建立 RegistryDirectory 範例
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 設定註冊中心和協定
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    // 生成服務消費者連結
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    // 註冊服務消費者,在 consumers 目錄下新節點
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
        registry.register(registeredConsumerUrl);
        directory.setRegisteredConsumerUrl(registeredConsumerUrl);
    }

    // 訂閱 providers、configurators、routers 等節點資料
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

    // 一個註冊中心可能有多個服務提供者,因此這裡需要將多個服務提供者合併為一個
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

呼叫在倒數第三行程式碼。

如果看過我之前寫的《服務參照》那篇文章,想必對 doRefer 方法不陌生了。在服務目錄訂閱完註冊中心的資料後,就呼叫 join 方法生成 Cluster Invoker

囉嗦多一句:

可以這麼理解,實際負責遠端呼叫的,是服務目錄中的 invoker 集合中的 invoker,而 Cluster Invoker 則對服務目錄中的 invoker 集合進行處理。

Cluster Invoker

預設的 Cluster Invoker 是FailoverClusterInvoker,既然是一個 Invoker,我們就從它的 invoke 方法入手。

AbstractClusterInvoker

invoke 方法在它的父類別AbstractClusterInvoker

public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();
    LoadBalance loadbalance = null;

    // 繫結 attachments 到 invocation 中.
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // 列舉 Invoker
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && !invokers.isEmpty()) {
        // 載入 LoadBalance
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 呼叫 doInvoke 進行後續操作
    return doInvoke(invocation, invokers, loadbalance);
}

invoke 方法邏輯也很簡單:

  1. 列舉 invoker
  2. 載入 LoadBalance(自適應拓展類)
  3. 呼叫 doInvoke 進行後續操作

其中列舉 invoker 如下

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
    List<Invoker<T>> invokers = directory.list(invocation);
    return invokers;
}

list 方法就是呼叫服務目錄的 list 方法,裡面做了兩件事(結合前面的元件圖):

  • list:從服務目錄拿到 invoker 集合
  • route:通過路由過濾出符合規則的 invoker 集合

FailoverClusterInvoker

doInvoke 方法具體實現在FailoverClusterInvoker中,此 invoker 的容錯方式為失敗自動切換。

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    checkInvokers(copyinvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 獲取重試次數
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // 迴圈呼叫,失敗重試
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        if (i > 0) {
            checkWhetherDestroyed();
            // 在進行重試前重新列舉 Invoker,這樣做的好處是,如果某個服務掛了,
            // 通過呼叫 list 可得到最新可用的 Invoker 列表
            copyinvokers = list(invocation);
            // check again
            // 對 copyinvokers 進行判空檢查
            checkInvokers(copyinvokers, invocation);
        }
        // 通過負載均衡選擇 Invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        // 新增到 invoker 到 invoked 列表中
        invoked.add(invoker);
        // 設定 invoked 到 RPC 上下文中
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // 呼叫目標 Invoker 的 invoke 方法
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    // 若重試失敗,則丟擲異常
    throw new RpcException(xxx);
}

doInvoke 方法程式碼量不少,但是邏輯簡化之後也很簡單,就是根據重試次數,在 for 迴圈中進行遠端呼叫,成功則返回,失敗就重試。如果重試次數耗盡還無法呼叫成功,則丟擲異常。

從這裡可以知道,Dubbo的預設失敗重試次數是3次。

此方法中我們關注下 select 方法,它負責從 invoker 集合中選出一個 infoker

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    // 獲取呼叫方法名
    String methodName = invocation == null ? "" : invocation.getMethodName();

    // 獲取 sticky 設定,sticky 表示粘滯連線。所謂粘滯連線是指讓服務消費者儘可能的
    // 呼叫同一個服務提供者,除非該提供者掛了再進行切換
    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
    {
        // 檢測 invokers 列表是否包含 stickyInvoker,如果不包含,
        // 說明 stickyInvoker 代表的服務提供者掛了,此時需要將其置空
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
            stickyInvoker = null;
        }
        // 在 sticky 為 true,且 stickyInvoker != null 的情況下。如果 selected 包含
        // stickyInvoker,表明 stickyInvoker 對應的服務提供者可能因網路原因未能成功提供服務。
        // 但是該提供者並沒掛,此時 invokers 列表中仍存在該服務提供者對應的 Invoker。
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
            // availablecheck 表示是否開啟了可用性檢查,如果開啟了,則呼叫 stickyInvoker 的
            // isAvailable 方法進行檢查,如果檢查通過,則直接返回 stickyInvoker。
            if (availablecheck && stickyInvoker.isAvailable()) {
                return stickyInvoker;
            }
        }
    }

    // 如果執行緒走到當前程式碼處,說明前面的 stickyInvoker 為空,或者不可用。
    // 此時繼續呼叫 doSelect 選擇 Invoker
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    // 如果 sticky 為 true,則將負載均衡元件選出的 Invoker 賦值給 stickyInvoker
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

select 方法主要處理對粘滯連線特性的支援。註釋寫的很清楚了。選擇 invoker 的操作在 doSelect 方法

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    if (invokers.size() == 1)
        return invokers.get(0);
    if (loadbalance == null) {
        // 如果 loadbalance 為空,這裡通過 SPI 載入 Loadbalance,預設為 RandomLoadBalance
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    // 通過負載均衡元件選擇 Invoker
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    // 如果 selected 包含負載均衡選擇出的 Invoker,或者該 Invoker 無法經過可用性檢查,此時進行重選
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            // 進行重選
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rinvoker != null) {
                invoker = rinvoker;
            } else {
                // rinvoker 為空,定位 invoker 在 invokers 中的位置
                int index = invokers.indexOf(invoker);
                try {
                    // 獲取 index + 1 位置處的 Invoker,以下程式碼等價於:
                    //     invoker = invokers.get((index + 1) % invokers.size());
                    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

這裡通過負載均衡選出 invoker,如果 invoker 在 selected 中(就是在doInvoke方法中呼叫失敗的invoker)或者不可用,則呼叫 reselect 方法進行重選。如果重選還是選不出 invoker,則返回 invoker 集合中的下一個元素。這裡的繁瑣判斷,就是為了儘量保證拿到可用的 invoker

我們繼續看看 reselect 方法

private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
            throws RpcException {

    List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    // 下面的 if-else 分支邏輯有些冗餘,pull request #2826 對這段程式碼進行了簡化,可以參考一下
    // 根據 availablecheck 進行不同的處理
    if (availablecheck) { // invoker.isAvailable() should be checked
        for (Invoker<T> invoker : invokers) {
            if (invoker.isAvailable()) {
                if (selected == null || !selected.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
        }
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    } else { // do not check invoker.isAvailable()
        for (Invoker<T> invoker : invokers) {
            if (selected == null || !selected.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    }
    {
        // 若執行緒走到此處,說明 reselectInvokers 集合為空,此時不會呼叫負載均衡元件進行篩選。
        // 這裡從 selected 列表中查詢可用的 Invoker,並將其新增到 reselectInvokers 集合中
        if (selected != null) {
            for (Invoker<T> invoker : selected) {
                if ((invoker.isAvailable()) // available first
                        && !reselectInvokers.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
        }
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    }
    return null;
}

這個方法可以分成兩部分:

  1. 在非 selected 的 invoker 集合中,呼叫負載均衡選擇一個 invoker
  2. 在步驟1無法選出 invoker 時,在 selected 中選出 invoker

至此,Dubbo的叢集就講完了。負載均衡有空再說。

再論Cluster

前面我們提到 Cluster 介面在多個地方被呼叫,也講了同一個服務有多個服務提供者時的處理。那麼,有多個註冊中心呢,該如何處理?

// 類ReferenceConfig
private T createProxy(Map<String, String> map) {
    ......

    // 本地參照
    if (isJvmRefer) {
        ......
    // 遠端參照
    } else {
        ......

        // 單個註冊中心或服務提供者(服務直連,下同)
        if (urls.size() == 1) {
            // 呼叫 RegistryProtocol 的 refer 構建 Invoker 範例
            invoker = refprotocol.refer(interfaceClass, urls.get(0));

        // 多個註冊中心或多個服務提供者,或者兩者混合
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;

            // 獲取所有的 Invoker
            for (URL url : urls) {
                // 通過 refprotocol 呼叫 refer 構建 Invoker,refprotocol 會在執行時
                // 根據 url 協定頭載入指定的 Protocol 範例,並呼叫範例的 refer 方法
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // use AvailableCluster only when register's cluster is available
                // 如果註冊中心連結不為空,則將使用 AvailableCluster
                URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 建立 StaticDirectory 範例,並由 Cluster 對多個 Invoker 進行合併
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    // 生成代理類
    return (T) proxyFactory.getProxy(invoker);
}

createProxy 是服務參照時,生成服務代理物件的方法。這裡會判斷,如果有多個註冊中心,會再封裝一層叢集,也就是先選擇註冊中心,再選擇服務提供者。

這裡一般情況 registryURL 不為空,cluster 使用的是AvailableCluster

public class AvailableCluster implements Cluster {
    public static final String NAME = "available";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new AbstractClusterInvoker<T>(directory) {
            @Override
            public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
                for (Invoker<T> invoker : invokers) {
                    if (invoker.isAvailable()) {
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };
    }
}

AvailableCluster的邏輯很簡單,按順序選擇可使用的 invoker (這裡的invoker其實就是每個註冊中心)

總結

本篇文章介紹了Dubbo叢集容錯的整體工作過程和呼叫邏輯。Dubbo提供了多種叢集實現,本文只介紹了Failover Cluster,其餘實現感興趣的可以自行檢視原始碼。


參考資料

Dubbo開發指南