本文基於Dubbo2.6.x版本,中文註釋版原始碼已上傳github:xiaoguyu/dubbo
叢集(cluster)就是一組計算機,它們作為一個總體向用戶提供一組網路資源。這些單個的計算機系統就是叢集的節點(node)。
在Dubbo中,為了避免單點故障,同一個服務允許有多個服務提供者,也允許同時連線多個註冊中心。那麼,服務消費者參照服務時,該請求哪個註冊中心的服務提供者以及呼叫失敗之後該如何處理呢?這些就是Dubbo叢集所做的事。
在分析叢集原始碼之前,先看看叢集容錯的所有元件,下圖是官方檔案的元件圖
Dubbo 定義了叢集介面 Cluster 以及 Cluster Invoker:
將上圖從中間切分,可將叢集工作過程分為兩個階段,左邊為第一階段。
第一個階段是在服務消費者初始化期間。
叢集 Cluster 實現類為服務消費者建立 Cluster Invoker,即圖上的 merge 操作,也就是將多個服務提供者合併為一個 Cluster Invoker
第二個階段是在服務消費者進行遠端呼叫時。
步驟大體上就如圖所示:list → route → select → invoke
其中,list、route操作在之前文章講過了,傳送門:《服務目錄》、《服務路由》
select 不是本文重點,後續負載均衡時講解。
以上就是叢集工作的整個流程,這裡並沒有介紹叢集是如何容錯的,也就是 invoke 步驟呼叫失敗的處理。Dubbo提供了多種容錯方式:叢集容錯範例
下面的原始碼我們以預設的 Failover Cluster - 失敗自動切換 進行分析
首先來看看 Cluster 介面,這是一個自適應拓展類,預設實現為FailoverCluster
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
前面講了,Cluster 的作用就是將多個服務提供者合併為一個 Cluster Invoker
多個服務提供者合併也就是 服務目錄(Directory) 中的 invoker 集合。join 方法返回了一個 Cluster Invoker
接下來,我們看看呼叫路徑。 Cluster 介面在多個地方被呼叫,我們看服務消費者初始化期間的呼叫。
// 呼叫路徑如下:
// ReferenceBean#getObject()
// ReferenceConfig#get()
// ReferenceConfig#init()
// ReferenceConfig#createProxy(Map<String, String> map)
// RegistryProtocol#refer(Class<T> type, URL url)
// RegistryProtocol#doRefer(Cluster cluster, Registry registry, Class<T> type, URL url)
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 建立 RegistryDirectory 範例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 設定註冊中心和協定
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服務消費者連結
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 註冊服務消費者,在 consumers 目錄下新節點
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
// 訂閱 providers、configurators、routers 等節點資料
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 一個註冊中心可能有多個服務提供者,因此這裡需要將多個服務提供者合併為一個
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
呼叫在倒數第三行程式碼。
如果看過我之前寫的《服務參照》那篇文章,想必對 doRefer 方法不陌生了。在服務目錄訂閱完註冊中心的資料後,就呼叫 join 方法生成 Cluster Invoker
囉嗦多一句:
可以這麼理解,實際負責遠端呼叫的,是服務目錄中的 invoker 集合中的 invoker,而 Cluster Invoker 則對服務目錄中的 invoker 集合進行處理。
預設的 Cluster Invoker 是FailoverClusterInvoker
,既然是一個 Invoker,我們就從它的 invoke 方法入手。
invoke 方法在它的父類別AbstractClusterInvoker
中
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;
// 繫結 attachments 到 invocation 中.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// 列舉 Invoker
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
// 載入 LoadBalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 呼叫 doInvoke 進行後續操作
return doInvoke(invocation, invokers, loadbalance);
}
invoke 方法邏輯也很簡單:
其中列舉 invoker 如下
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
list 方法就是呼叫服務目錄的 list 方法,裡面做了兩件事(結合前面的元件圖):
doInvoke 方法具體實現在FailoverClusterInvoker
中,此 invoker 的容錯方式為失敗自動切換。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 獲取重試次數
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// 迴圈呼叫,失敗重試
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
// 在進行重試前重新列舉 Invoker,這樣做的好處是,如果某個服務掛了,
// 通過呼叫 list 可得到最新可用的 Invoker 列表
copyinvokers = list(invocation);
// check again
// 對 copyinvokers 進行判空檢查
checkInvokers(copyinvokers, invocation);
}
// 通過負載均衡選擇 Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 新增到 invoker 到 invoked 列表中
invoked.add(invoker);
// 設定 invoked 到 RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// 呼叫目標 Invoker 的 invoke 方法
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 若重試失敗,則丟擲異常
throw new RpcException(xxx);
}
doInvoke 方法程式碼量不少,但是邏輯簡化之後也很簡單,就是根據重試次數,在 for 迴圈中進行遠端呼叫,成功則返回,失敗就重試。如果重試次數耗盡還無法呼叫成功,則丟擲異常。
從這裡可以知道,Dubbo的預設失敗重試次數是3次。
此方法中我們關注下 select 方法,它負責從 invoker 集合中選出一個 infoker
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
// 獲取呼叫方法名
String methodName = invocation == null ? "" : invocation.getMethodName();
// 獲取 sticky 設定,sticky 表示粘滯連線。所謂粘滯連線是指讓服務消費者儘可能的
// 呼叫同一個服務提供者,除非該提供者掛了再進行切換
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
{
// 檢測 invokers 列表是否包含 stickyInvoker,如果不包含,
// 說明 stickyInvoker 代表的服務提供者掛了,此時需要將其置空
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
// 在 sticky 為 true,且 stickyInvoker != null 的情況下。如果 selected 包含
// stickyInvoker,表明 stickyInvoker 對應的服務提供者可能因網路原因未能成功提供服務。
// 但是該提供者並沒掛,此時 invokers 列表中仍存在該服務提供者對應的 Invoker。
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
// availablecheck 表示是否開啟了可用性檢查,如果開啟了,則呼叫 stickyInvoker 的
// isAvailable 方法進行檢查,如果檢查通過,則直接返回 stickyInvoker。
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
}
// 如果執行緒走到當前程式碼處,說明前面的 stickyInvoker 為空,或者不可用。
// 此時繼續呼叫 doSelect 選擇 Invoker
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
// 如果 sticky 為 true,則將負載均衡元件選出的 Invoker 賦值給 stickyInvoker
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
select 方法主要處理對粘滯連線特性的支援。註釋寫的很清楚了。選擇 invoker 的操作在 doSelect 方法
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
if (invokers.size() == 1)
return invokers.get(0);
if (loadbalance == null) {
// 如果 loadbalance 為空,這裡通過 SPI 載入 Loadbalance,預設為 RandomLoadBalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
// 通過負載均衡元件選擇 Invoker
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
// 如果 selected 包含負載均衡選擇出的 Invoker,或者該 Invoker 無法經過可用性檢查,此時進行重選
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
// 進行重選
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
// rinvoker 為空,定位 invoker 在 invokers 中的位置
int index = invokers.indexOf(invoker);
try {
// 獲取 index + 1 位置處的 Invoker,以下程式碼等價於:
// invoker = invokers.get((index + 1) % invokers.size());
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
這裡通過負載均衡選出 invoker,如果 invoker 在 selected 中(就是在doInvoke方法中呼叫失敗的invoker)或者不可用,則呼叫 reselect 方法進行重選。如果重選還是選不出 invoker,則返回 invoker 集合中的下一個元素。這裡的繁瑣判斷,就是為了儘量保證拿到可用的 invoker
我們繼續看看 reselect 方法
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
throws RpcException {
List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
// 下面的 if-else 分支邏輯有些冗餘,pull request #2826 對這段程式碼進行了簡化,可以參考一下
// 根據 availablecheck 進行不同的處理
if (availablecheck) { // invoker.isAvailable() should be checked
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
} else { // do not check invoker.isAvailable()
for (Invoker<T> invoker : invokers) {
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
{
// 若執行緒走到此處,說明 reselectInvokers 集合為空,此時不會呼叫負載均衡元件進行篩選。
// 這裡從 selected 列表中查詢可用的 Invoker,並將其新增到 reselectInvokers 集合中
if (selected != null) {
for (Invoker<T> invoker : selected) {
if ((invoker.isAvailable()) // available first
&& !reselectInvokers.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
return null;
}
這個方法可以分成兩部分:
至此,Dubbo的叢集就講完了。負載均衡有空再說。
前面我們提到 Cluster 介面在多個地方被呼叫,也講了同一個服務有多個服務提供者時的處理。那麼,有多個註冊中心呢,該如何處理?
// 類ReferenceConfig
private T createProxy(Map<String, String> map) {
......
// 本地參照
if (isJvmRefer) {
......
// 遠端參照
} else {
......
// 單個註冊中心或服務提供者(服務直連,下同)
if (urls.size() == 1) {
// 呼叫 RegistryProtocol 的 refer 構建 Invoker 範例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多個註冊中心或多個服務提供者,或者兩者混合
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 獲取所有的 Invoker
for (URL url : urls) {
// 通過 refprotocol 呼叫 refer 構建 Invoker,refprotocol 會在執行時
// 根據 url 協定頭載入指定的 Protocol 範例,並呼叫範例的 refer 方法
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
// 如果註冊中心連結不為空,則將使用 AvailableCluster
URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 建立 StaticDirectory 範例,並由 Cluster 對多個 Invoker 進行合併
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
// 生成代理類
return (T) proxyFactory.getProxy(invoker);
}
createProxy 是服務參照時,生成服務代理物件的方法。這裡會判斷,如果有多個註冊中心,會再封裝一層叢集,也就是先選擇註冊中心,再選擇服務提供者。
這裡一般情況 registryURL 不為空,cluster 使用的是AvailableCluster
public class AvailableCluster implements Cluster {
public static final String NAME = "available";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new AbstractClusterInvoker<T>(directory) {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
};
}
}
AvailableCluster的邏輯很簡單,按順序選擇可使用的 invoker (這裡的invoker其實就是每個註冊中心)
本篇文章介紹了Dubbo叢集容錯的整體工作過程和呼叫邏輯。Dubbo提供了多種叢集實現,本文只介紹了Failover Cluster,其餘實現感興趣的可以自行檢視原始碼。
參考資料