MQ系列4:NameServer 原理解析

2022-08-29 18:01:37

MQ系列1:訊息中介軟體執行原理
MQ系列2:訊息中介軟體的技術選型
MQ系列3:RocketMQ 架構分析

1 關於NameServer

上一節的 MQ系列3:RocketMQ 架構分析,我們大致介紹了 RocketMQ的基本元件構成,包括 NameServer、Broker、Producer以及Consumer四部分。
NameServer,指的是服務可以根據給定的名字來進行資源或物件的地址定位,並獲取有關的屬性資訊。在Rocket中也一樣,NameServer是 RocketMQ 的服務註冊中心(類似於 Kafka 叢集 後面的 Zookeeper 叢集一樣, 對叢集後設資料進行管理),根據後設資料(ip、port和router資訊)來唯一定位服務。RocketMQ 需要先啟動 NameServer ,再啟動 Rocket 中的 Broker。

2 NameServer執行流程

2.1 註冊

註冊發生在Broker啟動之後,啟動後快速與NameServer建立長連線,並每30s對NameService傳送一次心跳包,Broker會將自己的IP Address、Port、Router 等資訊隨著心跳一併註冊到 NameServer中。

這裡的RouterInfo 主要指Broker下包含哪些Topic資訊,這種對映關係方便後面訊息的生產和消費的時候進行定址。

註冊使用到的核心資料結構如下:
HashMap<String BrokerName, BrokerData> brokerAddrTable

  • HashMap 的 Key 是 Broker 的名稱,儲存了一個Broker服務所對應的屬性資訊。
  • Value 是個物件,資料結構如下:
欄位 型別 說明
cluster String 所屬的叢集名稱
broker String broker的名稱
brokerAddress HashMap Broker的IP地址列表,包含一個Master IP地址列表 和 多個Slave IP地址列表
" Broker-A":{
	"cluster":"Broker-Cluster",
	"brokerName":"Broker-A",
	"cluster":{  // 1主2從
	   "0":"192.168.0.1:1234",
	   "1":"192.168.0.2:1234",
	   "2":"192.168.0.3:1234"
	}
}

2.2 註冊資訊更新

當你對你的Broker中的Topic資訊進行更新了(增、刪、改)怎麼辦,你才需要重新將資訊註冊到NameServer中。

  • 如果你建立了新的 Topic,Broker會向 NameServer 傳送註冊資訊,接收到資訊後會對每個Master 角色的Broker ,建立一個新的QueueData物件。
  • 如果你修改了Topic,則NameServer 會先把舊的 QueueData 刪除,在加一個新的 QueueData。
  • 如果你刪除了Topic,則NameServer 會將對應的 QueueData 刪除。

使用到的核心資料結構如下:
HashMap<String topic, List<QueueData>> topicQueueTable

  • HashMap 的 Key 是 Topic 的名稱,裡面儲存了Topic的所有屬性資訊。
  • Value 是個列表,列表的資料型別是 QueueData,列表的length就是Topic中的 Master角色的 Broker 個數。
  • QueueData的結構如下
欄位 型別 說明
brokerName String broker名稱
readQueueNums Long 讀Queue的數量
writeQueueNums Long 寫Queue的數量
perm Integer 許可權 PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0
topicSyncFlag Long 同步的位置標識
{
  "topic-test":[ // topic名稱,注意下面會用到
   { 
    "brokerName":"Broker-A",
	"readQueueNums":37,
	"writeQueueNums":37,
	"perm":6,  // 讀寫許可權
	"topicSynFlag":12
   },
   { 
    "brokerName":"Broker-B",
	"readQueueNums":37,
	"writeQueueNums":37,
	"perm":6,  // 讀寫許可權
	"topicSynFlag":12
   } 
 ]
}

參考RocketMQ原始碼如下,這邊加了註釋,方便理解:

    /**
     * 建立或者更新 MessageQueue 的資料
     * @param brokerName
     * @param topicConfig
     */
    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
        QueueData queueData = new QueueData();
        queueData.setBrokerName(brokerName); // broker 名稱
        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());   // 讀Queue的數量
        queueData.setReadQueueNums(topicConfig.getReadQueueNums());  // 寫Queue的數量
        queueData.setPerm(topicConfig.getPerm());  // 許可權: PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0
        queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

        List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
        if (null == queueDataList) {  // 新增
            queueDataList = new LinkedList<QueueData>();
            queueDataList.add(queueData);
            this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
            log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
        } else {   // 更新
            boolean addNewOne = true;   
            Iterator<QueueData> it = queueDataList.iterator();
            while (it.hasNext()) {
                QueueData qd = it.next();
                if (qd.getBrokerName().equals(brokerName)) {
                    if (qd.equals(queueData)) {
                        addNewOne = false;
                    } else {
                        log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
                                queueData);
                        it.remove();   // 先刪除
                    }
                }
            }
            if (addNewOne) {
                queueDataList.add(queueData);   // 再新增
            }
        }
    }

2.3 異常清理

如果Broker掛掉,那麼再被訊息的生產者和消費者使用就會有問題了。這時候需要對已經宕掉的Broker進行清理,確保NamServer中註冊的Broker服務資訊都是Alive的。它的做法是這樣的:

  • 前面我們說了,Broker每30s對NameService傳送一次心跳包給NameServer
  • NameServer接收到心跳包的時候,會將當前時間戳更新到 brokerLiveTable 表的 lastUpdateTimestamp 欄位中。
  • NameServer中會啟動一個定時任務
  • 每10s(記住這邊掃描是10s間隔,與上面心跳包區分開)掃描 一下 brokerLiveTable
  • 檢查lastUpdateTimestamp欄位,如果時間戳與當前時間相隔超過 120s(即兩分鐘),則認為 Broker 已經宕了,並會將broker清除出NameServer的登入檔。

使用到的核心資料結構如下:
HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable

  • HashMap 的 Key 是 Broker伺服器的地址資訊(IP+Port),裡面儲存了該Broker伺服器的基本資訊。
  • Value 是個物件,結構如下:
欄位 型別 說明
lastUpdateTimestamp Long 最後一次收到心跳包的時間戳
dataVersion DataVersion 資料版本號物件
channel Channel netty的Channel,IO資料互動媒介
haServerAddr String master地址,初次請求的時候值為空,slave向NameServer註冊之後返回

2.4 訊息生產和消費

上面的步驟都完成之後,NameServer這個 "中央大腦" 正式開始投入使用。這時候 ,訊息的生產和消費具體是怎麼做的呢?

  • Producer 或者 Consumer 啟動之後會和 NameServer 建立長連線
  • 定時(預設為每30s)從 NameServer 獲取Routers資訊,並將路由資訊儲存至Producer或者Consumer的本地。
  • Producer傳送一條訊息 hello-brand 到 topic (topic-test) 中
  • 因為名稱為 topic-test 的 topic 存在於多個 broker中,所以需要如下幾個步驟,才能找到具體的地址:
    • 先 根據 topic 名稱 topic-test 查詢 topicQueueTable , 選擇一個並獲取它的broker資訊(包含brokerName)
    • 再根據已經獲取到的brokerName 查詢 brokerAddressTable 獲取具體的Broker IP地址(一般包含1個Master和n個Slave的IP地址)
    • 拿到IP地址之後,生產者與broker建立連線,並行送訊息
    • 消費者同理

3 總結


上述的流程圖比較清晰的描述如下運轉流程:

  • NameServer 作為整個 RocketMQ 的「中央大腦」 ,負責對叢集後設資料進行管理,所以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
  • Broker 啟動後,與 NameServer 保持長連線,每 30s 傳送一次傳送心跳包,來確保Broker是否存活。並將 Broker 資訊 ( IP+、埠等資訊)以及Broker中儲存的Topic資訊上報。註冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的對映關係。
  • NameServer有個定時任務,每10s掃描下brokerLiveTable表 , 如果檢測到某個Broker 宕機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無上報心跳),則從路由登入檔中將其移除。
  • 生產者在傳送某個主題的訊息之前先從 NamerServer 獲取 Broker 伺服器地址列表(通過topic名稱查詢topicQueueTable獲得broker名稱,通過broker名稱查詢brokerAddressTable獲取具體的Broker IP地址),然後根據負載均衡演演算法從列表中選擇1臺Broker ,建立連線通道,進行訊息傳送。
  • 消費者在訂閱某個topic的訊息之前從 NamerServer 獲取 Broker 伺服器地址列表(同上),包括關聯的全部Topic佇列資訊。進而獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連線通道,開始消費資料。
  • 生產者和消費者預設每30s 從 NamerServer 獲取 Broker 伺服器地址列表,以及關聯的所有Topic佇列資訊,更新到Client本地。

參考:
https://zhuanlan.zhihu.com/p/388807516