在RocketMQ 5.0以前,有兩種叢集部署模式,分別為主從模式(Master-Slave模式)和Dledger模式。
主從模式
主從模式中分為Master和Slave兩個角色,叢集中可以有多個Master節點,一個Master節點可以有多個Slave節點。Master節點負責接收生產者傳送的寫入請求,將訊息寫入CommitLog檔案,Slave節點會與Master節點建立連線,從Master節點同步訊息資料(有同步複製和非同步複製兩種方式)。
消費者可以從Master節點拉取訊息,也可以從Slave節點拉取訊息。
在RocketMQ 4.5版 本之前,如果Master宕機,不支援自動將Slave切換為Master,需要人工介入。
Dledger模式
為了解決主從架構下Slave不能自動切換為Master的問題,4.5版本之後提供了DLedger模式,使用Raft演演算法,如果Master節點出現故障,可以自動從Slave節點中選舉出新的Master進行切換。
存在問題
(1)根據Raft演演算法的多數原則,叢集至少有三個節點以上,在訊息寫入時,也需要大多數的Follower節點響應成功才能認為訊息寫入成功;
(2)Dledger模式下,進行訊息寫入的時候,使用的是openmessaging包中提供的介面,無法利用RocketMQ原生的儲存和複製能力(比如非Dledger模式下使用暫存池方式寫入);
(3)存在兩套紀錄檔複製流程(主從模式下一套、Dledger模式下一套),不統一;
主從同步實現原理
Controller模式
為了解決如上問題,RocketMQ 5.0以後推出了Controller模式,它的特點如下:
(1)在主從部署模式下就具有自動切換Master的能力,5.0之前需要使用DLedger才可以;
(2)可以利用RocketMQ原生儲存複製能力,並統一RocketMQ的儲存和複製能力;
RocketMQ 5.0對Broker選主相關的功能進行了抽離,放在Controller中,實現了在主從部署模式下就可以自動切換Master,Controller可以獨立部署也可以嵌入在NameServer中部署。
獨立部署下的Controller:
嵌入NameServer中的部署圖如下:
Controller
也稱為Controller控制器,一般叢集中部署多個Controller,使用Raft演演算法選舉出一個Active DLedger Controller作為主控制器,它主要用來管理一個SyncStateSet集合,
這個集合中儲存的是一組跟上Master進度的Broker節點集合,如果Controller發現某個Master Broker下線時,會從集合中選出新的Master Broker並切換,Controller可以單獨部署可以嵌在NameServer中部署。
SyncStateSet
SyncStateSet中維護了一個Broker副本組集合,包含當前Master Broker和它的Slave Broker,需要注意在集合內的節點都是跟上Master進度的節點,在節更變動時,由Master Broker向Controller控制器發起變更請求,更新Controller中的SyncStateSet資料,在選舉Master的時候,Controller只需從這個列表中選出一個節點成為新的Master即可。
節點變更分為Shrink操作和Expand操作,需要Master Broker發起,它會通過定時任務以及在資料同步過程中判斷是否需要進行Shrink或Expand。
Shrink
Shrink指的是將SyncStateSet副本集合中與Master節點差距過大的副本移除,差距的判斷條件如下:
haMaxTimeSlaveNotCatchup:表示Slave沒有跟上 Master 的最大時間間隔,若在 SyncStateSet 中的 slave 超過該時間間隔會將其從 SyncStateSet 移除。預設為 15000(15s)。
Expand
如果Master Broker發現某個Slave節點趕上了Master節點的進度,需要將其重新加入到SyncStateSet。
需要注意以上兩個操作,都需要Master Broker向Controller節點傳送通知,請求更新SyncStateSet中的資料。
不管是Controller獨立部署,還是嵌入到NameServer中部署,Controller都會監聽每個Broker的連線,Broker會定期向Controller傳送心跳包,Controller會定時掃描,如果某個Broker心跳包傳送超時,會認為這個Broker已經失效,此時會判斷Broker是否是Master角色,如果是Master角色就需要從該組的SyncStateSet中重新選出一個節點作為Master。
選舉Master的方式比較簡單,從該組的SyncStateSet中,挑選一個心跳包傳送正常的Slave成為新的Master節點即可,並將結果通知到該組所有的Broker,每個Broker也會定時向Controller傳送請求獲取主備資訊。
主從架構部署模式下,需要設定brokerRole和brokerId,也就是手動分配Master和Slave,在Controller模式下,這兩個引數會失效,不需要再進行設定,角色和ID由Controller來分配。
Controller模式下增加了controllerAddr引數,Broker在啟動時,需要設定這個引數,設定每個controller的地址:
controllerAddr:controller的地址,多個controller中間用分號隔開。例如controllerAddr = 127.0.0.1:9877;127.0.0.1:9878;127.0.0.1:9879
Broker設定了每個Controller的地址,Broker啟動時,會先向Controller註冊,並獲取角色關係和brokerId,通過角色關係可以知道自己是Master還是Slave,之後再向NameServer註冊。
Broker可以通過任意一個Controller獲取Active Controller節點的IP,後臺也會有一個定時任務,定時更新Active Controller節點的IP。
初始化時,第一個Broker在向Controller註冊的時候,此時並沒有該Broker組的SyncStateSet,所以Active Controller會將第一個向其傳送請求共識的Broker設定為Master,之後該組的其他節點會設定為Slave,Master節點的brokerId為0,
Slave節點從1開始編號,往後遞增。
由於Controller控制每個節點的角色,所以每個Broker也會定時向Controller傳送請求獲取主備資訊,以便在角色發生變化的時候可以及時更新。
當Broker成為Master時,會進行如下操作:
Broker在接收Controller指令之後,會根據Controller的選舉結果,轉變對應的角色,分別為Master和Slave。
連線階段
連線階段用於Master節點與Slave節點間建立連線:
HandShake階段
Master節點與Slave節點連線建立成功之後,進入HandShake階段:
Slave節點向Master節點傳送HandShake包,裡面包含一些狀態資訊及Slave的地址,資料格式如下:
Master節點向Slave節點回復HandShake包,Slave節點收到Master節點回復的包後,會使用原生的Epoch+StartOffset與Master傳輸的對比,找到截斷點進行紀錄檔截斷,與Master的紀錄檔保持一致,Master節點回復的HandShake包資料格式如下:
Slave中將每一任Epoch對應的<Startoffset,Endoffset>序列儲存在一個TreeMap中(從大到小排序):
TreeMap<Epoch, Pair<startOffset,endOffset>> epochMap;
Slave節點會遍歷所有的任期(從大到小),然後根據任期號Epoch獲取Master節點對應的<startOffset,endOffset>序列進行對比,如果Slave的Epoch與Master一致,並且StartOffset相等,取兩者中較小的那個endOffset作為截斷位點,之後Slave節點修正自己的<epoch,startoffset>資訊,然後進入Transfer階段進行紀錄檔傳輸。如果未找到截斷位點,會一直向後遍歷直到找到。
Slave保證在截斷位點位置之前的紀錄檔與Master一致,之後從截斷位點位置開始從Master複製紀錄檔。
// Slave從大到小遍歷所有的任期
while (iterator.hasNext()) {
// 任期資訊及對應的<startOffset,endOffset>
Map.Entry<Epoch, Pair<startOffset,endOffset>> curEntry = iterator.next();
// 根據Epoch任期號獲取Master節點對應的<startOffset,endOffset>
Pair<startOffset,endOffset> masterOffset=findMasterOffsetByEpoch(curEntry.getKey());
// 如果獲取不為空,並且startOffset相等
if(masterOffset != null &&
curEntry.getKey().getObejct1() == masterOffset.getObejct1()) {
// 返回較小的那個endOffset
truncateOffset = Math.min(curEntry.getKey().getObejct2(), masterOffset.getObejct2());
break;
}
}
Transfer階段
在Transfer階段,Master節點會不斷向Slave傳送紀錄檔包,開始進行紀錄檔複製:
參考
RIP-44 Support DLedger Controller