Dubbo原始碼(九)

2022-09-01 12:04:55

1. 前言

本文基於Dubbo2.6.x版本,中文註釋版原始碼已上傳github:xiaoguyu/dubbo

原始碼分析均基於官方Demo,路徑:dubbo/dubbo-demo

如果沒有看過之前Dubbo系列的文章,建議先去看看。因為服務呼叫過程涉及範圍較廣,需要那些前置知識。

Dubbo 服務呼叫過程比較複雜,包含眾多步驟,比如傳送請求、編解碼、服務降級、過濾器鏈處理、序列化、執行緒派發以及響應請求等步驟。限於篇幅原因,本篇文章無法對所有的步驟一一進行分析。後續挖坑再說吧。本篇文章將會重點分析請求的傳送與接收、執行緒派發以及響應的傳送與接收等過程。

2. 原始碼分析

先了解下 Dubbo 服務呼叫過程(圖片來自官方檔案)

首先服務消費者通過代理物件 Proxy 發起遠端呼叫,接著通過網路使用者端 Client 將編碼後的請求傳送給服務提供方的網路層上,也就是 Server。Server 在收到請求後,首先要做的事情是對封包進行解碼。然後將解碼後的請求傳送至分發器 Dispatcher,再由分發器將請求派發到指定的執行緒池上,最後由執行緒池呼叫具體的服務。這就是一個遠端呼叫請求的傳送與接收過程。至於響應的傳送與接收過程,這張圖中沒有表現出來。

2.1 服務呼叫方式

Dubbo 支援同步和非同步兩種呼叫方式,其中非同步呼叫還可細分為「有返回值」的非同步呼叫和「無返回值」的非同步呼叫。所謂「無返回值」非同步呼叫是指服務消費方只管呼叫,但不關心呼叫結果,此時 Dubbo 會直接返回一個空的 RpcResult。Dubbo 預設使用同步呼叫方式。

2.1.1 非同步呼叫案例

當有返回值非同步和無返回值非同步同時存在,無返回值非同步優先:

  • 有返回值非同步呼叫

    修改設定,將引數async設定為 true

    <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
        <dubbo:method name="sayHello" async="true" />
    </dubbo:reference>
    

    程式碼使用如下

    String hello = demoService.sayHello("world");// 返回值為null,要注意
    Future<String> future = RpcContext.getContext().getFuture();
    ... // 業務執行緒可以開始做其他事情
    result = future.get();
    
  • 無返回值非同步呼叫

    修改設定,將引數return設定為 false

    <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
        <dubbo:method name="sayHello" return="false" />
    </dubbo:reference>
    

    程式碼使用

    String hello = demoService.sayHello("world");// 返回值為null,要注意
    Future<String> future = RpcContext.getContext().getFuture();// future 為 null
    

下面,我們開始進入原始碼分析。

2.1.2 InvokerInvocationHandler

當我們通過Spring注入服務介面時,實際上注入的是服務介面的實現類,這個實現類由Dubbo框架生成。請看 服務參照#建立代理物件

package org.apache.dubbo.common.bytecode;

public class proxy0 implements org.apache.dubbo.demo.DemoService {

    public static java.lang.reflect.Method[] methods;

    private java.lang.reflect.InvocationHandler handler;

    public proxy0() {
    }

    public proxy0(java.lang.reflect.InvocationHandler arg0) {
        handler = 1;
    }

    public java.lang.String sayHello(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = (w) $1;
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
}

也就是呼叫 demoService.sayHello 時,實際上是呼叫 handler.invoke ,而這個 handler 就是InvokerInvocationHandler

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();

        // 攔截定義在 Object 類中的方法(未被子類重寫),比如 wait/notify
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        // 如果 toString、hashCode 和 equals 等方法被子類重寫了,這裡也直接呼叫
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 將 method 和 args 封裝到 RpcInvocation 中,並執行後續的呼叫
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

invoke 方法判斷如果是 java 內建的一下方法,則直接呼叫,不走 dubbo 的邏輯。所以我們關注的是 invoker.invoke() 。類變數 invoker 實際上是 FailoverClusterInvoker, 但是又被 MockClusterInvoker包裝了一層。這個 FailoverClusterInvoker 是由FailoverCluster生成的,請看 服務參照#遠端參照 。而 MockClusterInvoker 是由MockClusterWrapper生成,其基於Dubbo的SPI機制,將 FailoverCluster 又包裝了一遍。MockClusterInvoker內部封裝了服務降級邏輯。以後再開坑聊。

我們在 Dubbo叢集 文章中講過FailoverClusterInvoker,所以直接快進到DubboInvoker#doInvoke()方法。此時是不是一臉懵逼,為啥從 FailoverClusterInvoker 一下子就到了 DubboInvoker ,我們先來看看呼叫棧

我們把視角拉回FailoverClusterInvoker#doInvoke,看看通過負載均衡選出的 invoker

從圖片可以看到,最外層的invoker是一個內部類,是 服務目錄通過訂閱註冊中心 生成的

invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);

而 protocol 實際是DubboProtocol,所以 protocol.refer(serviceType, url) 生成的是DubboInvoker,至於為啥呼叫鏈這麼長,是因為ProtocolFilterWrapper,這個類增加了對Dubbo過濾器的支援。這是一個 protocol 的包裝類,它包裝了DubboProtocol#refer() ,我們取看看 ProtocolFilterWrapper的原始碼

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // 建立invoker鏈條
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // 獲取過濾器
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            // 對invoker進行封裝,責任鏈模式
            last = new Invoker<T>() {
                ......

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

            };
        }
    }
    return last;
}

buildInvokerChain 方法將 invoker 轉換成責任鏈的形式,獲取的 filters 為 {ConsumerContextFilter,FutureFilter,MonitorFilter},和圖片中的呼叫棧就對應上了。

那麼還剩下ListenerInvokerWrapper,這是一個 Invoker 包裝類,由 ProtocolListenerWrapper 生成。

public class ProtocolListenerWrapper implements Protocol {

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        // 封裝了Invoker監聽器
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                Collections.unmodifiableList(
                        ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
    }

}

public class ListenerInvokerWrapper<T> implements Invoker<T> {

    public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {
        if (invoker == null) {
            throw new IllegalArgumentException("invoker == null");
        }
        this.invoker = invoker;
        this.listeners = listeners;
        if (listeners != null && !listeners.isEmpty()) {
            for (InvokerListener listener : listeners) {
                if (listener != null) {
                    try {
                        listener.referred(invoker);
                    } catch (Throwable t) {
                        logger.error(t.getMessage(), t);
                    }
                }
            }
        }
    }

    @Override
    public void destroy() {
        try {
            invoker.destroy();
        } finally {
            if (listeners != null && !listeners.isEmpty()) {
                for (InvokerListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.destroyed(invoker);
                        } catch (Throwable t) {
                            logger.error(t.getMessage(), t);
                        }
                    }
                }
            }
        }
    }

}

總結一下:

ProtocolFilterWrapper是 Invoker 過濾器的支援,dubbo的過濾器用的也是責任鏈模式ListenerInvokerWrapper是 Invoker 監聽器的支援

2.1.3 DubboInvoker

上面囉嗦了很多,終於回到主線 DubboInvoker 。它繼承自 AbstractInvoker ,invoke 方法在抽象父類別中

public abstract class AbstractInvoker<T> implements Invoker<T> {
    @Override
    public Result invoke(Invocation inv) throws RpcException {
        ...

        RpcInvocation invocation = (RpcInvocation) inv;
        // 設定 Invoker
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            // 設定 attachment
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            // 新增 contextAttachments 到 RpcInvocation#attachment 變數中
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            // 設定非同步資訊到 RpcInvocation#attachment 中
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        // 新增呼叫id
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
        if (serializationId != null) {
            invocation.put(SERIALIZATION_ID_KEY, serializationId);
        }

        try {
            // 抽象方法,由子類實現
            return doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            ...
        } catch (RpcException e) {
            ...
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }
}

invoke 方法中,主要用於新增資訊到 RpcInvocation#attachment 中,給後續的邏輯使用。重點是 doInvoke 方法,這是一個抽象方法,由子類 DubboInvoker 實現。

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    // 設定 path 和 version 到 attachment 中
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        // 從 clients 陣列中獲取 ExchangeClient
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 獲取非同步設定
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // isOneway 為 true,表示「單向」通訊
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 非同步無返回值
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            // 傳送請求
            currentClient.send(inv, isSent);
            // 設定上下文中的 future 欄位為 null
            RpcContext.getContext().setFuture(null);
            // 返回一個空的 RpcResult
            return new RpcResult();
        // 非同步有返回值
        } else if (isAsync) {
            // 傳送請求,並得到一個 ResponseFuture 範例
            ResponseFuture future = currentClient.request(inv, timeout);
            // 設定 future 到上下文中
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            // 暫時返回一個空結果
            return new RpcResult();
        // 同步呼叫
        } else {
            RpcContext.getContext().setFuture(null);
            // 傳送請求,得到一個 ResponseFuture 範例,並呼叫該範例的 get 方法進行等待
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

doInvoke 方法主要是對同步和非同步呼叫的邏輯處理。可以看到,在有返回值的情況下,同步和非同步都是通過 currentClient.request 來傳送請求。區別在於,同步呼叫會使用 ResponseFuture#get 方法阻塞,知道請求完成,得到返回值。而非同步是將 ResponseFuture 放到上下文物件中,返回空結果。

FutureAdapter 是一個介面卡,它實現了 jdk 內建的 Future 介面,將 ResponseFuture 轉換成 Future 的用法,更貼合用戶習慣。這裡我們的重點是ResponseFuture是如何支援非同步呼叫的,這個介面的預設實現是DefaultFuture

public class DefaultFuture implements ResponseFuture {
    private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();    

    // 構造方法
    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        // 獲取請求 id,這個 id 很重要,後面還會見到
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 儲存 <requestId, DefaultFuture> 對映關係到 FUTURES 中
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

    // 阻塞等待並獲取請求結果
    @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        // 檢測服務提供方是否成功返回了呼叫結果
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                // 迴圈檢測服務提供方是否成功返回了呼叫結果
                while (!isDone()) {
                    // 如果呼叫結果尚未返回,這裡等待一段時間,預設1000毫秒
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果呼叫結果成功返回,或等待超時,此時跳出 while 迴圈,執行後續的邏輯
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 如果呼叫結果仍未返回,則丟擲超時異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

    @Override
    public boolean isDone() {
        // 通過檢測 response 欄位為空與否,判斷是否收到了呼叫結果
        return response != null;
    }

    // 當請求有響應時,呼叫此方法
    public static void received(Channel channel, Response response) {
        try {
            // 根據呼叫編號從 FUTURES 集合中查詢指定的 DefaultFuture 物件
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                // 這是請求超時,但是結果返回了的警告
                logger.warn("...");
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            // 儲存響應物件
            response = res;
            if (done != null) {
                // 喚醒使用者執行緒
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

}

上面對DefaultFuture做了部分程式碼精簡。get 方法阻塞等待返回值。而 received 方法則是在請求有相應時,儲存響應物件並喚醒 get 方法中的迴圈。這裡是很典型的 future 結構的寫法,有疑問的同學可以去了解下 Java 的並行知識。

2.2 服務消費方傳送請求

上節講了 Dubbo 的同步、非同步呼叫方式。本節來講講有返回值的情況下,Dubbo 消費方是如何傳送請求的。

我們把實現拉回 DubboInvoker#doInvoke 方法中,其有返回值的請求方法為 currentClient.request(inv, timeout),currentClient 為ReferenceCountExchangeClient,我們看下面這張呼叫棧圖

  • ReferenceCountExchangeClient:為 ExchangeClient 新增參照計數功能
  • HeaderExchangeClient:內部持有 client ,並封裝了心跳的功能

從 DubboInvoker 到 HeaderExchangeChannel,在 服務參照 文章就講過了,這裡不再贅述。下面直接看HeaderExchangeChannel 中的 request 方法

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    // 建立 Request 物件
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    // 設定雙向通訊標誌為 true
    req.setTwoWay(true);
    // 這裡的 request 變數型別為 RpcInvocation
    req.setData(request);

    // 建立 DefaultFuture 物件
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try {
        // 呼叫 NettyClient 的 send 方法傳送請求(在父類別AbstractPeer中)
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    // 返回 DefaultFuture 物件
    return future;
}

從上面的方法可以看到,將請求資料封裝成 Request 物件,傳遞給 DefaultFuture,再傳送出去。Request 在構造方法中會建立請求id,用於在接收到響應時,確定是哪個請求的響應。繼續看請求的傳送方法 channel.send(req),channel 是 NettyClient,結合類圖看呼叫路徑

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    @Override
    public void send(Object message) throws RemotingException {
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }
}

public abstract class AbstractClient extends AbstractEndpoint implements Client {
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }
        // 獲取 Channel,getChannel 是一個抽象方法,具體由子類實現
        Channel channel = getChannel();

        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        // 繼續向下呼叫
        channel.send(message, sent);
    }
}

這裡就兩個重點,獲取 channel 和 使用 channel 繼續往下呼叫。先看看如何獲取 channel

public class NettyClient extends AbstractClient {
    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isActive()) {
            return null;
        }
        // 獲取一個 NettyChannel 型別物件
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
}

final class NettyChannel extends AbstractChannel {
    // 私有構造方法
    private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        }
        this.channel = channel;
    }

    static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }

        // 嘗試從集合中獲取 NettyChannel 範例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,則建立一個新的 NettyChannel 範例
            NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
            if (ch.isActive()) {
                ret = channelMap.putIfAbsent(ch, nettyChannel);
            }
            if (ret == null) {
                ret = nettyChannel;
            }
        }
        return ret;
    }
}

獲取 channel 的邏輯很簡單,從快取獲取 NettyChannel,沒有則建立。下面繼續看 channel.send(message, sent)

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 傳送訊息(包含請求和響應訊息)
        ChannelFuture future = channel.writeAndFlush(message);
        // sent 的值源於 <dubbo:method sent="true/false" /> 中 sent 的設定值,有兩種設定值:
        //   1. true: 等待訊息發出,訊息傳送失敗將丟擲異常
        //   2. false: 不等待訊息發出,將訊息放入 IO 佇列,即刻返回
        // 預設情況下 sent = false;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待訊息發出,若在規定時間沒能發出,success 會被置為 false
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }

    // 若 success 為 false,這裡丟擲異常
    if (!success) {
        throw new RemotingException(this, "...");
    }
}

至此,請求資料的傳送過程就結束了。涉及 Netty 的傳送編解碼處理過程,感興趣的可以從 NettyClient#doOpen方法入手,這裡鑑於篇幅,就不寫了。

2.2.1 呼叫路徑

下面我們來總結一下消費端呼叫傳送請求過程的呼叫棧(以 DemoService 為例)

proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多個 Filter 呼叫
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)

2.3 服務提供方接收請求

預設情況下 Dubbo 使用 Netty 作為底層的通訊框架,從 NettyServer#doOpen 方法知道,接收請求的入口在 NettyServerHandler#channelRead,這裡已經是解碼之後得到的資料。然後資料依次經過 MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler 。至於為什麼是這幾個類以及順序,可以去看 NettyServer 的構造方法。下面我們首先看呼叫棧

public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

public class ChannelHandlers {
    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
}

public class ChannelHandlers {
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

MultiMessageHandler、HeartbeatHandler 直接通過構造方法建立,而 AllChannelHandler 則由 Dispatcher 的預設自適應拓展類 AllDispatcher 建立。

2.3.1 執行緒派發模型

剛才講到了 Dispatcher,這是一個執行緒派發器。讓我們回顧一下 Dubbo 服務呼叫過程圖(圖片來自官方檔案)

Dubbo 將底層通訊框架中接收請求的執行緒稱為 IO 執行緒。如果一些事件處理邏輯可以很快執行完,此時直接在 IO 執行緒上執行該段邏輯即可。但如果事件的處理邏輯比較耗時,比如該段邏輯會發起資料庫查詢或者 HTTP 請求。此時我們就不應該讓事件處理邏輯在 IO 執行緒上執行,而是應該派發到執行緒池中去執行。原因也很簡單,IO 執行緒主要用於接收請求,如果 IO 執行緒被佔滿,將導致它不能接收新的請求。PS:像不像Netty的主從模型,萬物殊途同歸啊。

Dispatcher 真實的職責建立具有執行緒派發能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身並不具備執行緒派發能力。Dubbo 支援 5 種不同的執行緒派發策略

策略 用途
all 所有訊息都派發到執行緒池,包括請求,響應,連線事件,斷開事件等
direct 所有訊息都不派發到執行緒池,全部在 IO 執行緒上直接執行
message 只有請求和響應訊息派發到執行緒池,其它訊息均在 IO 執行緒上執行
execution 只有請求訊息派發到執行緒池,不含響應。其它訊息均在 IO 執行緒上執行
connection 在 IO 執行緒上,將連線斷開事件放入佇列,有序逐個執行,其它訊息派發到執行緒池

下面我們看看預設的 AllChannelHandler

public class AllChannelHandler extends WrappedChannelHandler {
    /** 處理請求和響應訊息,這裡的 message 變數型別可能是 Request,也可能是 Response */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 獲取執行緒池,由自適應拓展生成,預設由 FixedThreadPool 生成
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將請求和響應訊息派發到執行緒池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	if(message instanceof Request && t instanceof RejectedExecutionException){
        		Request request = (Request)message;
            // 如果通訊方式為雙向通訊,此時將 Server side ... threadpool is exhausted
            // 錯誤資訊封裝到 Response 中,並返回給服務消費方。
        		if(request.isTwoWay()){
        			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
        			Response response = new Response(request.getId(), request.getVersion());
        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
        			response.setErrorMessage(msg);
              // 返回包含錯誤資訊的 Response 物件
        			channel.send(response);
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

請求物件會被封裝 ChannelEventRunnable 中,也就是 ChannelEventRunnable#run 方法才是實際處理請求的地方。

2.3.2 呼叫服務

public class ChannelEventRunnable implements Runnable {
    @Override
    public void run() {
        // 檢測通道狀態,對於請求或響應訊息,此時 state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
                // 將 channel 和 message 傳給 ChannelHandler 物件,進行後續的呼叫
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("...", e);
            }
        // 其他訊息型別通過 switch 進行處理
        } else {
            switch (state) {
            case CONNECTED:
                ...
            case DISCONNECTED:
                ...
            case SENT:
                ...
            case CAUGHT:
                ...
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }
    }
}

ChannelEventRunnable 依然不進行呼叫邏輯,只是根據通道的狀態將請求轉發。可以注意一下,這裡特意對 RECEIVED 狀態用了 if 判斷,然後其它狀態使用 switch 來判斷,是因為絕大部分的請求都是 RECEIVED 型別。

這裡的 handler 是 DecodeHandler,這是一個解碼處理器。也許你會以為,這個是不是和 InternalDecoder衝突了?既然解碼操作已經在 IO 執行緒(也就是 Netty 的 WorkerGroup)中處理了,為什麼到 Dubbo 執行緒池中,還要再處理一次?這取決於 decode.in.io 引數,允許將部分解碼工作交由 Dubbo 執行緒池中完成。下面我們略過 DecodeHandler,快進到 HeaderExchangeHandler 中

public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 處理請求物件
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    // 處理事件
                    handlerEvent(channel, request);
                // 處理普通的請求
                } else {
                    // 雙向通訊
                    if (request.isTwoWay()) {
                        // 向後呼叫服務,並得到呼叫結果
                        Response response = handleRequest(exchangeChannel, request);
                        // 將呼叫結果返回給服務消費端
                        channel.send(response);
                    } else {
                        // 如果是單向通訊,僅向後呼叫指定服務即可,無需返回撥用結果
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            // 處理響應物件,服務消費方會執行此處邏輯
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // telnet 相關
                ...
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    // 處理請求
    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 檢測請求是否合法,不合法則返回狀態碼為 BAD_REQUEST 的響應
        if (req.isBroken()) {
            ...
            return res;
        }
        // 獲取 data 欄位值,也就是 RpcInvocation 物件
        Object msg = req.getData();
        try {
            // handle data.
            // 繼續向下呼叫
            Object result = handler.reply(channel, msg);
            // 設定 OK 狀態碼
            res.setStatus(Response.OK);
            // 設定呼叫結果
            res.setResult(result);
        } catch (Throwable e) {
            // 若呼叫過程出現異常,則設定 SERVICE_ERROR,表示伺服器端異常
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }
}

處理過程註釋中已經寫了。通過 handleRequest 方法處理請求得到返回值,並通過 channel.send 將結果返回給消費者。(碎碎念:這個 channel 和 Netty 的是真的像)

handleRequest 方法中主要是對 Response 物件的處理,我們繼續跟進呼叫過程 handler.reply(channel, msg),這個 handler 是 DubboProtocol的類變數requestHandler

public class DubboProtocol extends AbstractProtocol {

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                // 獲取 Invoker 範例
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                // 回撥相關
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    ...
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 通過 Invoker 呼叫具體的服務
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "...");
        }

        ...
    };

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        ...
        // 計算 service key,格式為 groupName/serviceName:serviceVersion:port。比如:
        //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

        // 從 exporterMap 查詢與 serviceKey 相對應的 DubboExporter 物件,
        // 服務匯出過程中會將 <serviceKey, DubboExporter> 對映關係儲存到 exporterMap 集合中
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null)
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);

        // 獲取 Invoker 物件,並返回
        return exporter.getInvoker();
    }
}

reply 方法先是獲取 Invoker 範例,然後通過 Invoker 呼叫具體的服務。想了解 Invoker 的建立以及如何放入到 exporterMap 中的,可以看以前寫過的 服務匯出 文章。下面這段在 服務匯出 文章中均有提過,不想看的可以直接跳到本節末尾看呼叫路徑。

invoke 方法定義在 AbstractProxyInvoker 中

public abstract class AbstractProxyInvoker<T> implements Invoker<T> {

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            // 呼叫 doInvoke 執行後續的呼叫,並將呼叫結果封裝到 RpcResult 中,並
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method ...");
        }
    }
    
    protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}

doInvoke 是一個抽象方法,這個需要由具體的 Invoker 範例實現。Invoker 範例是在執行時通過 JavassistProxyFactory 建立的

public class JavassistProxyFactory extends AbstractProxyFactory {
    
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 建立匿名類物件
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // 呼叫 invokeMethod 方法進行後續的呼叫
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。Dubbo 會在執行時通過 Javassist 框架為 Wrapper 生成實現類,並實現 invokeMethod 方法,該方法最終會根據呼叫資訊呼叫具體的服務。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。

public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 型別轉換
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根據方法名呼叫指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}

至此,伺服器端呼叫服務的過程就講完了。

2.3.3 呼叫路徑

下面我們來總結一下伺服器端呼叫服務過程的呼叫棧(以 DemoService 為例)

// 這是IO執行緒的呼叫過程
NettyServerHandler#channelRead(ChannelHandlerContext, Object)
  —> AbstractPeer#received(Channel, Object)
    —> MultiMessageHandler#received(Channel, Object)
      —> HeartbeatHandler#received(Channel, Object)
        —> AllChannelHandler#received(Channel, Object)
// 這是轉發到執行緒池之後的呼叫過程
ChannelEventRunnable#run()
  —> DecodeHandler#received(Channel, Object)
    —> HeaderExchangeHandler#received(Channel, Object)
      —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
        —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
          —> Filter#invoke(Invoker, Invocation)
            —> AbstractProxyInvoker#invoke(Invocation)
              —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                —> DemoServiceImpl#sayHello(String)

2.4 服務提供方返回撥用結果

在 2.3.2 節中講了,呼叫結果會封裝在 Response 物件中,並由NettyChannel 的 send 方法將 Response 物件返回。詳情請看 HeaderExchangeHandler。至於返回 Response 過程中的編碼過程,我們省略。

2.5 服務消費方接收呼叫結果

消費者接收響應資料的處理過程中,從 NettyHandler (消費者是 NettyClientHandler,生產者是 NettyServerHandler,不過他們的 channelRead 方法一模一樣) 到 AllChannelHandler 的處理過程與服務提供方接收請求(2.3節)的處理過程一致,就不重複分析了。所以本節重點在 Dubbo如何將呼叫結果傳遞給使用者執行緒。

2.5.1 向用戶執行緒傳遞呼叫結果

我們直接快進到 HeaderExchangeHandler 的 received 方法中(呼叫路徑請看 2.3.2 節末尾)

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 處理請求物件
            if (message instanceof Request) {
                ...
            // 處理響應物件,服務消費方會執行此處邏輯
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                ...
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
}

可以看到,是呼叫 DefaultFuture#receive 方法處理的,DefaultFuture 物件我們在 2.1.3 節有講到,繼續追蹤程式碼

public class DefaultFuture implements ResponseFuture {

    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

    public static void received(Channel channel, Response response) {
        try {
            // 根據呼叫編號從 FUTURES 集合中查詢指定的 DefaultFuture 物件
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                // 這是請求超時,但是結果返回了的警告
                logger.warn("...");
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            // 儲存響應物件
            response = res;
            if (done != null) {
                // 喚醒使用者執行緒
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}

在一次呼叫過程中,請求和相應的編號是一致的,所以可以根據呼叫編號從 FUTURES 中得到發起請求時建立的 DefaultFuture 。DefaultFuture.get 方法阻塞等待響應結果,而 DefaultFuture#received 是得到響應結果之後喚醒使用者執行緒(也就是 get 方法中的迴圈)。這兩個方法結合起來看就明白了。

3. 總結

沒啥好總結的,Dubbo 系列就寫完了。閱讀優秀框架的原始碼從大的方面可以學習其思想以及架構,小的方面就是一個個小功能的寫法,比如負載均衡演演算法、DefaultFuture、SPI 等等。

PS:總感覺 Dubbo 和 Netty 的執行緒模型殊途同歸


參考資料

Dubbo開發指南