手寫raft(三) 實現紀錄檔壓縮

2023-09-01 06:01:04

手寫raft(三) 實現紀錄檔壓縮

在上一篇部落格中MyRaft實現了紀錄檔複製功能,按照計劃接下來需要實現紀錄檔壓縮。

1. 什麼是raft紀錄檔壓縮?

我們知道raft協定是基於紀錄檔複製的協定,紀錄檔資料是raft的核心。但隨著raft叢集的持續工作,raft的紀錄檔檔案將會維護越來越多的紀錄檔,而這會帶來以下幾個問題。

  1. 紀錄檔檔案過大會佔用所在機器過多的本地磁碟空間。
  2. 對於新加入叢集的follower,leader與該follower之間完成紀錄檔同步會非常緩慢。
  3. 對於自身不進行持久化的狀態機,raft節點重啟後回放紀錄檔也會非常緩慢。

考慮到絕大多數的狀態機中儲存的資料並不都是新增,而更多的是對已有資料的更新,則狀態機中所儲存的資料量通常會遠小於raft紀錄檔的總大小。例如K/V資料庫,對相同key的N次操作(整體更新操作),只有最後一次操作是實際有效的,而在此之前的針對該key的raft紀錄檔其實已經沒有儲存的必要了。
因此raft的作者在論文的紀錄檔壓縮一節中提到了幾種紀錄檔壓縮的演演算法(基於快照的、基於LSM樹的),raft選擇了更容易理解和實現的、基於狀態機快照的演演算法作為紀錄檔壓縮的基礎。

2. MyRaft紀錄檔壓縮實現原始碼解析

raft紀錄檔壓縮實現中有以下幾個關鍵點:

  • raft的各個節點可以按照某種策略獨立的生成快照(比如定期檢測紀錄檔檔案大小是否超過閾值),快照的主要內容是狀態機當前瞬間所維護的所有資料的快照。
    MyRaft的狀態機是一個純記憶體的K/V資料庫,所以快照就是記憶體中對應Map資料序列化後的內容。
  • 當前狀態機中的快照實際上等同於所有已提交紀錄檔的順序執行的最終結果,快照檔案生成後會將所有已提交的紀錄檔全部刪除以達成壓縮的目的。
    而在處理appendEntries時,leader需要在引數中設定當前傳輸紀錄檔的前一條紀錄檔的index和term值,如果此時leader前一條紀錄檔恰好是已提交的並且被壓縮到快照裡而被刪除了,則獲取不到這個值了。
    相對應的,follower也可能出現類似的情況,即當前所有紀錄檔都是已提交的並且由於紀錄檔壓縮被刪除了,進行prevIndex/prevTerm校驗時,也需要這個資料。
    因此,最終的快照中包含了最後一條已提交紀錄檔的index和term值這一關鍵的後設資料。
  • 在leader和紀錄檔進度較慢的follower進行通訊時,如果follower所需要的紀錄檔是很早的,而leader這邊對應index的紀錄檔又被快照壓縮而刪除了,沒法通過appendEntries進行同步。
    raft對此新增加了一個rpc介面installSnapshot專門用於解決這個問題。在leader發現follower所需的紀錄檔已經被自己壓縮到快照裡時,則會通過installSnapshot將自己完整的快照直接複製給follower。
    由於快照可能很大,所以installSnapshot一次只會傳輸少量的紀錄檔,通過多次的互動後完成整個快照的安裝。當follower側完成了快照同步後,後續所需要同步的紀錄檔就都是leader紀錄檔檔案中還保留的,後續的紀錄檔接著使用appendEntries同步即可。

下面開始結合原始碼分析MyRaft的紀錄檔壓縮功能

2.1 紀錄檔快照模組

  • raft快照資料由RaftSnapshot物件承載,除了二進位制的狀態機狀態資料外,還包括了快照最後一條紀錄檔的index和term的值。
  • MyRaft關於快照資料讀寫的邏輯集中維護在SnapshotModule中,簡單起見使用一把全域性的讀寫鎖來防止並行而不過多的考慮效能。
    在SnapshotModule中通過引入臨時檔案的方式來解決新快照檔案在生成過程中可能突然宕機的問題。
/**
 * raft快照物件
 * */
public class RaftSnapshot {

    /**
     * 快照所包含的最後一條log的索引編號
     * */
    private long lastIncludedIndex;

    /**
     * 快照所包含的最後一條log的任期編號
     * */
    private int lastIncludedTerm;

    /**
     * 快照資料
     * (注意:暫不考慮快照過大導致byte陣列放不下的情況)
     * */
    private byte[] snapshotData = new byte[0];
}

public class SnapshotModule {
    private static final Logger logger = LoggerFactory.getLogger(SnapshotModule.class);

    private final RaftServer currentServer;

    private final File snapshotFile;

    private final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();
    private final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();

    private static final String snapshotFileName = "snapshot.txt";
    private static final String snapshotTempFileName = "snapshot-temp.txt";

    /**
     * 存放快照實際資料的偏移量(lastIncludedIndex + lastIncludedTerm 共兩個欄位後存放快照)
     * */
    private static final int actualDataOffset = 4 + 8;

    public SnapshotModule(RaftServer currentServer) {
        this.currentServer = currentServer;

        // 保證目錄是存在的
        String snapshotFileDir = getSnapshotFileDir();
        new File(snapshotFileDir).mkdirs();

        snapshotFile = new File(snapshotFileDir + File.separator + snapshotFileName);

        File snapshotTempFile = new File(snapshotFileDir + File.separator + snapshotTempFileName);

        if(!snapshotFile.exists() && snapshotTempFile.exists()){
            // 快照檔案不存在,但是快照的臨時檔案存在。說明在寫完臨時檔案並重新命名之前宕機了(臨時檔案是最新的完整快照)

            // 將tempFile重新命名為快照檔案
            snapshotTempFile.renameTo(snapshotFile);

            logger.info("snapshot-temp file rename to snapshot file success!");
        }
    }

    /**
     * 持久化一個新的快照檔案
     * (暫不考慮快照太大的問題)
     * */
    public void persistentNewSnapshotFile(RaftSnapshot raftSnapshot){
        logger.info("do persistentNewSnapshotFile raftSnapshot={}",raftSnapshot);
        writeLock.lock();

        try {
            String userPath = getSnapshotFileDir();

            // 新的檔名是tempFile
            String newSnapshotFilePath = userPath + File.separator + snapshotTempFileName;
            logger.info("do persistentNewSnapshotFile newSnapshotFilePath={}", newSnapshotFilePath);

            File snapshotTempFile = new File(newSnapshotFilePath);
            try (RandomAccessFile newSnapshotFile = new RandomAccessFile(snapshotTempFile, "rw")) {
                MyRaftFileUtil.createFile(snapshotTempFile);

                newSnapshotFile.writeInt(raftSnapshot.getLastIncludedTerm());
                newSnapshotFile.writeLong(raftSnapshot.getLastIncludedIndex());
                newSnapshotFile.write(raftSnapshot.getSnapshotData());

                logger.info("do persistentNewSnapshotFile success! raftSnapshot={}", raftSnapshot);
            } catch (IOException e) {
                throw new MyRaftException("persistentNewSnapshotFile error", e);
            }

            // 先刪掉原來的快照檔案,然後把臨時檔案重名名為快照檔案(delete後、重新命名前可能宕機,但是沒關係,重啟後構造方法裡做了對應處理)
            snapshotFile.delete();
            snapshotTempFile.renameTo(snapshotFile);
        }finally {
            writeLock.unlock();
        }
    }

    /**
     * 安裝快照
     * */
    public void appendInstallSnapshot(InstallSnapshotRpcParam installSnapshotRpcParam){
        logger.info("do appendInstallSnapshot installSnapshotRpcParam={}",installSnapshotRpcParam);
        writeLock.lock();

        String userPath = getSnapshotFileDir();

        // 新的檔名是tempFile
        String newSnapshotFilePath = userPath + File.separator + snapshotTempFileName;
        logger.info("do appendInstallSnapshot newSnapshotFilePath={}", newSnapshotFilePath);

        File snapshotTempFile = new File(newSnapshotFilePath);
        try (RandomAccessFile newSnapshotFile = new RandomAccessFile(snapshotTempFile, "rw")) {
            MyRaftFileUtil.createFile(snapshotTempFile);

            if(installSnapshotRpcParam.getOffset() == 0){
                newSnapshotFile.setLength(0);
            }

            newSnapshotFile.seek(0);
            newSnapshotFile.writeInt(installSnapshotRpcParam.getLastIncludedTerm());
            newSnapshotFile.writeLong(installSnapshotRpcParam.getLastIncludedIndex());

            // 檔案指標偏移,找到實際應該寫入快照資料的地方
            newSnapshotFile.seek(actualDataOffset + installSnapshotRpcParam.getOffset());
            // 寫入快照資料
            newSnapshotFile.write(installSnapshotRpcParam.getData());

            logger.info("do appendInstallSnapshot success! installSnapshotRpcParam={}", installSnapshotRpcParam);
        } catch (IOException e) {
            throw new MyRaftException("appendInstallSnapshot error", e);
        } finally {
            writeLock.unlock();
        }

        if(installSnapshotRpcParam.isDone()) {
            writeLock.lock();
            try {
                // 先刪掉原來的快照檔案,然後把臨時檔案重名名為快照檔案(delete後、重新命名前可能宕機,但是沒關係,重啟後構造方法裡做了對應處理)
                snapshotFile.delete();
                snapshotTempFile.renameTo(snapshotFile);
            } finally {
                writeLock.unlock();
            }
        }
    }

    /**
     * 沒有實際快照資料,只有後設資料
     * */
    public RaftSnapshot readSnapshotMetaData(){
        if(this.snapshotFile.length() == 0){
            return null;
        }

        readLock.lock();

        try(RandomAccessFile latestSnapshotRaFile = new RandomAccessFile(this.snapshotFile, "r")) {
            RaftSnapshot raftSnapshot = new RaftSnapshot();
            raftSnapshot.setLastIncludedTerm(latestSnapshotRaFile.readInt());
            raftSnapshot.setLastIncludedIndex(latestSnapshotRaFile.readLong());

            return raftSnapshot;
        } catch (IOException e) {
            throw new MyRaftException("readSnapshotNoData error",e);
        } finally {
            readLock.unlock();
        }
    }

    public RaftSnapshot readSnapshot(){
        logger.info("do readSnapshot");

        if(this.snapshotFile.length() == 0){
            logger.info("snapshotFile is empty!");
            return null;
        }

        readLock.lock();

        try(RandomAccessFile latestSnapshotRaFile = new RandomAccessFile(this.snapshotFile, "r")) {
            logger.info("do readSnapshot");

            RaftSnapshot latestSnapshot = new RaftSnapshot();
            latestSnapshot.setLastIncludedTerm(latestSnapshotRaFile.readInt());
            latestSnapshot.setLastIncludedIndex(latestSnapshotRaFile.readLong());

            // 讀取snapshot的實際資料(簡單起見,暫不考慮快照太大記憶體溢位的問題,可以優化為按照偏移量多次讀取)
            byte[] snapshotData = new byte[(int) (this.snapshotFile.length() - actualDataOffset)];
            latestSnapshotRaFile.read(snapshotData);
            latestSnapshot.setSnapshotData(snapshotData);

            logger.info("readSnapshot success! readSnapshot={}",latestSnapshot);
            return latestSnapshot;
        } catch (IOException e) {
            throw new MyRaftException("readSnapshot error",e);
        } finally {
            readLock.unlock();
        }
    }

    private String getSnapshotFileDir(){
        return System.getProperty("user.dir")
            + File.separator + currentServer.getServerId()
            + File.separator + "snapshot";
    }
}

2.2 紀錄檔壓縮定時任務

  • 相比lab2,lab3中MyRaft的紀錄檔模組新增加了一個定時任務,用於檢查當前紀錄檔檔案的大小是否超過了指定的閾值,如果超過了閾值則會觸發生成新紀錄檔快照的邏輯。
    和快照模組類似,考慮到紀錄檔檔案壓縮時可能宕機的問題,同樣採用引入臨時檔案的方法解決。
  • 生成快照的邏輯裡先將新的快照通過SnapshotModule持久化,然後將當前已提交的紀錄檔從紀錄檔檔案中刪除掉。
    紀錄檔檔案是從前寫到後的,直接操作原紀錄檔檔案會比較麻煩和危險。因此MyRaft將所有未提交的紀錄檔寫入一個新的臨時紀錄檔檔案後,再通過一次檔名的切換實現對已提交紀錄檔的刪除。
   /**
   * 構建快照的檢查
   * */
    private void buildSnapshotCheck() {
        try {
            if(!readLock.tryLock(1,TimeUnit.SECONDS)){
                logger.info("buildSnapshotCheck lock fail, quick return!");
                return;
            }
        } catch (InterruptedException e) {
            throw new MyRaftException("buildSnapshotCheck tryLock error!",e);
        }

        try {
            long logFileLength = this.logFile.length();
            long logFileThreshold = currentServer.getRaftConfig().getLogFileThreshold();
            if (logFileLength < logFileThreshold) {
                logger.info("logFileLength not reach threshold, do nothing. logFileLength={},threshold={}", logFileLength, logFileThreshold);
                return;
            }

            logger.info("logFileLength already reach threshold, start buildSnapshot! logFileLength={},threshold={}", logFileLength, logFileThreshold);

            byte[] snapshot = currentServer.getKvReplicationStateMachine().buildSnapshot();
            LogEntry lastCommittedLogEntry = readLocalLog(this.lastCommittedIndex);

            RaftSnapshot raftSnapshot = new RaftSnapshot();
            raftSnapshot.setLastIncludedTerm(lastCommittedLogEntry.getLogTerm());
            raftSnapshot.setLastIncludedIndex(lastCommittedLogEntry.getLogIndex());
            raftSnapshot.setSnapshotData(snapshot);

            // 持久化最新的一份快照
            currentServer.getSnapshotModule().persistentNewSnapshotFile(raftSnapshot);
        }finally {
            readLock.unlock();
        }

        try {
            buildNewLogFileRemoveCommittedLog();
        } catch (IOException e) {
            logger.error("buildNewLogFileRemoveCommittedLog error",e);
        }
    }
   /**
     * 構建一個刪除了已提交紀錄檔的新紀錄檔檔案(紀錄檔壓縮到快照裡了)
     * */
    private void buildNewLogFileRemoveCommittedLog() throws IOException {
        long lastCommitted = getLastCommittedIndex();
        long lastIndex = getLastIndex();

        // 暫不考慮讀取太多紀錄檔造成記憶體溢位的問題
        List<LocalLogEntry> logEntryList;
        if(lastCommitted == lastIndex){
            // (lastCommitted == lastIndex) 所有紀錄檔都提交了,建立一個空的新紀錄檔檔案
            logEntryList = new ArrayList<>();
        }else{
            // 還有紀錄檔沒提交,把沒提交的記錄到新的紀錄檔檔案中
            logEntryList = readLocalLog(lastCommitted+1,lastIndex);
        }

        File tempLogFile = new File(getLogFileDir() + File.separator + logTempFileName);
        MyRaftFileUtil.createFile(tempLogFile);
        try(RandomAccessFile randomAccessTempLogFile = new RandomAccessFile(tempLogFile,"rw")) {

            long currentOffset = 0;
            for (LogEntry logEntry : logEntryList) {
                // 寫入紀錄檔
                writeLog(randomAccessTempLogFile, logEntry, currentOffset);

                currentOffset = randomAccessTempLogFile.getFilePointer();
            }

            this.currentOffset = currentOffset;
        }

        File tempLogMeteDataFile = new File(getLogFileDir() + File.separator + logMetaDataTempFileName);
        MyRaftFileUtil.createFile(tempLogMeteDataFile);

        // 臨時的紀錄檔後設資料檔案寫入資料
        refreshMetadata(tempLogMeteDataFile,currentOffset);

        writeLock.lock();
        try{
            // 先刪掉原來的紀錄檔檔案,然後把臨時檔案重名名為紀錄檔檔案(delete後、重新命名前可能宕機,但是沒關係,重啟後構造方法裡做了對應處理)
            this.logFile.delete();
            boolean renameLogFileResult = tempLogFile.renameTo(this.logFile);
            if(!renameLogFileResult){
                logger.error("renameLogFile error!");
            }

            // 先刪掉原來的紀錄檔後設資料檔案,然後把臨時檔案重名名為紀錄檔後設資料檔案(delete後、重新命名前可能宕機,但是沒關係,重啟後構造方法裡做了對應處理)
            this.logMetaDataFile.delete();
            boolean renameTempLogMeteDataFileResult = tempLogMeteDataFile.renameTo(this.logMetaDataFile);
            if(!renameTempLogMeteDataFileResult){
                logger.error("renameTempLogMeteDataFile error!");
            }
        }finally {
            writeLock.unlock();
        }
    }

2.3 installSnapshot與紀錄檔複製時互動的改造

appendEntries紀錄檔同步邏輯leader側的拓展

相比lab2,在引入了快照壓縮功能後,leader側的紀錄檔複製邏輯需要進行一點小小的拓展。
即當要向follower同步某一條紀錄檔時,對應紀錄檔可能已經被壓縮掉了,因此此時需要改為使用installSnapshotRpc來完成快照的安裝。

   /**
     * leader向叢集廣播,令follower複製新的紀錄檔條目
     * */
    public List<AppendEntriesRpcResult> replicationLogEntry(LogEntry lastEntry) {
        List<RaftService> otherNodeInCluster = currentServer.getOtherNodeInCluster();

        List<Future<AppendEntriesRpcResult>> futureList = new ArrayList<>(otherNodeInCluster.size());

        for(RaftService node : otherNodeInCluster){
            // 並行傳送rpc,要求follower複製紀錄檔
            Future<AppendEntriesRpcResult> future = this.rpcThreadPool.submit(()->{
                logger.info("replicationLogEntry start!");

                long nextIndex = this.currentServer.getNextIndexMap().get(node);

                AppendEntriesRpcResult finallyResult = null;

                // If last log index ≥ nextIndex for a follower: send AppendEntries RPC with log entries starting at nextIndex
                while(lastEntry.getLogIndex() >= nextIndex){
                    AppendEntriesRpcParam appendEntriesRpcParam = new AppendEntriesRpcParam();
                    appendEntriesRpcParam.setLeaderId(currentServer.getServerId());
                    appendEntriesRpcParam.setTerm(currentServer.getCurrentTerm());
                    appendEntriesRpcParam.setLeaderCommit(this.lastCommittedIndex);

                    int appendLogEntryBatchNum = this.currentServer.getRaftConfig().getAppendLogEntryBatchNum();

                    // 要傳送的紀錄檔最大index值
                    // (追進度的時候,就是nextIndex開始批次傳送appendLogEntryBatchNum-1條(左閉右閉區間);如果進度差不多那就是以lastEntry.index為界限全部傳送出去)
                    long logIndexEnd = Math.min(nextIndex+(appendLogEntryBatchNum-1), lastEntry.getLogIndex());
                    // 讀取出[nextIndex-1,logIndexEnd]的紀錄檔(左閉右閉區間),-1往前一位是為了讀取出preLog的資訊
                    List<LocalLogEntry> localLogEntryList = this.readLocalLog(nextIndex-1,logIndexEnd);

                    logger.info("replicationLogEntry doing! nextIndex={},logIndexEnd={},LocalLogEntryList={}",
                        nextIndex,logIndexEnd,JsonUtil.obj2Str(localLogEntryList));

                    List<LogEntry> logEntryList = localLogEntryList.stream()
                        .map(LogEntry::toLogEntry)
                        .collect(Collectors.toList());

                    // 索引區間大小
                    long indexRange = (logIndexEnd - nextIndex + 1);

                    // 假設索引區間大小為N,可能有三種情況
                    // 1. 查出N條紀錄檔,所需要的紀錄檔全都在本地紀錄檔檔案裡沒有被壓縮
                    // 2. 查出1至N-1條紀錄檔,部分紀錄檔被壓縮到快照裡 or 就是隻有那麼多紀錄檔(一次批次查5條,但當前總共只寫入了3條)
                    // 3. 查出0條紀錄檔,需要的紀錄檔全部被壓縮了(因為是先落盤再廣播,如果既沒有紀錄檔也沒有快照那就是有bug)
                    if(logEntryList.size() == indexRange+1){
                        // 查出了區間內的所有紀錄檔(case 1)

                        logger.info("find log size match!");
                        // preLog
                        LogEntry preLogEntry = logEntryList.get(0);
                        // 實際需要傳輸的log
                        List<LogEntry> needAppendLogList = logEntryList.subList(1,logEntryList.size());
                        appendEntriesRpcParam.setEntries(needAppendLogList);
                        appendEntriesRpcParam.setPrevLogIndex(preLogEntry.getLogIndex());
                        appendEntriesRpcParam.setPrevLogTerm(preLogEntry.getLogTerm());
                    }else if(logEntryList.size() > 0 && logEntryList.size() <= indexRange){
                        // 查出了部分紀錄檔(case 2)
                        // 新增紀錄檔壓縮功能後,查出來的資料個數小於指定的區間不一定就是查到第一條資料,還有可能是紀錄檔被壓縮了

                        logger.info("find log size not match!");

                        RaftSnapshot readSnapshotNoData = currentServer.getSnapshotModule().readSnapshotMetaData();
                        if(readSnapshotNoData != null){
                            logger.info("has snapshot! readSnapshotNoData={}",readSnapshotNoData);

                            // 存在快照,使用快照裡儲存的上一條紀錄檔資訊
                            appendEntriesRpcParam.setPrevLogIndex(readSnapshotNoData.getLastIncludedIndex());
                            appendEntriesRpcParam.setPrevLogTerm(readSnapshotNoData.getLastIncludedTerm());
                        }else{
                            logger.info("no snapshot! prevLogIndex=-1, prevLogTerm=-1");

                            // 沒有快照,說明恰好傳送第一條紀錄檔記錄(比如appendLogEntryBatchNum=5,但一共只有3條紀錄檔全查出來了)
                            // 第一條記錄的prev的index和term都是-1
                            appendEntriesRpcParam.setPrevLogIndex(-1);
                            appendEntriesRpcParam.setPrevLogTerm(-1);
                        }

                        appendEntriesRpcParam.setEntries(logEntryList);
                    } else if(logEntryList.isEmpty()){
                        // 紀錄檔壓縮把要同步的紀錄檔刪除掉了,只能使用installSnapshotRpc了(case 3)
                        logger.info("can not find and log entry,maybe delete for log compress");
                        // 快照壓縮導致leader更早的index紀錄檔已經不存在了

                        // 應該改為使用installSnapshot來同步進度
                        RaftSnapshot raftSnapshot = currentServer.getSnapshotModule().readSnapshot();
                        doInstallSnapshotRpc(node,raftSnapshot,currentServer);

                        // 走到這裡,一般是成功的完成了快照的安裝。目標follower目前已經有了包括lastIncludedIndex以及之前的所有紀錄檔
                        // 如果是因為成為follower快速返回,則再回圈一次就結束了
                        nextIndex = raftSnapshot.getLastIncludedIndex() + 1;
                        continue;
                    } else{
                        // 走到這裡不符合預期,紀錄檔模組有bug
                        throw new MyRaftException("replicationLogEntry logEntryList size error!" +
                            " nextIndex=" + nextIndex + " logEntryList.size=" + logEntryList.size());
                    }

                    logger.info("leader do appendEntries start, node={}, appendEntriesRpcParam={}",node,appendEntriesRpcParam);
                    AppendEntriesRpcResult appendEntriesRpcResult = node.appendEntries(appendEntriesRpcParam);
                    logger.info("leader do appendEntries end, node={}, appendEntriesRpcResult={}",node,appendEntriesRpcResult);

                    finallyResult = appendEntriesRpcResult;
                    // 收到更高任期的處理
                    boolean beFollower = currentServer.processCommunicationHigherTerm(appendEntriesRpcResult.getTerm());
                    if(beFollower){
                        return appendEntriesRpcResult;
                    }

                    if(appendEntriesRpcResult.isSuccess()){
                        logger.info("appendEntriesRpcResult is success, node={}",node);

                        // If successful: update nextIndex and matchIndex for follower (§5.3)

                        // 同步成功了,nextIndex遞增一位
                        this.currentServer.getNextIndexMap().put(node,nextIndex+1);
                        this.currentServer.getMatchIndexMap().put(node,nextIndex);

                        nextIndex++;
                    }else{
                        // 因為紀錄檔對不上導致一致性檢查沒通過,同步沒成功,nextIndex往後退一位

                        logger.info("appendEntriesRpcResult is false, node={}",node);

                        // If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3)
                        nextIndex--;
                        this.currentServer.getNextIndexMap().put(node,nextIndex);
                    }
                }

                if(finallyResult == null){
                    // 說明有bug
                    throw new MyRaftException("replicationLogEntry finallyResult is null!");
                }

                logger.info("finallyResult={},node={}",node,finallyResult);

                return finallyResult;
            });

            futureList.add(future);
        }

        // 獲得結果
        List<AppendEntriesRpcResult> appendEntriesRpcResultList = CommonUtil.concurrentGetRpcFutureResult(
                "do appendEntries", futureList,
                this.rpcThreadPool,2, TimeUnit.SECONDS);

        logger.info("leader replicationLogEntry appendEntriesRpcResultList={}",appendEntriesRpcResultList);

        return appendEntriesRpcResultList;
    }
appendEntries紀錄檔同步邏輯follower側的拓展

前面提到,follower側在進行紀錄檔一致性校驗時,也可能出現恰好前一條紀錄檔被壓縮到快照裡的情況。
因此需要在當前紀錄檔不存在時,嘗試通過SnapshotModule讀取快照資料中的前一條紀錄檔資訊來進行比對。

    public AppendEntriesRpcResult appendEntries(AppendEntriesRpcParam appendEntriesRpcParam) {
        if(appendEntriesRpcParam.getTerm() < this.raftServerMetaDataPersistentModule.getCurrentTerm()){
            // Reply false if term < currentTerm (§5.1)
            // 拒絕處理任期低於自己的老leader的請求

            logger.info("doAppendEntries term < currentTerm");
            return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),false);
        }

        if(appendEntriesRpcParam.getTerm() >= this.raftServerMetaDataPersistentModule.getCurrentTerm()){
            // appendEntries請求中任期值如果大於自己,說明已經有一個更新的leader了,自己轉為follower,並且以對方更大的任期為準
            this.serverStatusEnum = ServerStatusEnum.FOLLOWER;
            this.currentLeader = appendEntriesRpcParam.getLeaderId();
            this.raftServerMetaDataPersistentModule.setCurrentTerm(appendEntriesRpcParam.getTerm());
        }

        if(appendEntriesRpcParam.getEntries() == null || appendEntriesRpcParam.getEntries().isEmpty()){
            // 來自leader的心跳處理,清理掉之前選舉的votedFor
            this.cleanVotedFor();
            // entries為空,說明是心跳請求,重新整理一下最近收到心跳的時間
            raftLeaderElectionModule.refreshLastHeartbeatTime();

            long currentLastCommittedIndex = logModule.getLastCommittedIndex();
            logger.debug("doAppendEntries heartbeat leaderCommit={},currentLastCommittedIndex={}",
                appendEntriesRpcParam.getLeaderCommit(),currentLastCommittedIndex);

            if(appendEntriesRpcParam.getLeaderCommit() > currentLastCommittedIndex) {
                // 心跳處理裡,如果leader當前已提交的紀錄檔進度超過了當前節點的進度,令當前節點狀態機也跟上
                // 如果leaderCommit >= logModule.getLastIndex(),說明當前節點的紀錄檔進度不足,但可以把目前已有的紀錄檔都提交給狀態機去執行
                // 如果leaderCommit < logModule.getLastIndex(),說明當前節點進度比較快,有一些紀錄檔是leader已複製但還沒提交的,把leader已提交的那一部分作用到狀態機就行
                long minNeedCommittedIndex = Math.min(appendEntriesRpcParam.getLeaderCommit(), logModule.getLastIndex());
                pushStatemachineApply(minNeedCommittedIndex);
            }

            // 心跳請求,直接返回
            return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),true);
        }

        // logEntries不為空,是真實的紀錄檔複製rpc

        logger.info("do real log append! appendEntriesRpcParam={}",appendEntriesRpcParam);
        // AppendEntry可靠性校驗,如果prevLogIndex和prevLogTerm不匹配,則需要返回false,讓leader發更早的紀錄檔過來
        {
            LogEntry localPrevLogEntry = logModule.readLocalLog(appendEntriesRpcParam.getPrevLogIndex());
            if(localPrevLogEntry == null){
                // 沒有查到prevLogIndex對應的紀錄檔,分兩種情況
                RaftSnapshot raftSnapshot = snapshotModule.readSnapshotMetaData();
                localPrevLogEntry = new LogEntry();
                if(raftSnapshot == null){
                    // 當前節點紀錄檔條目為空,又沒有快照,說明完全沒有紀錄檔(預設任期為-1,這個是約定)
                    localPrevLogEntry.setLogIndex(-1);
                    localPrevLogEntry.setLogTerm(-1);
                }else{
                    // 紀錄檔裡沒有,但是有快照(把快照裡記錄的最後一條紀錄檔資訊與leader的引數比對)
                    localPrevLogEntry.setLogIndex(raftSnapshot.getLastIncludedIndex());
                    localPrevLogEntry.setLogTerm(raftSnapshot.getLastIncludedTerm());
                }
            }

            if (localPrevLogEntry.getLogTerm() != appendEntriesRpcParam.getPrevLogTerm()) {
                //  Reply false if log doesn’t contain an entry at prevLogIndex
                //  whose term matches prevLogTerm (§5.3)
                //  本地紀錄檔和引數中的PrevLogIndex和PrevLogTerm對不上(對應紀錄檔不存在,或者任期對不上)
                logger.info("doAppendEntries localPrevLogEntry not match, localLogEntry={}",localPrevLogEntry);

                return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),false);
            }
        }

        // 走到這裡說明找到了最新的一條匹配的記錄
        logger.info("doAppendEntries localEntry is match");

        List<LogEntry> newLogEntryList = appendEntriesRpcParam.getEntries();

        // 1. Append any new entries not already in the log
        // 2. If an existing entry conflicts with a new one (same index but different terms),
        //    delete the existing entry and all that follow it (§5.3)
        // 新紀錄檔的複製操作(直接整個覆蓋掉prevLogIndex之後的所有紀錄檔,以leader發過來的紀錄檔為準)
        logModule.writeLocalLog(newLogEntryList, appendEntriesRpcParam.getPrevLogIndex());

        // If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
        if(appendEntriesRpcParam.getLeaderCommit() > logModule.getLastCommittedIndex()){
            // 如果leaderCommit更大,說明當前節點的同步進度慢於leader,以新的entry裡的index為準(更高的index還沒有在本地儲存(因為上面的appendEntry有效性檢查))
            // 如果index of last new entry更大,說明當前節點的同步進度是和leader相匹配的,commitIndex以leader最新提交的為準

            LogEntry lastNewEntry = newLogEntryList.get(newLogEntryList.size()-1);
            long lastCommittedIndex = Math.min(appendEntriesRpcParam.getLeaderCommit(), lastNewEntry.getLogIndex());
            pushStatemachineApply(lastCommittedIndex);
        }

        // 返回成功
        return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(), true);
    }
leader側快照安裝installSnapshot邏輯
  • MyRaft中,leader側安裝快照的方法實現的比較簡單,未考慮快照可能很大的情況,所以直接一股腦將整個快照檔案全部讀取到記憶體中來了(在向多個follower並行安裝快照時會佔用很多的記憶體,待優化)。
  • 在將快照讀取到記憶體中後,通過一個迴圈將快照資料按照設定的block大小逐步的傳送給follower。在傳送完最後一個block資料後,rpc請求引數的done屬性會被設定為true標識為同步完成。
    public static void doInstallSnapshotRpc(RaftService targetNode, RaftSnapshot raftSnapshot, RaftServer currentServer){
        int installSnapshotBlockSize = currentServer.getRaftConfig().getInstallSnapshotBlockSize();
        byte[] completeSnapshotData = raftSnapshot.getSnapshotData();

        int currentOffset = 0;
        while(true){
            InstallSnapshotRpcParam installSnapshotRpcParam = new InstallSnapshotRpcParam();
            installSnapshotRpcParam.setLastIncludedIndex(raftSnapshot.getLastIncludedIndex());
            installSnapshotRpcParam.setTerm(currentServer.getCurrentTerm());
            installSnapshotRpcParam.setLeaderId(currentServer.getServerId());
            installSnapshotRpcParam.setLastIncludedTerm(raftSnapshot.getLastIncludedTerm());
            installSnapshotRpcParam.setOffset(currentOffset);

            // 填充每次傳輸的資料塊
            int blockSize = Math.min(installSnapshotBlockSize,completeSnapshotData.length-currentOffset);
            byte[] block = new byte[blockSize];
            System.arraycopy(completeSnapshotData,currentOffset,block,0,blockSize);
            installSnapshotRpcParam.setData(block);

            currentOffset += installSnapshotBlockSize;
            if(currentOffset >= completeSnapshotData.length){
                installSnapshotRpcParam.setDone(true);
            }else{
                installSnapshotRpcParam.setDone(false);
            }

            InstallSnapshotRpcResult installSnapshotRpcResult = targetNode.installSnapshot(installSnapshotRpcParam);

            boolean beFollower = currentServer.processCommunicationHigherTerm(installSnapshotRpcResult.getTerm());
            if(beFollower){
                // 傳輸過程中發現自己已經不再是leader了,快速結束
                logger.info("doInstallSnapshotRpc beFollower quick return!");
                return;
            }

            if(installSnapshotRpcParam.isDone()){
                // 快照整體安裝完畢
                logger.info("doInstallSnapshotRpc isDone!");
                return;
            }
        }
    }
follower側處理installSnapshot邏輯
  • follower側處理快照安裝rpc的邏輯中,除了必要的對引數term大小的檢查,就是簡單的通過SnapshotModule完成快照的安裝工作。
  • 在快照整體成功安裝完成後,通過LogModule.compressLogBySnapshot方法將所有已提交的紀錄檔全都刪除掉,並將之前安裝好的快照整體作用到follower自己原生的狀態機中。
    public InstallSnapshotRpcResult installSnapshot(InstallSnapshotRpcParam installSnapshotRpcParam) {
        logger.info("installSnapshot start! serverId={},installSnapshotRpcParam={}",this.serverId,installSnapshotRpcParam);

        if(installSnapshotRpcParam.getTerm() < this.raftServerMetaDataPersistentModule.getCurrentTerm()){
            // Reply immediately if term < currentTerm
            // 拒絕處理任期低於自己的老leader的請求

            logger.info("installSnapshot term < currentTerm");
            return new InstallSnapshotRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm());
        }

        // 安裝快照
        this.snapshotModule.appendInstallSnapshot(installSnapshotRpcParam);

        // 快照已經完全安裝好了
        if(installSnapshotRpcParam.isDone()){
            // discard any existing or partial snapshot with a smaller index
            // 快照整體安裝完畢,清理掉index小於等於快照中lastIncludedIndex的所有紀錄檔(紀錄檔壓縮)
            logModule.compressLogBySnapshot(installSnapshotRpcParam);

            // Reset state machine using snapshot contents (and load snapshot’s cluster configuration)
            // follower的狀態機重新安裝快照
            RaftSnapshot raftSnapshot = this.snapshotModule.readSnapshot();
            kvReplicationStateMachine.installSnapshot(raftSnapshot.getSnapshotData());
        }

        logger.info("installSnapshot end! serverId={}",this.serverId);

        return new InstallSnapshotRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm());
    }
    /**
     * discard any existing or partial snapshot with a smaller index
     * 快照整體安裝完畢,清理掉index小於等於快照中lastIncludedIndex的所有紀錄檔
     * */
    public void compressLogBySnapshot(InstallSnapshotRpcParam installSnapshotRpcParam){
        this.lastCommittedIndex = installSnapshotRpcParam.getLastIncludedIndex();
        if(this.lastIndex < this.lastCommittedIndex){
            this.lastIndex = this.lastCommittedIndex;
        }

        try {
            buildNewLogFileRemoveCommittedLog();
        } catch (IOException e) {
            throw new MyRaftException("compressLogBySnapshot error",e);
        }
    }

3. MyRaft紀錄檔壓縮測試case

和lab2中一樣,通過啟動一個raft叢集並觸發幾個case可以驗證MyRaft紀錄檔壓縮功能的正確性。

  1. 啟動5個節點,關閉快照壓縮功能,正常的進行寫入操作。
    => 狀態機/紀錄檔檔案的資料是否符合預期
  2. 啟動5個節點,開啟快照壓縮功能,正常的進行寫入操作,當紀錄檔檔案超過閾值觸發紀錄檔壓縮後。
    => 狀態機/紀錄檔檔案/快照檔案的資料是否符合預期
  3. 啟動5個節點,開啟快照壓縮功能,正常的進行寫入操作,主動關閉一個節點。再進行一些寫入,令leader生成最新的快照,然後讓宕機的節點回到叢集,看leader是否通過快照安裝來完成快照/紀錄檔的同步。
    => 狀態機/紀錄檔檔案/快照檔案的資料是否符合預期

總結

  • 作為手寫raft系列部落格的第三篇,也是目前計劃中的最後一篇部落格(叢集動態變更功能暫不實現)。部落格中介紹了Raft的紀錄檔壓縮功能以及從原始碼層面上分析MyRaft實現紀錄檔壓縮功能的細節。
  • raft是一個相對複雜的演演算法,因此MyRaft在功能實現上為了追求實現的簡單性,捨棄了很多效能方面的優化(比如一把全域性大鎖防並行,快照資料完整放到記憶體中處理等等)。
    同時分散式系統中處處有並行、機器也時刻可能宕機,需要考慮的細節繁多。所以MyRaft即使通過了我自己設計的一些單測和整合測試的case,但肯定還存在許多尚未被發現bug,還請多多指教。
  • 部落格中展示的完整程式碼在我的github上:https://github.com/1399852153/MyRaft (release/lab3_log_compaction分支),希望能幫助到對raft演演算法感興趣的小夥伴。