Dubbo-聊聊通訊模組設計

2022-11-07 12:01:35

前言

Dubbo原始碼閱讀分享系列文章,歡迎大家關注點贊

SPI實現部分

  1. Dubbo-SPI機制
  2. Dubbo-Adaptive實現原理
  3. Dubbo-Activate實現原理
  4. Dubbo SPI-Wrapper

註冊中心

  1. Dubbo-聊聊註冊中心的設計

通訊模組介紹

Dubbo通訊模組主要的目的就是解決使用者端以伺服器端通訊的問題,核心程式碼都在dubbo-remoting模組,該模組提供了多種使用者端和伺服器端通訊的功能。Dubbo的通訊主要包括是三部分:Exchange、Transport和Serialize,對於序列化部分的設計在單獨的模組中,我們再單獨聊,這篇文章主要聊Exchange、Transport設計。 image.png 對於Dubbo來說沒有自己的網路框架,使用現有第三方類庫,因此需要設計一套標準API來相容多種不同的通訊框架,dubbo-remoting 模組的結構就是目前Dubbo相容的所有的通訊框架。 image.png 在整體模組設計上,dubbo-remoting-api是其他模組上層抽象,其他子模組都是依賴第三方NIO庫實現 dubbo-remoting-api模組的。因此我們想要了解清楚dubbo-remoting設計必須要理解dubbo-remoting-api的設計。 image.png 對於dubbo-remoting-api大致可以分為四類,

  1. 核心API設計,主要是包括埠、編碼、解碼等等核心介面的抽象;
  2. buffer,主要是定義緩衝區相關的介面、抽象類以及實現類;
  3. exchange,抽Request和Response概念抽象以及擴充套件;
  4. transport,網路傳輸層的抽象,但它只負責訊息的傳輸;

原始碼分析

核心API設計

Endpoint

Endpoint被翻譯端點,這裡可以理解為通訊中對IP和Port的抽象,Client和Server端共同的抽象,兩個端通過Endpoint建立TCP連線,進行通訊。 image.png 對於該Endpoint介面定義了三類方法:

  1. get類方法,主要獲取Endpoint的本地地址、關聯的URL資訊以及底層Channel關聯的ChannelHandle,也就是獲取建立連線需要的屬性;
  2. send方法主要負責傳送資料;
  3. close類方法,主要是用來關閉連線;
Channel

Channel可以理解為Client和Server端連線的通道,是NIO框架設計中不可缺少的概念,Channel繼承Endpoint,因此擁有Endpoint的能力,對於Channel來說,可以給自身設計一些額外屬性。 image.png

ChannelHandler

ChannelHandler可以理解為Channel的處理器,ChannelHandler 可以處理Channel的連線建立以及連線斷開事件,還可以處理讀取到的資料、傳送的資料以及捕獲到的異常。 image.png

Codec2

Codec2實現編碼和解碼,實現位元組與訊息體之間的轉換,類似Netty中編碼和解碼。此外,Codec2介面被@SPI 介面修飾了,說明該介面是一個擴充套件介面,同時encode方法和 decode方法都被@Adaptive註解修飾,因此也會生成介面卡類,可以根據URL中的codec值確定具體的擴充套件實現類,這裡就體現SPI和URL靈活設定的特性。

@SPI
public interface Codec2 {

    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;


    enum DecodeResult {
        NEED_MORE_INPUT, SKIP_SOME_INPUT
    }

}

此外還存在DecodeResult的列舉,該列舉是處理粘包和拆包使用的。

Client

Client繼承了Endpoint、Channel等相關的介面,因此對於Client也具備收發訊息能力,Client只可以關聯一個 Channel。 image.png

RemotingServer

Server與Client不太一樣地方就是可以接收多個Client發起的Channel連線,因此RemotingServer介面中存在獲取多個Channel列表的介面。 image.png

Transporter

Transporter介面是Dubbo在Client和Server上又封裝的一層,我們可以看到改介面被@SPI以及@Adaptive註解修飾,因此這個是個可延伸的介面,預設使用Netty的擴充套件,@Adaptive表示可以動態生成該適配的類,根據設定的值確定具體實現的類。

@SPI("netty")
public interface Transporter {

    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

Transporter的實現類有主要有以下幾種,每個對應的具體的NIO的實現都在其各自的包中,這樣可以通過靈活設定來進行切換不同的實現。 image.png 為了驗證是否正確,我們簡單再來看一下RemotingServer的實現,RemotingServer的實現中,包含每個具體NIO框架的實現,因此這裡更加印證Transporter的的抽象,讓我們可以通過Dubbo SPI修改具體Transporter擴充套件實現,從而切換到不同的Client和 RemotingServer實現,從而達到NIO庫切換,這裡我們無需修改任何程式碼,真正的做到開放-閉合的原則。 image.png

Transporters

Transporters該類是一種門面模式的設計,主要是解決和多個不同子模組直接進行互動的問題,通過該類設計,將公共的行為Transporter物件的建立以及ChannelHandler的處理,大家可以直接依賴Transporters類,這部分呼叫是在Dubbo協定初始化時候發起的,這部分我們到時候在細講,這個章節暫時先不講解。 image.png 但是這裡需要在這個看一下關於ChannelHandler的處理,此處傳入了多個ChannelHandler,將多個ChannelHandler包裝成為ChannelHandlerDispatcher,ChannelHandlerDispatcher實現ChannelHandler,內部維護了一個 CopyOnWriteArraySet,對外提供操作ChannelHandler方法,此處主要是為了引出後續Handler的處理流程,後續一層處理模型的源頭都在這裡。 image.png image.png

到這裡我們大概對Dubbo的通訊模型有了一個輪廓,我們來進行一個簡單的總結,可以參考下圖: image.png

  1. 上層通過會Transporters獲取到具體的Transporter擴充套件實現,然後通過Transporter獲取Client和 RemotingServer實現;
  2. Client與RemotingServer都是通過Channel進行互動,Channel使用ChannelHandler進行資料傳輸,此外通過Codec2進行編解碼;
Buffer設計
image.png
image.png
介面設計

ChannelBuffer的設計類似於Netty的Buffer的設計,大致可以分為五類,對於具體的實現我們在後面AbstractChannelBuffer等實現類裡面進行講解。 image.png 接下來我們來看一下ChannelBufferFactory,該介面都是用來建立ChannelBuffer的,並且每個具體的實現都是單例的,可以理解為一個簡單工廠的設計,可以有不同型別的ChannelBuffer的實現。 image.png

AbstractChannelBuffer

AbstractChannelBuffer維護兩類索引,一類用於讀寫,另外一類用於讀寫標記; image.png 關於讀寫類索引就是記錄當前讀到什麼位置以及寫到什麼位置了,標記類索引就是為了做資料備份和回滾使用,為了對緩衝區重複利用。該類的方法都主要是利用四個屬性來操作,用來檢測是否有資料可讀或者還是否有空間可寫等方法,做一些前置條件的校驗以及索引的設定,具體的實現都是需要子類來實現。

    @Override
    public void readBytes(byte[] dst, int dstIndex, int length) {
        //檢查位置是否足夠
        checkReadableBytes(length);
        //此處可以理解為將readerIndex後移length個位元組讀取到dst陣列中
        //也就是陣列dst的dstIndex~dstIndex+length位置
        getBytes(readerIndex, dst, dstIndex, length);
        //readerIndex後移length個位元組
        readerIndex += length;
    }
    @Override
    public void readBytes(byte[] dst, int dstIndex, int length) {
        //檢查位置是否足夠
        checkReadableBytes(length);
        //此處可以理解為將readerIndex後移length個位元組讀取到dst陣列中
        //也就是陣列dst的dstIndex~dstIndex+length位置
        getBytes(readerIndex, dst, dstIndex, length);
        //readerIndex後移length個位元組
        readerIndex += length;
    }
    @Override
    public void writeBytes(byte[] src, int srcIndex, int length) {
        //將src陣列中srcIndex~srcIndex+length位置的資料寫到當前的buffer中
        setBytes(writerIndex, src, srcIndex, length);
        //將當前的writerIndex後移length
        writerIndex += length;
    }
HeapChannelBuffer

HeapChannelBuffer是ChannelBuffer的一種具體的實現,該類是基於位元組陣列的ChannelBuffer實現,通過byte[]陣列來進行資料的儲存,setBytes和getBytes通過System.arraycopy來進行對陣列的操作。

    //此緩衝區包裝的基礎堆位元組陣列
    protected final byte[] array;
    
    @Override
    public void getBytes(int index, byte[] dst, int dstIndex, int length) {
        System.arraycopy(array, index, dst, dstIndex, length);
    }

    @Override
    public void setBytes(int index, byte[] src, int srcIndex, int length) {
        System.arraycopy(src, srcIndex, array, index, length);
    }

對於HeapChannelBuffer的具體的工廠的實現是HeapChannelBufferFactory,該工廠是一個單例模式,HeapChannelBufferFactory通過ChannelBuffers工具類建立固定容量的HeapChannelBuffer,此外也可以通過拷貝的形式建立HeapChannelBuffer。

    @Override
    public ChannelBufferFactory factory() {
        return HeapChannelBufferFactory.getInstance();
    }

image.png image.png

DynamicChannelBuffer

DynamicChannelBuffer可以理解為一個擴充套件類,也就是對裝飾者模式,就是對ChannelBuffer的增加強,增加動態擴容的能力,關於該類預設的實現HeapChannelBufferFactory,我可以通過指定HeapChannelBufferFactory為對應的實現新增動態擴容的能力。

    //具體的ChannelBufferFactory的實現
    private final ChannelBufferFactory factory;

    //需要擴容的buffer
    private ChannelBuffer buffer;

    public DynamicChannelBuffer(int estimatedLength) {
        //預設實現
        this(estimatedLength, HeapChannelBufferFactory.getInstance());
    }

    //指定具體的實現
    public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) {
        if (estimatedLength < 0) {
            throw new IllegalArgumentException("estimatedLength: " + estimatedLength);
        }
        if (factory == null) {
            throw new NullPointerException("factory");
        }
        this.factory = factory;
        buffer = factory.getBuffer(estimatedLength);
    }

關於如何實現ChannelBuffer的動態擴容,看懂Java ArryList擴容的,我相信一定能理解,也就是我們要控制寫入時候的判斷寫入的空間是否足夠就可以了。DynamicChannelBuffer通過ensureWritableBytes方法來實現擴容,我們來看下他是如何做的:

    @Override
    public void ensureWritableBytes(int minWritableBytes) {
        //如果寫入位元組數小於等於可寫的位元組數
        if (minWritableBytes <= writableBytes()) {
            return;
        }
        //新增容量
        int newCapacity;
        //快取區位元組數為0
        if (capacity() == 0) {
            //設定為1
            newCapacity = 1;
        } else {
            //新增容量為緩衝區位元組數
            newCapacity = capacity();
        }
        //最小新增容量 = 當前寫入位元組數的索引+最小寫入的位元組數
        int minNewCapacity = writerIndex() + minWritableBytes;
        //如果新增容量小於最小新增容量
        while (newCapacity < minNewCapacity) {
            //新增容量左移1位,加倍
            newCapacity <<= 1;
        }
        //通過工廠類建立該容量
        ChannelBuffer newBuffer = factory().getBuffer(newCapacity);
        //從buffer中讀取資料到newBuffer中
        newBuffer.writeBytes(buffer, 0, writerIndex());
        //替換原來的快取區
        buffer = newBuffer;
    }
ByteBufferBackedChannelBuffer

ByteBufferBackedChannelBuffer該類是基於Java NIO的ByteBuffer實現的ChannelBuffer,都是通過操作ByteBuffer的API進行實現,這裡我們就不展開了。

    //NIO ByteBuffer
    private final ByteBuffer buffer;

    //初始化容量
    private final int capacity;

    public ByteBufferBackedChannelBuffer(ByteBuffer buffer) {
        if (buffer == null) {
            throw new NullPointerException("buffer");
        }

        this.buffer = buffer.slice();
        capacity = buffer.remaining();
        writerIndex(capacity);
    }

    public ByteBufferBackedChannelBuffer(ByteBufferBackedChannelBuffer buffer) {
        this.buffer = buffer.buffer;
        capacity = buffer.capacity;
        setIndex(buffer.readerIndex(), buffer.writerIndex());
    }
ChannelBufferInputStream

image.png ChannelBufferInputStream該類實現InputStream輸入流的的方法,內部維護了ChannelBuffer、startIndex以及endIndex,該方法內部都是讀取ChannelBuffer中的資料,startIndex和endIndex控制讀取資料位置,這樣就完成 InputStream的擴充套件實現。

    //ChannelBuffer
    private final ChannelBuffer buffer;
    //開始位置
    private final int startIndex;
    //結束位置
    private final int endIndex;

    @Override
    public int read() throws IOException {
        if (!buffer.readable()) {
            return -1;
        }
        return buffer.readByte() & 0xff;
    }
ChannelBufferOutputStream

image.png ChannelBufferOutputStream該類實現OutputStream輸出流,內部維護了ChannelBuffer、startIndex,該方法內部都是寫入到ChannelBuffer中,startIndex是標記開始寫入位置。 Buffer的整體的設計到此就介紹完成,通過ChannelBufferOutputStream、ChannelBufferInputStream控制資料的輸入輸出,內部通過ChannelBuffer儲存資料,ChannelBuffer可以根據需要進行不同的實現。

Transport設計

Transport在核心API中介紹上層存取都是通過該介面存取的,接下來我們就來探祕下Transport層都做了哪些事情。

AbstractPeer

image.png AbstractPeer該抽象類可以理解為伺服器概念,繼承了Endpoint、ChannelHandler介面,內部有四個核心的屬性,URL代表自身服務的地址,closing、closed表示當前伺服器狀態,handler就是ChannelHandler,AbstractPeer內部實現了都是委託給ChannelHandler,這是一種典型的裝飾器設計模式。

    //ChannelHandler
    private final ChannelHandler handler;

    //自身地址
    private volatile URL url;

    //伺服器狀態
    private volatile boolean closing;

    private volatile boolean closed;

    public AbstractPeer(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }
AbstractEndpoint

image.png AbstractEndpoint繼承AbstractPeer,可以理解為埠的抽象,內部增加Codec2和connectTimeout兩個屬性,在AbstractEndpoint在初始化的時候會將這兩個欄位初始化。

    private Codec2 codec;

    private int connectTimeout;

    public AbstractEndpoint(URL url, ChannelHandler handler) {
        //呼叫父類別
        super(url, handler);
        //根據URL中的codec引數值,確定此處具體的Codec2實現類
        this.codec = getChannelCodec(url);
        //設定connectTimeout
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }
    protected static Codec2 getChannelCodec(URL url) {
        //獲取URL協定
        String codecName = url.getProtocol();
        //判斷有沒有該擴充套件名
        if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
            //通過ExtensionLoader載入具體實現類
            return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
        } else {
            //沒有匹配到從擴充套件類進行載入
            return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
                    .getExtension(codecName))
;
        }
    }

此外該介面實現Resetable介面,該介面內部只有一個reset方法,該方法通過獲取URL引數資訊,重置了connectTimeout的資訊以及Codec2的資訊。

AbstractServer

image.png AbstractServer是對伺服器端的抽象,該抽象類實現AbstractEndpoint和RemotingServer,該抽象類內部有五個核心屬性,localAddress、bindAddress這兩個屬性都是在URL引數中獲取,表示Server原生的地址以及繫結的地址,預設兩個值是一致的,accepts表示是Server最大的連線次數,預設是0,表述沒有限制,executorRepository、executor執行緒池相關的屬性,executorRepository負責管理執行緒池,executor表示當前服務管理的執行緒池。

    //當前服務關聯的執行緒池
    ExecutorService executor;
    //本機地址
    private InetSocketAddress localAddress;
    //繫結地址
    private InetSocketAddress bindAddress;
    //最大連線數
    private int accepts;
    //管理執行緒池
    private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

AbstractServer初始化也就是在建構函式中完成初始化的,然後通過呼叫其抽象方法doOpen實現啟動伺服器。

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        //呼叫父類別
        super(url, handler);
        //從URL獲取本地地址
        localAddress = getUrl().toInetSocketAddress();

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        //繫結地址
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        //連線數
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        try {
            //呼叫該抽象方法啟動服務
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null"Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //建立該服務對應的執行緒池
        executor = executorRepository.createExecutorIfAbsent(url);
    }
AbstractClient

AbstractClient是對使用者端的抽象,同樣它的繼承和AbstractServer也一樣,只是在實現不同而已,接下來我們來看看AbstractClient的實現,該類內部有4個關鍵的欄位,對於executor和executorRepository這兩個欄位與AbstractServer功能類似,這裡重點來介紹connectLock和needReconnect,connectLock是當用戶端進行連線、斷開、重連等操作時,需要獲取該鎖進行同步操作,needReconnect 在使用者端傳送資料之前,會檢查使用者端的連線是否斷開,如果斷開了,則會根據needReconnect欄位,決定是否重連。 image.png AbstractClient整體的初始化是在建構函式實現的,我們可以看到AbstractClient 定義了 doOpen、doClose、doConnect和doDisConnect四個抽象方法給子類實現,整體的設計與AbstractServer類似。

    public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        //呼叫父類別構造方法
        super(url, handler);
        //從URL獲取是否重連欄位 預設是
        needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
        //初始化Executor
        initExecutor(url);

        try {
            //初始化具體的底層實現client
            doOpen();
        } catch (Throwable t) {
            //關閉
            close();
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }

        try {
            //建立連線
            connect();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
            }
        } catch (RemotingException t) {
            if (url.getParameter(Constants.CHECK_KEY, true)) {
                close();
                throw t;
            } else {
                logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
            }
        } catch (Throwable t) {
            close();
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }
    }
AbstractChannel

image.png AbstractChannel的設計也是類似模板類的設計,對於不同的NIO框架來說有不同的Channel的實現,因此對於Dubbo來說也必須去抽象該實現,具體的不同交由子類進行實現,子類做對映。該類內部只有有一個Send方法,為了判斷當前的連線是否還在,沒有實現具體的傳送訊息。

Netty4
NettyTransporter

NettyTransporter實現Transporter,當SPI機制觸發的時候會自動載入實現NettyServer、NettyClient初始化建立。 image.png

NettyServerimage.png

接下來我們來看下Netty4中關於doOpen方法的實現,此處就是Netty Server啟動的核心,也是Dubbo網路通訊的伺服器端能力的提供者,就是Dubbo和Netty結合的核心。

    protected void doOpen() throws Throwable {
        //建立ServerBootstrap
        bootstrap = new ServerBootstrap();

        //建立boss EventLoopGroup
        bossGroup = NettyEventLoopFactory.eventLoopGroup(1"NettyServerBoss");
        //建立worker EventLoopGroup
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");
        //建立一個Netty的ChannelHandler
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        //此處的Channel是Dubbo的Channel
        channels = nettyServerHandler.getChannels();
        //對談保持
        boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);

        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        //連線空閒超時時間
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        //建立Netty實現的decoder和encoder
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                            //如果設定HTTPS 要實現SslHandler
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                //心跳檢查
                                .addLast("server-idle-handler"new IdleStateHandler(00, idleTimeout, MILLISECONDS))
                                //註冊nettyServerHandler
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        //等待繫結完成
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

此處與Netty啟動不同的地方在於替換了Channel的實現為Dubbo實現,然後通過doOpen完成Server啟動,大家也可以藉助下圖來進行理解: image.png

NettyCodecAdapter

NettyCodecAdapter該類是對編解碼的實現,主要是將Netty規則替換為為Dubbo的規則,該類內部有5個核心的屬性,其中encoder和decoder是NettyCodecAdapter內部類,

    //Netty Channel 編碼
    private final ChannelHandler encoder = new InternalEncoder();

    //Netty Channel 解碼
    private final ChannelHandler decoder = new InternalDecoder();

    //Dubbo 的編解碼
    private final Codec2 codec;

    //URL引數
    private final URL url;

    //Dubbo ChannelHandler
    private final org.apache.dubbo.remoting.ChannelHandler handler;

encoder和decoder是對Netty中的ByteToMessageDecoder和MessageToByteEncoder的實現,也正是此處的實現將真正的編碼委託給Codec2進行實現,

    private class InternalEncoder extends MessageToByteEncoder {

        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            //Netty對ChannelBuffer的實現 操作位元組陣列
            //將Netty  ByteBuf 包裝為 Dubbo  ChannelBuffer
            ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
            //獲取關聯的Channel
            Channel ch = ctx.channel();
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            //codec進行編碼
            codec.encode(channel, buffer, msg);
        }
    }

    private class InternalDecoder extends ByteToMessageDecoder {

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
            //將Netty  ByteBuf 包裝為 Dubbo  ChannelBuffer
            ChannelBuffer message = new NettyBackedChannelBuffer(input);
            //獲取關聯的Channel
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

            // decode object.
            do {
                //記錄當前讀到的位置
                int saveReaderIndex = message.readerIndex();
                //codec進行解碼
                Object msg = codec.decode(channel, message);
                //判斷訊息長度是否足夠
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    //重置讀取的位置
                    message.readerIndex(saveReaderIndex);
                    break;
                } else {
                    //邊界值判斷
                    if (saveReaderIndex == message.readerIndex()) {
                        throw new IOException("Decode without read data.");
                    }
                    //將訊息傳遞給其他Handler
                    if (msg != null) {
                        out.add(msg);
                    }
                }
            } while (message.readable());
        }
    }
NettyServerHandler

NettyServerHandler該類繼承了Netty的ChannelDuplexHandler,該類具備同時處理Inbound和Outbound的能力,我們來看下整體的繼承結構,整體的繼承結構確實也是一樣的。 image.png 該類內部主要有3個核心欄位,這裡相對比較重要的是channels和handler, image.png channels欄位快取當前所有Server建立的Channel,所有的建立、斷開連線的時候都會操作channels該物件,handler在內部所有的實現都是通過Dubbo ChannelHandler,這樣就完成對Netty的替換;程式碼如下:

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        if (channel != null) {
            //新建的連結 增加快取
            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
        }
        //使用Dubbo ChannelHandler建立連線
        handler.connected(channel);

        if (logger.isInfoEnabled()) {
            logger.info("The connection of " + channel.getRemoteAddress() + " -> " + channel.getLocalAddress() + " is established.");
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            //關閉連線 移除快取
            channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()));
            //關閉釋放Dubbo ChannelHandler
            handler.disconnected(channel);
        } finally {
            //NettyChannel也同時移除
            NettyChannel.removeChannel(ctx.channel());
        }

        if (logger.isInfoEnabled()) {
            logger.info("The connection of " + channel.getRemoteAddress() + " -> " + channel.getLocalAddress() + " is disconnected.");
        }
    }

在NettyServer建立的時候,有下圖程式碼,這裡的this指的就是NettyServer,在NettyServerHandler裡面第二個引數是ChannelHandler,同時NettyServer又繼承了實現ChannelHandler的AbstractPeer,因此NettyServerHandler在建立的時候就會將所有資料委託給ChannelHandler進行處理,此處體現多型的魅力。 image.png image.png 到此相信你也對Netty Server以及Dubbo Server設計有了一個深入的瞭解,可以參考下圖,上層是對Client、Channel等能力的抽象,這些抽象能力抽象介面實現,這樣子該抽象方法子類又可以有不同的實現,這樣子就完成上層能力的建設,下層又可以根據自身特點完成自己編解碼以及服務的實現,做到了靈活多變,自由擴充套件。 image.png

NettyClient

image.png NettyClient實現與NettyServer類似,都是初始化自身服務,這裡我們來看下實現;

  @Override
    protected void doOpen() throws Throwable {
        //建立NettyClientHandler 做法與Server類似
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(EVENT_LOOP_GROUP)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(socketChannelClass());
        //設定超時時間
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                //設定心跳的間隔
                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());

                if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                    ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
                }

                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        //解密編碼
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        //設定心跳
                        .addLast("client-idle-handler"new IdleStateHandler(heartbeatInterval, 00, MILLISECONDS))
                        //註冊nettyClientHandler
                        .addLast("handler", nettyClientHandler);
                //如果需要Socks5Proxy,需要新增Socks5ProxyHandler(略
                String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
                if(socksProxyHost != null) {
                    int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                    Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                    ch.pipeline().addFirst(socks5ProxyHandler);
                }
            }
        });
    }

形成通訊的通道的圖也是類似: image.png 對於NettyClientHandler實現整體上與NettyServerHandler的設計思路類似,這裡就不進行介紹了, image.png

NettyChannel

NettyChannel是對AbstractChannel一種實現,有四個欄位,

    //快取Netty Channel 和 Dubbo Channel的對應關係
    private static final ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP = new ConcurrentHashMap<Channel, NettyChannel>();
    //Netty Channel
    private final Channel channel;
    //Channnel附加的屬性快取
    private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
    //標識當前channel是否可用
    private final AtomicBoolean active = new AtomicBoolean(false);
    //炒作Channel也會操作快取的內容
    static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }
        NettyChannel ret = CHANNEL_MAP.get(ch);
        if (ret == null) {
            NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
            if (ch.isActive()) {
                nettyChannel.markActive(true);
                ret = CHANNEL_MAP.putIfAbsent(ch, nettyChannel);
            }
            if (ret == null) {
                ret = nettyChannel;
            }
        }
        return ret;
    }

    /**
     * Remove the inactive channel.
     *
     * @param ch netty channel
     */

    static void removeChannelIfDisconnected(Channel ch) {
        if (ch != null && !ch.isActive()) {
            NettyChannel nettyChannel = CHANNEL_MAP.remove(ch);
            if (nettyChannel != null) {
                nettyChannel.markActive(false);
            }
        }
    }

接下來就是核心send方法的實現,此處會關聯Netty Channel,將資料傳送出去,此處就是子類具體的實現。

    public void send(Object message, boolean sent) throws RemotingException {
        //呼叫父類別 判斷連線是否可用
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            //netty channel 傳送資料
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                //等待傳送結束
                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            //判斷是否異常
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            //異常斷開netty連線 移除快取關係
            removeChannelIfDisconnected(channel);
            throw new RemotingException(this"Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
        if (!success) {
            throw new RemotingException(this"Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

未完待續

歡迎大家點點關注,點點贊!