NameServer是一個註冊中心,提供服務註冊和服務發現的功能。NameServer可以叢集部署,叢集中每個節點都是對等的關係(沒有像ZooKeeper那樣在叢集中選舉出一個Master節點),節點之間互不通訊。
服務註冊
Broker啟動的時候會向所有的NameServer節點進行註冊,注意這裡是向叢集中所有的NameServer節點註冊,而不是隻向其中的某些節點註冊,因為NameServer每個節點都是對等的,所以Broker需要向每一個節點進行註冊,這樣每一個節點都會有一份Broker的註冊資訊。
服務發現
Broker向NameServer註冊以後,生產者Producer和消費者Consumer就可以從NameServer中獲取所有註冊的Broker資訊,並從中選取Broker進行訊息的傳送和消費。
以生產者為例,在NameServer叢集部署模式下,生產者會從多個NameServer中隨機選取一個進行通訊,從中拉取所有Broker的註冊資訊,並將拉取到的資訊進行快取,生產者知道了Broker的資訊後,就可以得知Topic的分佈情況,然後選取一個訊息佇列,與其所在的Broker通訊進行訊息的傳送。如果通訊的Nameservre宕機,消費者會輪詢選擇下一個NameServer。
為什麼需要NameServer?
在使用RocketMQ的時候,為了提升效能以及應對高並行的情況,一般都會使用多個Broker進行叢集部署,假設沒有註冊中心,對於Broker來說,如果想獲取到叢集中所有的Broker資訊(生產者和消費者需要通過某個Broker獲取整個叢集的資訊,從而得到Topic的分佈情況),每個Broker都需要與其他Broker通訊來交換資訊,以此來得到叢集內所有Broker的資訊,在Broker數量比較大的情況下,會造成非常大的通訊壓力。
為什麼不使用zookeeper這樣的分散式協調元件?
首先zookeeper的實現複雜,引入zookeeper會增加系統的複雜度,並且zookeeper在CAP中選擇了CP,也就是一致性和分割區容錯性,從而犧牲了可用性,為了保持資料的一致性會在一段時間內會不可用。
而NameServer在實現上簡單,RocketMQ的設計者也許認為對於一個訊息佇列的註冊中心來說,一致性與可用性相比,可用性更重要一些,至於一致性可以通過其他方式來解決。
假如選擇了CP的ZooKeeper,先不考慮其他原因,在ZooKeeper不可用的時候,如果有消費者或生產者剛好需要從NameServer拉取資訊,由於服務不可用,導致生產者和消費者無法進行訊息的生產和傳送,在高並行或者資料量比較大的情況下,大量的訊息無法傳送/無法消費影響是極大的,而如果選擇AP,即便資料暫時處於不一致的狀態,在心跳機制的作用下也可以保證資料的最終一致性,所以RocketMQ選擇了自己實現註冊中心,簡單並且輕量。
舉個例子,假如叢集中有三個Broker(分別為 A、B、C),向三臺NameServer進行了註冊(也分別為A、B、C),消費者從NameServer中獲取到了三個Broker的資訊,如果此時BrokerA需要停止服務,分別通知三臺NameServer需要下線,從NameServer中剔除該Broker的資訊,由於網路或者其他原因,NameServer A和B收到了下線的請求,NameServer C並未收到,此時就處於資料不一致的狀態,如果某個消費者是與NameServer C進行通訊,會認為Broker還處於可用的狀態:
對於這種情況,首先NameServer與Broker之間會有一個心跳機制,NameServer定時檢測在某個時間範圍內是否收到了Broker傳送的心跳請求,如果未收到,會認為該Broker不可用,將其剔除(在下面會講到),所以對於NameServer來說,儘管資料會暫時處於不一致的狀態,但是可以保證過一段時間之後恢復資料的一致性,也就是最終一致性。
對於消費者來說,既然可以從NameServer C中獲取到Broker A的資訊,那麼消費者就認為Broker A可用,如果傳送的訊息所在的訊息佇列在Broker A中,就會與Broker A通訊進行傳送,但實際上Broker A實際上是不可用的,訊息會傳送失敗,所以RocketMQ設計了訊息重試機制以及故障延遲機制。
Broker啟動後會開啟定時向NameServer進行註冊(傳送心跳包)的任務,傳送心跳包的時間間隔可以在組態檔中進行設定,但是最長不能超過10s,也就是說Broker最長10秒鐘會向Nameserver傳送一次心跳包。
NameServer收到Broker的註冊請求(心跳包)後,會判斷Broker之前是否已經註冊過,如果未註冊過將其加入到註冊的Broker集合brokerAddrTable
中,同時也會記錄收到註冊請求的時間,將其加入到brokerLiveTable
中,裡面記錄了NameServer收到每個Broker傳送心跳包的時間,在進行心跳檢測的時候根據這個時間戳來判斷是否在規定時間內未收到該Broker傳送的心跳包。
讀寫鎖
由於NameServer可能同時收到多個Broker的註冊以及生產者或者消費者的拉取請求,為了保證資料的一致性(因為有讀寫請求同時發生或者寫與寫請求同時發生),在處理相關請求的時候需要加鎖,為了提高效能,使用了ReadWriteLock讀寫鎖,處理註冊請求時會先新增寫鎖,處理拉取請求時新增讀鎖,這樣如果某一時刻都是讀的請求可以同時進行,互不影響,如果有寫請求,其他請求就需要等鎖釋放才可以進行往下進行。如果不使用讀寫鎖,直接對所有的請求加鎖,會影響效能,實際上讀與讀之間並不需要加鎖。
Nameserver在啟動的時候會開啟一個用於心跳檢測的定時任務(每10s執行一次),定時掃描處於不活躍狀態的Broker,如果在規定時間內未收到某個Broker的心跳包,會認為此Broker不可用,需要將其進行剔除。
上面說到brokerLiveTable
儲存了當前NameServer收到的心跳資料,裡面記錄了每一個Broker最近進行註冊/傳送心跳的時間戳,所以只需遍歷brokerLiveTable,獲取每一個Broker最近一次傳送心跳的時間進行判斷,如果上一次傳送心跳的時間 + 過期時間(120s) 小於 當前時間,也就是超過120s沒有收到某個Broker的心跳包,則認為此Broker已下線,將Broker移除。
正常下線
當Broker下線的時候會向NameServer發起取消註冊的請求,NameServer收到請求後會將Broker剔除。
異常下線
如果Broker異常宕機,或者傳送給NameServer的取消註冊請求由於某些原因並未傳送成功,NameServer可能並未感知到Broker的下線,由於心跳機制定時檢測的功能,會在一段時間後發現未收到Broker的心跳請求,主動將Broker剔除。
生產者和消費者都會定時從NameServer中更新Broker的註冊資訊,預設是30s進行一次更新:
public class MQClientInstance {
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 更新路由資訊 MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
}
}
對應的相關原始碼可參考:
【RocketMQ】【原始碼】NameServer的啟動
【RocketMQ】【原始碼】Broker服務註冊