在上一篇部落格中MyRaft實現了紀錄檔複製功能,按照計劃接下來需要實現紀錄檔壓縮。
我們知道raft協定是基於紀錄檔複製的協定,紀錄檔資料是raft的核心。但隨著raft叢集的持續工作,raft的紀錄檔檔案將會維護越來越多的紀錄檔,而這會帶來以下幾個問題。
考慮到絕大多數的狀態機中儲存的資料並不都是新增,而更多的是對已有資料的更新,則狀態機中所儲存的資料量通常會遠小於raft紀錄檔的總大小。例如K/V資料庫,對相同key的N次操作(整體更新操作),只有最後一次操作是實際有效的,而在此之前的針對該key的raft紀錄檔其實已經沒有儲存的必要了。
因此raft的作者在論文的紀錄檔壓縮一節中提到了幾種紀錄檔壓縮的演演算法(基於快照的、基於LSM樹的),raft選擇了更容易理解和實現的、基於狀態機快照的演演算法作為紀錄檔壓縮的基礎。
raft紀錄檔壓縮實現中有以下幾個關鍵點:
下面開始結合原始碼分析MyRaft的紀錄檔壓縮功能
/**
* raft快照物件
* */
public class RaftSnapshot {
/**
* 快照所包含的最後一條log的索引編號
* */
private long lastIncludedIndex;
/**
* 快照所包含的最後一條log的任期編號
* */
private int lastIncludedTerm;
/**
* 快照資料
* (注意:暫不考慮快照過大導致byte陣列放不下的情況)
* */
private byte[] snapshotData = new byte[0];
}
public class SnapshotModule {
private static final Logger logger = LoggerFactory.getLogger(SnapshotModule.class);
private final RaftServer currentServer;
private final File snapshotFile;
private final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
private static final String snapshotFileName = "snapshot.txt";
private static final String snapshotTempFileName = "snapshot-temp.txt";
/**
* 存放快照實際資料的偏移量(lastIncludedIndex + lastIncludedTerm 共兩個欄位後存放快照)
* */
private static final int actualDataOffset = 4 + 8;
public SnapshotModule(RaftServer currentServer) {
this.currentServer = currentServer;
// 保證目錄是存在的
String snapshotFileDir = getSnapshotFileDir();
new File(snapshotFileDir).mkdirs();
snapshotFile = new File(snapshotFileDir + File.separator + snapshotFileName);
File snapshotTempFile = new File(snapshotFileDir + File.separator + snapshotTempFileName);
if(!snapshotFile.exists() && snapshotTempFile.exists()){
// 快照檔案不存在,但是快照的臨時檔案存在。說明在寫完臨時檔案並重新命名之前宕機了(臨時檔案是最新的完整快照)
// 將tempFile重新命名為快照檔案
snapshotTempFile.renameTo(snapshotFile);
logger.info("snapshot-temp file rename to snapshot file success!");
}
}
/**
* 持久化一個新的快照檔案
* (暫不考慮快照太大的問題)
* */
public void persistentNewSnapshotFile(RaftSnapshot raftSnapshot){
logger.info("do persistentNewSnapshotFile raftSnapshot={}",raftSnapshot);
writeLock.lock();
try {
String userPath = getSnapshotFileDir();
// 新的檔名是tempFile
String newSnapshotFilePath = userPath + File.separator + snapshotTempFileName;
logger.info("do persistentNewSnapshotFile newSnapshotFilePath={}", newSnapshotFilePath);
File snapshotTempFile = new File(newSnapshotFilePath);
try (RandomAccessFile newSnapshotFile = new RandomAccessFile(snapshotTempFile, "rw")) {
MyRaftFileUtil.createFile(snapshotTempFile);
newSnapshotFile.writeInt(raftSnapshot.getLastIncludedTerm());
newSnapshotFile.writeLong(raftSnapshot.getLastIncludedIndex());
newSnapshotFile.write(raftSnapshot.getSnapshotData());
logger.info("do persistentNewSnapshotFile success! raftSnapshot={}", raftSnapshot);
} catch (IOException e) {
throw new MyRaftException("persistentNewSnapshotFile error", e);
}
// 先刪掉原來的快照檔案,然後把臨時檔案重名名為快照檔案(delete後、重新命名前可能宕機,但是沒關係,重啟後構造方法裡做了對應處理)
snapshotFile.delete();
snapshotTempFile.renameTo(snapshotFile);
}finally {
writeLock.unlock();
}
}
/**
* 安裝快照
* */
public void appendInstallSnapshot(InstallSnapshotRpcParam installSnapshotRpcParam){
logger.info("do appendInstallSnapshot installSnapshotRpcParam={}",installSnapshotRpcParam);
writeLock.lock();
String userPath = getSnapshotFileDir();
// 新的檔名是tempFile
String newSnapshotFilePath = userPath + File.separator + snapshotTempFileName;
logger.info("do appendInstallSnapshot newSnapshotFilePath={}", newSnapshotFilePath);
File snapshotTempFile = new File(newSnapshotFilePath);
try (RandomAccessFile newSnapshotFile = new RandomAccessFile(snapshotTempFile, "rw")) {
MyRaftFileUtil.createFile(snapshotTempFile);
if(installSnapshotRpcParam.getOffset() == 0){
newSnapshotFile.setLength(0);
}
newSnapshotFile.seek(0);
newSnapshotFile.writeInt(installSnapshotRpcParam.getLastIncludedTerm());
newSnapshotFile.writeLong(installSnapshotRpcParam.getLastIncludedIndex());
// 檔案指標偏移,找到實際應該寫入快照資料的地方
newSnapshotFile.seek(actualDataOffset + installSnapshotRpcParam.getOffset());
// 寫入快照資料
newSnapshotFile.write(installSnapshotRpcParam.getData());
logger.info("do appendInstallSnapshot success! installSnapshotRpcParam={}", installSnapshotRpcParam);
} catch (IOException e) {
throw new MyRaftException("appendInstallSnapshot error", e);
} finally {
writeLock.unlock();
}
if(installSnapshotRpcParam.isDone()) {
writeLock.lock();
try {
// 先刪掉原來的快照檔案,然後把臨時檔案重名名為快照檔案(delete後、重新命名前可能宕機,但是沒關係,重啟後構造方法裡做了對應處理)
snapshotFile.delete();
snapshotTempFile.renameTo(snapshotFile);
} finally {
writeLock.unlock();
}
}
}
/**
* 沒有實際快照資料,只有後設資料
* */
public RaftSnapshot readSnapshotMetaData(){
if(this.snapshotFile.length() == 0){
return null;
}
readLock.lock();
try(RandomAccessFile latestSnapshotRaFile = new RandomAccessFile(this.snapshotFile, "r")) {
RaftSnapshot raftSnapshot = new RaftSnapshot();
raftSnapshot.setLastIncludedTerm(latestSnapshotRaFile.readInt());
raftSnapshot.setLastIncludedIndex(latestSnapshotRaFile.readLong());
return raftSnapshot;
} catch (IOException e) {
throw new MyRaftException("readSnapshotNoData error",e);
} finally {
readLock.unlock();
}
}
public RaftSnapshot readSnapshot(){
logger.info("do readSnapshot");
if(this.snapshotFile.length() == 0){
logger.info("snapshotFile is empty!");
return null;
}
readLock.lock();
try(RandomAccessFile latestSnapshotRaFile = new RandomAccessFile(this.snapshotFile, "r")) {
logger.info("do readSnapshot");
RaftSnapshot latestSnapshot = new RaftSnapshot();
latestSnapshot.setLastIncludedTerm(latestSnapshotRaFile.readInt());
latestSnapshot.setLastIncludedIndex(latestSnapshotRaFile.readLong());
// 讀取snapshot的實際資料(簡單起見,暫不考慮快照太大記憶體溢位的問題,可以優化為按照偏移量多次讀取)
byte[] snapshotData = new byte[(int) (this.snapshotFile.length() - actualDataOffset)];
latestSnapshotRaFile.read(snapshotData);
latestSnapshot.setSnapshotData(snapshotData);
logger.info("readSnapshot success! readSnapshot={}",latestSnapshot);
return latestSnapshot;
} catch (IOException e) {
throw new MyRaftException("readSnapshot error",e);
} finally {
readLock.unlock();
}
}
private String getSnapshotFileDir(){
return System.getProperty("user.dir")
+ File.separator + currentServer.getServerId()
+ File.separator + "snapshot";
}
}
/**
* 構建快照的檢查
* */
private void buildSnapshotCheck() {
try {
if(!readLock.tryLock(1,TimeUnit.SECONDS)){
logger.info("buildSnapshotCheck lock fail, quick return!");
return;
}
} catch (InterruptedException e) {
throw new MyRaftException("buildSnapshotCheck tryLock error!",e);
}
try {
long logFileLength = this.logFile.length();
long logFileThreshold = currentServer.getRaftConfig().getLogFileThreshold();
if (logFileLength < logFileThreshold) {
logger.info("logFileLength not reach threshold, do nothing. logFileLength={},threshold={}", logFileLength, logFileThreshold);
return;
}
logger.info("logFileLength already reach threshold, start buildSnapshot! logFileLength={},threshold={}", logFileLength, logFileThreshold);
byte[] snapshot = currentServer.getKvReplicationStateMachine().buildSnapshot();
LogEntry lastCommittedLogEntry = readLocalLog(this.lastCommittedIndex);
RaftSnapshot raftSnapshot = new RaftSnapshot();
raftSnapshot.setLastIncludedTerm(lastCommittedLogEntry.getLogTerm());
raftSnapshot.setLastIncludedIndex(lastCommittedLogEntry.getLogIndex());
raftSnapshot.setSnapshotData(snapshot);
// 持久化最新的一份快照
currentServer.getSnapshotModule().persistentNewSnapshotFile(raftSnapshot);
}finally {
readLock.unlock();
}
try {
buildNewLogFileRemoveCommittedLog();
} catch (IOException e) {
logger.error("buildNewLogFileRemoveCommittedLog error",e);
}
}
/**
* 構建一個刪除了已提交紀錄檔的新紀錄檔檔案(紀錄檔壓縮到快照裡了)
* */
private void buildNewLogFileRemoveCommittedLog() throws IOException {
long lastCommitted = getLastCommittedIndex();
long lastIndex = getLastIndex();
// 暫不考慮讀取太多紀錄檔造成記憶體溢位的問題
List<LocalLogEntry> logEntryList;
if(lastCommitted == lastIndex){
// (lastCommitted == lastIndex) 所有紀錄檔都提交了,建立一個空的新紀錄檔檔案
logEntryList = new ArrayList<>();
}else{
// 還有紀錄檔沒提交,把沒提交的記錄到新的紀錄檔檔案中
logEntryList = readLocalLog(lastCommitted+1,lastIndex);
}
File tempLogFile = new File(getLogFileDir() + File.separator + logTempFileName);
MyRaftFileUtil.createFile(tempLogFile);
try(RandomAccessFile randomAccessTempLogFile = new RandomAccessFile(tempLogFile,"rw")) {
long currentOffset = 0;
for (LogEntry logEntry : logEntryList) {
// 寫入紀錄檔
writeLog(randomAccessTempLogFile, logEntry, currentOffset);
currentOffset = randomAccessTempLogFile.getFilePointer();
}
this.currentOffset = currentOffset;
}
File tempLogMeteDataFile = new File(getLogFileDir() + File.separator + logMetaDataTempFileName);
MyRaftFileUtil.createFile(tempLogMeteDataFile);
// 臨時的紀錄檔後設資料檔案寫入資料
refreshMetadata(tempLogMeteDataFile,currentOffset);
writeLock.lock();
try{
// 先刪掉原來的紀錄檔檔案,然後把臨時檔案重名名為紀錄檔檔案(delete後、重新命名前可能宕機,但是沒關係,重啟後構造方法裡做了對應處理)
this.logFile.delete();
boolean renameLogFileResult = tempLogFile.renameTo(this.logFile);
if(!renameLogFileResult){
logger.error("renameLogFile error!");
}
// 先刪掉原來的紀錄檔後設資料檔案,然後把臨時檔案重名名為紀錄檔後設資料檔案(delete後、重新命名前可能宕機,但是沒關係,重啟後構造方法裡做了對應處理)
this.logMetaDataFile.delete();
boolean renameTempLogMeteDataFileResult = tempLogMeteDataFile.renameTo(this.logMetaDataFile);
if(!renameTempLogMeteDataFileResult){
logger.error("renameTempLogMeteDataFile error!");
}
}finally {
writeLock.unlock();
}
}
相比lab2,在引入了快照壓縮功能後,leader側的紀錄檔複製邏輯需要進行一點小小的拓展。
即當要向follower同步某一條紀錄檔時,對應紀錄檔可能已經被壓縮掉了,因此此時需要改為使用installSnapshotRpc來完成快照的安裝。
/**
* leader向叢集廣播,令follower複製新的紀錄檔條目
* */
public List<AppendEntriesRpcResult> replicationLogEntry(LogEntry lastEntry) {
List<RaftService> otherNodeInCluster = currentServer.getOtherNodeInCluster();
List<Future<AppendEntriesRpcResult>> futureList = new ArrayList<>(otherNodeInCluster.size());
for(RaftService node : otherNodeInCluster){
// 並行傳送rpc,要求follower複製紀錄檔
Future<AppendEntriesRpcResult> future = this.rpcThreadPool.submit(()->{
logger.info("replicationLogEntry start!");
long nextIndex = this.currentServer.getNextIndexMap().get(node);
AppendEntriesRpcResult finallyResult = null;
// If last log index ≥ nextIndex for a follower: send AppendEntries RPC with log entries starting at nextIndex
while(lastEntry.getLogIndex() >= nextIndex){
AppendEntriesRpcParam appendEntriesRpcParam = new AppendEntriesRpcParam();
appendEntriesRpcParam.setLeaderId(currentServer.getServerId());
appendEntriesRpcParam.setTerm(currentServer.getCurrentTerm());
appendEntriesRpcParam.setLeaderCommit(this.lastCommittedIndex);
int appendLogEntryBatchNum = this.currentServer.getRaftConfig().getAppendLogEntryBatchNum();
// 要傳送的紀錄檔最大index值
// (追進度的時候,就是nextIndex開始批次傳送appendLogEntryBatchNum-1條(左閉右閉區間);如果進度差不多那就是以lastEntry.index為界限全部傳送出去)
long logIndexEnd = Math.min(nextIndex+(appendLogEntryBatchNum-1), lastEntry.getLogIndex());
// 讀取出[nextIndex-1,logIndexEnd]的紀錄檔(左閉右閉區間),-1往前一位是為了讀取出preLog的資訊
List<LocalLogEntry> localLogEntryList = this.readLocalLog(nextIndex-1,logIndexEnd);
logger.info("replicationLogEntry doing! nextIndex={},logIndexEnd={},LocalLogEntryList={}",
nextIndex,logIndexEnd,JsonUtil.obj2Str(localLogEntryList));
List<LogEntry> logEntryList = localLogEntryList.stream()
.map(LogEntry::toLogEntry)
.collect(Collectors.toList());
// 索引區間大小
long indexRange = (logIndexEnd - nextIndex + 1);
// 假設索引區間大小為N,可能有三種情況
// 1. 查出N條紀錄檔,所需要的紀錄檔全都在本地紀錄檔檔案裡沒有被壓縮
// 2. 查出1至N-1條紀錄檔,部分紀錄檔被壓縮到快照裡 or 就是隻有那麼多紀錄檔(一次批次查5條,但當前總共只寫入了3條)
// 3. 查出0條紀錄檔,需要的紀錄檔全部被壓縮了(因為是先落盤再廣播,如果既沒有紀錄檔也沒有快照那就是有bug)
if(logEntryList.size() == indexRange+1){
// 查出了區間內的所有紀錄檔(case 1)
logger.info("find log size match!");
// preLog
LogEntry preLogEntry = logEntryList.get(0);
// 實際需要傳輸的log
List<LogEntry> needAppendLogList = logEntryList.subList(1,logEntryList.size());
appendEntriesRpcParam.setEntries(needAppendLogList);
appendEntriesRpcParam.setPrevLogIndex(preLogEntry.getLogIndex());
appendEntriesRpcParam.setPrevLogTerm(preLogEntry.getLogTerm());
}else if(logEntryList.size() > 0 && logEntryList.size() <= indexRange){
// 查出了部分紀錄檔(case 2)
// 新增紀錄檔壓縮功能後,查出來的資料個數小於指定的區間不一定就是查到第一條資料,還有可能是紀錄檔被壓縮了
logger.info("find log size not match!");
RaftSnapshot readSnapshotNoData = currentServer.getSnapshotModule().readSnapshotMetaData();
if(readSnapshotNoData != null){
logger.info("has snapshot! readSnapshotNoData={}",readSnapshotNoData);
// 存在快照,使用快照裡儲存的上一條紀錄檔資訊
appendEntriesRpcParam.setPrevLogIndex(readSnapshotNoData.getLastIncludedIndex());
appendEntriesRpcParam.setPrevLogTerm(readSnapshotNoData.getLastIncludedTerm());
}else{
logger.info("no snapshot! prevLogIndex=-1, prevLogTerm=-1");
// 沒有快照,說明恰好傳送第一條紀錄檔記錄(比如appendLogEntryBatchNum=5,但一共只有3條紀錄檔全查出來了)
// 第一條記錄的prev的index和term都是-1
appendEntriesRpcParam.setPrevLogIndex(-1);
appendEntriesRpcParam.setPrevLogTerm(-1);
}
appendEntriesRpcParam.setEntries(logEntryList);
} else if(logEntryList.isEmpty()){
// 紀錄檔壓縮把要同步的紀錄檔刪除掉了,只能使用installSnapshotRpc了(case 3)
logger.info("can not find and log entry,maybe delete for log compress");
// 快照壓縮導致leader更早的index紀錄檔已經不存在了
// 應該改為使用installSnapshot來同步進度
RaftSnapshot raftSnapshot = currentServer.getSnapshotModule().readSnapshot();
doInstallSnapshotRpc(node,raftSnapshot,currentServer);
// 走到這裡,一般是成功的完成了快照的安裝。目標follower目前已經有了包括lastIncludedIndex以及之前的所有紀錄檔
// 如果是因為成為follower快速返回,則再回圈一次就結束了
nextIndex = raftSnapshot.getLastIncludedIndex() + 1;
continue;
} else{
// 走到這裡不符合預期,紀錄檔模組有bug
throw new MyRaftException("replicationLogEntry logEntryList size error!" +
" nextIndex=" + nextIndex + " logEntryList.size=" + logEntryList.size());
}
logger.info("leader do appendEntries start, node={}, appendEntriesRpcParam={}",node,appendEntriesRpcParam);
AppendEntriesRpcResult appendEntriesRpcResult = node.appendEntries(appendEntriesRpcParam);
logger.info("leader do appendEntries end, node={}, appendEntriesRpcResult={}",node,appendEntriesRpcResult);
finallyResult = appendEntriesRpcResult;
// 收到更高任期的處理
boolean beFollower = currentServer.processCommunicationHigherTerm(appendEntriesRpcResult.getTerm());
if(beFollower){
return appendEntriesRpcResult;
}
if(appendEntriesRpcResult.isSuccess()){
logger.info("appendEntriesRpcResult is success, node={}",node);
// If successful: update nextIndex and matchIndex for follower (§5.3)
// 同步成功了,nextIndex遞增一位
this.currentServer.getNextIndexMap().put(node,nextIndex+1);
this.currentServer.getMatchIndexMap().put(node,nextIndex);
nextIndex++;
}else{
// 因為紀錄檔對不上導致一致性檢查沒通過,同步沒成功,nextIndex往後退一位
logger.info("appendEntriesRpcResult is false, node={}",node);
// If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3)
nextIndex--;
this.currentServer.getNextIndexMap().put(node,nextIndex);
}
}
if(finallyResult == null){
// 說明有bug
throw new MyRaftException("replicationLogEntry finallyResult is null!");
}
logger.info("finallyResult={},node={}",node,finallyResult);
return finallyResult;
});
futureList.add(future);
}
// 獲得結果
List<AppendEntriesRpcResult> appendEntriesRpcResultList = CommonUtil.concurrentGetRpcFutureResult(
"do appendEntries", futureList,
this.rpcThreadPool,2, TimeUnit.SECONDS);
logger.info("leader replicationLogEntry appendEntriesRpcResultList={}",appendEntriesRpcResultList);
return appendEntriesRpcResultList;
}
前面提到,follower側在進行紀錄檔一致性校驗時,也可能出現恰好前一條紀錄檔被壓縮到快照裡的情況。
因此需要在當前紀錄檔不存在時,嘗試通過SnapshotModule讀取快照資料中的前一條紀錄檔資訊來進行比對。
public AppendEntriesRpcResult appendEntries(AppendEntriesRpcParam appendEntriesRpcParam) {
if(appendEntriesRpcParam.getTerm() < this.raftServerMetaDataPersistentModule.getCurrentTerm()){
// Reply false if term < currentTerm (§5.1)
// 拒絕處理任期低於自己的老leader的請求
logger.info("doAppendEntries term < currentTerm");
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),false);
}
if(appendEntriesRpcParam.getTerm() >= this.raftServerMetaDataPersistentModule.getCurrentTerm()){
// appendEntries請求中任期值如果大於自己,說明已經有一個更新的leader了,自己轉為follower,並且以對方更大的任期為準
this.serverStatusEnum = ServerStatusEnum.FOLLOWER;
this.currentLeader = appendEntriesRpcParam.getLeaderId();
this.raftServerMetaDataPersistentModule.setCurrentTerm(appendEntriesRpcParam.getTerm());
}
if(appendEntriesRpcParam.getEntries() == null || appendEntriesRpcParam.getEntries().isEmpty()){
// 來自leader的心跳處理,清理掉之前選舉的votedFor
this.cleanVotedFor();
// entries為空,說明是心跳請求,重新整理一下最近收到心跳的時間
raftLeaderElectionModule.refreshLastHeartbeatTime();
long currentLastCommittedIndex = logModule.getLastCommittedIndex();
logger.debug("doAppendEntries heartbeat leaderCommit={},currentLastCommittedIndex={}",
appendEntriesRpcParam.getLeaderCommit(),currentLastCommittedIndex);
if(appendEntriesRpcParam.getLeaderCommit() > currentLastCommittedIndex) {
// 心跳處理裡,如果leader當前已提交的紀錄檔進度超過了當前節點的進度,令當前節點狀態機也跟上
// 如果leaderCommit >= logModule.getLastIndex(),說明當前節點的紀錄檔進度不足,但可以把目前已有的紀錄檔都提交給狀態機去執行
// 如果leaderCommit < logModule.getLastIndex(),說明當前節點進度比較快,有一些紀錄檔是leader已複製但還沒提交的,把leader已提交的那一部分作用到狀態機就行
long minNeedCommittedIndex = Math.min(appendEntriesRpcParam.getLeaderCommit(), logModule.getLastIndex());
pushStatemachineApply(minNeedCommittedIndex);
}
// 心跳請求,直接返回
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),true);
}
// logEntries不為空,是真實的紀錄檔複製rpc
logger.info("do real log append! appendEntriesRpcParam={}",appendEntriesRpcParam);
// AppendEntry可靠性校驗,如果prevLogIndex和prevLogTerm不匹配,則需要返回false,讓leader發更早的紀錄檔過來
{
LogEntry localPrevLogEntry = logModule.readLocalLog(appendEntriesRpcParam.getPrevLogIndex());
if(localPrevLogEntry == null){
// 沒有查到prevLogIndex對應的紀錄檔,分兩種情況
RaftSnapshot raftSnapshot = snapshotModule.readSnapshotMetaData();
localPrevLogEntry = new LogEntry();
if(raftSnapshot == null){
// 當前節點紀錄檔條目為空,又沒有快照,說明完全沒有紀錄檔(預設任期為-1,這個是約定)
localPrevLogEntry.setLogIndex(-1);
localPrevLogEntry.setLogTerm(-1);
}else{
// 紀錄檔裡沒有,但是有快照(把快照裡記錄的最後一條紀錄檔資訊與leader的引數比對)
localPrevLogEntry.setLogIndex(raftSnapshot.getLastIncludedIndex());
localPrevLogEntry.setLogTerm(raftSnapshot.getLastIncludedTerm());
}
}
if (localPrevLogEntry.getLogTerm() != appendEntriesRpcParam.getPrevLogTerm()) {
// Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
// 本地紀錄檔和引數中的PrevLogIndex和PrevLogTerm對不上(對應紀錄檔不存在,或者任期對不上)
logger.info("doAppendEntries localPrevLogEntry not match, localLogEntry={}",localPrevLogEntry);
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),false);
}
}
// 走到這裡說明找到了最新的一條匹配的記錄
logger.info("doAppendEntries localEntry is match");
List<LogEntry> newLogEntryList = appendEntriesRpcParam.getEntries();
// 1. Append any new entries not already in the log
// 2. If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it (§5.3)
// 新紀錄檔的複製操作(直接整個覆蓋掉prevLogIndex之後的所有紀錄檔,以leader發過來的紀錄檔為準)
logModule.writeLocalLog(newLogEntryList, appendEntriesRpcParam.getPrevLogIndex());
// If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
if(appendEntriesRpcParam.getLeaderCommit() > logModule.getLastCommittedIndex()){
// 如果leaderCommit更大,說明當前節點的同步進度慢於leader,以新的entry裡的index為準(更高的index還沒有在本地儲存(因為上面的appendEntry有效性檢查))
// 如果index of last new entry更大,說明當前節點的同步進度是和leader相匹配的,commitIndex以leader最新提交的為準
LogEntry lastNewEntry = newLogEntryList.get(newLogEntryList.size()-1);
long lastCommittedIndex = Math.min(appendEntriesRpcParam.getLeaderCommit(), lastNewEntry.getLogIndex());
pushStatemachineApply(lastCommittedIndex);
}
// 返回成功
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(), true);
}
public static void doInstallSnapshotRpc(RaftService targetNode, RaftSnapshot raftSnapshot, RaftServer currentServer){
int installSnapshotBlockSize = currentServer.getRaftConfig().getInstallSnapshotBlockSize();
byte[] completeSnapshotData = raftSnapshot.getSnapshotData();
int currentOffset = 0;
while(true){
InstallSnapshotRpcParam installSnapshotRpcParam = new InstallSnapshotRpcParam();
installSnapshotRpcParam.setLastIncludedIndex(raftSnapshot.getLastIncludedIndex());
installSnapshotRpcParam.setTerm(currentServer.getCurrentTerm());
installSnapshotRpcParam.setLeaderId(currentServer.getServerId());
installSnapshotRpcParam.setLastIncludedTerm(raftSnapshot.getLastIncludedTerm());
installSnapshotRpcParam.setOffset(currentOffset);
// 填充每次傳輸的資料塊
int blockSize = Math.min(installSnapshotBlockSize,completeSnapshotData.length-currentOffset);
byte[] block = new byte[blockSize];
System.arraycopy(completeSnapshotData,currentOffset,block,0,blockSize);
installSnapshotRpcParam.setData(block);
currentOffset += installSnapshotBlockSize;
if(currentOffset >= completeSnapshotData.length){
installSnapshotRpcParam.setDone(true);
}else{
installSnapshotRpcParam.setDone(false);
}
InstallSnapshotRpcResult installSnapshotRpcResult = targetNode.installSnapshot(installSnapshotRpcParam);
boolean beFollower = currentServer.processCommunicationHigherTerm(installSnapshotRpcResult.getTerm());
if(beFollower){
// 傳輸過程中發現自己已經不再是leader了,快速結束
logger.info("doInstallSnapshotRpc beFollower quick return!");
return;
}
if(installSnapshotRpcParam.isDone()){
// 快照整體安裝完畢
logger.info("doInstallSnapshotRpc isDone!");
return;
}
}
}
public InstallSnapshotRpcResult installSnapshot(InstallSnapshotRpcParam installSnapshotRpcParam) {
logger.info("installSnapshot start! serverId={},installSnapshotRpcParam={}",this.serverId,installSnapshotRpcParam);
if(installSnapshotRpcParam.getTerm() < this.raftServerMetaDataPersistentModule.getCurrentTerm()){
// Reply immediately if term < currentTerm
// 拒絕處理任期低於自己的老leader的請求
logger.info("installSnapshot term < currentTerm");
return new InstallSnapshotRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm());
}
// 安裝快照
this.snapshotModule.appendInstallSnapshot(installSnapshotRpcParam);
// 快照已經完全安裝好了
if(installSnapshotRpcParam.isDone()){
// discard any existing or partial snapshot with a smaller index
// 快照整體安裝完畢,清理掉index小於等於快照中lastIncludedIndex的所有紀錄檔(紀錄檔壓縮)
logModule.compressLogBySnapshot(installSnapshotRpcParam);
// Reset state machine using snapshot contents (and load snapshot’s cluster configuration)
// follower的狀態機重新安裝快照
RaftSnapshot raftSnapshot = this.snapshotModule.readSnapshot();
kvReplicationStateMachine.installSnapshot(raftSnapshot.getSnapshotData());
}
logger.info("installSnapshot end! serverId={}",this.serverId);
return new InstallSnapshotRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm());
}
/**
* discard any existing or partial snapshot with a smaller index
* 快照整體安裝完畢,清理掉index小於等於快照中lastIncludedIndex的所有紀錄檔
* */
public void compressLogBySnapshot(InstallSnapshotRpcParam installSnapshotRpcParam){
this.lastCommittedIndex = installSnapshotRpcParam.getLastIncludedIndex();
if(this.lastIndex < this.lastCommittedIndex){
this.lastIndex = this.lastCommittedIndex;
}
try {
buildNewLogFileRemoveCommittedLog();
} catch (IOException e) {
throw new MyRaftException("compressLogBySnapshot error",e);
}
}
和lab2中一樣,通過啟動一個raft叢集並觸發幾個case可以驗證MyRaft紀錄檔壓縮功能的正確性。