開發raft時用到的比較主流的兩個庫是Etcd Raft 和hashicorp Raft,網上也有一些關於這兩個庫的討論。之前分析過etcd Raft,發現該庫相對hashicorp Raft比較難以理解,其最大的問題是沒有實現網路層,實現難度比較大,因此本文在實現時使用了hashicorp Raft。
下文中會參考consul的一致性協定來講解如何實現Raft協定。
Raft節點總是處於三種狀態之一: follower, candidate, leader。一開始,所有的節點都是follower,該狀態下,節點可以從leader接收log,並參與選舉,如果一段時間內沒有接收到任何Log entry,則節點會自提升到candidate狀態。在candidate狀態下,節點會請求其他節點的選舉,如果一個candidate接收到大部分節點(仲裁數目)的認同,就會被提升為leader。leader必須接收新的Log entry,並複製到所有其他follower。
如果使用者無法接受舊的資料,則所有的請求必須由leader執行。
一旦一個叢集有了leader,就可以接收新的Log entry。使用者端可以請求leader追加一個新的Log entry。Leader會將Log entry寫入持久化儲存,並嘗試將其複製給仲裁數目的follower。一旦Log entry被commit,就可以將該Log entry apply到FSM。FSM是應用特定的儲存,在Consul中,使用 MemDB來維護叢集狀態。
無限量複製log的方式是不可取的。Raft 提供了一種機制,可以對當前狀態進行快照並壓縮log。由於 FSM 的抽象,FSM 的狀態恢復必須與replay log的狀態相同。Raft 可以捕獲某個時刻的 FSM 狀態,然後移除用於達到該狀態的所有log。這些操作可以在沒有使用者干預的情況下自動執行,防止無限使用磁碟,同時最小化replay log所花費的時間。
Raft Consensus中所有的操作都必須經過Leader,因此需要保證所有的請求都能夠傳送到Leader節點,然後由Leader將請求傳送給所有Follower,並等待大部分(仲裁數目的)節點處理完成該命令,leader通過選舉機制產生。之後每個follower會執行如下操作:
如下表述來自Raft Protocol Overview,它的意思是,如果是查詢類的請求,直接從FSM返回結果即可,如果是修改類的請求,則需要通過raft.Apply
來保證變更的一致性
所有raft叢集中的成員都知道當前的leader,當一個RPC請求到達一個非leader的成員時,它會將該請求轉發給當前的leader。如果是查詢型別的RPC,意味著它是唯讀的,leader會基於FSM的當前狀態生成結果;如果是事務型別的RPC,意味著它需要修改狀態,Leader會生成一條新的log entry,並執行
Raft.Apply
,當log entry被提交併apply到FSM之後,事務才算執行完成。
Apply是資料進入Raft的介面,整個Raft的主要作用是維護資料操作的一致性。在上圖中,由兩個apply:一個是Raft.Apply
(內部會通過applyCh
傳遞log
),其也是外部資料的入口。另一個是FSM.Apply,其資料來源頭是Raft.Apply
。FSM基於Raft實現了一致性讀寫。在上圖中可以看到,leader的FSM.Apply
是在資料commit成功(仲裁成功)之後才執行的,這樣就能以Raft的方式保證分散式場景下應用資料的一致性,可以將FSM.Apply
理解為應用資料的寫入操作。
Raft中的一條log
表示一個操作。使用hashicorp/raft
時應該將實現分為兩層:一層是底層的Raft,支援Raft資料的儲存、快照等,叢集的選舉和恢復等,這一部分由Raft模組自實現;另一層是應用層,需要由使用者實現FSM介面,FSM的介面並不對外,在Raft的處理過程中會呼叫FSM的介面來實現應用資料的儲存、備份和恢復等操作。這兩層都有資料的讀寫和快照實現,因此在理解上需要進行區分。
如果是新建的Raft節點,可以使用BootstrapCluster
方法初始化該節點。為避免非新的節點被初始化,在呼叫BootstrapCluster
前可以使用raft.HasExistingState
來判斷範例中是否包含相關狀態(logs,當前term或snapshot):
if (s.config.Bootstrap) && !s.config.ReadReplica {
hasState, err := raft.HasExistingState(log, stable, snap)
if err != nil {
return err
}
if !hasState {
configuration := raft.Configuration{
Servers: []raft.Server{
{
ID: s.config.RaftConfig.LocalID,
Address: trans.LocalAddr(),
},
},
}
if err := raft.BootstrapCluster(s.config.RaftConfig,
log, stable, snap, trans, configuration); err != nil {
return err
}
}
}
Raft節點的建立方法如下,如果儲存非空,則Raft會嘗試恢復該節點:
func NewRaft(conf *Config,
fsm FSM,
logs LogStore,
stable StableStore,
snaps SnapshotStore,
trans Transport) (*Raft, error) {
包括:
fsm
:由應用實現,用於處理應用資料。FSM中的資料來自底層的Raft log
。
logs
,stable
和snaps
:logs
(儲存Raft log
) ,stable
(儲存Raft選舉資訊,如角色、term等資訊) 可以使用raftboltdb.New
進行初始化, snaps
用於Leader和follower之間的批次資料同步以及(手動或自動)叢集恢復,可以使用raft.NewFileSnapshotStore
或raft.NewFileSnapshotStoreWithLogger
進行初始化。
trans
:Transport是raft叢集內部節點之間的資訊通道,節點之間需要通過該通道來同步log、選舉leader等。下面介面中的AppendEntriesPipeline
和AppendEntries
方法用於log
同步,RequestVote
用於leader選舉,InstallSnapshot
用於在follower 的log
落後過多的情況下,給follower傳送snapshot(批次log
)。
可以使用raft.NewTCPTransport
、raft.NewTCPTransportWithLogger
或raft.NewNetworkTransportWithConfig
方法來初始化trans。
type Transport interface {
...
AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error)
AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
// RequestVote sends the appropriate RPC to the target node.
RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error
InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
...
}
NewRaft
方法中會執行如下後臺任務:r.goFunc(r.run) //處理角色變更和RPC請求 r.goFunc(r.runFSM) //負責將logs apply到FSM r.goFunc(r.runSnapshots) //管理FSM的snapshot
為保證資料的一致性,只能通過leader寫入資料,因此需要及時瞭解leader的變更資訊,在Raft的設定中有一個變數NotifyCh chan<- bool
,當Raft變為leader時會將true
寫入該chan,通過讀取該chan來判斷本節點是否是leader。在初始化Raft設定的時候傳入即可:
leaderNotifyCh := make(chan bool, 10)
raftConfig.NotifyCh = leaderNotifyCh
還有其他方式可以獲取leader變更狀態:
如下方法可以生成一個chan,當本節點變為Leader時會傳送
true
,當本節點丟失Leader角色時傳送false
,該方法的用途與上述方式相同,但由於該方法沒有快取,可能導致丟失變更訊號,因此推薦使用上面的方式。func (r *Raft) LeaderCh() <-chan bool
至此已經完成了Raft的初始化。下面就是要實現初始化函數中要求實現的內容,主要就是實現FSM
介面。其中logs
、stable
、snaps
和trans
已經提到,使用現成的方法初始化即可。對於儲存來說,也可以根據需要採用其他方式,如S3。
下面是LogStore
、SnapshotStore
和StableStore
的介面定義。
type LogStore interface {//用於儲存Raft log
// FirstIndex returns the first index written. 0 for no entries.
FirstIndex() (uint64, error)
// LastIndex returns the last index written. 0 for no entries.
LastIndex() (uint64, error)
// GetLog gets a log entry at a given index.
GetLog(index uint64, log *Log) error
// StoreLog stores a log entry.
StoreLog(log *Log) error
// StoreLogs stores multiple log entries.
StoreLogs(logs []*Log) error
// DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error
}
type SnapshotStore interface {//用於快照的生成和恢復
// Create is used to begin a snapshot at a given index and term, and with
// the given committed configuration. The version parameter controls
// which snapshot version to create.
Create(version SnapshotVersion, index, term uint64, configuration Configuration,
configurationIndex uint64, trans Transport) (SnapshotSink, error)
// List is used to list the available snapshots in the store.
// It should return then in descending order, with the highest index first.
List() ([]*SnapshotMeta, error)
// Open takes a snapshot ID and provides a ReadCloser. Once close is
// called it is assumed the snapshot is no longer needed.
Open(id string) (*SnapshotMeta, io.ReadCloser, error)
}
type StableStore interface { //用於儲存叢集後設資料
Set(key []byte, val []byte) error
// Get returns the value for key, or an empty byte slice if key was not found.
Get(key []byte) ([]byte, error)
SetUint64(key []byte, val uint64) error
// GetUint64 returns the uint64 value for key, or 0 if key was not found.
GetUint64(key []byte) (uint64, error)
}
FSM基於Raft來實現,包含三個方法:
Apply
:在Raft完成commit索引之後,儲存應用資料。Snapshot
:用於支援log壓縮,可以儲存某個時間點的FSM快照。需要注意的是,由於Apply
和Snapshot
執行在同一個執行緒中(如run
和runFSM
執行緒),因此要求函數能夠快速返回,否則會阻塞Apply
的執行。在實現中,該函數只需捕獲指向當前狀態的指標,而對於IO開銷較大的操作,則放到FSMSnapshot.Persist
中執行。Restore
:用於從snapshot恢復FSMtype FSM interface {
// Apply is called once a log entry is committed by a majority of the cluster.
//
// Apply should apply the log to the FSM. Apply must be deterministic and
// produce the same result on all peers in the cluster.
//
// The returned value is returned to the client as the ApplyFuture.Response.
Apply(*Log) interface{}
// Snapshot returns an FSMSnapshot used to: support log compaction, to
// restore the FSM to a previous state, or to bring out-of-date followers up
// to a recent log index.
//
// The Snapshot implementation should return quickly, because Apply can not
// be called while Snapshot is running. Generally this means Snapshot should
// only capture a pointer to the state, and any expensive IO should happen
// as part of FSMSnapshot.Persist.
//
// Apply and Snapshot are always called from the same thread, but Apply will
// be called concurrently with FSMSnapshot.Persist. This means the FSM should
// be implemented to allow for concurrent updates while a snapshot is happening.
Snapshot() (FSMSnapshot, error)
// Restore is used to restore an FSM from a snapshot. It is not called
// concurrently with any other command. The FSM must discard all previous
// state before restoring the snapshot.
Restore(snapshot io.ReadCloser) error
}
FSMSnapshot
是實現快照需要實現的另一個介面,用於儲存持久化FSM狀態,後續可以通過FSM.Restore
方法恢復FSM。該介面不會阻塞Raft.Apply
,但在持久化FSM的資料時需要保證不影響Raft.Apply
的並行存取。
FSMSnapshot.Persist
的入參sink
是呼叫SnapshotStore.Creates
時的返回值。如果是通過raft.NewFileSnapshotStore
初始化了SnapshotStore
,則入參sink
的型別就是FileSnapshotStore。
FSMSnapshot.Persist
執行結束之後需要執行SnapshotSink.Close()
,如果出現錯誤,則執行SnapshotSink.Cancel()
。
// FSMSnapshot is returned by an FSM in response to a Snapshot
// It must be safe to invoke FSMSnapshot methods with concurrent
// calls to Apply.
type FSMSnapshot interface {
// Persist should dump all necessary state to the WriteCloser 'sink',
// and call sink.Close() when finished or call sink.Cancel() on error.
Persist(sink SnapshotSink) error
// Release is invoked when we are finished with the snapshot.
Release()
}
FSM的備份和恢復的邏輯比較難理解,一方面備份的資料儲存在Raft中,FSM介面是由Raft主動呼叫的,另一方面又需要由使用者實現FSM的備份和恢復邏輯,因此需要了解Raft是如何與FSM互動的。
FSM依賴snapshot來實現備份和恢復,snapshot中儲存的也都是FSM資訊。
當用戶執行RecoverCluster
介面時會呼叫FSM.Snapshot
觸發建立一個新的FSM snapshot
手動呼叫如下介面也會觸發建立FSM snapshot:
func (r *Raft) Snapshot() SnapshotFuture
Raft自動備份也會觸發建立FSM snapshot,預設時間為[120s, 240s]之間的隨機時間。
當用戶執行RecoverCluster
介面時會呼叫FSM.Restore
,用於手動恢復叢集
當用戶執行Raft.Restore
介面時會呼叫FSM.Restore
,用於手動恢復叢集
通過NewRaft建立Raft節點時會嘗試恢復snapshot(Raft.restoreSnapshot
-->Raft.tryRestoreSingleSnapshot
-->fsmRestoreAndMeasure
-->fsm.Restore
)
因此在正常情況下,Raft會不定期建立snapshot,且在建立Raft節點(新建或重啟)的時候也會嘗試通過snapshot來恢復FSM。
FSM的備份和恢復與SnapshotStore介面息息相關。
在備份FSM時的邏輯如下,首先通過SnapshotStore.Create
建立一個snapshot,然後初始化一個FSMSnapshot
範例,並通過FSMSnapshot.Persist
將FSM儲存到建立出的snapshot中:
sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans) //建立一個snapshot
snapshot, err := fsm.Snapshot() //初始化一個FSMSnapshot範例
snapshot.Persist(sink) //呼叫FSMSnapshot.Persist將FSM儲存到上面的snapshot中
恢復FSM的邏輯如下,首先通過SnapshotStore.List
獲取snapshots,然後通過SnapshotStore.Open
逐個開啟獲取到的snapshot,最後呼叫FSM.Restore
恢復FSM,其入參可以看做是snapshot的檔案描述符:
snapshots, err = snaps.List()
for _, snapshot := range snapshots {
_, source, err = snaps.Open(snapshot.ID)
crc := newCountingReadCloser(source)
err = fsm.Restore(crc)
// Close the source after the restore has completed
source.Close()
}
下面以consul的實現為例看下它是如何進行FSM的備份和恢復的。
FSM.Snapshot()
的作用就是返回一個SnapshotSink
介面物件,進而呼叫SnapshotSink.Persist
來持久化FSM。
下面是consul的SnapshotSink
實現,邏輯比較簡單,它將FSM持久化到了一個snapshot中,注意它在寫入snapshot前做了編碼(編碼型別為ChunkingStateType
):
// Persist saves the FSM snapshot out to the given sink.
func (s *snapshot) Persist(sink raft.SnapshotSink) error {
...
// Write the header
header := SnapshotHeader{
LastIndex: s.state.LastIndex(),
}
encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
if err := encoder.Encode(&header); err != nil {
sink.Cancel()
return err
}
...
if _, err := sink.Write([]byte{byte(structs.ChunkingStateType)}); err != nil {
return err
}
if err := encoder.Encode(s.chunkState); err != nil {
return err
}
return nil
}
func (s *snapshot) Release() {
s.state.Close()
}
備份時將FSM儲存在了snapshot中,恢復時讀取並解碼對應型別的snapshot即可:
// Restore streams in the snapshot and replaces the current state store with a
// new one based on the snapshot if all goes OK during the restore.
func (c *FSM) Restore(old io.ReadCloser) error {
defer old.Close()
...
handler := func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error {
switch {
case msg == structs.ChunkingStateType: //解碼資料
chunkState := &raftchunking.State{
ChunkMap: make(raftchunking.ChunkMap),
}
if err := dec.Decode(chunkState); err != nil {
return err
}
if err := c.chunker.State(chunkState); err != nil {
return err
}
...
default:
if msg >= 64 {
return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul OSS cannot restore it", msg)
} else {
return fmt.Errorf("Unrecognized msg type %d", msg)
}
}
return nil
}
if err := ReadSnapshot(old, handler); err != nil {
return err
}
...
return nil
}
// ReadSnapshot decodes each message type and utilizes the handler function to
// process each message type individually
func ReadSnapshot(r io.Reader, handler func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error) error {
// Create a decoder
dec := codec.NewDecoder(r, structs.MsgpackHandle)
// Read in the header
var header SnapshotHeader
if err := dec.Decode(&header); err != nil {
return err
}
// Populate the new state
msgType := make([]byte, 1)
for {
// Read the message type
_, err := r.Read(msgType)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
// Decode
msg := structs.MessageType(msgType[0])
if err := handler(&header, msg, dec); err != nil {
return err
}
}
}
至此已經完成了Raft的開發介紹。需要注意的是,FSM介面都是Raft內部呼叫的,使用者並不會直接與之互動。
更多參見:Raft Developer Documentation
將節點新增到叢集中,節點剛新增到叢集中時狀態是staging,當其ready之後就會被提升為voter,參與選舉。如果節點已經是voter,則該操作會更新服務地址。該方法必須在leader上呼叫:
func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture
如下方法用於新增一個只接收log entry、但不參與投票或commit log的節點:該方法必須在leader上呼叫
func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture
將節點從叢集中移除,如果移除的節點是leader,則會觸發leader選舉。該方法必須在leader上呼叫:
func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture
取消節點的投票權,節點不再參與投票或commit log。該方法必須在leader上呼叫:
func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture
重新載入節點設定:
func (r *Raft) ReloadConfig(rc ReloadableConfig) error
用於阻塞等待FSM apply所有操作。該方法必須在leader上呼叫:
func (r *Raft) Barrier(timeout time.Duration) Future
apply一個命令到FSM,該方法必須在leader上呼叫:
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture
從上面介面可以看到,在Raft協定中,必須通過leader才能寫入(apply)資料,在非leader的節點上執行
Apply()
會返回ErrNotLeader
的錯誤。
Apply
方法會呼叫LogStore
介面的StoreLogs
方法儲存log(cmd
)。Raft.applyCh負責將log傳送給FSM進行處理,最後通過dispatchLogs
將log分發給其他節點(dispatchLogs
會呼叫Transport.AppendEntries
來將log分發給對端)。在分散式環境中,外部請求可能通過LB轉發到非leader節點上,此時非leader節點需要將請求轉發到leader節點上進行處理,在consul中會通過ForwardRPC將請求轉發給leader,再由leader執行
Apply
操作。
當叢集中的節點少於仲裁數目時,叢集將無法正常運作,此時可以手動呼叫如下介面嘗試恢復叢集,但這樣會可能會導致原本正在複製的紀錄檔被commit。
最佳方式是停止所有節點,並在所有節點上執行RecoverCluster
,當叢集重啟之後,會發生選舉,Raft也會恢復運作。
func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport, configuration Configuration) error
通過如下方式可以讓叢集使用外部snapshot(如備份的snapshot)。注意該操作只適用於DR,且只能在Leader上執行。
func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error
獲取節點的狀態資訊:
func (r *Raft) Stats() map[string]string
返回當前leader的地址和叢集ID。如果當前沒有leader則返回空:
func (r *Raft) LeaderWithID() (ServerAddress, ServerID)
各個節點之間主要通過RPC來互動log和選舉資訊,可以分為RPC使用者端和RPC伺服器端。
RPC使用者端通過呼叫Transport介面方法來傳遞資料(如Leader執行Raft.Apply
log之後會呼叫Transport.AppendEntries
來分發log)。
RPC伺服器端的實現如下,其處理了不同型別的RPC請求,如AppendEntriesRequest
就是Leader執行Transport.AppendEntries
傳遞的請求內容:
func (r *Raft) processRPC(rpc RPC) {
if err := r.checkRPCHeader(rpc); err != nil {
rpc.Respond(nil, err)
return
}
switch cmd := rpc.Command.(type) {
case *AppendEntriesRequest:
r.appendEntries(rpc, cmd)
case *RequestVoteRequest:
r.requestVote(rpc, cmd)
case *InstallSnapshotRequest:
r.installSnapshot(rpc, cmd)
case *TimeoutNowRequest:
r.timeoutNow(rpc, cmd)
default:
r.logger.Error("got unexpected command",
"command", hclog.Fmt("%#v", rpc.Command))
rpc.Respond(nil, fmt.Errorf("unexpected command"))
}
}
實現Raft時需要考慮如下幾點:
FSM
和FSMSnapshot
這兩個介面Raft.Apply
介面實現,以保證FSM的資料一致性,而在讀取應用資料時,如果要求資料強一致,則需要從leader的FSM讀取,否則也可以從follower的FSM讀取在此次實現Raft的過程中,主要參考了stcache和consul的原始碼,其中FSM的實現參考了前者,而Raft的初始化和節點發現參考了後者。
原始碼結構如下:
- src
discovery #節點發現程式碼
raft #raft管理程式碼
rpc #請求轉發程式碼
service #主服務管理程式碼
discovery:採用serf來實現節點發現,它底層採用的還是memberlist,通過gossip來管理節點。
rpc:實現了非Leader節點向Leader節點轉發請求的功能,本demo僅實現了/api/v1/set
介面轉發,對於/api/v1/get
介面,則直接從本節點的FSM中獲取資料,因此get
介面不是強一致性的。
使用如下命令可以生成rpc模組的pb.go檔案:
$ protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./forward.proto
下面啟動3個節點來組成Raft叢集:
入參描述如下:
- httpAddress:與使用者互動的服務
- raftTCPAddress:Raft服務
- rpcAddress:請求轉發的gRpc服務
- serfAddress:serf節點發現服務
- dataDir:Raft儲存路徑,建立Raft節點時會用到
- bootstrap:該節點是否需要使用bootstrap方式啟動
- joinAddress:加入Raft叢集的地址,為
serfAddress
,可以新增多個,如add1,add2
第一個節點啟動時並沒有需要加入的叢集,因此第一個節點以bootstrap方式啟動,啟動後成為leader。
$ raft-example --httpAddress 0.0.0.0:5000 --raftTCPAddress 192.168.1.42:6000 --rpcAddress=0.0.0.0:7000 --serfAddress 192.168.1.42:8000 --dataDir /Users/charlie.liu/home/raftDatadir/node0 --bootstrap true
注意:
raftTCPAddress
不能為0.0.0.0
,否則raft會報錯誤:"local bind address is not advertisable",serfAddress
的地址最好也不要使用0.0.0.0
。
啟動第2、3個節點,後續的節點啟動的時候需要加入叢集,啟動的時候指定第一個節點的地址:
$ raft-example --httpAddress 0.0.0.0:5001 --raftTCPAddress 192.168.1.42:6001 --rpcAddress=0.0.0.0:7001 --serfAddress 192.168.1.42:8001 --dataDir /Users/charlie.liu/home/raftDatadir/node1 --joinAddress 192.168.1.42:8000
$ raft-example --httpAddress 0.0.0.0:5002 --raftTCPAddress 192.168.1.42:6002 --rpcAddress=0.0.0.0:7002 --serfAddress 192.168.1.42:8002 --dataDir /Users/charlie.liu/home/raftDatadir/node2 --joinAddress 192.168.1.42:8000
在節點啟動之後,就可以在Leader的標準輸出中可以看到Raft叢集中的成員資訊:
[INFO] raft: updating configuration: command=AddVoter server-id=192.168.1.42:6002 server-addr=192.168.1.42:6002 servers="[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]"
使用/api/maintain/stats
介面可以檢視各個節點的狀態,num_peers
展示了對端節點數目,state
展示了當前節點的角色。
$ curl 0.0.0.0:5000/api/maintain/stats|jq //node0為Leader
{
"applied_index": "6",
"commit_index": "6",
"fsm_pending": "0",
"last_contact": "0",
"last_log_index": "6",
"last_log_term": "2",
"last_snapshot_index": "0",
"last_snapshot_term": "0",
"latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
"latest_configuration_index": "0",
"num_peers": "2",
"protocol_version": "3",
"protocol_version_max": "3",
"protocol_version_min": "0",
"snapshot_version_max": "1",
"snapshot_version_min": "0",
"state": "Leader",
"term": "2"
}
$ curl 0.0.0.0:5001/api/maintain/stats|jq //node2為Follower
{
"applied_index": "6",
"commit_index": "6",
"fsm_pending": "0",
"last_contact": "15.996792ms",
"last_log_index": "6",
"last_log_term": "2",
"last_snapshot_index": "0",
"last_snapshot_term": "0",
"latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
"latest_configuration_index": "0",
"num_peers": "2",
"protocol_version": "3",
"protocol_version_max": "3",
"protocol_version_min": "0",
"snapshot_version_max": "1",
"snapshot_version_min": "0",
"state": "Follower",
"term": "2"
}
$ curl 0.0.0.0:5002/api/maintain/stats|jq //node2為Follower
{
"applied_index": "6",
"commit_index": "6",
"fsm_pending": "0",
"last_contact": "76.764584ms",
"last_log_index": "6",
"last_log_term": "2",
"last_snapshot_index": "0",
"last_snapshot_term": "0",
"latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
"latest_configuration_index": "0",
"num_peers": "2",
"protocol_version": "3",
"protocol_version_max": "3",
"protocol_version_min": "0",
"snapshot_version_max": "1",
"snapshot_version_min": "0",
"state": "Follower",
"term": "2"
}
停掉上述Demo中的Leader節點(node0),可以看到node1稱為新的leader,且term
變為4:
$ curl 0.0.0.0:5001/api/maintain/stats|jq //新的Leader
{
"applied_index": "15",
"commit_index": "15",
"fsm_pending": "0",
"last_contact": "0",
"last_log_index": "15",
"last_log_term": "4",
"last_snapshot_index": "0",
"last_snapshot_term": "0",
"latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
"latest_configuration_index": "0",
"num_peers": "2",
"protocol_version": "3",
"protocol_version_max": "3",
"protocol_version_min": "0",
"snapshot_version_max": "1",
"snapshot_version_min": "0",
"state": "Leader",
"term": "4"
}
$ curl 0.0.0.0:5002/api/maintain/stats|jq
{
"applied_index": "15",
"commit_index": "15",
"fsm_pending": "0",
"last_contact": "42.735ms",
"last_log_index": "15",
"last_log_term": "4",
"last_snapshot_index": "0",
"last_snapshot_term": "0",
"latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
"latest_configuration_index": "0",
"num_peers": "2",
"protocol_version": "3",
"protocol_version_max": "3",
"protocol_version_min": "0",
"snapshot_version_max": "1",
"snapshot_version_min": "0",
"state": "Follower",
"term": "4"
}
在本實現中,如果停止一個Raft節點,則Leader節點會一直列印連線該節點失敗的紀錄檔,原因是在
Ctrl+c
停止Raft節點的時候沒有呼叫Raft.RemoveServer
來移除該節點。這種處理方式是合理的,因為當一個節點重啟或故障的時候,不應該從Raft中移除,此時應該查明原因,恢復叢集。本實現中沒有主動移除Raft節點的介面,也可以新增一個介面來呼叫
Raft.RemoveServer
,進而移除預期的節點,注意只能在Leader節點上執行Raft.RemoveServer
。
下面我們驗證應用資料的寫入和讀取。
向非Leader節點寫入資料,其會將寫入請求轉發給leader,由leader執行資料寫入。下面展示向非Leader節寫入資料的場景:
$ curl 0.0.0.0:5001/api/maintain/stats|jq
{
"applied_index": "64",
"commit_index": "64",
"fsm_pending": "0",
"last_contact": "4.312667ms",
"last_log_index": "64",
"last_log_term": "137",
"last_snapshot_index": "0",
"last_snapshot_term": "0",
"latest_configuration": "[{Suffrage:Voter ID:0.0.0.0:7000 Address:192.168.1.42:6000} {Suffrage:Voter ID:0.0.0.0:7001 Address:192.168.1.42:6001} {Suffrage:Voter ID:0.0.0.0:7002 Address:192.168.1.42:6002}]",
"latest_configuration_index": "0",
"num_peers": "2",
"protocol_version": "3",
"protocol_version_max": "3",
"protocol_version_min": "0",
"snapshot_version_max": "1",
"snapshot_version_min": "0",
"state": "Follower", #非Leader節點
"term": "137"
}
$ curl -XPOST localhost:5001/api/v1/set --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
{
"key" : "testKey",
"value" : "testValue"
}'
向所有節點查詢寫入的資料,可以看到所有節點都可以查詢到該資料:
$ curl -XGET localhost:5000/api/v1/get --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
{
"key" : "testKey"
}'
testValue
$ curl -XGET localhost:5001/api/v1/get --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
{
"key" : "testKey"
}'
testValue
$curl -XGET localhost:5002/api/v1/get --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
{
"key" : "testKey"
}'
testValue
--dataDir
目錄來清除叢集後設資料candidate
狀態,無法變為Leader
,因此要保證叢集中有足夠的節點,避免一次停掉過多節點。本文來自部落格園,作者:charlieroro,轉載請註明原文連結:https://www.cnblogs.com/charlieroro/p/17486646.html