Dubbo原始碼閱讀分享系列文章,歡迎大家關注點贊
在整個Duubbo架構中,註冊中心主要完成以下三件事情:
Registry在整個架構中主要是對Consumer 和 Provider 所對應的業務進行解耦,從而提升系統的穩定性。關於為什麼需要註冊中心,大家可以參考一下微服務下的註冊中心如何選擇,在這篇文章中我花了一小節來介紹這個問題。
開始之前,首先來看下關於註冊中心原始碼的專案結構,整個專案由兩塊組成,一個就是核心Api,另外一個就是具體一些中介軟體的實現,看到這個我們可能會有一些想法,這個定然會有一些模板類以及一些工廠設計,能想到這裡,說明你已經具有很好的抽象思維,廢話不多說開始原始碼。
dubbo-registry-api是註冊中心核心物件的抽象以及實現部分,我們首先來看下幾個核心物件設計
Node位於Dubbo的dubbo-common專案下面,在Dubbo中Node這個介面用來抽象節點的概念,Node不僅可以表示Provider和Consumer節點,還可以表示註冊中心節點。Node節點內部定義三個方法:
RegistryService此介面定義註冊中心的功能,定義五個方法:
Registry介面繼承了Node和RegistryService這兩個介面,實現該介面類就是註冊中心介面的節點,該方法內部也提供兩個預設方法reExportRegister方法和reExportUnregister方法,這兩個方法實際呼叫還是RegistryService中的方法。
public interface Registry extends Node, RegistryService {
//呼叫RegistryService的register
default void reExportRegister(URL url) {
register(url);
}
//呼叫RegistryService的unregister
default void reExportUnregister(URL url) {
unregister(url);
}
}
RegistryFactory是 Registry 的工廠類,負責建立 Registry 物件,通過@SPI 註解指定了預設的擴充套件名為 dubbo,@Adaptive註解表示會生成介面卡類並根據 URL 引數中的 protocol 引數值選擇相應的實現。
@SPI("dubbo")
public interface RegistryFactory {
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
下圖是RegistryFactory多種不同的實現,每個 Registry 實現類都有對應的 RegistryFactory 工廠實現,每個 RegistryFactory 工廠實現只負責建立對應的 Registry 物件。
AbstractRegistryFactory 是一個實現了 RegistryFactory 介面的抽象類,內部維護一個Registry的Map集合以及提供銷燬和建立註冊中心方法,針對不同的註冊中心可以有不同的實現。
//鎖
protected static final ReentrantLock LOCK = new ReentrantLock();
//Map
protected static final Map<String, Registry> REGISTRIES = new HashMap<>();
銷燬方法分為兩個,一個全量,一個是單個,單個銷燬在AbstractRegistry中呼叫,引數是註冊範例物件。
//全量銷燬
public static void destroyAll() {
if (!destroyed.compareAndSet(false, true)) {
return;
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// Lock up the registry shutdown process
LOCK.lock();
try {
for (Registry registry : getRegistries()) {
try {
//一個一個銷燬
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
//清空map快取
REGISTRIES.clear();
} finally {
// Release the lock
LOCK.unlock();
}
}
//單個銷燬
public static void removeDestroyedRegistry(Registry toRm) {
LOCK.lock();
try {
REGISTRIES.entrySet().removeIf(entry -> entry.getValue().equals(toRm));
} finally {
LOCK.unlock();
}
}
getRegistry是對RegistryFactory實現,如果沒有在快取中,則進行建立範例物件createRegistry,createRegistry是抽象方法,為了讓子類重寫該方法,比如說redis實現的註冊中心和zookeeper實現的註冊中心建立方式肯定不同,而他們相同的一些操作都已經在AbstractRegistryFactory中實現,所以只要關注且實現該抽象方法即可。
//抽象的createRegistry方法
protected abstract Registry createRegistry(URL url);
//獲取範例
public Registry getRegistry(URL url) {
Registry defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
if (null != defaultNopRegistry) {
return defaultNopRegistry;
}
//構建key
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
// double check
// fix https://github.com/apache/dubbo/issues/7265.
defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
if (null != defaultNopRegistry) {
return defaultNopRegistry;
}
//獲取範例物件
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//沒有獲取到就建立
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
//放入Map集合中
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
ListenerRegistryWrapper是對RegistryFactory的擴充套件,建立Registry時候會包裝一個ListenerRegistryWrapper物件,內部維護一個監聽器RegistryServiceListener,當註冊、取消註冊、訂閱以及取消訂閱的時候,會傳送通知。
AbstractRegistry該抽象類是對Registry介面的實現,實現了Registry介面中的註冊、訂閱、查詢、通知等方法,但是註冊、訂閱、查詢、通知等方法只是簡單地把URL加入對應的集合,沒有具體的註冊或訂閱邏輯。此外該類還實現了快取機制,只不過,它的快取有兩份,一份在記憶體,一份在磁碟。
//原生的Properties檔案快取,在記憶體中 與file是同步的
private final Properties properties = new Properties();
//該單執行緒池負責講Provider的全量資料同步到properties欄位和快取檔案中,
//如果syncSaveFile設定為false,就由該執行緒池非同步完成檔案寫入
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
//是否非同步寫入
private boolean syncSaveFile;
//註冊資料的版本號 防止舊資料覆蓋新資料
private final AtomicLong lastCacheChanged = new AtomicLong();
//儲存Properties失敗以後異常重試次數
private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger();
//已經註冊服務的URL集合,註冊的URL不僅僅可以是服務提供者的,也可以是服務消費者的
private final Set<URL> registered = new ConcurrentHashSet<>();
//消費者Url訂閱的監聽器集合
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
//費者被通知的服務URL集合,最外部URL的key是消費者的URL,value是一個map集合,裡面的map中的key為分類名,value是該類下的服務url集合
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
//註冊中心URL
private URL registryUrl;
//本地磁碟Properties檔案
private File file;
當 Provider 端暴露的 URL 發生變化時,ZooKeeper 等服務發現元件會通知 Consumer 端的 Registry 元件,Registry 元件會呼叫 notify() 方法,被通知的 Consumer 能匹配到所有 Provider 的 URL 列表並寫入 properties 集合以及本地檔案中。
protected void notify(List<URL> urls) {
if (CollectionUtils.isEmpty(urls)) {
return;
}
//遍歷訂閱消費者URL的監聽器集合,通知他們
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
//匹配
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
//遍歷所有監聽器
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
//通知監聽器,URL變化結果
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
//引數校驗
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
//按照url中key為category對應的值進行分類,如果沒有該值,就找key為providers的值進行分類
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
//分類結果放入result
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
//處理通知監聽器URL變化結果
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
//把分類標實和分類後的列表放入notified的value中
categoryNotified.put(category, categoryList);
//呼叫NotifyListener監聽器
listener.notify(categoryList);
//單個Url變更,並將更改資訊同步至記憶體快取和磁碟快取中
saveProperties(url);
}
}
private void saveProperties(URL url) {
if (file == null) {
return;
}
try {
StringBuilder buf = new StringBuilder();
//從通知列表中取出資訊
Map<String, List<URL>> categoryNotified = notified.get(url);
//以空格為間隔拼接
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
buf.append(URL_SEPARATOR);
}
buf.append(u.toFullString());
}
}
}
//推播url至記憶體快取
properties.setProperty(url.getServiceKey(), buf.toString());
//增加版本號
long version = lastCacheChanged.incrementAndGet();
if (syncSaveFile) {
//如果磁碟檔案未被加鎖,將記憶體快取同步至磁碟快取
doSaveProperties(version);
} else {
//如果被加鎖了,使用新的執行緒去執行,當前執行緒返回
registryCacheExecutor.execute(new SaveProperties(version));
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
註冊中心有兩份快取,一份在記憶體,一份在磁碟。該方法的作用是將記憶體中的快取資料儲存在磁碟檔案中,該方法有錯誤重試,最大重試次數是3,重試採用另一個執行緒去執行重試,不是當前執行緒。本地快取設計相當於是一種容錯機制,當網路抖動等原因而導致訂閱失敗時,Consumer端的Registry就可以通過getCacheUrls()方法獲取本地快取,從而得到最近註冊的服務提供者。
//將記憶體中的檔案寫到磁碟上
public void doSaveProperties(long version) {
//版本號判斷 防止重複寫
if (version < lastCacheChanged.get()) {
return;
}
//判斷磁碟檔案是否為空
if (file == null) {
return;
}
// Save
try {
//lock檔案,用於加鎖操作
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
//RandomAccessFile提供對檔案的讀寫操作
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
FileChannel channel = raf.getChannel()) {
//獲取鎖
FileLock lock = channel.tryLock();
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// Save
try {
if (!file.exists()) {
file.createNewFile();
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
//從記憶體快取中獲取資料 寫入檔案
properties.store(outputFile, "Dubbo Registry Cache");
}
} finally {
lock.release();
}
}
} catch (Throwable e) {
//發生異常時,重試次數+1
savePropertiesRetryTimes.incrementAndGet();
//重試次數大於丟擲異常
if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) {
logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e);
savePropertiesRetryTimes.set(0);
return;
}
//再次對比版本資訊,如果版本已過期,返回不再處理
if (version < lastCacheChanged.get()) {
savePropertiesRetryTimes.set(0);
return;
} else {
//重試執行緒
registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
}
logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e);
}
}
//磁碟中檔案載入到記憶體中
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
try {
in = new FileInputStream(file);
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry cache file " + file + ", data: " + properties);
}
} catch (Throwable e) {
logger.warn("Failed to load registry cache file " + file, e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
AbstractRegistry 實現了 Registry 介面,當新節點註冊進來時候registry() 方法,會將當前節點要註冊的 URL快取到 registered集合,當節點下線時候, unregistry() 方法會從 registered 集合刪除指定的 URL。當消費者新增加一個訂閱的時候,subscribe() 方法會將當前節點作為 Consumer 的 URL 以及相關的 NotifyListener 記錄到 subscribed 集合,當消費者取消一個訂閱的時候,unsubscribe() 方法會將當前節點的 URL 以及關聯的 NotifyListener 從 subscribed 集合刪除。這四個方法相對比較簡單,這裡不做展示,此處設計為抽象類,當子類重寫的時候可以對其進行增強。
當因為網路問題與註冊中心斷開連線之後,會進行重連,重新連線成功之後,會呼叫 recover() 方法將 registered 集合中的全部 URL 重新執行register() 方法,恢復註冊資料。同樣,recover() 方法也會將 subscribed 集合中的 URL 重新執行subscribe() 方法,恢復訂閱監聽器。 當前節點下線的時候,destroy() 方法會呼叫 unregister() 方法和 unsubscribe() 方法將當前節點註冊的 URL 以及訂閱的監聽全部清理掉,此外還會銷燬本範例。
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
Set<URL> destroyRegistered = new HashSet<>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<>(destroyRegistered)) {
if (url.getParameter(DYNAMIC_KEY, true)) {
try {
//取消註冊
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
//取消訂閱
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
//移除註冊中心
AbstractRegistryFactory.removeDestroyedRegistry(this);
}
關於重試機制,Dubbo將重試機制放在了FailbackRegistry類中,FailbackRegistry 設計思想,重寫了 AbstractRegistry 中 register()/unregister()、subscribe()/unsubscribe() 以及 notify() 這五個核心方法,結合時間輪,實現失敗重試機制。此外,還新增了四個未實現的抽象模板方法,由其繼承者去實現,這裡也就是典型的模板類的設計。
//註冊失敗的URL Key是註冊失敗的 URL,Value 是對應的重試任務
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
//取消註冊失敗URL
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
//訂閱失敗的URL
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
//取消訂閱失敗的URL
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
//重試的時間間隔
private final int retryPeriod;
//用於定時執行失敗的時間輪
private final HashedWheelTimer retryTimer;
構造方法首先會呼叫父類別的構造方法完成本地快取相關的初始化操作,然後根據傳入URL引數中獲取重試操作的時間間隔來初始化 retryPeriod 欄位,最後初始化 HashedWheelTimer時間輪。
public FailbackRegistry(URL url) {
//呼叫
super(url);
this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
register方法重寫了父類別的註冊方法,首先呼叫父類別的register將url加入對應的容器,然後從failedRegistered 和failedUnregistered 兩個容器中移除失敗URL,然後執行doRegister方法,doRegister是抽象方法,具體的實現交給其繼承者,如果註冊失敗丟擲異常,會將URL加入failedRegistered 容器中。
@Override
public void register(URL url) {
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
//執行父類別的方法加入到容器中
super.register(url);
//移除註冊失敗
removeFailedRegistered(url);
//移除取消註冊失敗
removeFailedUnregistered(url);
try {
//抽象方法交給具體子類去實現
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
//註冊發生異常將註冊失敗放入到註冊失敗的容器中
addFailedRegistered(url);
}
}
接下來我們看下新增重試任務的方法addFailedRegistered,該方法相對比較簡單,核心就是將失敗的任務放到容器中,然後將失敗的任務加入時間輪等待執行。
private void addFailedRegistered(URL url) {
//判斷容器中是是否存在任務
FailedRegisteredTask oldOne = failedRegistered.get(url);
if (oldOne != null) {
return;
}
//將任務新增容器中
FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
oldOne = failedRegistered.putIfAbsent(url, newTask);
if (oldOne == null) {
// never has a retry task. then start a new task for retry.
//將任務提交到時間輪中 等待retryPeriod秒後執行
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
}
}
對於其他unregister()、subscribe()、unsubscribe() 都與register()類似這裡就不做過多介紹,簡單看下提供幾個抽象的方法。
public abstract void doRegister(URL url);
public abstract void doUnregister(URL url);
public abstract void doSubscribe(URL url, NotifyListener listener);
public abstract void doUnsubscribe(URL url, NotifyListener listener);
addFailedRegistered方法中建立FailedRegisteredTask以及其他重試任務,都是繼承AbstractRetryTask,接下來我們要來關於AbstractRetryTask的設計和實現。 在AbstractRetryTask抽象類中,有一個核心run方法實現已經一個抽象方法,該抽象方法也是模板類作用。
@Override
public void run(Timeout timeout) throws Exception {
//檢查定義任務狀態以及時間輪狀態
if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
// other thread cancel this timeout or stop the timer.
return;
}
//檢查重試次數
if (times > retryTimes) {
// reach the most times of retry.
logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times.");
return;
}
if (logger.isInfoEnabled()) {
logger.info(taskName + " : " + url);
}
try {
//執行重試
doRetry(url, registry, timeout);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t);
// reput this task when catch exception.
//執行異常 則重新等待
reput(timeout, retryPeriod);
}
}
protected void reput(Timeout timeout, long tick) {
//邊界值檢查
if (timeout == null) {
throw new IllegalArgumentException();
}
//檢查定時任務
Timer timer = timeout.timer();
if (timer.isStop() || timeout.isCancelled() || isCancel()) {
return;
}
//遞增times
times++;
//新增下次定時任務
timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}
protected abstract void doRetry(URL url, FailbackRegistry registry, Timeout timeout);
接下來我們看下FailedRegisteredTask對AbstractRetryTask�的實現,子類doRetry方法會執行關聯Registry的doRegister() 方法,完成與服務發現元件互動。如果註冊成功,則會呼叫removeFailedRegisteredTask()方法將當前關聯的 URL 以及當前重試任務從 failedRegistered集合中刪除。如果註冊失敗,則會丟擲異常,執行AbstractRetryTask的reput()方法重試。
@Override
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
// 重新註冊
registry.doRegister(url);
// 刪除註冊任務
registry.removeFailedRegisteredTask(url);
}
下一篇會簡單介紹一下時間輪、ZK對註冊中心的實現以及在,歡迎大家點點關注,點點贊!