Dubbo原始碼閱讀分享系列文章,歡迎大家關注點贊
Dubbo的RPC其實是對Protocol的封裝,整體的結構與Remoting類似,dubbo-rpc-api是對具體協定、服務暴露、服務參照、代理等的抽象,是整個RPC中的核心,其他模組是對該層具體的實現,每個模組都是Dubbo具體支援的協定。
dubbo-rpc-api整體模組如圖所示,整體介面包括了filter、listener、protocol、proxy、support以及核心API,接下來我們先來看下核心的介面介紹。
開始之前我們來先來回顧一下之前介紹RPC請求的過程,
這裡為什麼要回顧整個過程,這樣後面介紹抽象的介面的時候大家會更更容易理解為什麼這麼抽象。
Invoker介面內部有三個方法,分別是getInterface、invoke、destroyAll,getInterface該方法主要是獲取服務介面相關的資訊,invoke主要是發起一次呼叫以及相應資訊,destroyAll主要用於銷燬呼叫請求。
public interface Invoker<T> extends Node {
//獲取服務介面
Class<T> getInterface();
//發起呼叫
Result invoke(Invocation invocation) throws RpcException;
//銷燬呼叫連線
default void destroyAll() {
destroy();
}
}
Invocation是invoke的引數,內部抽象了RPC呼叫的目標服務、方法資訊、相關引數資訊、具體的引數值以及一些附加資訊。
public interface Invocation {
//呼叫Service的唯一標識
String getTargetServiceUniqueName();
String getProtocolServiceKey();
//呼叫的方法名稱
String getMethodName();
//服務名稱
String getServiceName();
//引數型別集合
Class<?>[] getParameterTypes();
//引數簽名集合
default String[] getCompatibleParamSignatures() {
return Stream.of(getParameterTypes())
.map(Class::getName)
.toArray(String[]::new);
}
//呼叫具體的引數值
Object[] getArguments();
//呼叫關聯的Invoker物件
Map<String, String> getAttachments();
@Experimental("Experiment api for supporting Object transmission")
Map<String, Object> getObjectAttachments();
void setAttachment(String key, String value);
@Experimental("Experiment api for supporting Object transmission")
void setAttachment(String key, Object value);
@Experimental("Experiment api for supporting Object transmission")
void setObjectAttachment(String key, Object value);
void setAttachmentIfAbsent(String key, String value);
@Experimental("Experiment api for supporting Object transmission")
void setAttachmentIfAbsent(String key, Object value);
@Experimental("Experiment api for supporting Object transmission")
void setObjectAttachmentIfAbsent(String key, Object value);
/**
* get attachment by key.
*
* @return attachment value.
* @serial
*/
String getAttachment(String key);
@Experimental("Experiment api for supporting Object transmission")
Object getObjectAttachment(String key);
/**
* get attachment by key with default value.
*
* @return attachment value.
* @serial
*/
String getAttachment(String key, String defaultValue);
@Experimental("Experiment api for supporting Object transmission")
Object getObjectAttachment(String key, Object defaultValue);
/**
* get the invoker in current context.
*
* @return invoker.
* @transient
*/
Invoker<?> getInvoker();
//Invoker物件可以設定一些KV屬性,這些屬性並不會傳遞給Provider
Object put(Object key, Object value);
Object get(Object key);
Map<Object, Object> getAttributes();
}
Result介面是Invoker.invoke方法的返回值,該返回值包含了被呼叫方返回值(或是異常)以及附加資訊,我們也可以新增回撥方法,在 RPC 呼叫方法結束時會觸發這些回撥。
public interface Result extends Serializable {
//呼叫的返回值
Object getValue();
void setValue(Object value);
//例外處理方法
Throwable getException();
void setException(Throwable t);
boolean hasException();
//複合操作,如果呼叫發生異常,則直接丟擲異常,如果沒有異常,則返回結果
Object recreate() throws Throwable;
//攜帶附加資訊
Map<String, String> getAttachments();
@Experimental("Experiment api for supporting Object transmission")
Map<String, Object> getObjectAttachments();
void addAttachments(Map<String, String> map);
@Experimental("Experiment api for supporting Object transmission")
void addObjectAttachments(Map<String, Object> map);
void setAttachments(Map<String, String> map);
@Experimental("Experiment api for supporting Object transmission")
void setObjectAttachments(Map<String, Object> map);
String getAttachment(String key);
@Experimental("Experiment api for supporting Object transmission")
Object getObjectAttachment(String key);
String getAttachment(String key, String defaultValue);
@Experimental("Experiment api for supporting Object transmission")
Object getObjectAttachment(String key, Object defaultValue);
void setAttachment(String key, String value);
@Experimental("Experiment api for supporting Object transmission")
void setAttachment(String key, Object value);
@Experimental("Experiment api for supporting Object transmission")
void setObjectAttachment(String key, Object value);
//新增回撥 當RPC呼叫完成時,會觸發回撥
Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn);
<U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn);
//阻塞執行緒,等待此次RPC呼叫完成
Result get() throws InterruptedException, ExecutionException;
Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Exporter暴露Invoker的實現,就是讓Provider能夠根據請求的各種資訊,找到對應的Invoker的實現。
public interface Exporter<T> {
//獲取Invoker物件
Invoker<T> getInvoker();
//取消Invoker物件
void unexport();
}
Protocol介面主要有三個核心方法export、refer以及destroy,export主要是將Invoker服務暴露出去,refer參照一個服務將Invoker物件返回,destroy主要是銷燬Invoker,釋放Protocol對底層的佔用。Protocol介面的實現中,export方法並不是簡單地將Invoker物件包裝成Exporter物件返回,其中還涉及代理物件的建立、底層Server的啟動等操作;refer方法除了根據傳入的type型別以及URL引數查詢Invoker之外,還涉及相關Client的建立等操作。 此外該介面被SPI修飾,export和refer被Adaptive修飾,因此對於Protocol可以動態選擇實現,此外Dubbo也提供多種Protocol實現。
@SPI("dubbo")
public interface Protocol {
//預設埠
int getDefaultPort();
//將一個Invoker暴露,該方法必須是冪等的
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
//參照一個Invoker,返回一個Invoker物件
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
//銷燬export方法以及refer方法使用到的Invoker物件,釋放當前Protocol物件底層佔用的資源
void destroy();
//返回當前Protocol底層的全部ProtocolServer
default List<ProtocolServer> getServers() {
return Collections.emptyList();
}
}
Filter介面用來攔截Dubbo請求,定義了一個invoke方法將請求傳遞給後續的Invoker進行處理。
@SPI
public interface Filter {
//將請求傳給後續的Invoker處理
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
//監聽響應以及異常
interface Listener {
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
}
ProxyFactory介面主要的功能是用來建立代理物件,此外ProxyFactory也是一個擴充套件介面,getProxy方法為Invoker建立代理物件,getInvoker方法將代理物件轉為Invoker物件,預設採用javassist生成代理物件,Dubbo還提供很多種實現,可以通過SPI設定進行自定義。
@SPI("javassist")
public interface ProxyFactory {
//將Invoker物件轉為代理物件
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
//將proxy物件轉為Invoker
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
歡迎大家點點關注,點點贊!