Dubbo原始碼閱讀分享系列文章,歡迎大家關注點贊
在介紹RPC核心介面的時候我們說過Protocol核心作用是將Invoker服務暴露出去以及參照服務將Invoker物件返回,因此我們就從Protocol開始說起。下圖是Protocol的整個繼承結構,從前面我們介紹的一些經驗來看,我們先來看一下AbstractProtocol這個抽象介面。 關於AbstractProtocol該介面沒有直接實現export和refer方法,該介面主要實現destroy方法以及提供一些公共欄位以及公共能力,首先我們看下核心欄位,核心欄位主要有三個exporterMap、serverMap以及invokers,exporterMap儲存服務集合,serverMap儲存ProtocolServer範例,invokers儲存參照服務的集合。
//儲存暴露除去的服務
protected final DelegateExporterMap exporterMap = new DelegateExporterMap();
//ProtocolServer所有範例
protected final Map<String, ProtocolServer> serverMap = new ConcurrentHashMap<>();
//服務參照的集合
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();
這裡和介紹一下exporterMap結構,exporterMap是一個Map結構,Key是通過ProtocolUtils.serviceKey方法構建的唯一key, Exporter也就是我們需要暴露除去服務。關於Key構建是可以理解為一個四層Map,第一層按照group分組,group就是URL中設定的內容,通常可以理解為機房、區域等等;剩下的層在GroupServiceKeyCache中,分別按照 serviceName、serviceVersion、port 進行分組,key最終的結構是serviceGroup/serviceName:serviceVersion:port
private String createServiceKey(String serviceName, String serviceVersion, int port) {
StringBuilder buf = new StringBuilder();
if (StringUtils.isNotEmpty(serviceGroup)) {
buf.append(serviceGroup).append('/');
}
buf.append(serviceName);
if (StringUtils.isNotEmpty(serviceVersion) && !"0.0.0".equals(serviceVersion) && !"*".equals(serviceVersion)) {
buf.append(':').append(serviceVersion);
}
buf.append(':').append(port);
return buf.toString();
}
serverMap儲存所有的ProtocolServer,也就是伺服器端,Key是host和port組成的字串,從URL中獲取,ProtocolServer就是對RemotingServer的簡單封裝,serverMap的填充發生在具體的實現。
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
//雙重鎖定
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
invokers主要用於儲存被參照的集合,
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
AbstractProtocol唯一實現的方法就是destory方法,首先會遍歷Invokers集合,銷燬全部的服務參照,然後遍歷全部的exporterMap集合,銷燬釋出出去的服務。
public void destroy() {
for (Invoker<?> invoker : invokers) {
if (invoker != null) {
//移除所有的參照
invokers.remove(invoker);
try {
if (logger.isInfoEnabled()) {
logger.info("Destroy reference: " + invoker.getUrl());
}
invoker.destroy();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
for (Map.Entry<String, Exporter<?>> item : exporterMap.getExporterMap().entrySet()) {
//銷燬釋出出去的服務
if (exporterMap.removeExportMap(item.getKey(), item.getValue())) {
try {
if (logger.isInfoEnabled()) {
logger.info("Unexport service: " + item.getValue().getInvoker().getUrl());
}
item.getValue().unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
}
再開始介紹DubboProtocol之前我們來聊下看原始碼的另外一個方式,該方式也就是通過單元測試,對於像Dubbo這種優秀的框架,自身的單元測試的覆蓋率是比較高的,此外在一些我們疑惑的地方,我們就可以使用單元測試來解決下疑惑,該種方式非常便捷,接下來我們會使用下該方法。 首先我們來看下export方法實現,該方法核心主要就是2個方法:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//建立Service key
String key = serviceKey(url);
//將invoker轉化為DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//記錄到exporterMap
exporterMap.addExportMap(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
//啟動ProtocolServer
openServer(url);
//序列化優化處理 該方法就是提前將被序列化的類載入到Dubbo中
optimizeSerialization(url);
return exporter;
}
DubboExporter該類會將invoker進行封裝,首先我們來看一下Exporter整體的繼承結構,如下圖: 在DubboExporter建立時候呼叫父類別AbstractExporter的建構函式,
public DubboExporter(Invoker<T> invoker, String key, DelegateExporterMap delegateExporterMap) {
super(invoker);
this.key = key;
this.delegateExporterMap = delegateExporterMap;
}
在AbstractExporter中存在兩個欄位invoker和unexported,unexported表示服務是否被銷燬,此外該類也對Exporter介面進行實現,在銷燬Invoker物件的時候會判斷服務的狀態,然後在呼叫destroy進行銷燬,afterUnExport方法會執行子類具體的實現,在DubboExporter是移除exporterMap中的快取的物件。
private final Invoker<T> invoker;
private volatile boolean unexported = false;
@Override
public Invoker<T> getInvoker() {
return invoker;
}
@Override
final public void unexport() {
if (unexported) {
return;
}
unexported = true;
getInvoker().destroy();
afterUnExport();
}
openServer方法是我們關鍵方法,該方法會將下層的Exchange、Transport層的方法進行呼叫,並最終建立NettyServer,此處我們也會使用偵錯的方式來搞清楚整個呼叫過程,openServer方法首先判斷是否是伺服器端,然後判斷服務是否建立,沒有則建立ProtocolServer,否則進行服務重置更新。createServer的時候通過Exchangers門面模式建立,最終封裝成為DubboProtocolServer。
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//判斷是否為伺服器端
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
//雙重鎖定
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
//ReadOnly請求是否阻塞等待
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
//心跳間隔
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
//Codec2擴充套件實現
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
//獲取伺服器端實現 預設是netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
//檢查伺服器端擴充套件實現是否支援
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
//通過Exchangers門面類建立ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
//檢測使用者端服務實現是否支援
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
//將ExchangeServer包裝為DubboProtocolServer
return new DubboProtocolServer(server);
}
在前面我們講過Transport的設計,對於Exchange是Transport的上層,也就是和Protocol進行互動的,今天我們就從這裡來分析Exchange以及Transport呼叫的整個過程,這樣大家就更加理解了Dubbo服務暴露的整個過程, 接下來呼叫鏈比較長,我們直接通過單元測試來梳理清楚整個呼叫鏈,我們先來檢視下export被呼叫的地方,如下圖,我們可以看到該方法被很多地方呼叫,應為我們是在DubboProtocol類下的方法,因此我們直接使用DubboProtocolTest類下的單元測試就可以。 DubboProtocolTest類下面有很多單測的方法如下圖,從名字我們我就可以看出和我們相關應該就是testDemoProtocol和testGetDubboProtocol,這兩個方法我們看斷言上面來說的話testGetDubboProtocol方法最符合我們的使用,因此我們使用該單元測試。
@Test
public void testGetDubboProtocol(){
DemoService service = new DemoServiceImpl();
int port = NetUtils.getAvailablePort();
protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("dubbo://127.0.0.1:" + port + "/" + DemoService.class.getName())));
Assertions.assertTrue(DubboProtocol.getDubboProtocol().getServers().size() > 0);
}
我們直接將斷點放到createServer方法內部,我們可以看到構建URL為,Transporter使用的NettyTransporter,編解碼器預設採用DubboCodec。 接下來我們斷點放入到Exchangers類的bind方法中,該類採用SPI載入Exchanger,通過偵錯我們可以發現,最終是採用的是HeaderExchanger, 在HeaderExchanger類中建立HeaderExchangeServer,HeaderExchangeServer該類會建立心跳檢測服務,伺服器端初始化核心的程式碼在Transporters中,getTransporter方法採用SPI的自適應拓展類,在執行時動態選擇NettyTransporter作為實現,
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
接下來我們看一下NettyTransporter類,在該類中直接建立NettyServer;
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
}
在NettyServer呼叫父類別的AbstractServer,這部分內容我們在通訊模組中已經講過,這裡我們就是要將這部分呼叫的串聯起來;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
在AbstractServer中,會呼叫NettyServer的doOpen方法,用來完成NettyServer的啟動;
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
//呼叫父類別
super(url, handler);
//從URL獲取本地地址
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
//繫結地址
bindAddress = new InetSocketAddress(bindIp, bindPort);
//連線數
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//建立該服務對應的執行緒池
executor = executorRepository.createExecutorIfAbsent(url);
}
NettyServer的啟動就是Netty的常規的使用,啟動過程中要注意下NettyServerHandler,關於該Handler作用就是當服務消費者呼叫服務提供者的服務時,提供者用來處理各個訊息事件,在整一套的呼叫鏈上會形成下圖的結構,關於這部分內容我們使用一個章節來詳細介紹一下,至此就完成整個伺服器端的啟動,最後就會包裝成為DubboProtocolServer。
protected void doOpen() throws Throwable {
//建立ServerBootstrap
bootstrap = new ServerBootstrap();
//建立boss EventLoopGroup
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
//建立worker EventLoopGroup
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
//建立一個Netty的ChannelHandler
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
//此處的Channel是Dubbo的Channel
channels = nettyServerHandler.getChannels();
//對談保持
boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, keepalive)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
//連線空閒超時時間
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
//建立Netty實現的decoder和encoder
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
//如果設定HTTPS 要實現SslHandler
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
//心跳檢查
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
//註冊nettyServerHandler
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
//等待繫結完成
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
歡迎大家點點關注,點點贊!