Dubbo-服務暴露

2022-12-07 09:00:28

前言

Dubbo原始碼閱讀分享系列文章,歡迎大家關注點贊

SPI實現部分

  1. Dubbo-SPI機制
  2. Dubbo-Adaptive實現原理
  3. Dubbo-Activate實現原理
  4. Dubbo SPI-Wrapper

註冊中心

  1. Dubbo-聊聊註冊中心的設計
  2. Dubbo-時間輪設計

通訊

  1. Dubbo-聊聊通訊模組設計

RPC

  1. 聊聊Dubbo協定

AbstractProtocol

在介紹RPC核心介面的時候我們說過Protocol核心作用是將Invoker服務暴露出去以及參照服務將Invoker物件返回,因此我們就從Protocol開始說起。下圖是Protocol的整個繼承結構,從前面我們介紹的一些經驗來看,我們先來看一下AbstractProtocol這個抽象介面。 image.png 關於AbstractProtocol該介面沒有直接實現export和refer方法,該介面主要實現destroy方法以及提供一些公共欄位以及公共能力,首先我們看下核心欄位,核心欄位主要有三個exporterMap、serverMap以及invokers,exporterMap儲存服務集合,serverMap儲存ProtocolServer範例,invokers儲存參照服務的集合。

//儲存暴露除去的服務
protected final DelegateExporterMap exporterMap = new DelegateExporterMap();

//ProtocolServer所有範例
protected final Map<String, ProtocolServer> serverMap = new ConcurrentHashMap<>();

//服務參照的集合
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();

這裡和介紹一下exporterMap結構,exporterMap是一個Map結構,Key是通過ProtocolUtils.serviceKey方法構建的唯一key, Exporter也就是我們需要暴露除去服務。關於Key構建是可以理解為一個四層Map,第一層按照group分組,group就是URL中設定的內容,通常可以理解為機房、區域等等;剩下的層在GroupServiceKeyCache中,分別按照 serviceName、serviceVersion、port 進行分組,key最終的結構是serviceGroup/serviceName:serviceVersion:port image.png

private String createServiceKey(String serviceName, String serviceVersion, int port) {
  StringBuilder buf = new StringBuilder();
  if (StringUtils.isNotEmpty(serviceGroup)) {
    buf.append(serviceGroup).append('/');
  }

  buf.append(serviceName);
  if (StringUtils.isNotEmpty(serviceVersion) && !"0.0.0".equals(serviceVersion) && !"*".equals(serviceVersion)) {
    buf.append(':').append(serviceVersion);
  }
  buf.append(':').append(port);
  return buf.toString();
}

serverMap儲存所有的ProtocolServer,也就是伺服器端,Key是host和port組成的字串,從URL中獲取,ProtocolServer就是對RemotingServer的簡單封裝,serverMap的填充發生在具體的實現。

private void openServer(URL url) {
  // find server.
  String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
  ProtocolServer server = serverMap.get(key);
  //雙重鎖定
  if (server == null) {
    synchronized (this) {
      server = serverMap.get(key);
      if (server == null) {
        serverMap.put(key, createServer(url));
      }
    }
  } else {
    // server supports reset, use together with override
    server.reset(url);
  }
}
}

invokers主要用於儲存被參照的集合,

public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
  optimizeSerialization(url);

// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);

return invoker;
}

AbstractProtocol唯一實現的方法就是destory方法,首先會遍歷Invokers集合,銷燬全部的服務參照,然後遍歷全部的exporterMap集合,銷燬釋出出去的服務。

public void destroy() {
  for (Invoker<?> invoker : invokers) {
    if (invoker != null) {
      //移除所有的參照
      invokers.remove(invoker);
      try {
        if (logger.isInfoEnabled()) {
          logger.info("Destroy reference: " + invoker.getUrl());
        }
        invoker.destroy();
      } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
      }
    }
  }
  for (Map.Entry<String, Exporter<?>> item : exporterMap.getExporterMap().entrySet()) {
    //銷燬釋出出去的服務
    if (exporterMap.removeExportMap(item.getKey(), item.getValue())) {
      try {
        if (logger.isInfoEnabled()) {
          logger.info("Unexport service: " + item.getValue().getInvoker().getUrl());
        }
        item.getValue().unexport();
      } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
      }
    }
  }
}

DubboProtocol

再開始介紹DubboProtocol之前我們來聊下看原始碼的另外一個方式,該方式也就是通過單元測試,對於像Dubbo這種優秀的框架,自身的單元測試的覆蓋率是比較高的,此外在一些我們疑惑的地方,我們就可以使用單元測試來解決下疑惑,該種方式非常便捷,接下來我們會使用下該方法。 首先我們來看下export方法實現,該方法核心主要就是2個方法:

  1. 將invoker轉化為DubboExporter,放入exporterMap快取;
  2. 啟動ProtocolServer;
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  URL url = invoker.getUrl();

  //建立Service key
  String key = serviceKey(url);
  //將invoker轉化為DubboExporter
  DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
  //記錄到exporterMap
  exporterMap.addExportMap(key, exporter);

  //export an stub service for dispatching event
  Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
  Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
  if (isStubSupportEvent && !isCallbackservice) {
    String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
    if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
      if (logger.isWarnEnabled()) {
        logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                                              "], has set stubproxy support event ,but no stub methods founded."));
      }

    }
  }

  //啟動ProtocolServer
  openServer(url);
  //序列化優化處理  該方法就是提前將被序列化的類載入到Dubbo中
  optimizeSerialization(url);

  return exporter;
}

DubboExporter

DubboExporter該類會將invoker進行封裝,首先我們來看一下Exporter整體的繼承結構,如下圖: image.png 在DubboExporter建立時候呼叫父類別AbstractExporter的建構函式,

public DubboExporter(Invoker<T> invoker, String key, DelegateExporterMap delegateExporterMap) {
  super(invoker);
  this.key = key;
  this.delegateExporterMap = delegateExporterMap;
}

在AbstractExporter中存在兩個欄位invoker和unexported,unexported表示服務是否被銷燬,此外該類也對Exporter介面進行實現,在銷燬Invoker物件的時候會判斷服務的狀態,然後在呼叫destroy進行銷燬,afterUnExport方法會執行子類具體的實現,在DubboExporter是移除exporterMap中的快取的物件。

private final Invoker<T> invoker;

private volatile boolean unexported = false;

@Override
public Invoker<T> getInvoker() {
  return invoker;
}

@Override
final public void unexport() {
  if (unexported) {
    return;
  }
  unexported = true;
  getInvoker().destroy();
  afterUnExport();
}

伺服器端初始化

openServer方法是我們關鍵方法,該方法會將下層的Exchange、Transport層的方法進行呼叫,並最終建立NettyServer,此處我們也會使用偵錯的方式來搞清楚整個呼叫過程,openServer方法首先判斷是否是伺服器端,然後判斷服務是否建立,沒有則建立ProtocolServer,否則進行服務重置更新。createServer的時候通過Exchangers門面模式建立,最終封裝成為DubboProtocolServer。

private void openServer(URL url) {
  // find server.
  String key = url.getAddress();
//判斷是否為伺服器端
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
  ProtocolServer server = serverMap.get(key);
  //雙重鎖定
  if (server == null) {
    synchronized (this) {
      server = serverMap.get(key);
      if (server == null) {
        serverMap.put(key, createServer(url));
      }
    }
  } else {
    // server supports reset, use together with override
    server.reset(url);
  }
}
}

private ProtocolServer createServer(URL url) {
  url = URLBuilder.from(url)
    //ReadOnly請求是否阻塞等待
    .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
    //心跳間隔
    .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
    //Codec2擴充套件實現
    .addParameter(CODEC_KEY, DubboCodec.NAME)
    .build();
  //獲取伺服器端實現  預設是netty
  String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
  //檢查伺服器端擴充套件實現是否支援
  if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
    throw new RpcException("Unsupported server type: " + str + ", url: " + url);
  }

  ExchangeServer server;
  try {
    //通過Exchangers門面類建立ExchangeServer
    server = Exchangers.bind(url, requestHandler);
  } catch (RemotingException e) {
    throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
  }
  //檢測使用者端服務實現是否支援
  str = url.getParameter(CLIENT_KEY);
  if (str != null && str.length() > 0) {
    Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
    if (!supportedTypes.contains(str)) {
      throw new RpcException("Unsupported client type: " + str);
    }
  }
  //將ExchangeServer包裝為DubboProtocolServer
  return new DubboProtocolServer(server);
}

在前面我們講過Transport的設計,對於Exchange是Transport的上層,也就是和Protocol進行互動的,今天我們就從這裡來分析Exchange以及Transport呼叫的整個過程,這樣大家就更加理解了Dubbo服務暴露的整個過程, image.png 接下來呼叫鏈比較長,我們直接通過單元測試來梳理清楚整個呼叫鏈,我們先來檢視下export被呼叫的地方,如下圖,我們可以看到該方法被很多地方呼叫,應為我們是在DubboProtocol類下的方法,因此我們直接使用DubboProtocolTest類下的單元測試就可以。 image.png DubboProtocolTest類下面有很多單測的方法如下圖,從名字我們我就可以看出和我們相關應該就是testDemoProtocol和testGetDubboProtocol,這兩個方法我們看斷言上面來說的話testGetDubboProtocol方法最符合我們的使用,因此我們使用該單元測試。 image.png

@Test
public void testGetDubboProtocol(){
  DemoService service = new DemoServiceImpl();
  int port = NetUtils.getAvailablePort();
  protocol.export(proxy.getInvoker(service, DemoService.classURL.valueOf("dubbo://127.0.0.1:" + port + "/" + DemoService.class.getName())));
                                   Assertions.assertTrue(DubboProtocol.getDubboProtocol().getServers().size() > 0);
}

我們直接將斷點放到createServer方法內部,我們可以看到構建URL為,Transporter使用的NettyTransporter,編解碼器預設採用DubboCodec。 image.png 接下來我們斷點放入到Exchangers類的bind方法中,該類採用SPI載入Exchanger,通過偵錯我們可以發現,最終是採用的是HeaderExchanger, image.png 在HeaderExchanger類中建立HeaderExchangeServer,HeaderExchangeServer該類會建立心跳檢測服務,伺服器端初始化核心的程式碼在Transporters中,getTransporter方法採用SPI的自適應拓展類,在執行時動態選擇NettyTransporter作為實現,

public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
  if (url == null) {
    throw new IllegalArgumentException("url == null");
  }
  if (handlers == null || handlers.length == 0) {
    throw new IllegalArgumentException("handlers == null");
  }
  ChannelHandler handler;
  if (handlers.length == 1) {
    handler = handlers[0];
  } else {
    handler = new ChannelHandlerDispatcher(handlers);
  }
  return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() {
  return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

接下來我們看一下NettyTransporter類,在該類中直接建立NettyServer;

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }

}

在NettyServer呼叫父類別的AbstractServer,這部分內容我們在通訊模組中已經講過,這裡我們就是要將這部分呼叫的串聯起來;

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
  // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
  // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
  super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}

在AbstractServer中,會呼叫NettyServer的doOpen方法,用來完成NettyServer的啟動;

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
  //呼叫父類別
  super(url, handler);
  //從URL獲取本地地址
  localAddress = getUrl().toInetSocketAddress();

  String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
  int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
  if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
    bindIp = ANYHOST_VALUE;
  }
  //繫結地址
  bindAddress = new InetSocketAddress(bindIp, bindPort);
  //連線數
  this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
  try {
    doOpen();
    if (logger.isInfoEnabled()) {
      logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
    }
  } catch (Throwable t) {
    throw new RemotingException(url.toInetSocketAddress(), null"Failed to bind " + getClass().getSimpleName()
                                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
  }
  //建立該服務對應的執行緒池
  executor = executorRepository.createExecutorIfAbsent(url);
}

NettyServer的啟動就是Netty的常規的使用,啟動過程中要注意下NettyServerHandler,關於該Handler作用就是當服務消費者呼叫服務提供者的服務時,提供者用來處理各個訊息事件,在整一套的呼叫鏈上會形成下圖的結構,關於這部分內容我們使用一個章節來詳細介紹一下,至此就完成整個伺服器端的啟動,最後就會包裝成為DubboProtocolServer。

protected void doOpen() throws Throwable {
  //建立ServerBootstrap
  bootstrap = new ServerBootstrap();

//建立boss EventLoopGroup
bossGroup = NettyEventLoopFactory.eventLoopGroup(1"NettyServerBoss");
//建立worker EventLoopGroup
workerGroup = NettyEventLoopFactory.eventLoopGroup(
  getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
  "NettyServerWorker");
//建立一個Netty的ChannelHandler
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
//此處的Channel是Dubbo的Channel
channels = nettyServerHandler.getChannels();
//對談保持
boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);

bootstrap.group(bossGroup, workerGroup)
  .channel(NettyEventLoopFactory.serverSocketChannelClass())
  .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
  .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
  .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
  .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
  .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
      // FIXME: should we use getTimeout()?
      //連線空閒超時時間
      int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
      //建立Netty實現的decoder和encoder
      NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
      if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
        //如果設定HTTPS 要實現SslHandler
        ch.pipeline().addLast("negotiation",
                              SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
      }
      ch.pipeline()
        .addLast("decoder", adapter.getDecoder())
        .addLast("encoder", adapter.getEncoder())
        //心跳檢查
        .addLast("server-idle-handler"new IdleStateHandler(00, idleTimeout, MILLISECONDS))
        //註冊nettyServerHandler
        .addLast("handler", nettyServerHandler);
    }
  });
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
//等待繫結完成
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();

}
image.png
image.png

結束

歡迎大家點點關注,點點贊!