在網路請求時,總會有各種異常情況出現,我們需要提前處理這種情況。在完善的rpc元件dubbo中,自然是不會少了這一層東西的。我們只需要通過一些簡單的設定就可以達到超時限制的作用了。
dubbo的設計理念是,使用者端控制優先,伺服器端控制兜底。
要想實現超時,一般有兩個思路。一個是使用者端自行設定一個超時限制,達到超時時間還未返回,則丟擲異常,不再等待結果。二是通過在超時後,將連線斷開,從而使資料請求中斷,最終也是以丟擲異常的方式返回的。
當然,超時有兩種情況,一種是自己主動的超時,另一種是被別人關掉連線發生的超時(需主動主傳送超時訊息)。一般我們認為主動設定的超時是可控的,被動的超時將是一個不可逾越的鴻溝,如果必須需要更長的時間才能拿到結果時,此種超時將限制我們,我們只能另謀出路了,比如呼叫的非同步化。
一般地,要想實現超時,我們也有兩種方式:一種是呼叫別人提供的api,其中包含了超時設定,此時僅需簡單設定即可;另一種是我們自行實現的超時,比如原本只有一個無限介面,我們要實現超時,必須將其非同步化,通過額外的執行緒來進行超時的檢測和控制。
那麼,dubbo又是怎樣實現超時的呢?
我們前面說過,dubbo中consumer端可以設定超時,伺服器端也可以提供超時設定。那麼,會不會是使用者端和伺服器端都要實現超時機制呢?不管怎麼樣,使用者端是一定要做的。所以,我們先來看看使用者端實現超時的機制。
首先,dubbo的調置超時方式,按照其整體架構設計理念,都有幾個作用域:應用級 -> 介面級 -> 方法級。 consumer端 -> provider端。
// 消費者端特定方法的設定 <dubbo:consumer interface="com.alibaba.xxx.XxxService" > <dubbo:method name="findPerson" timeout="1000" /> </dubbo:consumer> // 消費者端特定介面的設定 <dubbo:consumer interface="com.alibaba.xxx.XxxService" timeout="200" /> // 提供者端特定方法的設定 <dubbo:service interface="com.alibaba.xxx.XxxService" > <dubbo:method name="findPerson" timeout="1000" /> </dubbo:service> // 提供者端特定介面的設定 <dubbo:service interface="com.alibaba.xxx.XxxService" timeout="200" />
當然了,上面這種是使用xml進行設定的,你還可以使用properties檔案進行設定,也可以使用java程式碼直接進行設定。
這些引數設定好後,在呼叫rpc時,進行想入相應的Invoket,進行讀取引數,使用。
// org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke @Override public Result invoke(Invocation invocation) throws RpcException { // 同步和非同步,底層都是非同步請求,僅做上層封裝 Result asyncResult = invoker.invoke(invocation); try { // 同步請求時,在內部等待 if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { /** * NOTICE! * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop. */ asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { Throwable t = e.getCause(); // 超時返回,給出詳細堆疊 if (t instanceof TimeoutException) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else if (t instanceof RemotingException) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else { throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } catch (Throwable e) { throw new RpcException(e.getMessage(), e); } return asyncResult; } // org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke @Override public Result invoke(Invocation inv) throws RpcException { // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed if (destroyed.get()) { logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); if (CollectionUtils.isNotEmptyMap(attachment)) { invocation.addObjectAttachmentsIfAbsent(attachment); } Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { /** * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here, * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information). */ invocation.addObjectAttachments(contextAttachments); } invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION)); if (serializationId != null) { invocation.put(SERIALIZATION_ID_KEY, serializationId); } AsyncRpcResult asyncResult; try { // 呼叫遠端方法 asyncResult = (AsyncRpcResult) doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception Throwable te = e.getTargetException(); if (te == null) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation); } } catch (RpcException e) { if (e.isBiz()) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { throw e; } } catch (Throwable e) { asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture())); return asyncResult; } // org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 獲取超時設定 int timeout = calculateTimeout(invocation, methodName); invocation.put(TIMEOUT_KEY, timeout); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { // 響應結果回撥,使用執行緒池接收 ExecutorService executor = getCallbackExecutor(getUrl(), inv); // 向伺服器端傳送請求,並返回 future 作為結果接收器 CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } // 超時設定讀取,多種方式,多種優先順序 private int calculateTimeout(Invocation invocation, String methodName) { // timeout-countdown, 需要傳導到伺服器端的超時控制 Object countdown = RpcContext.getContext().get(TIME_COUNTDOWN_KEY); // 預設1s超時 int timeout = DEFAULT_TIMEOUT; if (countdown == null) { timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getContext(), DEFAULT_TIMEOUT); if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) { invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout); // pass timeout to remote server } } else { TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown; timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS); invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);// pass timeout to remote server } return timeout; } // org.apache.dubbo.rpc.support.RpcUtils#getTimeout public static long getTimeout(URL url, String methodName, RpcContext context, long defaultTimeout) { long timeout = defaultTimeout; // 先方法,後介面 // 事實上,所有介面級的變數在註冊的時候已經作用到了方法級上了,所以只需讀取方法上的引數即可 Object genericTimeout = context.getObjectAttachment(TIMEOUT_KEY); if (genericTimeout != null) { timeout = convertToNumber(genericTimeout, defaultTimeout); } else if (url != null) { timeout = url.getMethodPositiveParameter(methodName, TIMEOUT_KEY, defaultTimeout); } return timeout; } // org.apache.dubbo.common.URL#getMethodPositiveParameter(java.lang.String, java.lang.String, long) public long getMethodPositiveParameter(String method, String key, long defaultValue) { if (defaultValue <= 0) { throw new IllegalArgumentException("defaultValue <= 0"); } long value = getMethodParameter(method, key, defaultValue); return value <= 0 ? defaultValue : value; } public long getMethodPositiveParameter(String method, String key, long defaultValue) { if (defaultValue <= 0) { throw new IllegalArgumentException("defaultValue <= 0"); } long value = getMethodParameter(method, key, defaultValue); return value <= 0 ? defaultValue : value; } // org.apache.dubbo.common.URL#getMethodParameter(java.lang.String, java.lang.String, long) public long getMethodParameter(String method, String key, long defaultValue) { Number n = getCachedNumber(method, key); if (n != null) { return n.longValue(); } String value = getMethodParameter(method, key); if (StringUtils.isEmpty(value)) { return defaultValue; } long l = Long.parseLong(value); updateCachedNumber(method, key, l); return l; } // org.apache.dubbo.rpc.protocol.AbstractInvoker#getCallbackExecutor protected ExecutorService getCallbackExecutor(URL url, Invocation inv) { ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url); if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) { // 同步請求使用少量的共用執行緒池,實際上是做進一步封裝處理 return new ThreadlessExecutor(sharedExecutor); } else { // 非同步呼叫則直接使用共用執行緒池,不受其他節點控制 return sharedExecutor; } }
從上面可以看出,dubbo的超時機制是通過非同步執行緒future的方式實現的,其中,同步呼叫的超時設定,底層也是用非同步實現。這樣既簡化了底層實現,也對外提供了很好的易用性。因為底層都是通過netty或nio實現網路通訊,而這種實現一般又是select-poll 模型或者 epoll 模型,反正也必須要用非同步處理,所以不管如何也是跑不掉這個實現。只要實現好一個底層非同步通知,全部基石就都好了。而上層,則只需關注是使用者實現,還是框架實現了。
上面的實現,我們並沒有看到具體是如何實現超時的,畢竟我們只是看到了表面現象,即只是設定了一個 timeout引數,而已。更深層次的實現,請繼續。也就是說dubbo是在做請求的同時,做了超時的設定工作。
// org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request @Override public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); // 裡面包含了一個超時任務 timeTask DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; } // org.apache.dubbo.remoting.exchange.support.DefaultFuture#newFuture /** * init a DefaultFuture * 1.init a DefaultFuture * 2.timeout check * * @param channel channel * @param request the request * @param timeout timeout * @return a new DefaultFuture */ public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) { final DefaultFuture future = new DefaultFuture(channel, request, timeout); future.setExecutor(executor); // ThreadlessExecutor needs to hold the waiting future in case of circuit return. if (executor instanceof ThreadlessExecutor) { ((ThreadlessExecutor) executor).setWaitingFuture(future); } // timeout check timeoutCheck(future); return future; } // org.apache.dubbo.remoting.exchange.support.DefaultFuture#timeoutCheck /** * check time out of the future */ private static void timeoutCheck(DefaultFuture future) { TimeoutCheckTask task = new TimeoutCheckTask(future.getId()); // 新增一個定時器 future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); } // org.apache.dubbo.common.timer.HashedWheelTimer#newTimeout @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; } // org.apache.dubbo.remoting.exchange.support.DefaultFuture.TimeoutCheckTask#TimeoutCheckTask TimeoutCheckTask(Long requestID) { this.requestID = requestID; } @Override public void run(Timeout timeout) { DefaultFuture future = DefaultFuture.getFuture(requestID); if (future == null || future.isDone()) { return; } if (future.getExecutor() != null) { future.getExecutor().execute(() -> notifyTimeout(future)); } else { notifyTimeout(future); } } // org.apache.dubbo.remoting.exchange.support.DefaultFuture.TimeoutCheckTask#notifyTimeout private void notifyTimeout(DefaultFuture future) { // create exception response. Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. DefaultFuture.received(future.getChannel(), timeoutResponse, true); } // org.apache.dubbo.remoting.exchange.support.DefaultFuture#received public static void received(Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { Timeout t = future.timeoutCheckTask; if (!timeout) { // decrease Time t.cancel(); } future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response status is " + response.getStatus() + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result."); } } finally { CHANNELS.remove(response.getId()); } } // 丟擲異常訊息 private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 封裝為 TimeoutException this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } // the result is returning, but the caller thread may still waiting // to avoid endless waiting for whatever reason, notify caller thread to return. if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception.")); } } }
總體來說就是,在提交伺服器端的查詢請求時,會開啟定時任務,檢查超時。如果定時任務到期,還未收到結果則會觸發超時通知。如果使用者端還未成功傳送資料,則認為是使用者端自己超時。如果已經將資料傳送出去,則認為暗伺服器端超時。這相當於是一個看門狗的形式處理了,就是說,不管伺服器端和使用者端本身如何,總能被這東西給發現,所以這種超時控制是精確的。
當然,除了看門狗的監控,還有的情況是需要應用自己去主動發現的。至少,它不能一直讓看門狗起作用吧。
非同步結果的處理有兩種入口方式:一是後臺執行緒處理好之後,自行將結果放置到合適的地方;二是主執行緒主動查詢結果,如果沒有完成就等待,直到完成或超時返回;dubbo是在傳送請求時,設定一個定時器,檢查是否超時,到超時時間就傳送一個超時事件。並結束任務。
同步和非同步的結果處理方式如下:
// org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received @Override public void received(Channel channel, Object message) throws RemotingException { // 根據requestId, 取出之前設定的executor, 提交給業務執行緒池呼叫 ExecutorService executor = getPreferredExecutorService(message); try { // 將訊息封裝成 ChannelEventRunnable, 交由後續處理 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } /** * Currently, this method is mainly customized to facilitate the thread model on consumer side. * 1. Use ThreadlessExecutor, aka., delegate callback directly to the thread initiating the call. * 2. Use shared executor to execute the callback. * * @param msg * @return */ public ExecutorService getPreferredExecutorService(Object msg) { if (msg instanceof Response) { Response response = (Response) msg; DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId()); // a typical scenario is the response returned after timeout, the timeout response may has completed the future if (responseFuture == null) { return getSharedExecutorService(); } else { // 取出之前設定的executor ExecutorService executor = responseFuture.getExecutor(); if (executor == null || executor.isShutdown()) { executor = getSharedExecutorService(); } return executor; } } else { return getSharedExecutorService(); } } // 這是同步呼叫時使用到的執行緒池 ThreadlessExecutor, 接收到資料後不會立即處理 /** * If the calling thread is still waiting for a callback task, add the task into the blocking queue to wait for schedule. * Otherwise, submit to shared callback executor directly. * * @param runnable */ @Override public void execute(Runnable runnable) { runnable = new RunnableWrapper(runnable); synchronized (lock) { if (!waiting) { sharedExecutor.execute(runnable); } // 只要使用者端的還沒有觸發結果檢查,那麼將放入佇列中,即不會主動進行通知結果 else { queue.add(runnable); } } }
可以看到同步和非同步的處理方式區別在於使用不同的執行緒池實現,非同步是直接執行,而同步則做了一次包裝,這也為它自定義更合適的處理方式打下了基礎。
同步處理時,在上層介面呼叫也是無感的。但是底層都被包裝成了非同步呼叫,所以會在上層api中主動進行結果的等待處理。當然,既然是同步處理,它自然是不會主動設定一個較小的超時的,而是用了一個 Integer.MAX_VALUE 的超時設定,真正的超時是由非同步結果處理中丟擲。
// asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); // org.apache.dubbo.rpc.AsyncRpcResult#get(long, java.util.concurrent.TimeUnit) @Override public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; threadlessExecutor.waitAndDrain(); } // 最終直接從指定位置獲取結果即可 return responseFuture.get(timeout, unit); } // org.apache.dubbo.common.threadpool.ThreadlessExecutor#waitAndDrain() /** * Waits until there is a task, executes the task and all queued tasks (if there're any). The task is either a normal * response or a timeout response. */ public void waitAndDrain() throws InterruptedException { /** * Usually, {@link #waitAndDrain()} will only get called once. It blocks for the response for the first time, * once the response (the task) reached and being executed waitAndDrain will return, the whole request process * then finishes. Subsequent calls on {@link #waitAndDrain()} (if there're any) should return immediately. * * There's no need to worry that {@link #finished} is not thread-safe. Checking and updating of * 'finished' only appear in waitAndDrain, since waitAndDrain is binding to one RPC call (one thread), the call * of it is totally sequential. */ if (finished) { return; } Runnable runnable; try { // 如果伺服器端沒有響應,這裡是會一直阻塞,因此也達到了同步等待的效果 runnable = queue.take(); }catch (InterruptedException e){ waiting = false; throw e; } // 當拿到結果之後,再執行後續的任務,一般沒啥事了,主要就是將結果放置到合適的位置,以後後續可取 synchronized (lock) { waiting = false; runnable.run(); } runnable = queue.poll(); while (runnable != null) { runnable.run(); runnable = queue.poll(); } // mark the status of ThreadlessExecutor as finished. finished = true; }
同步只是表象,非同步才是核心。
非同步執行時,使用的就是 ThreadPoolExecutor, 直接進行execute, 即提交到執行緒池立即執行。即都是統一用共用執行緒池進行處理,這樣做的好處是,不需要等待使用者端呼叫結果,而是主動將結果放置到future的result位置,只需等待處理即可。
// org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run @Override public void run() { if (state == ChannelState.RECEIVED) { try { // 直接進入到netty 管道出入站流程,並最終如前面將結果設定到指定位置 handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } }
非同步處理沒啥特別的,直接交由netty的pipeline機制完全處理即可。
NettypClient, 與伺服器端互動的入口。主要用於開啟網路連線,設定各種處理器,總體來說就是netty的程式設計模型。感興趣的自行翻閱。
// org.apache.dubbo.remoting.transport.netty.NettyClient#NettyClient public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { super(url, wrapChannelHandler(url, handler)); } @Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ClientBootstrap(CHANNEL_FACTORY); // config // @see org.jboss.netty.channel.socket.SocketChannelConfig bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("connectTimeoutMillis", getConnectTimeout()); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); } 比較簡單,主要就是通過 NettyHandler 進入資料處理。當然,編解碼是少不了的。 @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } @Override public void received(Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } // 響應伺服器端結果的處理方式 else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } // org.apache.dubbo.remoting.transport.netty4.NettyClientHandler#channelRead @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.received(channel, msg); } // org.apache.dubbo.remoting.transport.AbstractPeer#received @Override public void received(Channel ch, Object msg) throws RemotingException { if (closed) { return; } handler.received(ch, msg); }
前面多次提到響應結束後,結果將會被放到合適的位置,我們就簡單看下它到底是怎麼放置的呢?其實就是 CompletableFuture 的complete方法。
// 主動置位結果 private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { // 放置結果後結束 this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } // the result is returning, but the caller thread may still waiting // to avoid endless waiting for whatever reason, notify caller thread to return. if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception.")); } } }
可以看出,同步和非同步呼叫的區別主要是執行緒池的處理,以級後續事件的觸發時機不同。同步呼叫在框架層面的假設是,傳送訊息之後,很快就會進行get() 操作,所以此時只需將就緒事件放入佇列即可。而非同步呼叫則可能沒有後續的使用者驅動,所以不能有卡點的出現,所以直接執行相應的結果通知,將結果放置到正確的位置。至於使用者端來取或不來取,整體都不景程。
超時機制,是通過一個定時器,到點檢查,檢查到即超時。如果結果先出來,那麼,主動將定時器取消,一切正常。因為定時器是另外的執行緒池進行處理,不受當前處理執行緒的影響,所以可以很好地控制超時。不管是使用者端超時,還是伺服器端超時,都一概處理。
最後,再說下超時時的訊息描述資訊,因為這可能給我排查問題帶來極大的便利。
判斷是使用者端超時還是伺服器端超時,是通過是否將訊息傳送出去為準的。實際上,它並不能區分出到底是使用者端傳送得晚了,還是伺服器端真的處理慢了。也就是說,當用戶端自己慢的時候,它很可能認為是伺服器端超時了。而且,使用者端是假設伺服器端一傳送加響應訊息,使用者端就立即能收到結果,然後就以當時時間來判定伺服器端的處理時間。然而這樣的判斷方式,在使用者端自身壓力很大的情況下,仍然是有失偏頗的。程式碼描述如下:
// 錯誤資訊詳細描述 // org.apache.dubbo.remoting.exchange.support.DefaultFuture#getTimeoutMessage private String getTimeoutMessage(boolean scan) { long nowTimestamp = System.currentTimeMillis(); return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side") + (scan ? " by scan timer" : "") + ". start time: " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(nowTimestamp))) + "," + (sent > 0 ? " client elapsed: " + (sent - start) + " ms, server elapsed: " + (nowTimestamp - sent) : " elapsed: " + (nowTimestamp - start)) + " ms, timeout: " + timeout + " ms, request: " + (logger.isDebugEnabled() ? request : getRequestWithoutData()) + ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress(); }
通過上一節,我們可以看到使用者端的超時機制比較簡單,但是實際上也是非常完善的。那麼,對於伺服器端是否也有同樣的一套東西呢?事實上,要控制伺服器端的超時,難度要比使用者端大:一是因為伺服器端作為服務提供者,應該是要保證服務正常處理,而不是邊處理邊檢查是否超時;二是伺服器端如果發現了超時,應該怎麼對使用者端說呢?丟擲異常或者不返回訊息?使用者端因為是終端,他可以忽略結果即可,但伺服器端這樣做卻是不太合適的。另外,伺服器端計算超時的方式是不完善的,因為超時一般是針對使用者端而言,因為整體鏈路除了伺服器端的處理時間,還有網路傳輸、處理時間,使用者端自行處理的時間等等,所以伺服器端的超時標準不太可靠。
當網路資料就緒之後,會將資料提交到業務執行緒池進行處理,也就是說io執行緒和業務執行緒是分離的。這是一般的處理方式,避免阻塞io執行緒,也方便擴充套件業務執行緒。我們理解,其實要做超時,在這個地方是比較合適的。
// 伺服器端訊息接入 // org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { // 交由對應的執行緒池非同步處理, 狀態為 RECEIVED // 此處其實可能存在阻塞等待問題 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } // org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received @Override public void received(Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { // handleRequest handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } // org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); // 發生異常情況時,會被取消執行 if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable) data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return; } // find handler by message class. Object msg = req.getData(); try { CompletionStage<Object> future = handler.reply(channel, msg); // 非同步等待結果響應回撥 future.whenComplete((appResult, t) -> { try { // 沒有異常,就是正常 if (t == null) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { // 在使用者端關閉連線時,傳送訊息將會失敗 logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } } // org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke @Override public Result invoke(Invocation invocation) throws RpcException { try { // 呼叫正常的rpc方法 Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = wrapWithFuture(value); CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> { AppResponse result = new AppResponse(invocation); if (t != null) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } return result; }); // 包裝返回結果 return new AsyncRpcResult(appResponseFuture, invocation); } catch (InvocationTargetException e) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e); } return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
server端確實也使用了一個獨立的執行緒池來處理業務,但是並沒有看到相應的外圍超時處理。這是比較疑惑的,因為它已經錯過了最佳判斷超時的時機了。那麼,是否伺服器端就不能提供超時功能了呢?
server端僅在特殊情況下才會處理超時。它是在 TimeoutFilter 做的簡單處理,僅將結果清空,然後正常返回了。
// org.apache.dubbo.rpc.protocol.FilterNode#invoke @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(next, invocation); } catch (Exception e) { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); try { Filter.Listener listener = listenableFilter.listener(invocation); if (listener != null) { listener.onError(e, invoker, invocation); } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; listener.onError(e, invoker, invocation); } throw e; } finally { } return asyncResult.whenCompleteWithContext((r, t) -> { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); Filter.Listener listener = listenableFilter.listener(invocation); try { if (listener != null) { if (t == null) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; if (t == null) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } }); } // org.apache.dubbo.rpc.filter.ContextFilter#invoke @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { Map<String, Object> attachments = invocation.getObjectAttachments(); if (attachments != null) { Map<String, Object> newAttach = new HashMap<>(attachments.size()); for (Map.Entry<String, Object> entry : attachments.entrySet()) { String key = entry.getKey(); if (!UNLOADING_KEYS.contains(key)) { newAttach.put(key, entry.getValue()); } } attachments = newAttach; } RpcContext context = RpcContext.getContext(); context.setInvoker(invoker) .setInvocation(invocation) // .setAttachments(attachments) // merged from dubbox .setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); String remoteApplication = (String) invocation.getAttachment(REMOTE_APPLICATION_KEY); if (StringUtils.isNotEmpty(remoteApplication)) { context.setRemoteApplicationName(remoteApplication); } else { context.setRemoteApplicationName((String) context.getAttachment(REMOTE_APPLICATION_KEY)); } // 此處為伺服器端的超時實現,通過 _TO:xx 設定,由使用者端傳導到伺服器端進行控制,當超時時,結果將被清空 // 即此處的超時是偽超時,使用者端實現的超時才是真實的 long timeout = RpcUtils.getTimeout(invocation, -1); if (timeout != -1) { context.set(TIME_COUNTDOWN_KEY, TimeoutCountDown.newCountDown(timeout, TimeUnit.MILLISECONDS)); } // merged from dubbox // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol) if (attachments != null) { if (context.getObjectAttachments() != null) { context.getObjectAttachments().putAll(attachments); } else { context.setObjectAttachments(attachments); } } if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } try { context.clearAfterEachInvoke(false); return invoker.invoke(invocation); } finally { context.clearAfterEachInvoke(true); // IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread. RpcContext.removeContext(true); RpcContext.removeServerContext(); } } // org.apache.dubbo.rpc.filter.TimeoutFilter#invoke @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { return invoker.invoke(invocation); } // TimeoutFilter @Override public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { // "timeout-countdown" Object obj = RpcContext.getContext().get(TIME_COUNTDOWN_KEY); if (obj != null) { // 超時後,將結果清空 TimeoutCountDown countDown = (TimeoutCountDown) obj; if (countDown.isExpired()) { ((AppResponse) appResponse).clear(); // clear response in case of timeout. if (logger.isWarnEnabled()) { logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + countDown.elapsedMillis() + " ms."); } } } }
伺服器端的超時控制,並非像使用者端那樣,可以直接斷開服務,或者丟棄連線。而是需要謹慎處理,此處為清空結果。這也許不是大家想要的超時。
伺服器端作為提供者,會將自己所有的必要的服務註冊到註冊中心,所以在在啟動時會使用netty服務架構,開啟網路埠。這個過程是在匯出第一個service的時候進行的。感興趣的自行翻閱。
// 匯出服務時會開啟遠端連線,對外提供埠服務 // org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs, int protocolConfigNum) { String name = protocolConfig.getName(); if (StringUtils.isEmpty(name)) { name = DUBBO; } ... Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); ... this.urls.add(url); } // org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } } openServer(url); optimizeSerialization(url); return exporter; } private void openServer(URL url) { // 一個ip:port, 對應一個server // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { ProtocolServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override server.reset(url); } } } // org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer private ProtocolServer createServer(URL url) { url = URLBuilder.from(url) // send readonly event when server closes, it's enabled by default .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // enable heartbeat by default .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); } ExchangeServer server; try { // 通過 header=org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger 進行查詢合適的網路傳輸元件 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return new DubboProtocolServer(server); } // 最終在 HeeaderExchanger 裡面載入transporter // org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } // org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...) public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } // 預設為取 netty 的設定 // @SPI("netty") // public interface Transporter // 而netty的設定是: netty4=org.apache.dubbo.remoting.transport.netty4.NettyTransporter // netty=org.apache.dubbo.remoting.transport.netty4.NettyTransporter return getTransporter().bind(url, handler); } // org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind @Override public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException { return new NettyServer(url, handler); } // org.apache.dubbo.remoting.transport.netty4.NettyServer#NettyServer public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants. // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); } // org.apache.dubbo.remoting.transport.AbstractServer#AbstractServer public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = ANYHOST_VALUE; } bindAddress = new InetSocketAddress(bindIp, bindPort); this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } executor = executorRepository.createExecutorIfAbsent(url); } // org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen /** * Init and start netty server * * @throws Throwable */ @Override protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss"); workerGroup = NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker"); // 伺服器端功能接入處理器 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE); bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_KEEPALIVE, keepalive) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler)); } // 編解碼, 正式處理器 // pipeline, encoder -> handler 出站, decoder -> handler 入站 ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); } // 處理各種網路事件的分發 // org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run @Override public void run() { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } } // org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } ... // org.apache.dubbo.rpc.protocol.FilterNode#invoke @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(next, invocation); } catch (Exception e) { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); try { Filter.Listener listener = listenableFilter.listener(invocation); if (listener != null) { listener.onError(e, invoker, invocation); } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; listener.onError(e, invoker, invocation); } throw e; } finally { } // 結果回撥通知, 用於監控、超時處理 之類的擴充套件點 return asyncResult.whenCompleteWithContext((r, t) -> { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = ((ListenableFilter) filter); Filter.Listener listener = listenableFilter.listener(invocation); try { if (listener != null) { if (t == null) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Filter.Listener) { Filter.Listener listener = (Filter.Listener) filter; if (t == null) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } }); } // org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke @Override public Result invoke(Invocation invocation) throws RpcException { try { // 通過代理,呼叫使用者的rpc實現 // JavaassistProxyFactory Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); // 使用future 封裝返回 CompletableFuture<Object> future = wrapWithFuture(value); CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> { AppResponse result = new AppResponse(invocation); if (t != null) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } return result; }); return new AsyncRpcResult(appResponseFuture, invocation); } catch (InvocationTargetException e) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e); } return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
雖然本節是講server端的超時控制的,但是很明顯這方便的講述也很少,原因是本來就沒打算在server端實現超時。我們要做的,也許只是驗證一下而已。
dubbo中的超時設定,可以在伺服器端、消費端,而且官方建議是設定在伺服器端,使用者端做特殊處理即可。原因是伺服器端更清楚介面的效能情況。這是完全理解的。但它會給人一種感覺,好像是真的伺服器端真的實現了超時處理。然而實際情況卻是,它僅將該引數傳導到使用者端,然後由使用者端來控制了。這倒是和直覺不太一樣,但是誰能說他的直覺就是對的呢。
其實要想實現真正的伺服器端超時,也是可以的。同樣,它也需要藉助一些額外的執行緒池。比如,在接收其實要想實現真正的伺服器端超時,也是可以的。同樣,它也需要藉助一些額外的執行緒池。比如,在接收完資料之後,需要新增到業務執行緒池中進行處理,此時在提交之前寫入一個開始時間,然後線上程池真正處理的時候,與當前時間運算,超時後就不再進行後續的計算邏輯,而是直接響應使用者端超時。這個思路很簡單,但至於為什麼沒有被實現,也許會有其他的考量,我們只討論區域性思路了。
注:本篇使用dubbo版本為 2.7.0