【RocketMQ】RocketMQ 5.0新特性(三)- Controller模式

2023-10-16 12:03:00

在RocketMQ 5.0以前,有兩種叢集部署模式,分別為主從模式(Master-Slave模式)和Dledger模式。

主從模式
主從模式中分為Master和Slave兩個角色,叢集中可以有多個Master節點,一個Master節點可以有多個Slave節點。Master節點負責接收生產者傳送的寫入請求,將訊息寫入CommitLog檔案,Slave節點會與Master節點建立連線,從Master節點同步訊息資料(有同步複製和非同步複製兩種方式)。
消費者可以從Master節點拉取訊息,也可以從Slave節點拉取訊息。

在RocketMQ 4.5版 本之前,如果Master宕機,不支援自動將Slave切換為Master,需要人工介入。

Dledger模式
為了解決主從架構下Slave不能自動切換為Master的問題,4.5版本之後提供了DLedger模式,使用Raft演演算法,如果Master節點出現故障,可以自動從Slave節點中選舉出新的Master進行切換。

存在問題
(1)根據Raft演演算法的多數原則,叢集至少有三個節點以上,在訊息寫入時,也需要大多數的Follower節點響應成功才能認為訊息寫入成功;
(2)Dledger模式下,進行訊息寫入的時候,使用的是openmessaging包中提供的介面,無法利用RocketMQ原生的儲存和複製能力(比如非Dledger模式下使用暫存池方式寫入);
(3)存在兩套紀錄檔複製流程(主從模式下一套、Dledger模式下一套),不統一;
主從同步實現原理

Dledger模式下的紀錄檔複製

Controller模式
為了解決如上問題,RocketMQ 5.0以後推出了Controller模式,它的特點如下:
(1)在主從部署模式下就具有自動切換Master的能力,5.0之前需要使用DLedger才可以;
(2)可以利用RocketMQ原生儲存複製能力,並統一RocketMQ的儲存和複製能力;

RocketMQ 5.0對Broker選主相關的功能進行了抽離,放在Controller中,實現了在主從部署模式下就可以自動切換Master,Controller可以獨立部署也可以嵌入在NameServer中部署。

獨立部署下的Controller:

嵌入NameServer中的部署圖如下:

Controller
也稱為Controller控制器,一般叢集中部署多個Controller,使用Raft演演算法選舉出一個Active DLedger Controller作為主控制器,它主要用來管理一個SyncStateSet集合,
這個集合中儲存的是一組跟上Master進度的Broker節點集合,如果Controller發現某個Master Broker下線時,會從集合中選出新的Master Broker並切換,Controller可以單獨部署可以嵌在NameServer中部署。

SyncStateSet

SyncStateSet中維護了一個Broker副本組集合,包含當前Master Broker和它的Slave Broker,需要注意在集合內的節點都是跟上Master進度的節點,在節更變動時,由Master Broker向Controller控制器發起變更請求,更新Controller中的SyncStateSet資料,在選舉Master的時候,Controller只需從這個列表中選出一個節點成為新的Master即可。

節點變更分為Shrink操作和Expand操作,需要Master Broker發起,它會通過定時任務以及在資料同步過程中判斷是否需要進行Shrink或Expand。

Shrink
Shrink指的是將SyncStateSet副本集合中與Master節點差距過大的副本移除,差距的判斷條件如下:

  1. 節點是否與Master Broker的連線已斷,如果斷開需要將該節點從SyncStateSet移除;
  2. 節點的複製進度是否過大,新增了haMaxTimeSlaveNotCatchup引數,Master Broker會通過定時任務掃描每一個Slave節點的複製資訊,裡面有每個節點上一次跟上Master進度的時間戳lastCaughtUpTimeMs,如果當前時間減去這個lastCaughtUpTimeMs超過了haMaxTimeSlaveNotCatchup的值,會認為該Slave節點的複製進度過後;
haMaxTimeSlaveNotCatchup:表示Slave沒有跟上 Master 的最大時間間隔,若在 SyncStateSet 中的 slave 超過該時間間隔會將其從 SyncStateSet 移除。預設為 15000(15s)。

Expand
如果Master Broker發現某個Slave節點趕上了Master節點的進度,需要將其重新加入到SyncStateSet。

需要注意以上兩個操作,都需要Master Broker向Controller節點傳送通知,請求更新SyncStateSet中的資料。

選舉Master

不管是Controller獨立部署,還是嵌入到NameServer中部署,Controller都會監聽每個Broker的連線,Broker會定期向Controller傳送心跳包,Controller會定時掃描,如果某個Broker心跳包傳送超時,會認為這個Broker已經失效,此時會判斷Broker是否是Master角色,如果是Master角色就需要從該組的SyncStateSet中重新選出一個節點作為Master。

選舉Master的方式比較簡單,從該組的SyncStateSet中,挑選一個心跳包傳送正常的Slave成為新的Master節點即可,並將結果通知到該組所有的Broker,每個Broker也會定時向Controller傳送請求獲取主備資訊。

Broker端設計

主從架構部署模式下,需要設定brokerRole和brokerId,也就是手動分配Master和Slave,在Controller模式下,這兩個引數會失效,不需要再進行設定,角色和ID由Controller來分配。

Controller模式下增加了controllerAddr引數,Broker在啟動時,需要設定這個引數,設定每個controller的地址:

controllerAddr:controller的地址,多個controller中間用分號隔開。例如controllerAddr = 127.0.0.1:9877;127.0.0.1:9878;127.0.0.1:9879

Broker上線

Broker設定了每個Controller的地址,Broker啟動時,會先向Controller註冊,並獲取角色關係和brokerId,通過角色關係可以知道自己是Master還是Slave,之後再向NameServer註冊。

Broker可以通過任意一個Controller獲取Active Controller節點的IP,後臺也會有一個定時任務,定時更新Active Controller節點的IP。

主備關係確定

初始化時,第一個Broker在向Controller註冊的時候,此時並沒有該Broker組的SyncStateSet,所以Active Controller會將第一個向其傳送請求共識的Broker設定為Master,之後該組的其他節點會設定為Slave,Master節點的brokerId為0,
Slave節點從1開始編號,往後遞增。

由於Controller控制每個節點的角色,所以每個Broker也會定時向Controller傳送請求獲取主備資訊,以便在角色發生變化的時候可以及時更新。

紀錄檔複製

  • MasterEpoch(Epoch):Master的任期號,與Term類似,每一任Master都會有一個對應的MasterEpoch任期號,這個任期號的值由Controller控制,單獨遞增;
  • StartOffset:每一任Master除了有一個任期號之外,還會取當選時對應CommitLog檔案中最大的偏移量(MaxPhyOffset),作為本任期期間紀錄檔的起始偏移量,記作StartOffset;
  • EpochFile:用於存放每一任Master對應的紀錄檔起始偏移量(<MasterEpoch, StartOffset> 序列),儲存在 ~/store資料夾下;

當Broker成為Master時,會進行如下操作:

  1. 獲取當前CommitLog檔案中最後一條訊息的偏移量,也就是MaxPhyOffset的值,作為StartOffset;
  2. 將當前任期號MasterEpoch和起始偏移量StartOffset的值持久化到EpochFile檔案中;
  3. 監聽Slave節點的連線;

紀錄檔複製整體流程

Broker在接收Controller指令之後,會根據Controller的選舉結果,轉變對應的角色,分別為Master和Slave。

連線階段
連線階段用於Master節點與Slave節點間建立連線:

  1. Master節點開始監聽連線;
  2. Slave節點請求與Master節點建立連線;

HandShake階段
Master節點與Slave節點連線建立成功之後,進入HandShake階段:

  1. Slave節點向Master節點傳送HandShake包,裡面包含一些狀態資訊及Slave的地址,資料格式如下:

    • Current State:表示當前狀態,當前是HandShake階段,所以表示HandShake;
    • Flags:一些標誌位;
    • SlaveAddressLength:Salve節點的地址長度;
    • SlaveAddress:Slave節點的地址,傳送給Master節點後,在下個階段Master節點會判斷是否需要將Slave節點加入到SyncStateSet中;
  2. Master節點向Slave節點回復HandShake包,Slave節點收到Master節點回復的包後,會使用原生的Epoch+StartOffset與Master傳輸的對比,找到截斷點進行紀錄檔截斷,與Master的紀錄檔保持一致,Master節點回復的HandShake包資料格式如下:

    • Current State:表示當前狀態,當前是HandShake階段;
    • Body Size:儲存Body的長度;
    • Offset:表示當前Master節點的CommitLog最大偏移量;
    • Epoch:表示當前Master節點任期號;
    • Body:Master端記錄的所有任期資訊,是一個集合,所以總大小為EpochEntry大小 * EpochEntry條數;

紀錄檔截斷

  • endOffset:下一任期的StartOffset,如果沒有下一任期,那麼取當前CommitLog的最大偏移量作為endOffset;

Slave中將每一任Epoch對應的<Startoffset,Endoffset>序列儲存在一個TreeMap中(從大到小排序):

TreeMap<Epoch, Pair<startOffset,endOffset>> epochMap;

Slave節點會遍歷所有的任期(從大到小),然後根據任期號Epoch獲取Master節點對應的<startOffset,endOffset>序列進行對比,如果Slave的Epoch與Master一致,並且StartOffset相等,取兩者中較小的那個endOffset作為截斷位點,之後Slave節點修正自己的<epoch,startoffset>資訊,然後進入Transfer階段進行紀錄檔傳輸。如果未找到截斷位點,會一直向後遍歷直到找到。
Slave保證在截斷位點位置之前的紀錄檔與Master一致,之後從截斷位點位置開始從Master複製紀錄檔。

// Slave從大到小遍歷所有的任期
while (iterator.hasNext()) {
    // 任期資訊及對應的<startOffset,endOffset>
    Map.Entry<Epoch, Pair<startOffset,endOffset>> curEntry = iterator.next();
    // 根據Epoch任期號獲取Master節點對應的<startOffset,endOffset>
    Pair<startOffset,endOffset> masterOffset=findMasterOffsetByEpoch(curEntry.getKey());
    // 如果獲取不為空,並且startOffset相等
    if(masterOffset != null && 
            curEntry.getKey().getObejct1() == masterOffset.getObejct1()) {
        // 返回較小的那個endOffset
        truncateOffset = Math.min(curEntry.getKey().getObejct2(), masterOffset.getObejct2());
        break;
   }
}

Transfer階段
在Transfer階段,Master節點會不斷向Slave傳送紀錄檔包,開始進行紀錄檔複製:

  1. Master節點向Slave節點傳送紀錄檔包;
  2. Slave節點收到紀錄檔包之後,會檢測Epoch是否發生變化,然後更新原生的EpochFile,之後向Master節點回復ACK;
  3. Master節點處理Slave節點回復的ACK響應;

參考
RIP-44 Support DLedger Controller

RocketMQ設計思想

RocketMQ官方檔案