本文基於Dubbo2.6.x版本,中文註釋版原始碼已上傳github:xiaoguyu/dubbo
原始碼分析均基於官方Demo,路徑:dubbo/dubbo-demo
如果沒有看過之前Dubbo系列的文章,建議先去看看。因為服務呼叫過程涉及範圍較廣,需要那些前置知識。
Dubbo 服務呼叫過程比較複雜,包含眾多步驟,比如傳送請求、編解碼、服務降級、過濾器鏈處理、序列化、執行緒派發以及響應請求等步驟。限於篇幅原因,本篇文章無法對所有的步驟一一進行分析。後續挖坑再說吧。本篇文章將會重點分析請求的傳送與接收、執行緒派發以及響應的傳送與接收等過程。
先了解下 Dubbo 服務呼叫過程(圖片來自官方檔案)
首先服務消費者通過代理物件 Proxy 發起遠端呼叫,接著通過網路使用者端 Client 將編碼後的請求傳送給服務提供方的網路層上,也就是 Server。Server 在收到請求後,首先要做的事情是對封包進行解碼。然後將解碼後的請求傳送至分發器 Dispatcher,再由分發器將請求派發到指定的執行緒池上,最後由執行緒池呼叫具體的服務。這就是一個遠端呼叫請求的傳送與接收過程。至於響應的傳送與接收過程,這張圖中沒有表現出來。
Dubbo 支援同步和非同步兩種呼叫方式,其中非同步呼叫還可細分為「有返回值」的非同步呼叫和「無返回值」的非同步呼叫。所謂「無返回值」非同步呼叫是指服務消費方只管呼叫,但不關心呼叫結果,此時 Dubbo 會直接返回一個空的 RpcResult。Dubbo 預設使用同步呼叫方式。
當有返回值非同步和無返回值非同步同時存在,無返回值非同步優先:
有返回值非同步呼叫
修改設定,將引數async
設定為 true
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
<dubbo:method name="sayHello" async="true" />
</dubbo:reference>
程式碼使用如下
String hello = demoService.sayHello("world");// 返回值為null,要注意
Future<String> future = RpcContext.getContext().getFuture();
... // 業務執行緒可以開始做其他事情
result = future.get();
無返回值非同步呼叫
修改設定,將引數return
設定為 false
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
<dubbo:method name="sayHello" return="false" />
</dubbo:reference>
程式碼使用
String hello = demoService.sayHello("world");// 返回值為null,要注意
Future<String> future = RpcContext.getContext().getFuture();// future 為 null
下面,我們開始進入原始碼分析。
當我們通過Spring注入服務介面時,實際上注入的是服務介面的實現類,這個實現類由Dubbo框架生成。請看 服務參照#建立代理物件
package org.apache.dubbo.common.bytecode;
public class proxy0 implements org.apache.dubbo.demo.DemoService {
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public proxy0() {
}
public proxy0(java.lang.reflect.InvocationHandler arg0) {
handler = 1;
}
public java.lang.String sayHello(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = (w) $1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String) ret;
}
}
也就是呼叫 demoService.sayHello 時,實際上是呼叫 handler.invoke ,而這個 handler 就是InvokerInvocationHandler
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 攔截定義在 Object 類中的方法(未被子類重寫),比如 wait/notify
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 如果 toString、hashCode 和 equals 等方法被子類重寫了,這裡也直接呼叫
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 將 method 和 args 封裝到 RpcInvocation 中,並執行後續的呼叫
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
invoke 方法判斷如果是 java 內建的一下方法,則直接呼叫,不走 dubbo 的邏輯。所以我們關注的是 invoker.invoke() 。類變數 invoker 實際上是 FailoverClusterInvoker
, 但是又被 MockClusterInvoker
包裝了一層。這個 FailoverClusterInvoker 是由FailoverCluster
生成的,請看 服務參照#遠端參照 。而 MockClusterInvoker 是由MockClusterWrapper
生成,其基於Dubbo的SPI機制,將 FailoverCluster 又包裝了一遍。MockClusterInvoker
內部封裝了服務降級邏輯。以後再開坑聊。
我們在 Dubbo叢集 文章中講過FailoverClusterInvoker
,所以直接快進到DubboInvoker#doInvoke()
方法。此時是不是一臉懵逼,為啥從 FailoverClusterInvoker 一下子就到了 DubboInvoker ,我們先來看看呼叫棧
我們把視角拉回FailoverClusterInvoker#doInvoke
,看看通過負載均衡選出的 invoker
從圖片可以看到,最外層的invoker是一個內部類,是 服務目錄通過訂閱註冊中心 生成的
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
而 protocol 實際是DubboProtocol
,所以 protocol.refer(serviceType, url) 生成的是DubboInvoker
,至於為啥呼叫鏈這麼長,是因為ProtocolFilterWrapper
,這個類增加了對Dubbo過濾器的支援。這是一個 protocol 的包裝類,它包裝了DubboProtocol#refer() ,我們取看看 ProtocolFilterWrapper
的原始碼
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// 建立invoker鏈條
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 獲取過濾器
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
// 對invoker進行封裝,責任鏈模式
last = new Invoker<T>() {
......
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
};
}
}
return last;
}
buildInvokerChain 方法將 invoker 轉換成責任鏈的形式,獲取的 filters 為 {ConsumerContextFilter,FutureFilter,MonitorFilter},和圖片中的呼叫棧就對應上了。
那麼還剩下ListenerInvokerWrapper
,這是一個 Invoker 包裝類,由 ProtocolListenerWrapper
生成。
public class ProtocolListenerWrapper implements Protocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// 封裝了Invoker監聽器
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
}
public class ListenerInvokerWrapper<T> implements Invoker<T> {
public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {
if (invoker == null) {
throw new IllegalArgumentException("invoker == null");
}
this.invoker = invoker;
this.listeners = listeners;
if (listeners != null && !listeners.isEmpty()) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
listener.referred(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}
@Override
public void destroy() {
try {
invoker.destroy();
} finally {
if (listeners != null && !listeners.isEmpty()) {
for (InvokerListener listener : listeners) {
if (listener != null) {
try {
listener.destroyed(invoker);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
}
}
}
}
總結一下:
ProtocolFilterWrapper
是 Invoker 過濾器的支援,dubbo的過濾器用的也是責任鏈模式ListenerInvokerWrapper
是 Invoker 監聽器的支援
上面囉嗦了很多,終於回到主線 DubboInvoker 。它繼承自 AbstractInvoker ,invoke 方法在抽象父類別中
public abstract class AbstractInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation inv) throws RpcException {
...
RpcInvocation invocation = (RpcInvocation) inv;
// 設定 Invoker
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
// 設定 attachment
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
// 新增 contextAttachments 到 RpcInvocation#attachment 變數中
invocation.addAttachments(contextAttachments);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
// 設定非同步資訊到 RpcInvocation#attachment 中
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
// 新增呼叫id
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
if (serializationId != null) {
invocation.put(SERIALIZATION_ID_KEY, serializationId);
}
try {
// 抽象方法,由子類實現
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
...
} catch (RpcException e) {
...
} catch (Throwable e) {
return new RpcResult(e);
}
}
}
invoke 方法中,主要用於新增資訊到 RpcInvocation#attachment 中,給後續的邏輯使用。重點是 doInvoke 方法,這是一個抽象方法,由子類 DubboInvoker 實現。
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// 設定 path 和 version 到 attachment 中
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
// 從 clients 陣列中獲取 ExchangeClient
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 獲取非同步設定
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// isOneway 為 true,表示「單向」通訊
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 非同步無返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// 傳送請求
currentClient.send(inv, isSent);
// 設定上下文中的 future 欄位為 null
RpcContext.getContext().setFuture(null);
// 返回一個空的 RpcResult
return new RpcResult();
// 非同步有返回值
} else if (isAsync) {
// 傳送請求,並得到一個 ResponseFuture 範例
ResponseFuture future = currentClient.request(inv, timeout);
// 設定 future 到上下文中
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
// 暫時返回一個空結果
return new RpcResult();
// 同步呼叫
} else {
RpcContext.getContext().setFuture(null);
// 傳送請求,得到一個 ResponseFuture 範例,並呼叫該範例的 get 方法進行等待
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
doInvoke 方法主要是對同步和非同步呼叫的邏輯處理。可以看到,在有返回值的情況下,同步和非同步都是通過 currentClient.request 來傳送請求。區別在於,同步呼叫會使用 ResponseFuture#get 方法阻塞,知道請求完成,得到返回值。而非同步是將 ResponseFuture 放到上下文物件中,返回空結果。
FutureAdapter 是一個介面卡,它實現了 jdk 內建的 Future 介面,將 ResponseFuture 轉換成 Future 的用法,更貼合用戶習慣。這裡我們的重點是ResponseFuture
是如何支援非同步呼叫的,這個介面的預設實現是DefaultFuture
public class DefaultFuture implements ResponseFuture {
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
// 構造方法
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
// 獲取請求 id,這個 id 很重要,後面還會見到
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 儲存 <requestId, DefaultFuture> 對映關係到 FUTURES 中
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
// 阻塞等待並獲取請求結果
@Override
public Object get() throws RemotingException {
return get(timeout);
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
// 檢測服務提供方是否成功返回了呼叫結果
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
// 迴圈檢測服務提供方是否成功返回了呼叫結果
while (!isDone()) {
// 如果呼叫結果尚未返回,這裡等待一段時間,預設1000毫秒
done.await(timeout, TimeUnit.MILLISECONDS);
// 如果呼叫結果成功返回,或等待超時,此時跳出 while 迴圈,執行後續的邏輯
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 如果呼叫結果仍未返回,則丟擲超時異常
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
@Override
public boolean isDone() {
// 通過檢測 response 欄位為空與否,判斷是否收到了呼叫結果
return response != null;
}
// 當請求有響應時,呼叫此方法
public static void received(Channel channel, Response response) {
try {
// 根據呼叫編號從 FUTURES 集合中查詢指定的 DefaultFuture 物件
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
// 這是請求超時,但是結果返回了的警告
logger.warn("...");
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
// 儲存響應物件
response = res;
if (done != null) {
// 喚醒使用者執行緒
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
上面對DefaultFuture
做了部分程式碼精簡。get 方法阻塞等待返回值。而 received 方法則是在請求有相應時,儲存響應物件並喚醒 get 方法中的迴圈。這裡是很典型的 future 結構的寫法,有疑問的同學可以去了解下 Java 的並行知識。
上節講了 Dubbo 的同步、非同步呼叫方式。本節來講講有返回值的情況下,Dubbo 消費方是如何傳送請求的。
我們把實現拉回 DubboInvoker#doInvoke 方法中,其有返回值的請求方法為 currentClient.request(inv, timeout),currentClient 為ReferenceCountExchangeClient
,我們看下面這張呼叫棧圖
從 DubboInvoker 到 HeaderExchangeChannel,在 服務參照 文章就講過了,這裡不再贅述。下面直接看HeaderExchangeChannel 中的 request 方法
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
// 建立 Request 物件
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
// 設定雙向通訊標誌為 true
req.setTwoWay(true);
// 這裡的 request 變數型別為 RpcInvocation
req.setData(request);
// 建立 DefaultFuture 物件
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 呼叫 NettyClient 的 send 方法傳送請求(在父類別AbstractPeer中)
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
// 返回 DefaultFuture 物件
return future;
}
從上面的方法可以看到,將請求資料封裝成 Request 物件,傳遞給 DefaultFuture,再傳送出去。Request 在構造方法中會建立請求id,用於在接收到響應時,確定是哪個請求的響應。繼續看請求的傳送方法 channel.send(req),channel 是 NettyClient,結合類圖看呼叫路徑
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
@Override
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
// 獲取 Channel,getChannel 是一個抽象方法,具體由子類實現
Channel channel = getChannel();
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
// 繼續向下呼叫
channel.send(message, sent);
}
}
這裡就兩個重點,獲取 channel 和 使用 channel 繼續往下呼叫。先看看如何獲取 channel
public class NettyClient extends AbstractClient {
@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isActive()) {
return null;
}
// 獲取一個 NettyChannel 型別物件
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
}
final class NettyChannel extends AbstractChannel {
// 私有構造方法
private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}
static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
// 嘗試從集合中獲取 NettyChannel 範例
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
// 如果 ret = null,則建立一個新的 NettyChannel 範例
NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
if (ch.isActive()) {
ret = channelMap.putIfAbsent(ch, nettyChannel);
}
if (ret == null) {
ret = nettyChannel;
}
}
return ret;
}
}
獲取 channel 的邏輯很簡單,從快取獲取 NettyChannel,沒有則建立。下面繼續看 channel.send(message, sent)
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 傳送訊息(包含請求和響應訊息)
ChannelFuture future = channel.writeAndFlush(message);
// sent 的值源於 <dubbo:method sent="true/false" /> 中 sent 的設定值,有兩種設定值:
// 1. true: 等待訊息發出,訊息傳送失敗將丟擲異常
// 2. false: 不等待訊息發出,將訊息放入 IO 佇列,即刻返回
// 預設情況下 sent = false;
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 等待訊息發出,若在規定時間沒能發出,success 會被置為 false
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
// 若 success 為 false,這裡丟擲異常
if (!success) {
throw new RemotingException(this, "...");
}
}
至此,請求資料的傳送過程就結束了。涉及 Netty 的傳送編解碼處理過程,感興趣的可以從 NettyClient#doOpen
方法入手,這裡鑑於篇幅,就不寫了。
下面我們來總結一下消費端呼叫傳送請求過程的呼叫棧(以 DemoService 為例)
proxy0#sayHello(String)
—> InvokerInvocationHandler#invoke(Object, Method, Object[])
—> MockClusterInvoker#invoke(Invocation)
—> AbstractClusterInvoker#invoke(Invocation)
—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
—> Filter#invoke(Invoker, Invocation) // 包含多個 Filter 呼叫
—> ListenerInvokerWrapper#invoke(Invocation)
—> AbstractInvoker#invoke(Invocation)
—> DubboInvoker#doInvoke(Invocation)
—> ReferenceCountExchangeClient#request(Object, int)
—> HeaderExchangeClient#request(Object, int)
—> HeaderExchangeChannel#request(Object, int)
—> AbstractPeer#send(Object)
—> AbstractClient#send(Object, boolean)
—> NettyChannel#send(Object, boolean)
—> NioClientSocketChannel#write(Object)
預設情況下 Dubbo 使用 Netty 作為底層的通訊框架,從 NettyServer#doOpen 方法知道,接收請求的入口在 NettyServerHandler#channelRead,這裡已經是解碼之後得到的資料。然後資料依次經過 MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler 。至於為什麼是這幾個類以及順序,可以去看 NettyServer 的構造方法。下面我們首先看呼叫棧
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
public class ChannelHandlers {
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
}
public class ChannelHandlers {
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
MultiMessageHandler、HeartbeatHandler 直接通過構造方法建立,而 AllChannelHandler 則由 Dispatcher 的預設自適應拓展類 AllDispatcher 建立。
剛才講到了 Dispatcher,這是一個執行緒派發器。讓我們回顧一下 Dubbo 服務呼叫過程圖(圖片來自官方檔案)
Dubbo 將底層通訊框架中接收請求的執行緒稱為 IO 執行緒。如果一些事件處理邏輯可以很快執行完,此時直接在 IO 執行緒上執行該段邏輯即可。但如果事件的處理邏輯比較耗時,比如該段邏輯會發起資料庫查詢或者 HTTP 請求。此時我們就不應該讓事件處理邏輯在 IO 執行緒上執行,而是應該派發到執行緒池中去執行。原因也很簡單,IO 執行緒主要用於接收請求,如果 IO 執行緒被佔滿,將導致它不能接收新的請求。PS:像不像Netty的主從模型,萬物殊途同歸啊。
Dispatcher 真實的職責建立具有執行緒派發能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身並不具備執行緒派發能力。Dubbo 支援 5 種不同的執行緒派發策略
策略 | 用途 |
---|---|
all | 所有訊息都派發到執行緒池,包括請求,響應,連線事件,斷開事件等 |
direct | 所有訊息都不派發到執行緒池,全部在 IO 執行緒上直接執行 |
message | 只有請求和響應訊息派發到執行緒池,其它訊息均在 IO 執行緒上執行 |
execution | 只有請求訊息派發到執行緒池,不含響應。其它訊息均在 IO 執行緒上執行 |
connection | 在 IO 執行緒上,將連線斷開事件放入佇列,有序逐個執行,其它訊息派發到執行緒池 |
下面我們看看預設的 AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {
/** 處理請求和響應訊息,這裡的 message 變數型別可能是 Request,也可能是 Response */
@Override
public void received(Channel channel, Object message) throws RemotingException {
// 獲取執行緒池,由自適應拓展生成,預設由 FixedThreadPool 生成
ExecutorService cexecutor = getExecutorService();
try {
// 將請求和響應訊息派發到執行緒池中處理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
// 如果通訊方式為雙向通訊,此時將 Server side ... threadpool is exhausted
// 錯誤資訊封裝到 Response 中,並返回給服務消費方。
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
// 返回包含錯誤資訊的 Response 物件
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
請求物件會被封裝 ChannelEventRunnable 中,也就是 ChannelEventRunnable#run 方法才是實際處理請求的地方。
public class ChannelEventRunnable implements Runnable {
@Override
public void run() {
// 檢測通道狀態,對於請求或響應訊息,此時 state = RECEIVED
if (state == ChannelState.RECEIVED) {
try {
// 將 channel 和 message 傳給 ChannelHandler 物件,進行後續的呼叫
handler.received(channel, message);
} catch (Exception e) {
logger.warn("...", e);
}
// 其他訊息型別通過 switch 進行處理
} else {
switch (state) {
case CONNECTED:
...
case DISCONNECTED:
...
case SENT:
...
case CAUGHT:
...
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
}
ChannelEventRunnable 依然不進行呼叫邏輯,只是根據通道的狀態將請求轉發。可以注意一下,這裡特意對 RECEIVED 狀態用了 if 判斷,然後其它狀態使用 switch 來判斷,是因為絕大部分的請求都是 RECEIVED 型別。
這裡的 handler 是 DecodeHandler
,這是一個解碼處理器。也許你會以為,這個是不是和 InternalDecoder
衝突了?既然解碼操作已經在 IO 執行緒(也就是 Netty 的 WorkerGroup)中處理了,為什麼到 Dubbo 執行緒池中,還要再處理一次?這取決於 decode.in.io 引數,允許將部分解碼工作交由 Dubbo 執行緒池中完成。下面我們略過 DecodeHandler,快進到 HeaderExchangeHandler 中
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
// 處理請求物件
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
// 處理事件
handlerEvent(channel, request);
// 處理普通的請求
} else {
// 雙向通訊
if (request.isTwoWay()) {
// 向後呼叫服務,並得到呼叫結果
Response response = handleRequest(exchangeChannel, request);
// 將呼叫結果返回給服務消費端
channel.send(response);
} else {
// 如果是單向通訊,僅向後呼叫指定服務即可,無需返回撥用結果
handler.received(exchangeChannel, request.getData());
}
}
// 處理響應物件,服務消費方會執行此處邏輯
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// telnet 相關
...
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
// 處理請求
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
// 檢測請求是否合法,不合法則返回狀態碼為 BAD_REQUEST 的響應
if (req.isBroken()) {
...
return res;
}
// 獲取 data 欄位值,也就是 RpcInvocation 物件
Object msg = req.getData();
try {
// handle data.
// 繼續向下呼叫
Object result = handler.reply(channel, msg);
// 設定 OK 狀態碼
res.setStatus(Response.OK);
// 設定呼叫結果
res.setResult(result);
} catch (Throwable e) {
// 若呼叫過程出現異常,則設定 SERVICE_ERROR,表示伺服器端異常
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
}
處理過程註釋中已經寫了。通過 handleRequest 方法處理請求得到返回值,並通過 channel.send 將結果返回給消費者。(碎碎念:這個 channel 和 Netty 的是真的像)
handleRequest 方法中主要是對 Response 物件的處理,我們繼續跟進呼叫過程 handler.reply(channel, msg),這個 handler 是 DubboProtocol
的類變數requestHandler
public class DubboProtocol extends AbstractProtocol {
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
// 獲取 Invoker 範例
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
// 回撥相關
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
...
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 通過 Invoker 呼叫具體的服務
return invoker.invoke(inv);
}
throw new RemotingException(channel, "...");
}
...
};
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
...
// 計算 service key,格式為 groupName/serviceName:serviceVersion:port。比如:
// dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
// 從 exporterMap 查詢與 serviceKey 相對應的 DubboExporter 物件,
// 服務匯出過程中會將 <serviceKey, DubboExporter> 對映關係儲存到 exporterMap 集合中
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
// 獲取 Invoker 物件,並返回
return exporter.getInvoker();
}
}
reply 方法先是獲取 Invoker 範例,然後通過 Invoker 呼叫具體的服務。想了解 Invoker 的建立以及如何放入到 exporterMap 中的,可以看以前寫過的 服務匯出 文章。下面這段在 服務匯出 文章中均有提過,不想看的可以直接跳到本節末尾看呼叫路徑。
invoke 方法定義在 AbstractProxyInvoker 中
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
// 呼叫 doInvoke 執行後續的呼叫,並將呼叫結果封裝到 RpcResult 中,並
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method ...");
}
}
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}
doInvoke 是一個抽象方法,這個需要由具體的 Invoker 範例實現。Invoker 範例是在執行時通過 JavassistProxyFactory 建立的
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 建立匿名類物件
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 呼叫 invokeMethod 方法進行後續的呼叫
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。Dubbo 會在執行時通過 Javassist 框架為 Wrapper 生成實現類,並實現 invokeMethod 方法,該方法最終會根據呼叫資訊呼叫具體的服務。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
// 省略其他方法
public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
DemoService demoService;
try {
// 型別轉換
demoService = (DemoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
// 根據方法名呼叫指定的方法
if ("sayHello".equals(string) && arrclass.length == 1) {
return demoService.sayHello((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
}
}
至此,伺服器端呼叫服務的過程就講完了。
下面我們來總結一下伺服器端呼叫服務過程的呼叫棧(以 DemoService 為例)
// 這是IO執行緒的呼叫過程
NettyServerHandler#channelRead(ChannelHandlerContext, Object)
—> AbstractPeer#received(Channel, Object)
—> MultiMessageHandler#received(Channel, Object)
—> HeartbeatHandler#received(Channel, Object)
—> AllChannelHandler#received(Channel, Object)
// 這是轉發到執行緒池之後的呼叫過程
ChannelEventRunnable#run()
—> DecodeHandler#received(Channel, Object)
—> HeaderExchangeHandler#received(Channel, Object)
—> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
—> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
—> Filter#invoke(Invoker, Invocation)
—> AbstractProxyInvoker#invoke(Invocation)
—> Wrapper0#invokeMethod(Object, String, Class[], Object[])
—> DemoServiceImpl#sayHello(String)
在 2.3.2 節中講了,呼叫結果會封裝在 Response 物件中,並由NettyChannel 的 send 方法將 Response 物件返回。詳情請看 HeaderExchangeHandler
。至於返回 Response 過程中的編碼過程,我們省略。
消費者接收響應資料的處理過程中,從 NettyHandler (消費者是 NettyClientHandler,生產者是 NettyServerHandler,不過他們的 channelRead 方法一模一樣) 到 AllChannelHandler 的處理過程與服務提供方接收請求(2.3節)的處理過程一致,就不重複分析了。所以本節重點在 Dubbo如何將呼叫結果傳遞給使用者執行緒。
我們直接快進到 HeaderExchangeHandler 的 received 方法中(呼叫路徑請看 2.3.2 節末尾)
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
// 處理請求物件
if (message instanceof Request) {
...
// 處理響應物件,服務消費方會執行此處邏輯
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
...
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
}
可以看到,是呼叫 DefaultFuture#receive 方法處理的,DefaultFuture 物件我們在 2.1.3 節有講到,繼續追蹤程式碼
public class DefaultFuture implements ResponseFuture {
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
public static void received(Channel channel, Response response) {
try {
// 根據呼叫編號從 FUTURES 集合中查詢指定的 DefaultFuture 物件
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
// 這是請求超時,但是結果返回了的警告
logger.warn("...");
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
// 儲存響應物件
response = res;
if (done != null) {
// 喚醒使用者執行緒
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
在一次呼叫過程中,請求和相應的編號是一致的,所以可以根據呼叫編號從 FUTURES 中得到發起請求時建立的 DefaultFuture 。DefaultFuture.get 方法阻塞等待響應結果,而 DefaultFuture#received 是得到響應結果之後喚醒使用者執行緒(也就是 get 方法中的迴圈)。這兩個方法結合起來看就明白了。
沒啥好總結的,Dubbo 系列就寫完了。閱讀優秀框架的原始碼從大的方面可以學習其思想以及架構,小的方面就是一個個小功能的寫法,比如負載均衡演演算法、DefaultFuture、SPI 等等。
PS:總感覺 Dubbo 和 Netty 的執行緒模型殊途同歸
參考資料