【Elasticsearch】ES選主流程分析

2022-09-13 12:02:06

Raft協定

Raft是分散式系統中的一種共識演演算法,用於在叢集中選舉Leader管理叢集。Raft協定中有以下角色:

Leader(領導者):叢集中的領導者,負責管理叢集。

Candidate(候選者):具有競選Leader資格的角色,如果叢集需要選舉Leader,節點需要先轉為候選者角色才可以發起競選。

Follower(跟隨者 ):Leader的跟隨者,接收和處理來自Leader的訊息,與Leader之間保持通訊,如果通訊超時或者其他原因導致節點與Leader之間通訊失敗,節點會認為叢集中沒有Leader,就會轉為候選者發起競選,推薦自己成為Leader。

Raft協定中還有一個Term(任期)的概念,任期是隨著選舉的舉行而變化,一般是單調進行遞增,比如說叢集中當前的任期為1,此時某個節點發現叢集中沒有Leader,開始發起競選,此時任期編號就會增加為2,表示進行了新一輪的選舉。一般會為Term較大的那個節點進行投票,當某個節點收到的投票數達到了Quorum,一般是叢集中的節點數/2 + 1,將會被選舉為Leader。

Elasticsearch選主

Elasticsearch在7.0版本以前採用Bully演演算法進行選主,7.0以後使用了Raft協定,但沒有完全按照Raft協定來實現,而是做了一些調整,ES選主流程如下:

  1. 節點的初始化狀態為Candidate;

  2. 啟動選舉任務,向探測到的叢集中其他節點傳送PRE_VOTE投票請求,請求中會攜帶節點的Term資訊;

  3. 其他節點收到PRE_VOTE投票請求後,對請求進行處理:

    (1)更新自己收到過的最大的Term

    如果請求中的Term比自己的Term大並且當前節點是Leader節點,意味著當前的Leader可能已經過期,其他節點已經開始競選Leader,所以此時當前節點需要放棄Leader的身份,重新發起選舉。

    (2)根據當前節點記錄的Leader資訊決定是否投票給發起者,然後向發起者返回投票響應資訊:

    • 如果當前節點記錄的叢集Leader為空,同意投票給發起者。

    • 如果當前節點記錄的叢集Leader不為空,但是與本次發起的節點一致,同樣同意投票。

    • 如果當前節點記錄的叢集Leader為空,但是與本次發起的節點不同,拒絕投票給發起者。

  4. 發起者收到其他節點對PRE_VOTE投票請求的響應,判斷是否得到了大多數投票,如果是進入下一步;

  5. 發起者向叢集中的節點傳送StartJoin請求,邀請節點加入叢集,傳送StartJoin請求的時候會將Term增加1,但是發起者的Term暫不更新,這與Raft協定在發起選舉的時候就對Term增加的操作不一樣;

  6. 其他節點收到StartJoin請求,更新自己的Term資訊,處理完畢後向發起者傳送JOIN請求,JOIN請求中攜帶了節點的Term資訊

    收到StartJoin請求時,只要請求中的Term比當前節點的Term大,當前節點都會同意為發起者進行投票,這裡也與Raft協定規定的每個任期內只能為一個節點進行投票不一致。

    既然節點可以多次進行投票,那麼就有可能產生多個Leader,對於這種情況,Elasticsearch會選擇最後那個選舉成功的節點成為Leader。

  7. 發起者收到其他節點傳送的JOIN請求後,會統計收到的JOIN請求個數,如果達到了大多數投票,即可成為Leader;

    發起者收到JOIN請求時也會校驗自己的Term是否比JOIN請求中的Term大,在第5步中發起者並未更新自己的Term,所以首次收到JOIN請求後,Term資訊會小於JOIN請求中的Term,這裡發起者會模擬一個JOIN請求給自己,也就是自己為自己投一票。

  8. 發起者成為Leader;

ES選主記憶體在的問題
由於每個節點可以多次進行投票,有可能出現節點競爭激烈導致一直未選出leader的問題。關於問題的解決方案可以參考以下兩篇文章:
【張超】留意Elasticsearch 7.x 可能無法選主的問題

【Guohang Huang】騰訊 Elasticsearch 7.x 大叢集選主優化

Elasticsearch選舉流程分析

在ES啟動節點的時候,會呼叫Coordinator的startInitialJoin方法開啟選舉:

// Node
public class Node implements Closeable {
   public Node start() throws NodeValidationException {
       // ...
       // 啟動叢集選舉
       coordinator.startInitialJoin();
       // ...
   }
}

// Coordinator
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    public void startInitialJoin() {
        synchronized (mutex) {
            // 先轉為候選者
            becomeCandidate("startInitialJoin");
        }
        // 啟動選舉任務
        clusterBootstrapService.scheduleUnconfiguredBootstrap();
    }
}

成為候選節點

becomeCandidate方法主要做一些Leader選舉的前置工作:

  1. 判斷節點的角色是否是候選者,因為Raft協定中候選者才可以發起leader選舉,所以第一步需要把當前節點轉為候選者節點;
  2. 初始化PreVoteCollector裡面狀態資訊,它是一個二元組Tuple<DiscoveryNode, PreVoteResponse>DiscoveryNode記錄了叢集的leader節點,PreVoteResponse裡面記錄節點的Term資訊,包括當前Term、上一次接受的Term(叢集Term)和上一次接受的版本(叢集版本),在投票選舉的時候會用到;
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {

    void becomeCandidate(String method) {
        // 判斷是否持有鎖
        assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
        logger.debug("{}: coordinator becoming CANDIDATE in term {} (was {}, lastKnownLeader was [{}])", method,
            getCurrentTerm(), mode, lastKnownLeader);
        // 如果不是CANDIDATE
        if (mode != Mode.CANDIDATE) {
            final Mode prevMode = mode;
            // 設定為CANDIDATE
            mode = Mode.CANDIDATE;
            cancelActivePublication("become candidate: " + method);
          
            //...
          
            // 如果之前是Leader
            if (prevMode == Mode.LEADER) {
                // 清除Master相關資訊
                cleanMasterService();
            }

            // ...
        }
        // 更新PreVoteCollector裡面記錄的leader節點和Term資訊,這裡還沒有選舉出leader,所以傳入的是null
        preVoteCollector.update(getPreVoteResponse(), null);
    }

    private PreVoteResponse getPreVoteResponse() {
        // 建立PreVoteResponse,記錄當前Term、上一次接受的Term和上一次接受的版本
        return new PreVoteResponse(
            getCurrentTerm(),
            coordinationState.get().getLastAcceptedTerm(),
            coordinationState.get().getLastAcceptedState().version()
        );
    }
  
 
}

PreVoteCollector的二元組如下,DiscoveryNode為leader節點,PreVoteResponse記錄了Term相關資訊,其他節點發起選舉時,返回給發起者的投票結果就是PreVoteResponse

public class PreVoteCollector {
    // 二元組
    private volatile Tuple<DiscoveryNode, PreVoteResponse> state; 

    public void update(final PreVoteResponse preVoteResponse, @Nullable final DiscoveryNode leader) {
        logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader);
        // 初始化狀態資訊
        state = new Tuple<>(leader, preVoteResponse);
    }
}

Leader選舉

scheduleUnconfiguredBootstrap方法中,對節點是否有Master角色許可權進行了判斷,如果沒有Master角色許可權,直接返回終止選舉,否則啟動選舉任務,獲取叢集中發現的節點,呼叫startBootstrap開始啟動:

public class ClusterBootstrapService {
    scheduleUnconfiguredBootstrap() {
        if (unconfiguredBootstrapTimeout == null) {
            return;
        }
        // Master角色許可權校驗
        if (transportService.getLocalNode().isMasterNode() == false) {
            return;
        }
        logger.info(
            "no discovery configuration found, will perform best-effort cluster bootstrapping after [{}] "
                + "unless existing master is discovered",
            unconfiguredBootstrapTimeout
        );
        // 執行啟動任務
        transportService.getThreadPool().scheduleUnlessShuttingDown(unconfiguredBootstrapTimeout, Names.GENERIC, new Runnable() {
            @Override
            public void run() {
                // 獲取叢集中發現的節點
                final Set<DiscoveryNode> discoveredNodes = getDiscoveredNodes();
                logger.debug("performing best-effort cluster bootstrapping with {}", discoveredNodes);
                // 啟動
                startBootstrap(discoveredNodes, emptyList());
            }
            // ...
        });
    }
}

啟動選舉

startBootstrap方法中,首先判斷探測到的叢集節點discoveryNodes是否有Master角色許可權,然後呼叫doBootstrap進行啟動。

doBootstrap方法中,建立了VotingConfiguration,然後呼叫votingConfigurationConsumer觸發選舉,並進行了異常捕捉,如果出現異常進行重試:

public class ClusterBootstrapService {
    private void startBootstrap(Set<DiscoveryNode> discoveryNodes, List<String> unsatisfiedRequirements) {
        // 判斷髮現的節點是否有Master角色許可權
        assert discoveryNodes.stream().allMatch(DiscoveryNode::isMasterNode) : discoveryNodes;
        assert unsatisfiedRequirements.size() < discoveryNodes.size() : discoveryNodes + " smaller than " + unsatisfiedRequirements;
        if (bootstrappingPermitted.compareAndSet(true, false)) {
            // 啟動
            doBootstrap(
                // 建立VotingConfiguration
                new VotingConfiguration(
                    Stream.concat(
                        discoveryNodes.stream().map(DiscoveryNode::getId),
                        unsatisfiedRequirements.stream().map(s -> BOOTSTRAP_PLACEHOLDER_PREFIX + s)
                    ).collect(Collectors.toSet())
                )
            );
        }
    }
  
    private void doBootstrap(VotingConfiguration votingConfiguration) {
        assert transportService.getLocalNode().isMasterNode();
        try {
            // 觸發投票
            votingConfigurationConsumer.accept(votingConfiguration);
        } catch (Exception e) {
            logger.warn(() -> "exception when bootstrapping with " + votingConfiguration + ", rescheduling", e);
            // 如果出現異常,進行重試
            transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.GENERIC, new Runnable() {
                @Override
                public void run() {
                    doBootstrap(votingConfiguration);
                }
                // ... 
            });
        }
    }

}

votingConfigurationConsumer是一個函數語言程式設計介面,它接收一個表示式,在Coordinator的建構函式中可以看到對ClusterBootstrapService進行範例化時,傳入的是setInitialConfiguration方法,所以votingConfigurationConsumer.accept(votingConfiguration)會執行CoordinatorsetInitialConfiguration方法:

public class ClusterBootstrapService {
    // votingConfigurationConsumer
    private final Consumer<VotingConfiguration> votingConfigurationConsumer;
  
    public ClusterBootstrapService(
        Settings settings,
        TransportService transportService,
        Supplier<Iterable<DiscoveryNode>> discoveredNodesSupplier,
        BooleanSupplier isBootstrappedSupplier,
        Consumer<VotingConfiguration> votingConfigurationConsumer
    ) {
       //...
       // 設定votingConfigurationConsumer
       this.votingConfigurationConsumer = votingConfigurationConsumer;
    }
}

public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {

    public Coordinator(
       // ...
    ) {
        // ...
        // 初始化ClusterBootstrapService
        this.clusterBootstrapService = new ClusterBootstrapService(
            settings,
            transportService,
            this::getFoundPeers,
            this::isInitialConfigurationSet,
            this::setInitialConfiguration  // 傳入setInitialConfiguration方法
        );
        // ...
    }
}

setInitialConfiguration方法的處理邏輯如下:

  1. 首先進行一系列的校驗,如果校驗不通過不能進行選舉:
    • 是否已經初始化過;
    • 當前節點是有Master角色許可權;
    • 叢集中的節點是否包含當前節點;
    • 叢集中的節點個數是否達到了Quorum個;
  2. 呼叫preVoteCollector的update方法,更新當前節點記錄的Leader節點和Term資訊;
  3. 呼叫startElectionScheduler方法啟動選舉;
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    public boolean setInitialConfiguration(final VotingConfiguration votingConfiguration) {
        synchronized (mutex) {
            // 獲取叢集狀態
            final ClusterState currentState = getStateForMasterService();
            // 判斷是否初始化過
            if (isInitialConfigurationSet()) {
                logger.debug("initial configuration already set, ignoring {}", votingConfiguration);
                return false;
            }
            // 校驗Master角色許可權
            if (getLocalNode().isMasterNode() == false) {
                logger.debug("skip setting initial configuration as local node is not a master-eligible node");
                throw new CoordinationStateRejectedException(
                    "this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node"
                );
            }
            // 如果節點ID中不包含當前節點的ID
            if (votingConfiguration.getNodeIds().contains(getLocalNode().getId()) == false) {
                logger.debug("skip setting initial configuration as local node is not part of initial configuration");
                throw new CoordinationStateRejectedException("local node is not part of initial configuration");
            }
            // ...
            // 判斷節點個數是否達到Quorum
            if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).toList()) == false) {
                // ...
                throw new CoordinationStateRejectedException(
                    "not enough nodes discovered to form a quorum in the initial configuration "
                        + "[knownNodes="
                        + knownNodes
                        + ", "
                        + votingConfiguration
                        + "]"
                );
            }
            // ...
            // 更新
            preVoteCollector.update(getPreVoteResponse(), null); 
            // 開始選舉 
            startElectionScheduler();
            return true;
        }
    }
}

發起選舉

startElectionScheduler方法用於啟動選舉任務,任務是非同步執行的:

  1. 校驗節點是否是CANDIDATE節點,如果是繼續往下進行;
  2. 如果當前節點叢集健康狀態處於UNHEALTHY,直接返回;
  3. 呼叫PreVoteCollector的start方法發起投票;
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
   private void startElectionScheduler() {
        assert electionScheduler == null : electionScheduler;
        // 校驗Master角色許可權
        if (getLocalNode().isMasterNode() == false) {
            return;
        }
        final TimeValue gracePeriod = TimeValue.ZERO;
        // 啟動選舉任務
        electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
            @Override
            public void run() {
                synchronized (mutex) {
                    // 如果是CANDIDATE節點
                    if (mode == Mode.CANDIDATE) {
                        // 獲取之前的叢集狀態
                        final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
                        if (localNodeMayWinElection(lastAcceptedState) == false) {
                            logger.trace("skip prevoting as local node may not win election: {}", lastAcceptedState.coordinationMetadata());
                            return;
                        }
                        // 獲取叢集狀態資訊
                        final StatusInfo statusInfo = nodeHealthService.getHealth();
                        // 如果處於UNHEALTHY狀態
                        if (statusInfo.getStatus() == UNHEALTHY) {
                            logger.debug("skip prevoting as local node is unhealthy: [{}]", statusInfo.getInfo());
                            return;
                        }
                        if (prevotingRound != null) {
                            prevotingRound.close();
                        }
                        // 發起投票
                        prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
                    }
                }
            }
            // ...
        });
    }
}

PreVoteCollector的start方法中,建立了PreVotingRound,然後呼叫PreVotingRound的start的方法發起投票:

 public class PreVoteCollector {
    public Releasable start(final ClusterState clusterState, final Iterable<DiscoveryNode> broadcastNodes) {
        // 建立PreVotingRound
        PreVotingRound preVotingRound = new PreVotingRound(clusterState, state.v2().getCurrentTerm());
        // 發起投票
        preVotingRound.start(broadcastNodes);
        return preVotingRound;
    }
}

傳送PRE_VOTE投票請求

PreVotingRoundPreVoteCollector的內部類,在start方法中,會遍歷探測到的叢集節點,然後進行遍歷,向每一個節點傳送PRE_VOTE投票請求,投票請求響應資訊處理是在handlePreVoteResponse方法中處理的:

public class PreVoteCollector {        
    private class PreVotingRound implements Releasable {
        PreVotingRound(final ClusterState clusterState, final long currentTerm) {
            // 叢集狀態
            this.clusterState = clusterState;
            // 構建投票請求
            preVoteRequest = new PreVoteRequest(transportService.getLocalNode(), currentTerm);
        }

        void start(final Iterable<DiscoveryNode> broadcastNodes) {
            logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
            // 遍歷發現的節點,當前節點向每一個節點傳送投票請求
            broadcastNodes.forEach(
                // 傳送PRE_VOTE請求
                n -> transportService.sendRequest(
                    n,
                    REQUEST_PRE_VOTE_ACTION_NAME,
                    preVoteRequest,
                    new TransportResponseHandler<PreVoteResponse>() {
                        // ...
                        @Override
                        public void handleResponse(PreVoteResponse response) {
                            // 處理返回的響應
                            handlePreVoteResponse(response, n);
                        }
                        // ...
                    }
                )
            );
        }
     }
}

節點對PRE_VOTE投票請求的處理

PreVoteCollector的建構函式中可以看到,註冊了REQUEST_PRE_VOTE_ACTION_NAME請求處理器,對PRE_VOTE請求的處理是呼叫handlePreVoteRequest方法進行的,處理完畢後呼叫sendResponse返回響應資訊:

public class PreVoteCollector {
    // 選舉任務
    private final Runnable startElection;
    // 更新最大Term
    private final LongConsumer updateMaxTermSeen;
  
    PreVoteCollector(
        final TransportService transportService,
        final Runnable startElection,
        final LongConsumer updateMaxTermSeen,
        final ElectionStrategy electionStrategy,
        NodeHealthService nodeHealthService
    ) {
        this.transportService = transportService;
        this.startElection = startElection;
        this.updateMaxTermSeen = updateMaxTermSeen;
        this.electionStrategy = electionStrategy;
        this.nodeHealthService = nodeHealthService;
        // 註冊PRE_VOTE請求處理器
        transportService.registerRequestHandler(
            REQUEST_PRE_VOTE_ACTION_NAME,
            Names.CLUSTER_COORDINATION,
            false,
            false,
            PreVoteRequest::new,
            (request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)) // 呼叫handlePreVoteRequest處理請求
        );
    }
}

handlePreVoteRequest之前,首先看Coordinator的建構函式對PreVoteCollector範例化時傳入的引數,主要關注startElectionupdateMaxTermSeen,它們都是函數語言程式設計介面,從範例化的程式碼中可以看到分別對應Coordinator的傳入的startElectionupdateMaxTermSeen方法,在後面會用到這兩個方法:

public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher { 
   public Coordinator(
   // ...
   ){
       // ...
       this.preVoteCollector = new PreVoteCollector(
            transportService,
            this::startElection, // 傳入startElection方法,啟動選舉
            this::updateMaxTermSeen, // 傳入updateMaxTermSeen,更新收到的最大Term
            electionStrategy,
            nodeHealthService
        );
        // ...
   }
}

handlePreVoteRequest方法處理邏輯如下:

  1. 對term進行比較,呼叫updateMaxTermSeen.accept()更新收到的最大Term;
  2. 獲取當前節點記錄的叢集Leader節點和Term資訊;
  3. 如果Leader節點為空,表示還沒有Leader節點,返回響應同意發起投票的節點成為leader;
  4. 如果leader不為空,但是與發起請求的節點是同一個節點,同樣支援發起請求的節點成為leader;
  5. 其他情況,表示已經存在leader,拒絕投票請求
public class PreVoteCollector {
    
    private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
        // 比較Term,更新maxTermSeen
        updateMaxTermSeen.accept(request.getCurrentTerm());
        Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
        assert state != null : "received pre-vote request before fully initialised";
        // 獲取當前節點記錄的叢集Leader節點
        final DiscoveryNode leader = state.v1();
        // 獲取當前節點的Term資訊
        final PreVoteResponse response = state.v2();
        // 獲取健康狀態
        final StatusInfo statusInfo = nodeHealthService.getHealth();
        // 如果當前節點的狀態處於UNHEALTHY
        if (statusInfo.getStatus() == UNHEALTHY) {
            String message = "rejecting " + request + " on unhealthy node: [" + statusInfo.getInfo() + "]";
            logger.debug(message);
            throw new NodeHealthCheckFailureException(message);
        }
        // 如果leader為空,表示還沒有Leader節點,返回響應同意發起投票的節點成為leader
        if (leader == null) {
            return response;
        }
        // 如果leader不為空,但是與發起請求的節點是同一個節點,同樣支援發起請求的節點成為leader
        if (leader.equals(request.getSourceNode())) {
            return response;
        }
        // 其他情況,表示已經存在leader,拒絕投票請求
        throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
    }
}
updateMaxTermSeen

上面說過updateMaxTermSeen指向CoordinatorupdateMaxTermSeen方法,處理邏輯如下:

  1. 比較當前節點收到過的最大的Term與請求中的Term,選擇較大的那個作為maxTermSeen的值進行更新;
  2. 如果當前節點是Leader並且請求中的Term大於當前節點的Term,表示當前節點的資訊可能已經過期,需要放棄當前的Leader角色,重新發起選舉
    • 呼叫ensureTermAtLeast檢查Term,確保是最新的Term,在ensureTermAtLeast方法中會判斷,如果當前節點Term小於請求中的Term將當前節點轉為候選者;
    • 呼叫startElection方法重新進行選舉;
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    private void updateMaxTermSeen(final long term) {
        synchronized (mutex) {
            // 當前節點收到過的最大的Term與請求中的term,如果請求中的Term較大,maxTermSeen的值將被更新為請求中的Term的值
            maxTermSeen = Math.max(maxTermSeen, term);
            // 獲取當前節點的term
            final long currentTerm = getCurrentTerm();
            // 如果當前節點是Leader並且maxTermSeen大於當前節點的Term,請求中的Term較大,這裡maxTermSeen的值就是請求中的Term,所以也是在比較請求中的Term是否大於當前節點的Term
            if (mode == Mode.LEADER && maxTermSeen > currentTerm) {
                if (publicationInProgress()) {
                    logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm);
                } else {
                    try {
                        logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, bumping term", maxTermSeen, currentTerm);
                        // 確保Term是最新
                        ensureTermAtLeast(getLocalNode(), maxTermSeen);
                        // 發起選舉
                        startElection();
                    } catch (Exception e) {
                        logger.warn(new ParameterizedMessage("failed to bump term to {}", maxTermSeen), e);
                        becomeCandidate("updateMaxTermSeen");
                    }
                }
            }
        }
    }
}
ensureTermAtLeast

ensureTermAtLeast方法中,判斷當前節點的Term是否小於請求中的Term:

  • 如果是則建立StartJoinRequest然後呼叫joinLeaderInTerm方法,joinLeaderInTerm方法會返回一個JOIN資訊;

    在叢集選舉Leader的時候,某個節點成為Leader之前,會向其他節點傳送StartJoin請求,這裡進行模擬傳送,當前節點向自己傳送一個StartJoinRequest進行處理,更新當前節點的Term資訊,後面會詳細講解StartJoin請求的處理。

  • 如果不是,返回一個空的JOIN資訊;

joinLeaderInTerm方法中,會呼叫handleStartJoin處理StartJoin請求,它會更新當前節點Term資訊為最新,之後判斷當前節點是否是CANDIDATE,如果不是需要將節點轉為CANDIDATE:

public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
        assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
        // 判斷當前節點Term是否小於請求中的Term
        if (getCurrentTerm() < targetTerm) {
            // 呼叫joinLeaderInTerm
            return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm)));
        }
        return Optional.empty();
    }
  
    private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
        synchronized (mutex) {
            logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
            final Join join = coordinationState.get().handleStartJoin(startJoinRequest);
            lastJoin = Optional.of(join);
            peerFinder.setCurrentTerm(getCurrentTerm());
            // 如果不是CANDIDATE轉為CANDIDATE
            if (mode != Mode.CANDIDATE) {
                becomeCandidate("joinLeaderInTerm");
                followersChecker.updateFastResponseState(getCurrentTerm(), mode);
                preVoteCollector.update(getPreVoteResponse(), null);
            }
            return join;
        }
    }
}

PRE_VOTE響應處理

發起者收到叢集節點返回的PRE_VOTE請求響應時,在handlePreVoteResponse方法中進行處理:

  1. 同樣呼叫updateMaxTermSeen更新當前節點收到的最大Term;
  2. 如果響應中的Term大於當前節點的Term, 或者Term相等但是版本號大於當前節點的版本號,直接返回不進行處理,否則進入下一步;
  3. 走到這裡表示認同當前節點成為Leader節點,將得到的投票資訊放入preVotesReceived
  4. 判斷是否得到了大多數投票,也就是收到的投票數是否超過了Quorum,如果未超過直接返回,如果超過表示當前節點可以成為Leader;
  5. 通過startElection開始處理成為Leader前的操作;
public class PreVoteCollector { 
    private class PreVotingRound implements Releasable {      
        private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
            if (isClosed.get()) {
                logger.debug("{} is closed, ignoring {} from {}", this, response, sender);
                return;
            }
            // 處理最大Term
            updateMaxTermSeen.accept(response.getCurrentTerm());
            // 如果響應中的Term大於當前節點的Term, 或者Term相等但是版本號大於當前節點的版本號
            if (response.getLastAcceptedTerm() > clusterState.term()
                || (response.getLastAcceptedTerm() == clusterState.term() && response.getLastAcceptedVersion() > clusterState.version())) {
                logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
                return;
            }
            // 記錄得到的投票
            preVotesReceived.put(sender, response);
            // ...
            // 判斷是否得到了大多數投票
            if (electionStrategy.isElectionQuorum(
                clusterState.nodes().getLocalNode(),
                localPreVoteResponse.getCurrentTerm(),
                localPreVoteResponse.getLastAcceptedTerm(),
                localPreVoteResponse.getLastAcceptedVersion(),
                clusterState.getLastCommittedConfiguration(),
                clusterState.getLastAcceptedConfiguration(),
                voteCollection
            ) == false) {
                logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
                return;
            }
            // ...
            // 開始選舉
            startElection.run();
        }
    }
}

成為Leader

邀請節點加入叢集

在成為Leader前,需要向叢集中的節點傳送StartJoin請求,邀請節點加入叢集:

  1. 建立StartJoin請求,請求中設定了Term資訊,取當前節點的Term和收到過最大的Term中較大的那個值並加1
  2. 呼叫sendStartJoinRequest傳送StartJoin請求;
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher { 
   private void startElection() {
        synchronized (mutex) {
            // 是否是CANDIDATE
            if (mode == Mode.CANDIDATE) {
                if (localNodeMayWinElection(getLastAcceptedState()) == false) {
                    logger.trace("skip election as local node may not win it: {}", getLastAcceptedState().coordinationMetadata());
                    return;
                }
                // 建立StartJoin請求,這裡可以看到在請求中的Term,設定為最大Term + 1
                final StartJoinRequest startJoinRequest = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
                logger.debug("starting election with {}", startJoinRequest);
                // 呼叫sendStartJoinRequest傳送StartJoin請求
                getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
            }
        }
   }
}
StartJoin請求傳送

StartJoin請求表示邀請節點加入叢集資訊,接收者收到請求後會向發起者傳送JOIN請求表示進行加入,所以發起者對StartJoin的響應不需要做什麼處理,等待接收者傳送JOIN請求即可:

public class JoinHelper {
    void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
        assert startJoinRequest.getSourceNode().isMasterNode()
            : "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
        // 傳送START_JOIN請求
        transportService.sendRequest(destination, START_JOIN_ACTION_NAME, startJoinRequest, new TransportResponseHandler.Empty() {
            @Override
            public void handleResponse(TransportResponse.Empty response) {
                // 什麼也不處理
                logger.debug("successful response to {} from {}", startJoinRequest, destination);
            }

            @Override
            public void handleException(TransportException exp) {
                logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp);
            }
        });
    }
}
StartJoin請求處理

JoinHelper的建構函式中,註冊了START_JOIN請求處理器,在收到START_JOIN請求時,會呼叫joinLeaderInTerm處理,然後呼叫sendJoinRequest向傳送者傳送JOIN請求:

public class JoinHelper {
    JoinHelper(
       // ...
    ) {
        // 註冊START_JOIN_ACTION_NAME請求處理
        transportService.registerRequestHandler(
            START_JOIN_ACTION_NAME,
            Names.CLUSTER_COORDINATION,
            false,
            false,
            StartJoinRequest::new,
            (request, channel, task) -> {
                final DiscoveryNode destination = request.getSourceNode();
                // 傳送join請求
                sendJoinRequest(destination, currentTermSupplier.getAsLong(), Optional.of(joinLeaderInTerm.apply(request))); // 呼叫joinLeaderInTerm處理
                channel.sendResponse(Empty.INSTANCE);
            }
        );
    }
}
joinLeaderInTerm

joinLeaderInTerm方法用於處理StartJoin請求,返回一個Join物件並行送給發起者,發起者會根據返回的Join資訊計算得到的票數,以此決定是否成為LeaderjoinLeaderInTerm處理邏輯如下:

  1. 呼叫handleStartJoin處理StartJoin請求,它會從請求中獲取Term資訊並更新到當前節點的CurrentTerm中,並返回Join物件,用於向發起者回復投票結果;
  2. 如果節點不是CANDIDATE,將節點轉為CANDIDATE
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {

    private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
        synchronized (mutex) {
            logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
            // 處理StartJoin請求
            final Join join = coordinationState.get().handleStartJoin(startJoinRequest);
            lastJoin = Optional.of(join);
            peerFinder.setCurrentTerm(getCurrentTerm());
            // 如果節點不是CANDIDATE,轉為CANDIDATE
            if (mode != Mode.CANDIDATE) {
                becomeCandidate("joinLeaderInTerm"); 
            } else {
                followersChecker.updateFastResponseState(getCurrentTerm(), mode);
                preVoteCollector.update(getPreVoteResponse(), null);
            }
            return join;
        }
    }
}
更新CurrentTerm

在handleStartJoin方法中從請求中獲取Term資訊並更新到當前節點的CurrentTerm中

  1. 如果StartJoin請求中的Term小於或者等於當前節點的Term,丟擲異常;
  2. 更新當前節點的CurrentTerm為StartJoin請求中的Term
  3. 返回一個Join物件,裡面記錄當前節點加入叢集的資訊,包括當前節點資訊、傳送startJoin請求的節點(選舉為Leader的節點),當前節點的Term,當前節點上一次接受的Term、當前節點上一次接受的版本

handleStartJoin方法中只要請求中的Term大於當前節點的Term,都會繼續往下進行,最後返回一個Join物件,這意味著當前節點同意為發起者進行投票,也就是說Elasticsearch允許一個節點多次進行投票,並沒有按照Raft協定中的規定每個任期內只能給一個節點投票。

public class CoordinationState {
  
    public Join handleStartJoin(StartJoinRequest startJoinRequest) {
        // 如果StartJoin請求中的Term小於或者等於當前節點的Term,丟擲異常
        if (startJoinRequest.getTerm() <= getCurrentTerm()) {
            logger.debug(
                "handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]",
                startJoinRequest,
                getCurrentTerm()
            );
            throw new CoordinationStateRejectedException(
                "incoming term " + startJoinRequest.getTerm() + " not greater than current term " + getCurrentTerm()
            );
        }
        logger.debug("handleStartJoin: leaving term [{}] due to {}", getCurrentTerm(), startJoinRequest);
        // ...
        // 更新當前節點的CurrentTerm
        persistedState.setCurrentTerm(startJoinRequest.getTerm());
        // 判斷當前節點的Term是否與startJoin請求的一致
        assert getCurrentTerm() == startJoinRequest.getTerm();
        lastPublishedVersion = 0;
        lastPublishedConfiguration = getLastAcceptedConfiguration();
        startedJoinSinceLastReboot = true;
        electionWon = false;
        joinVotes = new VoteCollection();
        publishVotes = new VoteCollection();
        // 返回JOIN資訊,包括當前節點、傳送startJoin請求的節點、當前節點的Term、當前節點上一次接受的Term、當前節點上一次接受的版本
        return new Join(localNode, startJoinRequest.getSourceNode(), getCurrentTerm(), getLastAcceptedTerm(), getLastAcceptedVersion());
    }
}

節點加入叢集

向Leader傳送JOIN請求

StartJoin請求處理完畢後呼叫sendJoinRequest向發起者傳送JOIN請求,表示加入叢集:

public class JoinHelper {
    public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin) {
        assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
        final StatusInfo statusInfo = nodeHealthService.getHealth();
        // 如果處於UNHEALTHY狀態不進行傳送
        if (statusInfo.getStatus() == UNHEALTHY) {
            logger.debug("dropping join request to [{}]: [{}]", destination, statusInfo.getInfo());
            return;
        }
        // 構建JOIN請求體
        final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
        // ...
        if (pendingOutgoingJoins.putIfAbsent(dedupKey, pendingJoinInfo) == null) {
            logger.debug("attempting to join {} with {}", destination, joinRequest);
            pendingJoinInfo.message = PENDING_JOIN_CONNECTING;
            // 連線節點
            transportService.connectToNode(destination, new ActionListener<>() {
                @Override
                public void onResponse(Releasable connectionReference) {
                    // ...
                    clusterApplier.onNewClusterState(
                        "joining " + destination.descriptionWithoutAttributes(),
                        () -> null,
                        new ActionListener<>() {
                            @Override
                            public void onResponse(Void unused) {
                                // ....
                                pendingJoinInfo.message = PENDING_JOIN_WAITING_RESPONSE;
                                // 傳送JOIN請求
                                transportService.sendRequest(
                                    destination,
                                    JOIN_ACTION_NAME,
                                    joinRequest,
                                    TransportRequestOptions.of(null, TransportRequestOptions.Type.PING),
                                    new TransportResponseHandler.Empty() {
                                        @Override
                                        public void handleResponse(TransportResponse.Empty response) {
                                            pendingJoinInfo.message = PENDING_JOIN_WAITING_STATE;
                                            pendingOutgoingJoins.remove(dedupKey);
                                            logger.debug("successfully joined {} with {}", destination, joinRequest);
                                            lastFailedJoinAttempt.set(null);
                                        }
                                        // ...
                                    }
                                );
                            }

                            // ...
                        }
                    );
                }
                // ...
            });

        } else {
            logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest);
        }
    }
}
Leader處理Join請求

JoinHelper的建構函式中,註冊了JOIN請求處理器,是通過joinHandler來處理請求的,它同樣是函數語言程式設計介面,在Coordinator對JoinHelper進行範例化的時候,可以看到傳入的是handleJoinRequest方法:

public class JoinHelper {
    JoinHelper(
      // ...
      BiConsumer<JoinRequest, ActionListener<Void>> joinHandler,
      // ...
    ) {
        // ...
        transportService.registerRequestHandler(
            JOIN_ACTION_NAME,
            Names.CLUSTER_COORDINATION,
            false,
            false,
            JoinRequest::new,
            (request, channel, task) -> joinHandler.accept(
                request,
                new ChannelActionListener<Empty, JoinRequest>(channel, JOIN_ACTION_NAME, request).map(ignored -> Empty.INSTANCE)
            )
        );
        // ...
    }
}
// Coordinator
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    public Coordinator(
       // ...
    ) {
        // ...
        this.joinHelper = new JoinHelper(
            allocationService,
            masterService,
            clusterApplier,
            transportService,
            this::getCurrentTerm,
            this::handleJoinRequest, // handleJoinRequest方法
            // ...
        );
        // ...
    }
}

Coordinator的handleJoinRequest方法中,會對傳送JOIN的節點進行連線,進行JOIN請求驗證:

  1. 先呼叫processJoinRequest處理收到的JOIN請求;
  2. 呼叫validateJoinRequest方法對JOIN請求進行驗證;
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joinListener) {
        // ...
        // 連線節點
        transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() {
            @Override
            public void onResponse(Releasable response) {
                boolean retainConnection = false;
                try {
                    // 對JOIN請求進行驗證
                    validateJoinRequest(
                        joinRequest,
                        ActionListener.runBefore(joinListener, () -> Releasables.close(response))
                            .delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l)) // 處理請求
                    );
                    retainConnection = true;
                } catch (Exception e) {
                    joinListener.onFailure(e);
                } finally {
                    if (retainConnection == false) {
                        Releasables.close(response);
                    }
                }
            }
           // ...
        });
    }
}
JOIN請求處理

processJoinRequest處理邏輯如下:

  1. 呼叫updateMaxTermSeen更新收到最大的Term;
  2. 判斷是否已經成功競選為Leader,因為發起者會收到多個節點傳送的JOIN請求,每次處理JOIN請求會判斷是否獲取了大多數投票,並將結果更新到CoordinationStateelectionWon變數中,為了不重複呼叫becomeLeader,這裡先獲取最近一次更新的值,記為prevElectionWon,用於判斷後面是否需要呼叫becomeLeader成為Leader;
  3. 呼叫handleJoin進行處理,處理的時候會判斷是否獲取了大多數的投票,並更新CoordinationStateelectionWon的值;
  4. 再次從CoordinationState中獲取electionWon值進行判斷,如果prevElectionWon為false但是當前的electionWon為true,也就是之前未收到大多數投票的,但是處理當前的JOIN請求時達到了大多數投票,成功競選為Leader,則呼叫becomeLeader成為Leader;
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    private void processJoinRequest(JoinRequest joinRequest, ActionListener<Void> joinListener) {
        assert Transports.assertNotTransportThread("blocking on coordinator mutex and maybe doing IO to increase term");
        // 獲取JOIN資訊
        final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
        try {
            synchronized (mutex) {
                // 更新最大Term
                updateMaxTermSeen(joinRequest.getTerm());
                // 獲取叢集協調狀態
                final CoordinationState coordState = coordinationState.get();
                // 獲取上一次的狀態,是否成功選舉為Leader
                final boolean prevElectionWon = coordState.electionWon();
                // 處理JOIN
                optionalJoin.ifPresent(this::handleJoin);
                joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinListener);
                // 如果之前未成為Leader並且當前選舉Leader成功
                if (prevElectionWon == false && coordState.electionWon()) {
                    // 成為Leader
                    becomeLeader();
                }
            }
        } catch (Exception e) {
            joinListener.onFailure(e);
        }
    }
}

接下來看下handleJoin的處理過程:

  1. 首先呼叫ensureTermAtLeast方法確保當前節點是最新的Term,ensureTermAtLeast前面已經講過,會確保當前的節點Term是最新,如果已經是最新什麼也不做,如果不是將建立StartJoinRequest然後呼叫joinLeaderInTerm方法,joinLeaderInTerm方法會返回一個JOIN資訊,表示當前節點要加入一個叢集的資訊;

    在節點傳送StartJoin請求時可知,對請求中的Term進行了加1但是節點自己的Term並未更新,所以首次收到發回的JOIN請求進入handleJoin時,JOIN請求中的Term會比當前節點的Term大1,那麼ensureTermAtLeast就會返回一個JOIN資訊,然後再次呼叫handleJoin處理JOIN請求,這裡可以理解為節點向自己發了一個JOIN請求(通過建立JOIN物件的方式),給自己投一票

  2. 上面說過CoordinationStateelectionWon記錄了是否已經選舉為Leader,所以這裡進行判斷,如果已經被選舉成為了Leader,呼叫handleJoinIgnoringExceptions處理JOIN請求,這個方法底層還是呼叫CoordinationStatehandleJoin進行處理,只不過在外層進行了異常捕捉,會忽略丟擲的異常,因為節點之前已經成功選舉了Leader,所以本次JION請求處理無關緊要,為了不讓異常影響後續的流程,所以對異常進行一個捕捉;

  3. 如果還未成功選舉為Leader,呼叫CoordinationStatehandleJoin處理請求,與第一步不一樣的是這個不會對異常進行捕捉,因為此時還沒成為Leader,如果有異常資訊需要丟擲;

public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    // 獲取CoordinationState
    private final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); 
    private void handleJoin(Join join) {
        synchronized (mutex) {
            // 確保Term最新,如果不是最新,會返回一個JOIN物件,呼叫handleJoin進行處理,這裡可以理解為節點給自己投了一票
            ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin);
            // 如果已經被選舉為Leader
            if (coordinationState.get().electionWon()) {
                // 呼叫對異常進行捕捉的handleJoin方法
                final boolean isNewJoinFromMasterEligibleNode = handleJoinIgnoringExceptions(join);
                final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
                if (isNewJoinFromMasterEligibleNode && establishedAsMaster && publicationInProgress() == false) {
                    scheduleReconfigurationIfNeeded();
                }
            } else { // 如果還未為成為Leader
                // CoordinationState的handleJoin處理請求
                coordinationState.get().handleJoin(join); 
            }
        }
    }
  
    private boolean handleJoinIgnoringExceptions(Join join) {
        try {
            // CoordinationState的handleJoin處理請求
            return coordinationState.get().handleJoin(join);
        } catch (CoordinationStateRejectedException e) {
            logger.debug(() -> "failed to add " + join + " - ignoring", e);
            return false;
        }
    }
}

在CoordinationState的handleJoin中,首先會對Term和版本資訊進行一系列的校驗,如果校驗通過,記錄收到的JOIN請求個數,表示當前已經成功收到的投票數,然後呼叫isElectionQuorum判斷是否獲得了大多數的投票,也就是獲得的投票數達到了Quorum,並將值更新到electionWon中:

public class CoordinationState { 
    public boolean handleJoin(Join join) {
        assert join.targetMatches(localNode) : "handling join " + join + " for the wrong node " + localNode;
        // 如果收到的JOIN請求Term與當前節點的Term不一致丟擲異常
        if (join.getTerm() != getCurrentTerm()) {
            logger.debug("handleJoin: ignored join due to term mismatch (expected: [{}], actual: [{}])", getCurrentTerm(), join.getTerm());
            throw new CoordinationStateRejectedException(
                "incoming term " + join.getTerm() + " does not match current term " + getCurrentTerm()
            );
        }
        // ...
        // 獲取上一次的Term
        final long lastAcceptedTerm = getLastAcceptedTerm();
        // 如果請求中的上一次接受的Term大於當前節點的lastAcceptedTerm,丟擲異常
        if (join.getLastAcceptedTerm() > lastAcceptedTerm) {
            logger.debug( "handleJoin: ignored join as joiner has a better last accepted term (expected: <=[{}], actual: [{}])", lastAcceptedTerm, join.getLastAcceptedTerm());
            throw new CoordinationStateRejectedException( "incoming last accepted term "
                    + join.getLastAcceptedTerm() + " of join higher than current last accepted term "
                    + lastAcceptedTerm
            );
        }
        // 對比版本
        if (join.getLastAcceptedTerm() == lastAcceptedTerm && join.getLastAcceptedVersion() > getLastAcceptedVersion()) {
            logger.debug("handleJoin: ignored join as joiner has a better last accepted version (expected: <=[{}], actual: [{}]) in term {}", getLastAcceptedVersion(), join.getLastAcceptedVersion(), lastAcceptedTerm);
            throw new CoordinationStateRejectedException("incoming last accepted version "
                    + join.getLastAcceptedVersion() + " of join higher than current last accepted version "
                    + getLastAcceptedVersion() + " in term " + lastAcceptedTerm);
        }
        // ...
        // 記錄JOIN投票
        boolean added = joinVotes.addJoinVote(join);
        boolean prevElectionWon = electionWon;
        // 判斷是否得到了大多數投票,這裡會更新electionWon的值
        electionWon = isElectionQuorum(joinVotes);
        assert prevElectionWon == false || electionWon : // we cannot go from won to not won
            "locaNode= " + localNode + ", join=" + join + ", joinVotes=" + joinVotes;
        logger.debug(
            "handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}", join, join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion()
        );
        // 如果得到了大多數投票並且上一次沒有選舉為Leader
        if (electionWon && prevElectionWon == false) {
            logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes;
            lastPublishedVersion = getLastAcceptedVersion();
        }
        return added;
    }
}

轉為Leader

當節點收到了大多數投票後,就會呼叫becomeLeader轉為Leader,這裡會將節點由CANDIDATE轉為LEADER角色,然後呼叫preVoteCollector的update更新Term和Leader節點資訊:

public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
    private void becomeLeader() {
        assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
        // 是否是CANDIDATE
        assert mode == Mode.CANDIDATE : "expected candidate but was " + mode;
        // 是否有Master角色許可權
        assert getLocalNode().isMasterNode() : getLocalNode() + " became a leader but is not master-eligible";
        logger.debug("handleJoinRequest: coordinator becoming LEADER in term {} (was {}, lastKnownLeader was [{}])", getCurrentTerm(), mode,lastKnownLeader);
        // 轉為Leader
        mode = Mode.LEADER;
        joinAccumulator.close(mode);
        // 設定為LeaderJoinAccumulator
        joinAccumulator = joinHelper.new LeaderJoinAccumulator();
        lastKnownLeader = Optional.of(getLocalNode());
        peerFinder.deactivate(getLocalNode());
        clusterFormationFailureHelper.stop();
        closePrevotingAndElectionScheduler();
        // 更新Leader資訊和Term資訊
        preVoteCollector.update(getPreVoteResponse(), getLocalNode());
        assert leaderChecker.leader() == null : leaderChecker.leader();
        followersChecker.updateFastResponseState(getCurrentTerm(), mode);
    }
}

參考

【張超】深入理解 Elasticsearch 7.x 新的叢集協調層

【陳亮亮】Elasticsearch 新版選主流程

【政採雲技術團隊】Elasticsearch系列之二選主7.x之後

Elasticsearch版本:8.3