Netty 學習(十):ChannelPipeline原始碼說明

2022-10-11 21:12:20

Netty 學習(十):ChannelPipeline原始碼說明

作者: Grey

原文地址:

部落格園:Netty 學習(十):ChannelPipeline原始碼說明

CSDN:Netty 學習(十):ChannelPipeline原始碼說明

ChannelPipeline可以看作一條流水線,原料(位元組流)進來,經過加工,形成一個個Java物件,然後基於這些物件進行處理,最後輸出二進位制位元組流。

ChannelPipeline 在建立 NioSocketChannel 的時候建立, 其預設實現是 DefaultChannelPipeline

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

ChannelPipeline 中儲存了 Channel 的參照,且其中每個節點都是一個 ChannelHandlerContext 物件。每個 ChannelHandlerContext 節點都儲存了執行器(即:ChannelHandler)。

ChannelPipeline裡有兩種不同的節點,一種是 ChannelInboundHandler,處理 inbound 事件(例如:讀取資料流),還有一種是 ChannelOutboundHandler,處理 Outbound 事件,比如呼叫writeAndFlush()類方法時,就會呼叫該 handler。

新增 handler 的邏輯如下

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 檢查是否有重複的 handler
            checkMultiplicity(handler);
            // 建立 節點
            newCtx = newContext(group, filterName(name, handler), handler);
            // 新增節點
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        // 回撥使用者方法
        callHandlerAdded0(newCtx);
        return this;
    }

如上程式碼,整個新增過程見註釋說明,其實主要就是四步:

第一步:檢查是否有重複的 handler,核心邏輯見

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                // 非共用的且新增過,就丟擲異常,反之,如果一個 handler 支援共用,就可以無限次被新增到 ChannelPipeline 中
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

第二步:建立節點,即把 handler 包裹成 ChannelHandlerContext,核心邏輯如下

    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
            new FastThreadLocal<Map<Class<?>, String>>() {
        @Override
        protected Map<Class<?>, String> initialValue() {
            return new WeakHashMap<Class<?>, String>();
        }
    };
    private String generateName(ChannelHandler handler) {
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }

        // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
        // any name conflicts.  Note that we don't cache the names generated here.
        if (context0(name) != null) {
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }

注:Netty 使用 FastThreadLocal 變數來快取 Handler 的類和名稱的對映關係,在生成 name 的時候,首先看快取中有沒有生成過預設 name,如果沒有生成,就呼叫generateName0()生成預設名稱,加入快取。

第三步:把 ChannelHandlerContext 作為節點新增到 pipeline 中,核心程式碼如下

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

其本質就是一個雙向連結串列的插入節點過程,而且,ChannelPipeline 刪除 ChannelHandler 的方法,本質就是把這個雙向連結串列的某個節點刪掉!

第四步:回撥使用者方法,核心程式碼如下

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.callHandlerAdded();
        } catch (Throwable t) {
            boolean removed = false;
            try {
                atomicRemoveFromHandlerList(ctx);
                ctx.callHandlerRemoved();
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }
    final void callHandlerRemoved() throws Exception {
        try {
            // Only call handlerRemoved(...) if we called handlerAdded(...) before.
            if (handlerState == ADD_COMPLETE) {
                handler().handlerRemoved(this);
            }
        } finally {
            // Mark the handler as removed in any case.
            setRemoved();
        }
    }

其中ctx.callHandlerAdded();就是回撥使用者的handlerAdded方法,然後通過 CAS 方式修改節點的狀態為 REMOVE_COMPLETE (說明該節點已經被移除),或者 ADD_COMPLETE (新增完成)。

完整程式碼見:hello-netty

本文所有圖例見:processon: Netty學習筆記

更多內容見:Netty專欄

參考資料

跟閃電俠學 Netty:Netty 即時聊天實戰與底層原理

深度解析Netty原始碼