使用hashicorp Raft開發分散式服務

2023-06-17 06:00:59

使用hashicorp Raft開發高可用服務

開發raft時用到的比較主流的兩個庫是Etcd Raft 和hashicorp Raft,網上也有一些關於這兩個庫的討論之前分析過etcd Raft,發現該庫相對hashicorp Raft比較難以理解,其最大的問題是沒有實現網路層,實現難度比較大,因此本文在實現時使用了hashicorp Raft。

下文中會參考consul的一致性協定來講解如何實現Raft協定。

Raft概述

術語

  • Log entry:Raft的主要單元。Raft將一致性問題分解為紀錄檔複製。紀錄檔是一個有序的表項,其包含了Raft的叢集變更資訊(如新增/移除節點)以及對應用資料的操作等。
  • FSM:Finite State Machine。FSM是有限狀態的集合。當一條紀錄檔被Raft apply後,可以對FSM進行狀態轉換。相同順序的紀錄檔在apply之後必須產生相同的結果,即行為必須是確定性的。
  • Peer set:指所有參與紀錄檔複製的成員。
  • Quorum:仲裁指peer set中的大部分成員:對於包含N個成員的peer set,仲裁要求有(N/2)+1個成員。如果出於某些原因導致仲裁節點不可用,則叢集會變為unavailable狀態,且新的紀錄檔也不會被commit。
  • Committed Entry:當一個Log entry持久化到仲裁數量的節點後,該認為該Log entry是Committed的。只有當Log entry 被Committed之後,它才會被FSM apply。
  • Leader:任何時間,peer set會選舉一個節點作為leader。leader負責處理新的Log entry,並將其複製給follower,以及決定何時將Log entry判定為committed狀態。

Raft機制簡介

Raft節點總是處於三種狀態之一: follower, candidate, leader。一開始,所有的節點都是follower,該狀態下,節點可以從leader接收log,並參與選舉,如果一段時間內沒有接收到任何Log entry,則節點會自提升到candidate狀態。在candidate狀態下,節點會請求其他節點的選舉,如果一個candidate接收到大部分節點(仲裁數目)的認同,就會被提升為leader。leader必須接收新的Log entry,並複製到所有其他follower。

如果使用者無法接受舊的資料,則所有的請求必須由leader執行。

一旦一個叢集有了leader,就可以接收新的Log entry。使用者端可以請求leader追加一個新的Log entry。Leader會將Log entry寫入持久化儲存,並嘗試將其複製給仲裁數目的follower。一旦Log entry被commit,就可以將該Log entry apply到FSM。FSM是應用特定的儲存,在Consul中,使用 MemDB來維護叢集狀態。

無限量複製log的方式是不可取的。Raft 提供了一種機制,可以對當前狀態進行快照並壓縮log。由於 FSM 的抽象,FSM 的狀態恢復必須與replay log的狀態相同。Raft 可以捕獲某個時刻的 FSM 狀態,然後移除用於達到該狀態的所有log。這些操作可以在沒有使用者干預的情況下自動執行,防止無限使用磁碟,同時最小化replay log所花費的時間。

Raft Consensus中所有的操作都必須經過Leader,因此需要保證所有的請求都能夠傳送到Leader節點,然後由Leader將請求傳送給所有Follower,並等待大部分(仲裁數目的)節點處理完成該命令,leader通過選舉機制產生。之後每個follower會執行如下操作:

  1. 在接收到命令之後,使用WAL方式將資料儲存為Log entry
  2. 在成功寫入log entry之後,將資料發給FSM進行處理
  3. 在FSM成功處理完資料之後,返回資料。之後Leader會注意到該節點已經成功完成資料處理。

如下表述來自Raft Protocol Overview,它的意思是,如果是查詢類的請求,直接從FSM返回結果即可,如果是修改類的請求,則需要通過raft.Apply來保證變更的一致性

所有raft叢集中的成員都知道當前的leader,當一個RPC請求到達一個非leader的成員時,它會將該請求轉發給當前的leader。如果是查詢型別的RPC,意味著它是唯讀的,leader會基於FSM的當前狀態生成結果;如果是事務型別的RPC,意味著它需要修改狀態,Leader會生成一條新的log entry,並執行Raft.Apply,當log entry被提交併apply到FSM之後,事務才算執行完成。

介面和原理描述

如下是官方給出的Raft Apply的流程圖:

sequenceDiagram autonumber participant client participant leadermain as leader:main participant leaderfsm as leader:fsm participant leaderreplicate as leader:replicate (each peer) participant followermain as follower:main (each peer) participant followerfsm as follower:fsm (each peer) client-)leadermain: applyCh to dispatchLogs leadermain->>leadermain: store logs to disk leadermain-)leaderreplicate: triggerCh leaderreplicate-->>followermain: Transport.AppendEntries RPC followermain->>followermain: store logs to disk opt leader commit index is ahead of peer commit index followermain-)followerfsm: fsmMutateCh <br>apply committed logs followerfsm->>followerfsm: fsm.Apply end followermain-->>leaderreplicate: respond success=true leaderreplicate->>leaderreplicate: update commitment opt quorum commit index has increased leaderreplicate-)leadermain: commitCh leadermain-)leaderfsm: fsmMutateCh leaderfsm->>leaderfsm: fsm.Apply leaderfsm-)client: future.respond end

Apply是資料進入Raft的介面,整個Raft的主要作用是維護資料操作的一致性。在上圖中,由兩個apply:一個是Raft.Apply(內部會通過applyCh傳遞log),其也是外部資料的入口。另一個是FSM.Apply,其資料來源頭是Raft.Apply。FSM基於Raft實現了一致性讀寫。在上圖中可以看到,leader的FSM.Apply是在資料commit成功(仲裁成功)之後才執行的,這樣就能以Raft的方式保證分散式場景下應用資料的一致性,可以將FSM.Apply理解為應用資料的寫入操作。

Raft中的一條log表示一個操作。使用hashicorp/raft時應該將實現分為兩層:一層是底層的Raft,支援Raft資料的儲存、快照等,叢集的選舉和恢復等,這一部分由Raft模組自實現;另一層是應用層,需要由使用者實現FSM介面,FSM的介面並不對外,在Raft的處理過程中會呼叫FSM的介面來實現應用資料的儲存、備份和恢復等操作。這兩層都有資料的讀寫和快照實現,因此在理解上需要進行區分。

Raft節點的初始化

如果是新建的Raft節點,可以使用BootstrapCluster方法初始化該節點。為避免非新的節點被初始化,在呼叫BootstrapCluster前可以使用raft.HasExistingState來判斷範例中是否包含相關狀態(logs,當前term或snapshot):

if (s.config.Bootstrap) && !s.config.ReadReplica {
  hasState, err := raft.HasExistingState(log, stable, snap)
  if err != nil {
    return err
  }
  if !hasState {
    configuration := raft.Configuration{
      Servers: []raft.Server{
        {
          ID:      s.config.RaftConfig.LocalID,
          Address: trans.LocalAddr(),
        },
      },
    }
    if err := raft.BootstrapCluster(s.config.RaftConfig,
      log, stable, snap, trans, configuration); err != nil {
      return err
    }
  }
}

Raft節點的建立

Raft節點的建立方法如下,如果儲存非空,則Raft會嘗試恢復該節點:

func NewRaft(conf *Config,
    fsm FSM,
    logs LogStore,
    stable StableStore,
    snaps SnapshotStore,
    trans Transport) (*Raft, error) {

包括:

  • fsm:由應用實現,用於處理應用資料。FSM中的資料來自底層的Raft log

  • logsstablesnapslogs(儲存Raft log) ,stable(儲存Raft選舉資訊,如角色、term等資訊) 可以使用raftboltdb.New進行初始化, snaps用於Leader和follower之間的批次資料同步以及(手動或自動)叢集恢復,可以使用raft.NewFileSnapshotStoreraft.NewFileSnapshotStoreWithLogger進行初始化。

  • trans:Transport是raft叢集內部節點之間的資訊通道,節點之間需要通過該通道來同步log、選舉leader等。下面介面中的AppendEntriesPipelineAppendEntries方法用於log同步,RequestVote用於leader選舉,InstallSnapshot用於在follower 的log落後過多的情況下,給follower傳送snapshot(批次log)。

    可以使用raft.NewTCPTransportraft.NewTCPTransportWithLoggerraft.NewNetworkTransportWithConfig方法來初始化trans。

    type Transport interface {
      ...
      AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error)
      AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
    
      // RequestVote sends the appropriate RPC to the target node.
      RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error
      InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
      ...
    }
    

NewRaft方法中會執行如下後臺任務:

r.goFunc(r.run)           //處理角色變更和RPC請求
r.goFunc(r.runFSM)        //負責將logs apply到FSM
r.goFunc(r.runSnapshots)  //管理FSM的snapshot

監控Leader變化

為保證資料的一致性,只能通過leader寫入資料,因此需要及時瞭解leader的變更資訊,在Raft的設定中有一個變數NotifyCh chan<- bool,當Raft變為leader時會將true寫入該chan,通過讀取該chan來判斷本節點是否是leader。在初始化Raft設定的時候傳入即可:

  leaderNotifyCh := make(chan bool, 10)
  raftConfig.NotifyCh = leaderNotifyCh

還有其他方式可以獲取leader變更狀態:

如下方法可以生成一個chan,當本節點變為Leader時會傳送true,當本節點丟失Leader角色時傳送false,該方法的用途與上述方式相同,但由於該方法沒有快取,可能導致丟失變更訊號,因此推薦使用上面的方式。

func (r *Raft) LeaderCh() <-chan bool

實現FSM

至此已經完成了Raft的初始化。下面就是要實現初始化函數中要求實現的內容,主要就是實現FSM介面。其中logsstablesnapstrans已經提到,使用現成的方法初始化即可。對於儲存來說,也可以根據需要採用其他方式,如S3。

下面是LogStoreSnapshotStoreStableStore的介面定義。

type LogStore interface {//用於儲存Raft log
  // FirstIndex returns the first index written. 0 for no entries.
  FirstIndex() (uint64, error)

  // LastIndex returns the last index written. 0 for no entries.
  LastIndex() (uint64, error)

  // GetLog gets a log entry at a given index.
  GetLog(index uint64, log *Log) error

  // StoreLog stores a log entry.
  StoreLog(log *Log) error

  // StoreLogs stores multiple log entries.
  StoreLogs(logs []*Log) error

  // DeleteRange deletes a range of log entries. The range is inclusive.
  DeleteRange(min, max uint64) error
}
type SnapshotStore interface {//用於快照的生成和恢復
  // Create is used to begin a snapshot at a given index and term, and with
  // the given committed configuration. The version parameter controls
  // which snapshot version to create.
  Create(version SnapshotVersion, index, term uint64, configuration Configuration,
    configurationIndex uint64, trans Transport) (SnapshotSink, error)

  // List is used to list the available snapshots in the store.
  // It should return then in descending order, with the highest index first.
  List() ([]*SnapshotMeta, error)

  // Open takes a snapshot ID and provides a ReadCloser. Once close is
  // called it is assumed the snapshot is no longer needed.
  Open(id string) (*SnapshotMeta, io.ReadCloser, error)
}
type StableStore interface { //用於儲存叢集後設資料
  Set(key []byte, val []byte) error

  // Get returns the value for key, or an empty byte slice if key was not found.
  Get(key []byte) ([]byte, error)

  SetUint64(key []byte, val uint64) error

  // GetUint64 returns the uint64 value for key, or 0 if key was not found.
  GetUint64(key []byte) (uint64, error)
}

FSM基於Raft來實現,包含三個方法:

  • Apply:在Raft完成commit索引之後,儲存應用資料。
  • Snapshot:用於支援log壓縮,可以儲存某個時間點的FSM快照。需要注意的是,由於ApplySnapshot執行在同一個執行緒中(如runrunFSM執行緒),因此要求函數能夠快速返回,否則會阻塞Apply的執行。在實現中,該函數只需捕獲指向當前狀態的指標,而對於IO開銷較大的操作,則放到FSMSnapshot.Persist中執行。
  • Restore:用於從snapshot恢復FSM
type FSM interface {
  // Apply is called once a log entry is committed by a majority of the cluster.
  //
  // Apply should apply the log to the FSM. Apply must be deterministic and
  // produce the same result on all peers in the cluster.
  //
  // The returned value is returned to the client as the ApplyFuture.Response.
  Apply(*Log) interface{}

  // Snapshot returns an FSMSnapshot used to: support log compaction, to
  // restore the FSM to a previous state, or to bring out-of-date followers up
  // to a recent log index.
  //
  // The Snapshot implementation should return quickly, because Apply can not
  // be called while Snapshot is running. Generally this means Snapshot should
  // only capture a pointer to the state, and any expensive IO should happen
  // as part of FSMSnapshot.Persist.
  //
  // Apply and Snapshot are always called from the same thread, but Apply will
  // be called concurrently with FSMSnapshot.Persist. This means the FSM should
  // be implemented to allow for concurrent updates while a snapshot is happening.
  Snapshot() (FSMSnapshot, error)

  // Restore is used to restore an FSM from a snapshot. It is not called
  // concurrently with any other command. The FSM must discard all previous
  // state before restoring the snapshot.
  Restore(snapshot io.ReadCloser) error
}

FSMSnapshot是實現快照需要實現的另一個介面,用於儲存持久化FSM狀態,後續可以通過FSM.Restore方法恢復FSM。該介面不會阻塞Raft.Apply,但在持久化FSM的資料時需要保證不影響Raft.Apply的並行存取。

FSMSnapshot.Persist的入參sink是呼叫SnapshotStore.Creates時的返回值。如果是通過raft.NewFileSnapshotStore初始化了SnapshotStore,則入參sink的型別就是FileSnapshotStore

FSMSnapshot.Persist執行結束之後需要執行SnapshotSink.Close() ,如果出現錯誤,則執行SnapshotSink.Cancel()

// FSMSnapshot is returned by an FSM in response to a Snapshot
// It must be safe to invoke FSMSnapshot methods with concurrent
// calls to Apply.
type FSMSnapshot interface {
  // Persist should dump all necessary state to the WriteCloser 'sink',
  // and call sink.Close() when finished or call sink.Cancel() on error.
  Persist(sink SnapshotSink) error

  // Release is invoked when we are finished with the snapshot.
  Release()
}

FSM的備份和恢復

FSM的備份和恢復的邏輯比較難理解,一方面備份的資料儲存在Raft中,FSM介面是由Raft主動呼叫的,另一方面又需要由使用者實現FSM的備份和恢復邏輯,因此需要了解Raft是如何與FSM互動的。

FSM依賴snapshot來實現備份和恢復,snapshot中儲存的也都是FSM資訊。

何時備份和恢復
備份的時機
  • 當用戶執行RecoverCluster介面時會呼叫FSM.Snapshot觸發建立一個新的FSM snapshot

  • 手動呼叫如下介面也會觸發建立FSM snapshot:

    func (r *Raft) Snapshot() SnapshotFuture
    
  • Raft自動備份也會觸發建立FSM snapshot,預設時間為[120s, 240s]之間的隨機時間。

恢復的時機
  • 當用戶執行RecoverCluster介面時會呼叫FSM.Restore,用於手動恢復叢集

  • 當用戶執行Raft.Restore介面時會呼叫FSM.Restore,用於手動恢復叢集

  • 通過NewRaft建立Raft節點時會嘗試恢復snapshot(Raft.restoreSnapshot-->Raft.tryRestoreSingleSnapshot-->fsmRestoreAndMeasure-->fsm.Restore)

因此在正常情況下,Raft會不定期建立snapshot,且在建立Raft節點(新建或重啟)的時候也會嘗試通過snapshot來恢復FSM。

備份和恢復的內部邏輯

FSM的備份和恢復與SnapshotStore介面息息相關。

在備份FSM時的邏輯如下,首先通過SnapshotStore.Create建立一個snapshot,然後初始化一個FSMSnapshot範例,並通過FSMSnapshot.Persist將FSM儲存到建立出的snapshot中:

sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans) //建立一個snapshot
snapshot, err := fsm.Snapshot() //初始化一個FSMSnapshot範例
snapshot.Persist(sink)          //呼叫FSMSnapshot.Persist將FSM儲存到上面的snapshot中

恢復FSM的邏輯如下,首先通過SnapshotStore.List獲取snapshots,然後通過SnapshotStore.Open逐個開啟獲取到的snapshot,最後呼叫FSM.Restore恢復FSM,其入參可以看做是snapshot的檔案描述符:

snapshots, err = snaps.List()
for _, snapshot := range snapshots {
  _, source, err = snaps.Open(snapshot.ID)
  crc := newCountingReadCloser(source)
  err = fsm.Restore(crc)
  // Close the source after the restore has completed
  source.Close()
}

下面以consul的實現為例看下它是如何進行FSM的備份和恢復的。

備份

FSM.Snapshot()的作用就是返回一個SnapshotSink介面物件,進而呼叫SnapshotSink.Persist來持久化FSM。

下面是consul的SnapshotSink實現,邏輯比較簡單,它將FSM持久化到了一個snapshot中,注意它在寫入snapshot前做了編碼(編碼型別為ChunkingStateType):

// Persist saves the FSM snapshot out to the given sink.
func (s *snapshot) Persist(sink raft.SnapshotSink) error {
  ...
  // Write the header
  header := SnapshotHeader{
    LastIndex: s.state.LastIndex(),
  }
  encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
  if err := encoder.Encode(&header); err != nil {
    sink.Cancel()
    return err
  }
  ...

  if _, err := sink.Write([]byte{byte(structs.ChunkingStateType)}); err != nil {
    return err
  }
  if err := encoder.Encode(s.chunkState); err != nil {
    return err
  }

  return nil
}

func (s *snapshot) Release() {
  s.state.Close()
}
恢復

備份時將FSM儲存在了snapshot中,恢復時讀取並解碼對應型別的snapshot即可:

// Restore streams in the snapshot and replaces the current state store with a
// new one based on the snapshot if all goes OK during the restore.
func (c *FSM) Restore(old io.ReadCloser) error {
  defer old.Close()
  ...

  handler := func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error {
    switch {
    case msg == structs.ChunkingStateType: //解碼資料
      chunkState := &raftchunking.State{
        ChunkMap: make(raftchunking.ChunkMap),
      }
      if err := dec.Decode(chunkState); err != nil {
        return err
      }
      if err := c.chunker.State(chunkState); err != nil {
        return err
      }
      ...
    default:
      if msg >= 64 {
        return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul OSS cannot restore it", msg)
      } else {
        return fmt.Errorf("Unrecognized msg type %d", msg)
      }
    }
    return nil
  }
  if err := ReadSnapshot(old, handler); err != nil {
    return err
  }
  ...

  return nil
}
// ReadSnapshot decodes each message type and utilizes the handler function to
// process each message type individually
func ReadSnapshot(r io.Reader, handler func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error) error {
  // Create a decoder
  dec := codec.NewDecoder(r, structs.MsgpackHandle)

  // Read in the header
  var header SnapshotHeader
  if err := dec.Decode(&header); err != nil {
    return err
  }

  // Populate the new state
  msgType := make([]byte, 1)
  for {
    // Read the message type
    _, err := r.Read(msgType)
    if err == io.EOF {
      return nil
    } else if err != nil {
      return err
    }

    // Decode
    msg := structs.MessageType(msgType[0])

    if err := handler(&header, msg, dec); err != nil {
      return err
    }
  }
}

至此已經完成了Raft的開發介紹。需要注意的是,FSM介面都是Raft內部呼叫的,使用者並不會直接與之互動

更多參見:Raft Developer Documentation

Raft關鍵對外介面

Raft節點管理

將節點新增到叢集中,節點剛新增到叢集中時狀態是staging,當其ready之後就會被提升為voter,參與選舉。如果節點已經是voter,則該操作會更新服務地址。該方法必須在leader上呼叫

func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture

如下方法用於新增一個只接收log entry、但不參與投票或commit log的節點:該方法必須在leader上呼叫

func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture

將節點從叢集中移除,如果移除的節點是leader,則會觸發leader選舉。該方法必須在leader上呼叫

func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture

取消節點的投票權,節點不再參與投票或commit log。該方法必須在leader上呼叫

func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture

重新載入節點設定:

func (r *Raft) ReloadConfig(rc ReloadableConfig) error
Raft資料的儲存和讀取

用於阻塞等待FSM apply所有操作。該方法必須在leader上呼叫

func (r *Raft) Barrier(timeout time.Duration) Future

apply一個命令到FSM,該方法必須在leader上呼叫

func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture

從上面介面可以看到,在Raft協定中,必須通過leader才能寫入(apply)資料,在非leader的節點上執行Apply()會返回ErrNotLeader的錯誤。

Apply方法會呼叫LogStore介面的StoreLogs方法儲存log(cmd)。Raft.applyCh負責將log傳送給FSM進行處理,最後通過dispatchLogs將log分發給其他節點(dispatchLogs會呼叫Transport.AppendEntries來將log分發給對端)。

在分散式環境中,外部請求可能通過LB轉發到非leader節點上,此時非leader節點需要將請求轉發到leader節點上進行處理,在consul中會通過ForwardRPC將請求轉發給leader,再由leader執行Apply操作。

叢集恢復

當叢集中的節點少於仲裁數目時,叢集將無法正常運作,此時可以手動呼叫如下介面嘗試恢復叢集,但這樣會可能會導致原本正在複製的紀錄檔被commit。

最佳方式是停止所有節點,並在所有節點上執行RecoverCluster,當叢集重啟之後,會發生選舉,Raft也會恢復運作。

func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
  snaps SnapshotStore, trans Transport, configuration Configuration) error 

通過如下方式可以讓叢集使用外部snapshot(如備份的snapshot)。注意該操作只適用於DR,且只能在Leader上執行

func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error
狀態獲取

獲取節點的狀態資訊

func (r *Raft) Stats() map[string]string

返回當前leader的地址和叢集ID。如果當前沒有leader則返回空:

func (r *Raft) LeaderWithID() (ServerAddress, ServerID)
節點資料互動

各個節點之間主要通過RPC來互動log和選舉資訊,可以分為RPC使用者端和RPC伺服器端。

RPC使用者端通過呼叫Transport介面方法來傳遞資料(如Leader執行Raft.Apply log之後會呼叫Transport.AppendEntries來分發log)。

RPC伺服器端的實現如下,其處理了不同型別的RPC請求,如AppendEntriesRequest就是Leader執行Transport.AppendEntries傳遞的請求內容:

func (r *Raft) processRPC(rpc RPC) {
  if err := r.checkRPCHeader(rpc); err != nil {
    rpc.Respond(nil, err)
    return
  }

  switch cmd := rpc.Command.(type) {
  case *AppendEntriesRequest:
    r.appendEntries(rpc, cmd)
  case *RequestVoteRequest:
    r.requestVote(rpc, cmd)
  case *InstallSnapshotRequest:
    r.installSnapshot(rpc, cmd)
  case *TimeoutNowRequest:
    r.timeoutNow(rpc, cmd)
  default:
    r.logger.Error("got unexpected command",
      "command", hclog.Fmt("%#v", rpc.Command))
    rpc.Respond(nil, fmt.Errorf("unexpected command"))
  }
}

實現描述

實現Raft時需要考慮如下幾點:

  • 實現FSM介面,包含FSMFSMSnapshot這兩個介面
  • 如何實現Raft節點的自動發現,包含節點的加入和退出
  • 使用者端和應用的互動介面,主要用於應用資料的增刪改等查等操作,對FSM的修改必須通過Raft.Apply介面實現,以保證FSM的資料一致性,而在讀取應用資料時,如果要求資料強一致,則需要從leader的FSM讀取,否則也可以從follower的FSM讀取
  • 在非Leader節點接收到使用者端的修改類請求後,如何將請求轉發給Leader節點

在此次實現Raft的過程中,主要參考了stcacheconsul的原始碼,其中FSM的實現參考了前者,而Raft的初始化和節點發現參考了後者。

原始碼結構如下:

- src
    discovery #節點發現程式碼
    raft      #raft管理程式碼
    rpc       #請求轉發程式碼
    service   #主服務管理程式碼
  • discovery:採用serf來實現節點發現,它底層採用的還是memberlist,通過gossip來管理節點。

  • rpc:實現了非Leader節點向Leader節點轉發請求的功能,本demo僅實現了/api/v1/set介面轉發,對於/api/v1/get介面,則直接從本節點的FSM中獲取資料,因此get介面不是強一致性的。

    使用如下命令可以生成rpc模組的pb.go檔案:

    $ protoc --go_out=. --go_opt=paths=source_relative  --go-grpc_out=. --go-grpc_opt=paths=source_relative ./forward.proto
    

啟動demo

下面啟動3個節點來組成Raft叢集:

入參描述如下:

  • httpAddress:與使用者互動的服務
  • raftTCPAddress:Raft服務
  • rpcAddress:請求轉發的gRpc服務
  • serfAddress:serf節點發現服務
  • dataDir:Raft儲存路徑,建立Raft節點時會用到
  • bootstrap:該節點是否需要使用bootstrap方式啟動
  • joinAddress:加入Raft叢集的地址,為serfAddress,可以新增多個,如add1,add2
  1. 第一個節點啟動時並沒有需要加入的叢集,因此第一個節點以bootstrap方式啟動,啟動後成為leader。

    $ raft-example --httpAddress 0.0.0.0:5000 --raftTCPAddress 192.168.1.42:6000 --rpcAddress=0.0.0.0:7000 --serfAddress 192.168.1.42:8000 --dataDir /Users/charlie.liu/home/raftDatadir/node0 --bootstrap true
    

    注意:raftTCPAddress不能為0.0.0.0,否則raft會報錯誤:"local bind address is not advertisable"serfAddress的地址最好也不要使用0.0.0.0

  2. 啟動第2、3個節點,後續的節點啟動的時候需要加入叢集,啟動的時候指定第一個節點的地址:

    $ raft-example --httpAddress 0.0.0.0:5001 --raftTCPAddress 192.168.1.42:6001 --rpcAddress=0.0.0.0:7001 --serfAddress 192.168.1.42:8001 --dataDir /Users/charlie.liu/home/raftDatadir/node1 --joinAddress 192.168.1.42:8000
    
    $ raft-example --httpAddress 0.0.0.0:5002 --raftTCPAddress 192.168.1.42:6002 --rpcAddress=0.0.0.0:7002 --serfAddress 192.168.1.42:8002 --dataDir /Users/charlie.liu/home/raftDatadir/node2 --joinAddress 192.168.1.42:8000
    

在節點啟動之後,就可以在Leader的標準輸出中可以看到Raft叢集中的成員資訊:

[INFO]  raft: updating configuration: command=AddVoter server-id=192.168.1.42:6002 server-addr=192.168.1.42:6002 servers="[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]"

使用/api/maintain/stats介面可以檢視各個節點的狀態,num_peers展示了對端節點數目,state展示了當前節點的角色。

$ curl 0.0.0.0:5000/api/maintain/stats|jq   //node0為Leader
{
  "applied_index": "6",
  "commit_index": "6",
  "fsm_pending": "0",
  "last_contact": "0",
  "last_log_index": "6",
  "last_log_term": "2",
  "last_snapshot_index": "0",
  "last_snapshot_term": "0",
  "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
  "latest_configuration_index": "0",
  "num_peers": "2",
  "protocol_version": "3",
  "protocol_version_max": "3",
  "protocol_version_min": "0",
  "snapshot_version_max": "1",
  "snapshot_version_min": "0",
  "state": "Leader",
  "term": "2"
}

$ curl 0.0.0.0:5001/api/maintain/stats|jq   //node2為Follower
{
  "applied_index": "6",
  "commit_index": "6",
  "fsm_pending": "0",
  "last_contact": "15.996792ms",
  "last_log_index": "6",
  "last_log_term": "2",
  "last_snapshot_index": "0",
  "last_snapshot_term": "0",
  "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
  "latest_configuration_index": "0",
  "num_peers": "2",
  "protocol_version": "3",
  "protocol_version_max": "3",
  "protocol_version_min": "0",
  "snapshot_version_max": "1",
  "snapshot_version_min": "0",
  "state": "Follower",
  "term": "2"
}

$ curl 0.0.0.0:5002/api/maintain/stats|jq   //node2為Follower
{
  "applied_index": "6",
  "commit_index": "6",
  "fsm_pending": "0",
  "last_contact": "76.764584ms",
  "last_log_index": "6",
  "last_log_term": "2",
  "last_snapshot_index": "0",
  "last_snapshot_term": "0",
  "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
  "latest_configuration_index": "0",
  "num_peers": "2",
  "protocol_version": "3",
  "protocol_version_max": "3",
  "protocol_version_min": "0",
  "snapshot_version_max": "1",
  "snapshot_version_min": "0",
  "state": "Follower",
  "term": "2"
}

Leader切換

停掉上述Demo中的Leader節點(node0),可以看到node1稱為新的leader,且term變為4:

$ curl 0.0.0.0:5001/api/maintain/stats|jq  //新的Leader
{
  "applied_index": "15",
  "commit_index": "15",
  "fsm_pending": "0",
  "last_contact": "0",
  "last_log_index": "15",
  "last_log_term": "4",
  "last_snapshot_index": "0",
  "last_snapshot_term": "0",
  "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
  "latest_configuration_index": "0",
  "num_peers": "2",
  "protocol_version": "3",
  "protocol_version_max": "3",
  "protocol_version_min": "0",
  "snapshot_version_max": "1",
  "snapshot_version_min": "0",
  "state": "Leader",
  "term": "4"
}

$ curl 0.0.0.0:5002/api/maintain/stats|jq
{
  "applied_index": "15",
  "commit_index": "15",
  "fsm_pending": "0",
  "last_contact": "42.735ms",
  "last_log_index": "15",
  "last_log_term": "4",
  "last_snapshot_index": "0",
  "last_snapshot_term": "0",
  "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]",
  "latest_configuration_index": "0",
  "num_peers": "2",
  "protocol_version": "3",
  "protocol_version_max": "3",
  "protocol_version_min": "0",
  "snapshot_version_max": "1",
  "snapshot_version_min": "0",
  "state": "Follower",
  "term": "4"
}

在本實現中,如果停止一個Raft節點,則Leader節點會一直列印連線該節點失敗的紀錄檔,原因是在Ctrl+c停止Raft節點的時候沒有呼叫Raft.RemoveServer來移除該節點。這種處理方式是合理的,因為當一個節點重啟或故障的時候,不應該從Raft中移除,此時應該查明原因,恢復叢集。

本實現中沒有主動移除Raft節點的介面,也可以新增一個介面來呼叫Raft.RemoveServer,進而移除預期的節點,注意只能在Leader節點上執行Raft.RemoveServer

應用資料的讀寫

下面我們驗證應用資料的寫入和讀取。

  1. 向非Leader節點寫入資料,其會將寫入請求轉發給leader,由leader執行資料寫入。下面展示向非Leader節寫入資料的場景:

    $ curl 0.0.0.0:5001/api/maintain/stats|jq
    {
      "applied_index": "64",
      "commit_index": "64",
      "fsm_pending": "0",
      "last_contact": "4.312667ms",
      "last_log_index": "64",
      "last_log_term": "137",
      "last_snapshot_index": "0",
      "last_snapshot_term": "0",
      "latest_configuration": "[{Suffrage:Voter ID:0.0.0.0:7000 Address:192.168.1.42:6000} {Suffrage:Voter ID:0.0.0.0:7001 Address:192.168.1.42:6001} {Suffrage:Voter ID:0.0.0.0:7002 Address:192.168.1.42:6002}]",
      "latest_configuration_index": "0",
      "num_peers": "2",
      "protocol_version": "3",
      "protocol_version_max": "3",
      "protocol_version_min": "0",
      "snapshot_version_max": "1",
      "snapshot_version_min": "0",
      "state": "Follower",  #非Leader節點
      "term": "137"
    }
    
    $ curl -XPOST localhost:5001/api/v1/set --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
    {
        "key" : "testKey",
        "value" : "testValue"
    }'
    
  2. 向所有節點查詢寫入的資料,可以看到所有節點都可以查詢到該資料:

    $ curl -XGET localhost:5000/api/v1/get --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
    {
        "key" : "testKey"
    }'
    testValue
    
    $ curl -XGET localhost:5001/api/v1/get --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
    {
        "key" : "testKey"
    }'
    testValue
    
    $curl -XGET localhost:5002/api/v1/get --header 'Content-Type: application/json' --header 'Content-Type: application/json' -d '
    {
        "key" : "testKey"
    }'
    testValue
    

TIPS

  • 驗證場景下,如果節點IP發生變動,可以通過刪除--dataDir目錄來清除叢集後設資料
  • 如果叢集中的節點不足仲裁數目,則節點可能處理candidate狀態,無法變為Leader,因此要保證叢集中有足夠的節點,避免一次停掉過多節點。

參考