etcd/raft選舉原始碼解讀

2023-04-08 15:01:15

ETCD-raft筆記

0. 引言

該篇部落格基於etcd v3.5.7版本,首先會簡單介紹etcd/raft對Raft選舉部分的演演算法優化,然後通過原始碼分析etcd/raft的選舉實現。

1. etcd對於raft選舉演演算法優化措施

該優化措施均在raft博士論文中有講解

etcd/raft實現的與選舉有關的優化有Pre-VoteCheck Quorum、和Leader Lease。在這三種優化中,只有Pre-VoteLeader Lease最初是對選舉過程的優化,Check Quorum是為了更高效地實現線性一致性讀(Linearizable Read)而做出的優化,但是由於Leader Lease需要依賴Check Quorum,因此也放在這講。

1.1 Pre-Vote

如下圖所示,當Raft叢集的網路發生分割區時,會出現節點數達不到quorum(達成共識至少需要的節點數)的分割區,如圖中的Partition 1

在節點數能夠達到quorum的分割區中,選舉流程會正常進行,該分割區中的所有節點的term最終會穩定為新選舉出的leader節點的term。不幸的是,在節點數無法達到quorum的分割區中,如果該分割區中沒有leader節點,因為節點總是無法收到數量達到quorum的投票而不會選舉出新的leader,所以該分割區中的節點在election timeout超時後,會增大term並行起下一輪選舉,這導致該分割區中的節點的term會不斷增大。

如果網路一直沒有恢復,這是沒有問題的。但是,如果網路分割區恢復,此時,達不到quorum的分割區中的節點的term值會遠大於能夠達到quorum的分割區中的節點的term,這會導致能夠達到quorum的分割區的leader退位(step down)並增大自己的term到更大的term,使叢集產生一輪不必要的選舉。

Pre-Vote機制就是為了解決這一問題而設計的,其解決的思路在於不允許達不到quorum的分割區正常進入投票流程,也就避免了其term號的增大。為此,Pre-Vote引入了「預投票」,也就是說,當節點election timeout超時時,它們不會立即增大自身的term並請求投票,而是先發起一輪預投票。收到預投票請求的節點不會退位。只有當節點收到了達到quorum的預投票響應時,節點才能增大自身term號並行起投票請求。這樣,達不到quorum的分割區中的節點永遠無法增大term,也就不會在分割區恢復後引起不必要的一輪投票。

1.2 Check Quorum

在Raft演演算法中,保證線性一致性讀取的最簡單的方式,就是講讀請求同樣當做一條Raft提議,通過與其它紀錄檔相同的方式執行,因此這種方式也叫作Log Read。顯然,Log Read的效能很差。而在很多系統中,讀多寫少的負載是很常見的場景。因此,為了提高讀取的效能,就要試圖繞過紀錄檔機制。

但是,直接繞過紀錄檔機制從leader讀取,可能會讀到陳舊的資料,也就是說存在stale read的問題。在下圖的場景中,假設網路分割區前,Node 5是整個叢集的leader。在網路發生分割區後,Partition 0分割區中選舉出了新leader,也就是圖中的Node 1

但是,由於網路分割區,Node 5無法收到Partition 0中節點的訊息,Node 5不會意識到叢集中出現了新的leader。此時,雖然它不能成功地完成紀錄檔提交,但是如果讀取時繞過了紀錄檔,它還是能夠提供讀取服務的。這會導致連線到Node 5的client讀取到陳舊的資料。

Check Quorum可以減輕這一問題帶來的影響,其機制也非常簡單:讓leader每隔一段時間主動地檢查follower是否活躍。如果活躍的follower數量達不到quorum,那麼說明該leader可能是分割區前的舊leader,所以此時該leader會主動退位轉為follower。

需要注意的是,Check Quorum並不能完全避免stale read的發生,只能減小其發生時間,降低影響。如果需要嚴格的線性一致性,需要通過其它機制實現。

1.3 Leader Lease

分散式系統中的網路環境十分複雜,有時可能出現網路不完全分割區的情況,即整個整個網路拓補圖是一個連通圖,但是可能並非任意的兩個節點都能互相存取。

這種現象不止會出現在網路故障中,還會出現在成員變更中。在通過ConfChange移除節點時,不同節點應用該ConfChange的時間可能不同,這也可能導致這一現象發生——TODO (舉個例子)。

在上圖的場景下,Node 1Node 2之間無法通訊。如果它們之間的通訊中斷前,Node 1是叢集的leader,在通訊中斷後,Node 2無法再收到來自Node 1的心跳。因此,Node 2會開始選舉。如果在Node 2發起選舉前,Node 1Node 3中都沒有新的紀錄檔,那麼Node 2仍可以收到能達到quorum的投票(來自Node 2本身的投票和來自Node 3的投票),併成為leader。

Leader Lease機制對投票引入了一條新的約束以解決這一問題:當節點在election timeout超時前,如果收到了leader的訊息,那麼它不會為其它發起投票或預投票請求的節點投票。也就是說,Leader Lease機制會阻止了正常工作的叢集中的節點給其它節點投票。

Leader Lease需要依賴Check Quorum機制才能正常工作。接下來筆者通過一個例子說明其原因。

假如在一個5個節點組成的Raft叢集中,出現了下圖中的分割區情況:Node 1Node 2互通,Node 3Node 4Node 5之間兩兩互通、Node 5與任一節點不通。在網路分割區前,Node 1是叢集的leader。

在既沒有Leader Lease也沒有Check Quorum的情況下,Node 3Node 4會因收不到leader的心跳而發起投票,因為Node 2Node 3Node 4互通,該分割區節點數能達到quorum,因此它們可以選舉出新的leader。

而在使用了Leader Lease而不使用Check Quorum的情況下,由於Node 2仍能夠收到原leader Node 1的心跳,受Leader Lease機制的約束,它不會為其它節點投票。這會導致即使整個叢集中存在可用節點數達到quorum的分割區,但是叢集仍無法正常工作。

而如果同時使用了Leader LeaseCheck Quorum,那麼在上圖的情況下,Node 1會在election timeout超時後因檢測不到數量達到quorum的活躍節點而退位為follower。這樣,Node 2Node 3Node 4之間的選舉可以正常進行。

1.4 引入的新問題與解決方案

引入Pre-VoteCheck Quorum(etcd/raft的實現中,開啟Check Quorum會自動開啟Leader Lease)會為Raft演演算法引入一些新的問題。

當一個節點收到了term比自己低的訊息時,原本的邏輯是直接忽略該訊息,因為term比自己低的訊息僅可能是因網路延遲的遲到的舊訊息。然而,開啟了這些機制後,在如下的場景中會出現問題:

場景1: 如上圖所示,在開啟了Check Quorum / Leader Lease後(假設沒有開啟Pre-VotePre-Vote的問題在下一場景中討論),數量達不到quorum的分割區中的leader會退位,且該分割區中的節點永遠都無法選舉出leader,因此該分割區的節點的term會不斷增大。當該分割區與整個叢集的網路恢復後,由於開啟了Check Quorum / Leader Lease,即使該分割區中的節點有更大的term,由於原分割區的節點工作正常,它們的選舉請求會被丟棄。同時,由於該節點的term比原分割區的leader節點的term大,因此它會丟棄原分割區的leader的請求。這樣,該節點永遠都無法重新加入叢集,也無法當選新leader。(詳見issue #5451issue #5468)。

場景2: Pre-Vote機制也有類似的問題。如上圖所示,假如發起預投票的節點,在預投票通過後正要發起正式投票的請求時出現網路分割區。此時,該節點的term會高於原叢集的term。而原叢集因沒有收到真正的投票請求,不會更新term,繼續正常執行。在網路分割區恢復後,原叢集的term低於分割區節點的term,但是紀錄檔比分割區節點更新。此時,該節點發起的預投票請求因沒有紀錄檔落後會被丟棄,而原叢集leader發給該節點的請求會因term比該節點小而被丟棄。同樣,該節點永遠都無法重新加入叢集,也無法當選新leader。(詳見issue #8501issue #8525)。

場景3: 在更復雜的情況中,比如,在變更設定時,開啟了原本沒有開啟的Pre-Vote機制。此時可能會出現與上一條類似的情況,即可能因term更高但是log更舊的節點的存在導致整個叢集的死鎖,所有節點都無法預投票成功。這種情況比上一種情況更危險,上一種情況只有之前分割區的節點無法加入叢集,在這種情況下,整個叢集都會不可用。(詳見issue #8501issue #8525)。

為了解決以上問題,節點在收到term比自己低的請求時,需要做特殊的處理。處理邏輯也很簡單:

  1. 如果收到了term比當前節點term低的leader的訊息,且叢集開啟了Check Quorum / Leader LeasePre-Vote,那麼傳送一條term為當前term的訊息,令term低的節點成為follower。(針對場景1場景2
  2. 對於term比當前節點term低的預投票請求,無論是否開啟了Check Quorum / Leader LeasePre-Vote,都要通過一條term為當前term的訊息,迫使其轉為follower並更新term。(針對場景3

2. etcd中Raft選舉的實現

2.1 發起vote或pre-vote流程

2.1.1 Election timeout

在叢集剛啟動時,所有節點的狀態都為 follower,等待超時觸發 leader election。超時時間由 Config 設定。etcd/raft 沒有用真實時間而是使用邏輯時鐘,當呼叫 tick 的次數超過指定次數時觸發超時事件。 對於 followercandidate 而言,tick 中會判斷是否超時,若超時則會本地生成一個 MsgHup 型別的訊息觸發 leader election:

// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

2.1.2 MsgHup訊息處理與hup方法

etcd/raft通過raft結構體的Step方法實現Raft狀態機的狀態轉移。Step 方法是訊息處理的入口,不同 state 處理的訊息不同且處理方式不同,所以有多個 step 方法:

  • raft.Step(): 訊息處理的入口,做一些共性的檢查,如 term,或處理所有狀態都需要處理的訊息。若需要更進一步處理,會根據狀態 呼叫下面的方法:
    • raft.stepLeader(): leader 狀態的訊息處理方法;
    • raft.stepFollower(): follower 狀態的訊息處理方法;
    • raft.stepCandidate(): candidate 狀態的訊息處理方法。
func (r *raft) Step(m pb.Message) error {
	// ... ...
	switch m.Type {
	case pb.MsgHup:
		if r.preVote {
			r.hup(campaignPreElection)
		} else {
			r.hup(campaignElection)
		}
	// ... ...
	}
	// ... ...
}

Step方法在處理MsgHup訊息時,會根據當前設定中是否開啟了Pre-Vote機制,以不同的CampaignType呼叫hup方法。CampaignType是一種列舉型別(go語言的列舉實現方式),其可能值如下表所示。

描述
campaignPreElection 表示Pre-Vote的預選舉階段。
campaignElection 表示正常的選舉階段(僅超時選舉,不包括Leader Transfer)。
campaignTransfer 表示Leader Transfer階段。

接下來對hup的實現進行分析。

func (r *raft) hup(t CampaignType) {
	if r.state == StateLeader {
		r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
		return
	}

	if !r.promotable() {
		r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
		return
	}
	ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
	if err != nil {
		r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
	}
	if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
		r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
		return
	}

	r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
	r.campaign(t)
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
	pr := r.prs.Progress[r.id]
	return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
}

總結當節點出現以下情況時不能發起選舉:

  1. 節點被移出叢集
  2. 節點是learner
  3. 節點還有未儲存到穩定儲存的snapshot
  4. 節點有還未被應用的叢集設定變更ConfChange訊息

2.1.3 campaign

官方註釋很詳細了,因此不多廢筆墨解釋

// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
    // 因為呼叫campaign的方法不止有hup,campaign方法首先還是會檢查promotable()是否為真。
	if !r.promotable() {
		// This path should not be hit (callers are supposed to check), but
		// better safe than sorry.
		r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
	}
	var term uint64
	var voteMsg pb.MessageType
	if t == campaignPreElection {
		r.becomePreCandidate()
		voteMsg = pb.MsgPreVote
		// PreVote RPCs are sent for the next term before we've incremented r.Term.
		term = r.Term + 1
	} else {
		r.becomeCandidate()
		voteMsg = pb.MsgVote
		term = r.Term
	}
	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
		// We won the election after voting for ourselves (which must mean that
		// this is a single-node cluster). Advance to the next state.
		if t == campaignPreElection {
			r.campaign(campaignElection)
		} else {
			r.becomeLeader()
		}
		return
	}
	var ids []uint64
	{
		//won't send requestVote to learners, beacause learners[] are not in incoming[] and outgoing[]
		idMap := r.prs.Voters.IDs()
		ids = make([]uint64, 0, len(idMap))
		for id := range idMap {
			ids = append(ids, id)
		}
		sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
	}
	for _, id := range ids {
		if id == r.id {
			continue
		}
		r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

		var ctx []byte
		if t == campaignTransfer {
			ctx = []byte(t)
		}
		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

至此,該節點已向其他節點傳送MsgVote或MsgPreVote訊息

2.2 節點收到vote或pre-vote訊息處理流程

處理vote或pre-vote訊息都在Step方法內,不會進入各自的step方法,有效的MsgPreVote必須滿足其中一個條件(m.Term > r.Term)

官方註釋很詳細,簡單易理解,因此不多廢筆墨解釋

func (r *raft) Step(m pb.Message) error {
	// Handle the message term, which may result in our stepping down to a follower.
	switch {
	case m.Term == 0:
		// local message
	case m.Term > r.Term:
		if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
			force := bytes.Equal(m.Context, []byte(campaignTransfer))
			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
			if !force && inLease {
				// If a server receives a RequestVote request within the minimum election timeout
				// of hearing from a current leader, it does not update its term or grant its vote
				return nil
			}
		}
		switch {
		case m.Type == pb.MsgPreVote:
			// Never change our term in response to a PreVote
		case m.Type == pb.MsgPreVoteResp && !m.Reject:
			// We send pre-vote requests with a term in our future. If the
			// pre-vote is granted, we will increment our term when we get a
			// quorum. If it is not, the term comes from the node that
			// rejected our vote so we should become a follower at the new
			// term.
		default:
			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
				r.becomeFollower(m.Term, m.From)
			} else {
				r.becomeFollower(m.Term, None)
			}
		}

	case m.Term < r.Term:
        // ........
	}

	switch m.Type {
	case pb.MsgHup:
        // ........
	case pb.MsgVote, pb.MsgPreVote:
		// We can vote if this is a repeat of a vote we've already cast...
		canVote := r.Vote == m.From ||
			// ...we haven't voted and we don't think there's a leader yet in this term...
			(r.Vote == None && r.lead == None) ||
			// ...or this is a PreVote for a future term...
			(m.Type == pb.MsgPreVote && m.Term > r.Term)
		// ...and we believe the candidate is up to date.
		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
			// Note: it turns out that that learners must be allowed to cast votes.
			// This seems counter- intuitive but is necessary in the situation in which
			// a learner has been promoted (i.e. is now a voter) but has not learned
			// about this yet.
			// For example, consider a group in which id=1 is a learner and id=2 and
			// id=3 are voters. A configuration change promoting 1 can be committed on
			// the quorum `{2,3}` without the config change being appended to the
			// learner's log. If the leader (say 2) fails, there are de facto two
			// voters remaining. Only 3 can win an election (due to its log containing
			// all committed entries), but to do so it will need 1 to vote. But 1
			// considers itself a learner and will continue to do so until 3 has
			// stepped up as leader, replicates the conf change to 1, and 1 applies it.
			// Ultimately, by receiving a request to vote, the learner realizes that
			// the candidate believes it to be a voter, and that it should act
			// accordingly. The candidate's config may be stale, too; but in that case
			// it won't win the election, at least in the absence of the bug discussed
			// in:
			// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.

			// When responding to Msg{Pre,}Vote messages we include the term
			// from the message, not the local term. To see why, consider the
			// case where a single node was previously partitioned away and
			// it's local term is now out of date. If we include the local term
			// (recall that for pre-votes we don't update the local term), the
			// (pre-)campaigning node on the other end will proceed to ignore
			// the message (it ignores all out of date messages).
			// The term in the original message and current local term are the
			// same in the case of regular votes, but different for pre-votes.
			r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
			if m.Type == pb.MsgVote {
				// Only record real votes.
				r.electionElapsed = 0
				r.Vote = m.From
			}
		} else {
			r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
		}

	default:
        // ...........
	}
	return nil
}

注意:節點同意投票訊息帶的是m.Term,拒絕投票訊息是r.Term,如果拒接MsgPreVote訊息,那麼傳送pre-vote訊息的節點就變為

r.Termfollower,在2.3.1節內體現

2.3 節點收到處理MsgPreVoteResp或MsgVoteResp訊息流程

2.3.1 Step內處理

根據2.2節可以看到Step內有這樣一段程式碼:在2.2節最後有解釋,官方也給了詳細註釋

		switch {
		case m.Type == pb.MsgPreVote:
			// Never change our term in response to a PreVote
		case m.Type == pb.MsgPreVoteResp && !m.Reject:
			// We send pre-vote requests with a term in our future. If the
			// pre-vote is granted, we will increment our term when we get a
			// quorum. If it is not, the term comes from the node that
			// rejected our vote so we should become a follower at the new
			// term.
		default:
			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
				r.becomeFollower(m.Term, m.From)
			} else {
				r.becomeFollower(m.Term, None)
			}
		}

2.3.2 stepCandidate內處理

	case myVoteRespType:
		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
		r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
		switch res {
		case quorum.VoteWon:
			if r.state == StatePreCandidate {
				r.campaign(campaignElection)
			} else {
				r.becomeLeader()
				r.bcastAppend()
			}
		case quorum.VoteLost:
			// pb.MsgPreVoteResp contains future term of pre-candidate
			// m.Term > r.Term; reuse r.Term
			r.becomeFollower(r.Term, None)
		}

如果預投票成功,則發起新一輪正式投票。如果正式投票成功,則轉為leader,接著後續操作

2.4 轉變領導者身份

2.4.1 becomeLeader()

func (r *raft) becomeLeader() {
	// TODO(xiangli) remove the panic when the raft implementation is stable
	if r.state == StateFollower {
		panic("invalid transition [follower -> leader]")
	}
	r.step = stepLeader
	r.reset(r.Term)
	r.tick = r.tickHeartbeat
	r.lead = r.id
	r.state = StateLeader
	// Followers enter replicate mode when they've been successfully probed
	// (perhaps after having received a snapshot as a result). The leader is
	// trivially in this state. Note that r.reset() has initialized this
	// progress with the last index already.
	r.prs.Progress[r.id].BecomeReplicate()

	// Conservatively set the pendingConfIndex to the last index in the
	// log. There may or may not be a pending config change, but it's
	// safe to delay any future proposals until we commit all our
	// pending log entries, and scanning the entire tail of the log
	// could be expensive.
	r.pendingConfIndex = r.raftLog.lastIndex()

	emptyEnt := pb.Entry{Data: nil}
	if !r.appendEntry(emptyEnt) {
		// This won't happen because we just called reset() above.
		r.logger.Panic("empty entry was dropped")
	}
	// As a special case, don't count the initial empty entry towards the
	// uncommitted log quota. This is because we want to preserve the
	// behavior of allowing one entry larger than quota if the current
	// usage is zero.
	r.reduceUncommittedSize([]pb.Entry{emptyEnt})
	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

candidate轉變為leader,需要在自己的log中append一條當前term的紀錄檔,並廣播給其他節點