MQ系列1:訊息中介軟體執行原理
MQ系列2:訊息中介軟體的技術選型
MQ系列3:RocketMQ 架構分析
上一節的 MQ系列3:RocketMQ 架構分析,我們大致介紹了 RocketMQ的基本元件構成,包括 NameServer、Broker、Producer以及Consumer四部分。
NameServer,指的是服務可以根據給定的名字來進行資源或物件的地址定位,並獲取有關的屬性資訊。在Rocket中也一樣,NameServer是 RocketMQ 的服務註冊中心(類似於 Kafka 叢集 後面的 Zookeeper 叢集一樣, 對叢集後設資料進行管理),根據後設資料(ip、port和router資訊)來唯一定位服務。RocketMQ 需要先啟動 NameServer ,再啟動 Rocket 中的 Broker。
註冊發生在Broker啟動之後,啟動後快速與NameServer建立長連線,並每30s對NameService傳送一次心跳包,Broker會將自己的IP Address、Port、Router 等資訊隨著心跳一併註冊到 NameServer中。
這裡的RouterInfo 主要指Broker下包含哪些Topic資訊,這種對映關係方便後面訊息的生產和消費的時候進行定址。
註冊使用到的核心資料結構如下:
HashMap<String BrokerName, BrokerData> brokerAddrTable
欄位 | 型別 | 說明 |
---|---|---|
cluster | String | 所屬的叢集名稱 |
broker | String | broker的名稱 |
brokerAddress | HashMap | Broker的IP地址列表,包含一個Master IP地址列表 和 多個Slave IP地址列表 |
" Broker-A":{
"cluster":"Broker-Cluster",
"brokerName":"Broker-A",
"cluster":{ // 1主2從
"0":"192.168.0.1:1234",
"1":"192.168.0.2:1234",
"2":"192.168.0.3:1234"
}
}
當你對你的Broker中的Topic資訊進行更新了(增、刪、改)怎麼辦,你才需要重新將資訊註冊到NameServer中。
使用到的核心資料結構如下:
HashMap<String topic, List<QueueData>> topicQueueTable
欄位 | 型別 | 說明 |
---|---|---|
brokerName | String | broker名稱 |
readQueueNums | Long | 讀Queue的數量 |
writeQueueNums | Long | 寫Queue的數量 |
perm | Integer | 許可權 PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0 |
topicSyncFlag | Long | 同步的位置標識 |
{
"topic-test":[ // topic名稱,注意下面會用到
{
"brokerName":"Broker-A",
"readQueueNums":37,
"writeQueueNums":37,
"perm":6, // 讀寫許可權
"topicSynFlag":12
},
{
"brokerName":"Broker-B",
"readQueueNums":37,
"writeQueueNums":37,
"perm":6, // 讀寫許可權
"topicSynFlag":12
}
]
}
參考RocketMQ原始碼如下,這邊加了註釋,方便理解:
/**
* 建立或者更新 MessageQueue 的資料
* @param brokerName
* @param topicConfig
*/
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName); // broker 名稱
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); // 讀Queue的數量
queueData.setReadQueueNums(topicConfig.getReadQueueNums()); // 寫Queue的數量
queueData.setPerm(topicConfig.getPerm()); // 許可權: PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) { // 新增
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
} else { // 更新
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove(); // 先刪除
}
}
}
if (addNewOne) {
queueDataList.add(queueData); // 再新增
}
}
}
如果Broker掛掉,那麼再被訊息的生產者和消費者使用就會有問題了。這時候需要對已經宕掉的Broker進行清理,確保NamServer中註冊的Broker服務資訊都是Alive的。它的做法是這樣的:
brokerLiveTable
表的 lastUpdateTimestamp
欄位中。brokerLiveTable
表lastUpdateTimestamp
欄位,如果時間戳與當前時間相隔超過 120s(即兩分鐘),則認為 Broker 已經宕了,並會將broker清除出NameServer的登入檔。使用到的核心資料結構如下:
HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable
欄位 | 型別 | 說明 |
---|---|---|
lastUpdateTimestamp | Long | 最後一次收到心跳包的時間戳 |
dataVersion | DataVersion | 資料版本號物件 |
channel | Channel | netty的Channel,IO資料互動媒介 |
haServerAddr | String | master地址,初次請求的時候值為空,slave向NameServer註冊之後返回 |
上面的步驟都完成之後,NameServer這個 "中央大腦" 正式開始投入使用。這時候 ,訊息的生產和消費具體是怎麼做的呢?
hello-brand
到 topic (topic-test
) 中topic-test
的 topic 存在於多個 broker中,所以需要如下幾個步驟,才能找到具體的地址:
topic-test
查詢 topicQueueTable
, 選擇一個並獲取它的broker資訊(包含brokerName)brokerAddressTable
獲取具體的Broker IP地址(一般包含1個Master和n個Slave的IP地址)
上述的流程圖比較清晰的描述如下運轉流程:
brokerLiveTable
表 , 如果檢測到某個Broker 宕機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無上報心跳),則從路由登入檔中將其移除。topicQueueTable
獲得broker名稱,通過broker名稱查詢brokerAddressTable
獲取具體的Broker IP地址),然後根據負載均衡演演算法從列表中選擇1臺Broker ,建立連線通道,進行訊息傳送。