raft的論文 中將raft演演算法的功能分解為4個模組:
其中前兩項「leader選舉」和「紀錄檔複製」是raft演演算法的基礎,而後兩項「紀錄檔壓縮」和「叢整合員動態變更」屬於raft演演算法在功能上的重要優化。
通過raft的論文或者其它相關資料,讀者基本能大致理解raft的工作原理。
但紙上得來終覺淺,絕知此事要躬行,親手實踐才能更好的把握raft中的精巧細節,加深對raft演演算法的理解,更有效的閱讀基於raft或其它一致性協定的開源專案原始碼。
在這個系列部落格中會帶領讀者一步步實現一個基於raft演演算法的簡易KV資料庫,即MyRaft。MyRaft的實現基於原始的raft演演算法,沒有額外的優化,目的是為了保證實現的簡單性。
MyRaft實現了raft論文中提到的三個功能,即」leader選舉「、」紀錄檔複製「和」紀錄檔壓縮「(在實踐中發現「叢整合員動態變更」對原有邏輯有較大改動而大幅增加了複雜度,限於個人水平暫不實現)。
三個功能會通過三次迭代實驗逐步完成,其中每個迭代都會以部落格的形式分享出來。
public interface RaftService {
/**
* 請求投票 requestVote
*
* Receiver implementation:
* 1. Reply false if term < currentTerm (§5.1)
* 2. If votedFor is null or candidateId, and candidate’s log is at
* least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
*
* 接受者需要實現以下功能:
* 1. 如果引數中的任期值term小於當前自己的任期值currentTerm,則返回false不同意投票給呼叫者
* 2. 如果自己還沒有投票(FIFO)或者已經投票給了candidateId對應的節點(冪等),
* 並且候選人的紀錄檔至少與被呼叫者的紀錄檔一樣新(比較紀錄檔的任期值和索引值),則投票給呼叫者(返回值裡voteGranted為true)
* */
RequestVoteRpcResult requestVote(RequestVoteRpcParam requestVoteRpcParam);
/**
* 追加紀錄檔條目 AppendEntries
* */
AppendEntriesRpcResult appendEntries(AppendEntriesRpcParam appendEntriesRpcParam);
}
/**
* 請求投票的RPC介面引數物件
*/
public class RequestVoteRpcParam implements Serializable {
/**
* 候選人的任期編號
* */
private int term;
/**
* 候選人的Id
* */
private String candidateId;
/**
* 候選人最新紀錄檔的索引編號
* */
private long lastLogIndex;
/**
* 候選人最新紀錄檔對應的任期編號
* */
private int lastLogTerm;
}
/**
* 請求投票的RPC介面響應物件
* */
public class RequestVoteRpcResult implements Serializable {
/**
* 被呼叫者當前的任期值
* */
private int term;
/**
* 是否同意投票給呼叫者
* */
private boolean voteGranted;
}
/**
* 追加紀錄檔條目的RPC介面引數物件
* */
public class AppendEntriesRpcParam implements Serializable {
/**
* 當前leader的任期值
* */
private int term;
/**
* leader的id
* */
private String leaderId;
}
/**
* 追加紀錄檔條目的RPC介面響應物件
* */
public class AppendEntriesRpcResult implements Serializable {
/**
* 被呼叫者當前的任期值
* */
private int term;
/**
* 是否處理成功
* */
private boolean success;
}
/**
* raft的rpc服務
* */
public class RaftRpcServer extends RaftServer {
private final Registry registry;
private final RaftNodeConfig currentNodeConfig;
public RaftRpcServer(RaftConfig raftConfig, Registry registry){
super(raftConfig);
this.currentNodeConfig = raftConfig.getCurrentNodeConfig();
this.registry = registry;
}
@Override
public void init(List<RaftService> otherNodeInCluster) {
// 先初始化內部模組
super.init(otherNodeInCluster);
// 初始化內部的模組後,啟動rpc
initRpcServer();
}
public List<RaftService> getRpcProxyList(List<RaftNodeConfig> otherNodeInCluster){
return initRpcConsumer(otherNodeInCluster);
}
private List<RaftService> initRpcConsumer(List<RaftNodeConfig> otherNodeInCluster){
ConsumerBootstrap consumerBootstrap = new ConsumerBootstrap()
.registry(registry)
.loadBalance(new SimpleRoundRobinBalance());
// 註冊消費者
Consumer<RaftService> consumer = consumerBootstrap.registerConsumer(RaftService.class,new FastFailInvoker());
RaftService raftServiceProxy = consumer.getProxy();
List<RaftService> raftRpcConsumerList = new ArrayList<>();
for(RaftNodeConfig raftNodeConfig : otherNodeInCluster){
// 使用rpc代理的使用者端
raftRpcConsumerList.add(new RaftRpcConsumer(raftNodeConfig,raftServiceProxy));
}
return raftRpcConsumerList;
}
private void initRpcServer(){
URLAddress providerURLAddress = new URLAddress(currentNodeConfig.getIp(),currentNodeConfig.getPort());
Provider<RaftService> provider = new Provider<>();
provider.setInterfaceClass(RaftService.class);
provider.setRef(this);
provider.setUrlAddress(providerURLAddress);
provider.setRegistry(registry);
provider.export();
NettyServer nettyServer = new NettyServer(providerURLAddress);
nettyServer.init();
}
}
public class RaftRpcConsumer implements RaftService {
private static final Logger logger = LoggerFactory.getLogger(RaftRpcConsumer.class);
private final RaftNodeConfig targetNodeConfig;
private final RaftService raftServiceProxy;
public RaftRpcConsumer(RaftNodeConfig targetNodeConfig, RaftService proxyRaftService) {
this.targetNodeConfig = targetNodeConfig;
this.raftServiceProxy = proxyRaftService;
}
@Override
public RequestVoteRpcResult requestVote(RequestVoteRpcParam requestVoteRpcParam) {
// 強制指定rpc目標的ip/port
setTargetProviderUrl();
RequestVoteRpcResult result = raftServiceProxy.requestVote(requestVoteRpcParam);
return result;
}
@Override
public AppendEntriesRpcResult appendEntries(AppendEntriesRpcParam appendEntriesRpcParam) {
// 強制指定rpc目標的ip/port
setTargetProviderUrl();
AppendEntriesRpcResult result = raftServiceProxy.appendEntries(appendEntriesRpcParam);
return result;
}
private void setTargetProviderUrl(){
ConsumerRpcContext consumerRpcContext = ConsumerRpcContextHolder.getConsumerRpcContext();
consumerRpcContext.setTargetProviderAddress(
new URLAddress(targetNodeConfig.getIp(),targetNodeConfig.getPort()));
}
}
public class RaftServerMetaData {
/**
* 當前伺服器的任期值
* */
private int currentTerm;
/**
* 當前伺服器在此之前投票給了誰?
* (候選者的serverId,如果還沒有投遞就是null)
* */
private String votedFor;
}
public class RaftServerMetaDataPersistentModule {
/**
* 當前伺服器的任期值
* */
private volatile int currentTerm;
/**
* 當前伺服器在此之前投票給了誰?
* (候選者的serverId,如果還沒有投遞就是null)
* */
private volatile String votedFor;
private final File persistenceFile;
private final ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
public RaftServerMetaDataPersistentModule(String serverId) {
String userPath = System.getProperty("user.dir") + File.separator + serverId;
this.persistenceFile = new File(userPath + File.separator + "raftServerMetaData-" + serverId + ".txt");
MyRaftFileUtil.createFile(persistenceFile);
// 讀取持久化在磁碟中的資料
RaftServerMetaData raftServerMetaData = readRaftServerMetaData(persistenceFile);
this.currentTerm = raftServerMetaData.getCurrentTerm();
this.votedFor = raftServerMetaData.getVotedFor();
}
public int getCurrentTerm() {
readLock.lock();
try {
return currentTerm;
}finally {
readLock.unlock();
}
}
public void setCurrentTerm(int currentTerm) {
writeLock.lock();
try {
this.currentTerm = currentTerm;
// 更新後資料落盤
persistentRaftServerMetaData(new RaftServerMetaData(this.currentTerm,this.votedFor),persistenceFile);
}finally {
writeLock.unlock();
}
}
public String getVotedFor() {
readLock.lock();
try {
return votedFor;
}finally {
readLock.unlock();
}
}
public void setVotedFor(String votedFor) {
writeLock.lock();
try {
if(Objects.equals(this.votedFor,votedFor)){
// 相等的話就不重新整理了
return;
}
this.votedFor = votedFor;
// 更新後資料落盤
persistentRaftServerMetaData(new RaftServerMetaData(this.currentTerm,this.votedFor),persistenceFile);
}finally {
writeLock.unlock();
}
}
private static RaftServerMetaData readRaftServerMetaData(File persistenceFile){
String content = MyRaftFileUtil.getFileContent(persistenceFile);
if(StringUtils.hasText(content)){
return JsonUtil.json2Obj(content,RaftServerMetaData.class);
}else{
return RaftServerMetaData.getDefault();
}
}
private static void persistentRaftServerMetaData(RaftServerMetaData raftServerMetaData, File persistenceFile){
String content = JsonUtil.obj2Str(raftServerMetaData);
MyRaftFileUtil.writeInFile(persistenceFile,content);
}
}
raft的leader選舉在論文中有較詳細的描述,這裡說一下我認為的關鍵細節。
下面基於原始碼展開介紹MyRaft是如何實現raft領導者選舉的。
大致分為以下幾部分:
public class RaftConfig {
/**
* 當前服務節點的id(叢集內全域性唯一)
* */
private final String serverId;
/**
* 自己節點的設定
* */
private final RaftNodeConfig currentNodeConfig;
/**
* 整個叢集所有的服務節點的id集合
* */
private final List<RaftNodeConfig> raftNodeConfigList;
/**
* 叢集中多數的值(例如:5節點majorityNum=3,6節點majorityNum=4)
* */
private final int majorityNum;
/**
* 選舉超時時間 單位:秒
* */
private int electionTimeout;
/**
* 選舉超時時間的隨機化區間 單位:毫秒
* */
private Range<Integer> electionTimeoutRandomRange;
/**
* 心跳間隔時間 單位:秒
* */
private int HeartbeatInternal;
}
public class RaftNodeConfig {
private String serverId;
private String ip;
private int port;
}
/**
* Raft伺服器的leader選舉模組
* */
public class RaftLeaderElectionModule {
private static final Logger logger = LoggerFactory.getLogger(RaftLeaderElectionModule.class);
private final RaftServer currentServer;
/**
* 最近一次接受到心跳的時間
* */
private volatile Date lastHeartbeatTime;
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService rpcThreadPool;
public RaftLeaderElectionModule(RaftServer currentServer) {
this.currentServer = currentServer;
this.lastHeartbeatTime = new Date();
this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
this.rpcThreadPool = Executors.newFixedThreadPool(
Math.max(currentServer.getOtherNodeInCluster().size() * 2, 1));
registerHeartbeatTimeoutCheckTaskWithRandomTimeout();
}
/**
* 提交新的延遲任務(帶有隨機化的超時時間)
* */
public void registerHeartbeatTimeoutCheckTaskWithRandomTimeout(){
int electionTimeout = currentServer.getRaftConfig().getElectionTimeout();
if(currentServer.getCurrentTerm() > 0 && currentServer.getRaftConfig().getDebugElectionTimeout() != null){
// debug的時候多等待一些時間
electionTimeout = currentServer.getRaftConfig().getDebugElectionTimeout();
}
long randomElectionTimeout = getRandomElectionTimeout();
// 選舉超時時間的基礎上,加上一個隨機化的時間
long delayTime = randomElectionTimeout + electionTimeout * 1000L;
logger.debug("registerHeartbeatTimeoutCheckTaskWithRandomTimeout delayTime={}",delayTime);
scheduledExecutorService.schedule(
new HeartbeatTimeoutCheckTask(currentServer,this),delayTime,TimeUnit.MILLISECONDS);
}
/**
* 處理投票請求
* 注意:synchronized修飾防止不同candidate並行的投票申請處理,以FIFO的方式處理
* */
public synchronized RequestVoteRpcResult requestVoteProcess(RequestVoteRpcParam requestVoteRpcParam){
if(this.currentServer.getCurrentTerm() > requestVoteRpcParam.getTerm()){
// Reply false if term < currentTerm (§5.1)
// 發起投票的candidate任期小於當前伺服器任期,拒絕投票給它
logger.info("reject requestVoteProcess! term < currentTerm, currentServerId={}",currentServer.getServerId());
return new RequestVoteRpcResult(this.currentServer.getCurrentTerm(),false);
}
// 發起投票的節點任期高於當前節點,無條件投票給它(任期高的說了算)
if(this.currentServer.getCurrentTerm() < requestVoteRpcParam.getTerm()){
// 重新整理後設資料
this.currentServer.refreshRaftServerMetaData(
new RaftServerMetaData(requestVoteRpcParam.getTerm(),requestVoteRpcParam.getCandidateId()));
// 任期沒它高,自己轉為follower
this.currentServer.setServerStatusEnum(ServerStatusEnum.FOLLOWER);
return new RequestVoteRpcResult(this.currentServer.getCurrentTerm(),true);
}
// term任期值相同,需要避免同一任期內投票給不同的節點而腦裂
if(this.currentServer.getVotedFor() != null && !this.currentServer.getVotedFor().equals(requestVoteRpcParam.getCandidateId())){
// If votedFor is null or candidateId(取反的衛語句)
// 當前伺服器已經把票投給了別人,拒絕投票給發起投票的candidate
logger.info("reject requestVoteProcess! votedFor={},currentServerId={}",
currentServer.getVotedFor(),currentServer.getServerId());
return new RequestVoteRpcResult(this.currentServer.getCurrentTerm(),false);
}
// 投票校驗通過,重新整理後設資料
this.currentServer.refreshRaftServerMetaData(
new RaftServerMetaData(requestVoteRpcParam.getTerm(),requestVoteRpcParam.getCandidateId()));
this.currentServer.processCommunicationHigherTerm(requestVoteRpcParam.getTerm());
return new RequestVoteRpcResult(this.currentServer.getCurrentTerm(),true);
}
public void refreshLastHeartbeatTime(){
// 重新整理最新的接受到心跳的時間
this.lastHeartbeatTime = new Date();
// 接受新的心跳,說明現在leader是存活的,清理掉之前的投票資訊
this.currentServer.cleanVotedFor();
}
private long getRandomElectionTimeout(){
long min = currentServer.getRaftConfig().getElectionTimeoutRandomRange().getLeft();
long max = currentServer.getRaftConfig().getElectionTimeoutRandomRange().getRight();
// 生成[min,max]範圍內隨機整數的通用公式為:n=rand.nextInt(max-min+1)+min。
return ThreadLocalRandom.current().nextLong(max-min+1) + min;
}
}
/**
* 心跳超時檢查任務
* */
public class HeartbeatTimeoutCheckTask implements Runnable{
private static final Logger logger = LoggerFactory.getLogger(HeartbeatTimeoutCheckTask.class);
private final RaftServer currentServer;
private final RaftLeaderElectionModule raftLeaderElectionModule;
public HeartbeatTimeoutCheckTask(RaftServer currentServer, RaftLeaderElectionModule raftLeaderElectionModule) {
this.currentServer = currentServer;
this.raftLeaderElectionModule = raftLeaderElectionModule;
}
@Override
public void run() {
if(currentServer.getServerStatusEnum() == ServerStatusEnum.LEADER){
// leader是不需要處理心跳超時的
// 註冊下一個心跳檢查任務
raftLeaderElectionModule.registerHeartbeatTimeoutCheckTaskWithRandomTimeout();
}else{
try {
doTask();
}catch (Exception e){
logger.info("do HeartbeatTimeoutCheckTask error! ignore",e);
}
// 註冊下一個心跳檢查任務
raftLeaderElectionModule.registerHeartbeatTimeoutCheckTaskWithRandomTimeout();
}
}
private void doTask(){
logger.debug("do HeartbeatTimeoutCheck start {}",currentServer.getServerId());
int electionTimeout = currentServer.getRaftConfig().getElectionTimeout();
// 當前時間
Date currentDate = new Date();
Date lastHeartbeatTime = raftLeaderElectionModule.getLastHeartbeatTime();
long diffTime = currentDate.getTime() - lastHeartbeatTime.getTime();
logger.debug("currentDate={}, lastHeartbeatTime={}, diffTime={}, serverId={}",
currentDate,lastHeartbeatTime,diffTime,currentServer.getServerId());
// 心跳超時判斷
if(diffTime > (electionTimeout * 1000L)){
logger.info("HeartbeatTimeoutCheck check fail, trigger new election! serverId={}",currentServer.getServerId());
// 觸發新的一輪選舉
triggerNewElection();
}else{
// 認定為心跳正常,無事發生
logger.debug("HeartbeatTimeoutCheck check success {}",currentServer.getServerId());
}
logger.debug("do HeartbeatTimeoutCheck end {}",currentServer.getServerId());
}
private void triggerNewElection(){
logger.info("HeartbeatTimeoutCheck check fail, trigger new election! serverId={}",currentServer.getServerId());
// 距離最近一次接到心跳已經超過了選舉超時時間,觸發新一輪選舉
// 當前伺服器節點當前任期自增1
currentServer.setCurrentTerm(currentServer.getCurrentTerm()+1);
// 自己發起選舉,先投票給自己
currentServer.setVotedFor(currentServer.getServerId());
// 角色轉變為CANDIDATE候選者
currentServer.setServerStatusEnum(ServerStatusEnum.CANDIDATE);
// 並行的傳送請求投票的rpc給叢集中的其它節點
List<RaftService> otherNodeInCluster = currentServer.getOtherNodeInCluster();
List<Future<RequestVoteRpcResult>> futureList = new ArrayList<>(otherNodeInCluster.size());
// 構造請求引數
RequestVoteRpcParam requestVoteRpcParam = new RequestVoteRpcParam();
requestVoteRpcParam.setTerm(currentServer.getCurrentTerm());
requestVoteRpcParam.setCandidateId(currentServer.getServerId());
for(RaftService node : otherNodeInCluster){
Future<RequestVoteRpcResult> future = raftLeaderElectionModule.getRpcThreadPool().submit(
()-> {
RequestVoteRpcResult rpcResult = node.requestVote(requestVoteRpcParam);
// 收到更高任期的處理
currentServer.processCommunicationHigherTerm(rpcResult.getTerm());
return rpcResult;
}
);
futureList.add(future);
}
List<RequestVoteRpcResult> requestVoteRpcResultList = CommonUtil.concurrentGetRpcFutureResult(
"requestVote", futureList,
raftLeaderElectionModule.getRpcThreadPool(),1,TimeUnit.SECONDS);
// 獲得rpc響應中決定投票給自己的總票數(算上自己的1票)
int getRpcVoted = (int) requestVoteRpcResultList.stream().filter(RequestVoteRpcResult::isVoteGranted).count()+1;
logger.info("HeartbeatTimeoutCheck election, getRpcVoted={}, currentServerId={}",getRpcVoted,currentServer.getServerId());
// 是否獲得大多數的投票
boolean majorVoted = getRpcVoted >= this.currentServer.getRaftConfig().getMajorityNum();
if(majorVoted){
logger.info("HeartbeatTimeoutCheck election result: become a leader! {}, currentTerm={}",currentServer.getServerId(),currentServer.getCurrentTerm());
// 票數過半成功當選為leader
currentServer.setServerStatusEnum(ServerStatusEnum.LEADER);
currentServer.setCurrentLeader(currentServer.getServerId());
// 成為leader後立馬傳送一次心跳,抑制其它節點發起新的一輪選舉
// Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server;
// repeat during idle periods to prevent election timeouts (§5.2)
HeartbeatBroadcastTask.doHeartbeatBroadcast(currentServer);
}else{
// 票數不過半,無法成為leader
logger.info("HeartbeatTimeoutCheck election result: not become a leader! {}",currentServer.getServerId());
}
this.currentServer.cleanVotedFor();
}
}
public class CommonUtil {
private static final Logger logger = LoggerFactory.getLogger(CommonUtil.class);
/**
* 並行的獲得future列表的結果
* */
public static <T> List<T> concurrentGetRpcFutureResult(
String info, List<Future<T>> futureList, ExecutorService threadPool, long timeout, TimeUnit timeUnit){
CountDownLatch countDownLatch = new CountDownLatch(futureList.size());
List<T> resultList = new ArrayList<>(futureList.size());
for(Future<T> futureItem : futureList){
threadPool.execute(()->{
try {
logger.debug(info + " concurrentGetRpcFutureResult start!");
T result = futureItem.get(timeout,timeUnit);
logger.debug(info + " concurrentGetRpcFutureResult end!");
resultList.add(result);
} catch (Exception e) {
// rpc異常不考慮
logger.error( "{} getFutureResult error!",info,e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new MyRaftException("getFutureResult error!",e);
}
return resultList;
}
}
/**
* Raft伺服器的心跳廣播模組
* */
public class RaftHeartbeatBroadcastModule {
private final RaftServer currentServer;
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService rpcThreadPool;
public RaftHeartbeatBroadcastModule(RaftServer currentServer) {
this.currentServer = currentServer;
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.rpcThreadPool = Executors.newFixedThreadPool(
Math.max(currentServer.getOtherNodeInCluster().size() * 2, 1));
int HeartbeatInternal = currentServer.getRaftConfig().getHeartbeatInternal();
// 心跳廣播任務需要以固定頻率執行(scheduleAtFixedRate)
scheduledExecutorService.scheduleAtFixedRate(
new HeartbeatBroadcastTask(currentServer,this), 0, HeartbeatInternal, TimeUnit.SECONDS);
}
}
/**
* leader心跳廣播任務
* */
public class HeartbeatBroadcastTask implements Runnable{
private static final Logger logger = LoggerFactory.getLogger(HeartbeatBroadcastTask.class);
private final RaftServer currentServer;
private final RaftHeartbeatBroadcastModule raftHeartbeatBroadcastModule;
private int HeartbeatCount = 0;
public HeartbeatBroadcastTask(RaftServer currentServer, RaftHeartbeatBroadcastModule raftHeartbeatBroadcastModule) {
this.currentServer = currentServer;
this.raftHeartbeatBroadcastModule = raftHeartbeatBroadcastModule;
}
@Override
public void run() {
if(currentServer.getServerStatusEnum() != ServerStatusEnum.LEADER){
// 只有leader才需要廣播心跳
return;
}
// 心跳廣播
doHeartbeatBroadcast(currentServer);
}
/**
* 做心跳廣播
* @return 是否大多數節點依然認為自己是leader
* */
public static boolean doHeartbeatBroadcast(RaftServer currentServer){
logger.info("do HeartbeatBroadcast start {}",currentServer.getServerId());
// 先重新整理自己的心跳時間
currentServer.getRaftLeaderElectionModule().refreshLastHeartbeatTime();
// 並行的傳送心跳rpc給叢集中的其它節點
List<RaftService> otherNodeInCluster = currentServer.getOtherNodeInCluster();
List<Future<AppendEntriesRpcResult>> futureList = new ArrayList<>(otherNodeInCluster.size());
// 構造請求引數(心跳rpc,entries為空)
AppendEntriesRpcParam appendEntriesRpcParam = new AppendEntriesRpcParam();
appendEntriesRpcParam.setTerm(currentServer.getCurrentTerm());
appendEntriesRpcParam.setLeaderId(currentServer.getServerId());
for(RaftService node : otherNodeInCluster){
Future<AppendEntriesRpcResult> future = currentServer.getRaftHeartbeatBroadcastModule().getRpcThreadPool().submit(
()-> {
AppendEntriesRpcResult rpcResult = node.appendEntries(appendEntriesRpcParam);
// rpc互動時任期高於當前節點任期的處理
currentServer.processCommunicationHigherTerm(rpcResult.getTerm());
return rpcResult;
}
);
futureList.add(future);
}
List<AppendEntriesRpcResult> appendEntriesRpcResultList = CommonUtil.concurrentGetRpcFutureResult("doHeartbeatBroadcast",futureList,
currentServer.getRaftHeartbeatBroadcastModule().getRpcThreadPool(),1, TimeUnit.SECONDS);
// 通知成功的數量(+1包括自己)
int successResponseCount = (int) (appendEntriesRpcResultList.stream().filter(AppendEntriesRpcResult::isSuccess).count() + 1);
if(successResponseCount >= currentServer.getRaftConfig().getMajorityNum()
&& currentServer.getServerStatusEnum() == ServerStatusEnum.LEADER){
// 大多數節點依然認為自己是leader,並且廣播的節點中沒有人任期高於當前節點,讓當前節點主動讓位
return true;
}else{
// 大多數節點不認為自己是leader(包括廣播超時等未接到響應的場景,也認為是廣播失敗)
return false;
}
}
}
處理requestVote請求
處理心跳請求
public class RaftServer implements RaftService {
private static final Logger logger = LoggerFactory.getLogger(RaftServer.class);
/**
* 當前服務節點的id(叢集內全域性唯一)
* */
private final String serverId;
/**
* Raft伺服器端設定
* */
private final RaftConfig raftConfig;
/**
* 當前伺服器的狀態
* */
private volatile ServerStatusEnum serverStatusEnum;
/**
* raft伺服器後設資料(當前任期值currentTerm、當前投票給了誰votedFor)
* */
private final RaftServerMetaDataPersistentModule raftServerMetaDataPersistentModule;
/**
* 當前服務認為的leader節點的Id
* */
private volatile String currentLeader;
/**
* 叢集中的其它raft節點服務
* */
protected List<RaftService> otherNodeInCluster;
private RaftLeaderElectionModule raftLeaderElectionModule;
private RaftHeartbeatBroadcastModule raftHeartbeatBroadcastModule;
public RaftServer(RaftConfig raftConfig) {
this.serverId = raftConfig.getServerId();
this.raftConfig = raftConfig;
// 初始化時都是follower
this.serverStatusEnum = ServerStatusEnum.FOLLOWER;
// 伺服器後設資料模組
this.raftServerMetaDataPersistentModule = new RaftServerMetaDataPersistentModule(raftConfig.getServerId());
}
public void init(List<RaftService> otherNodeInCluster){
// 叢集中的其它節點服務
this.otherNodeInCluster = otherNodeInCluster;
raftLeaderElectionModule = new RaftLeaderElectionModule(this);
raftHeartbeatBroadcastModule = new RaftHeartbeatBroadcastModule(this);
logger.info("raft server init end! otherNodeInCluster={}, currentServerId={}",otherNodeInCluster,serverId);
}
@Override
public RequestVoteRpcResult requestVote(RequestVoteRpcParam requestVoteRpcParam) {
RequestVoteRpcResult requestVoteRpcResult = raftLeaderElectionModule.requestVoteProcess(requestVoteRpcParam);
processCommunicationHigherTerm(requestVoteRpcParam.getTerm());
logger.info("do requestVote requestVoteRpcParam={},requestVoteRpcResult={}, currentServerId={}",
requestVoteRpcParam,requestVoteRpcResult,this.serverId);
return requestVoteRpcResult;
}
@Override
public AppendEntriesRpcResult appendEntries(AppendEntriesRpcParam appendEntriesRpcParam) {
if(appendEntriesRpcParam.getTerm() < this.raftServerMetaDataPersistentModule.getCurrentTerm()){
// Reply false if term < currentTerm (§5.1)
// 拒絕處理任期低於自己的老leader的請求
logger.info("doAppendEntries term < currentTerm");
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),false);
}
if(appendEntriesRpcParam.getTerm() >= this.raftServerMetaDataPersistentModule.getCurrentTerm()){
// appendEntries請求中任期值如果大於自己,說明已經有一個更新的leader了,自己轉為follower,並且以對方更大的任期為準
this.serverStatusEnum = ServerStatusEnum.FOLLOWER;
this.currentLeader = appendEntriesRpcParam.getLeaderId();
this.raftServerMetaDataPersistentModule.setCurrentTerm(appendEntriesRpcParam.getTerm());
}
// 來自leader的心跳處理,清理掉之前選舉的votedFor
this.cleanVotedFor();
// entries為空,說明是心跳請求,重新整理一下最近收到心跳的時間
raftLeaderElectionModule.refreshLastHeartbeatTime();
// 心跳請求,直接返回
return new AppendEntriesRpcResult(this.raftServerMetaDataPersistentModule.getCurrentTerm(),true);
}
}
在工程的test目錄下,可以啟動一個5節點的MyRaft的服務叢集(用main方法啟動即可),通過修改其中的RaftClusterGlobalConfig類可以修改相關的設定。
public class RaftClusterGlobalConfig {
public static Registry registry = RegistryFactory.getRegistry(
new RegistryConfig(RegistryCenterTypeEnum.FAKE_REGISTRY.getCode(), "127.0.0.1:2181"));
/**
* raft的叢集設定
* */
public static final List<RaftNodeConfig> raftNodeConfigList = Arrays.asList(
new RaftNodeConfig("raft-1","127.0.0.1",8001)
,new RaftNodeConfig("raft-2","127.0.0.1",8002)
,new RaftNodeConfig("raft-3","127.0.0.1",8003)
,new RaftNodeConfig("raft-4","127.0.0.1",8004)
,new RaftNodeConfig("raft-5","127.0.0.1",8005)
);
public static final int electionTimeout = 3;
public static final Integer debugElectionTimeout = null;
public static final int HeartbeatInterval = 1;
/**
* N次心跳後,leader會自動模擬出現故障(退回follow,停止心跳廣播)
* N<=0代表不觸發自動模擬故障
*/
public static final int leaderAutoFailCount = 0;
/**
* 隨機化的選舉超時時間(毫秒)
* */
public static final Range<Integer> electionTimeoutRandomRange = new Range<>(150,500);
public static void initRaftRpcServer(String serverId){
RaftNodeConfig currentNodeConfig = RaftClusterGlobalConfig.raftNodeConfigList
.stream().filter(item->item.getServerId().equals(serverId)).findAny()
.orElseThrow(() -> new MyRaftException("serverId must in raftNodeConfigList"));
List<RaftNodeConfig> otherNodeList = RaftClusterGlobalConfig.raftNodeConfigList
.stream().filter(item->!item.getServerId().equals(serverId)).collect(Collectors.toList());
RaftConfig raftConfig = new RaftConfig(
currentNodeConfig,RaftClusterGlobalConfig.raftNodeConfigList);
raftConfig.setElectionTimeout(RaftClusterGlobalConfig.electionTimeout);
raftConfig.setDebugElectionTimeout(RaftClusterGlobalConfig.debugElectionTimeout);
raftConfig.setHeartbeatInternal(RaftClusterGlobalConfig.HeartbeatInterval);
raftConfig.setLeaderAutoFailCount(RaftClusterGlobalConfig.leaderAutoFailCount);
// 隨機化選舉超時時間的範圍
raftConfig.setElectionTimeoutRandomRange(RaftClusterGlobalConfig.electionTimeoutRandomRange);
RaftRpcServer raftRpcServer = new RaftRpcServer(raftConfig, RaftClusterGlobalConfig.registry);
List<RaftService> raftServiceList = raftRpcServer.getRpcProxyList(otherNodeList);
// raft服務,啟動!
raftRpcServer.init(raftServiceList);
}
}
驗證lab1中MyRaft leader選舉實現的正確性,可以通過以下幾個case簡單的驗證下:
在原始的raft演演算法的leader選舉中存在一個問題。具體場景舉例如下:
從本質上來說,這個分割區恢復後進行的新選舉是無意義的。且由於進行選舉會造成叢集短暫的不可用,因此最好能避免這個問題。
業界給出的解決方法是在真正的選舉前先發起一輪預選舉(preVote)。
MyRaft為了保持實現的簡單性,並沒有實現預選舉機制。但etcd、sofa-jraft等流行的開源raft系統都是實現了預選舉優化的,所以在這裡還是簡單介紹一下。