微服務元件-----Spring Cloud Alibaba 註冊中心Nacos的CP架構Raft協定分析

2022-12-15 12:01:13

 前言

  本篇幅是繼  註冊中心Nacos原始碼分析 的下半部分。

 

 意義

【1】雖說大部分我們採用註冊中心的時候考慮的都是AP架構,為什麼呢?因為效能相對於CP架構來說更高,需要等待的時間更少【相對於CP架構,採用的是二段提交,AP架構是直接落盤資料,然後進行資料擴散,來達到最終一致,所以使用者端收到響應會更快】;

【2】其次,考慮AP架構會不會存在資料丟失的風險呢?答案是必然的,所以是不是應該考慮CP架構呢?那麼問題來了,資料丟失是問題嗎?明顯不是。基於AP架構的註冊中心,明顯在使用者端那邊都會存在重試機制,也就是對於一個叢集而言,一臺伺服器宕機會自動重連到其他機器上去,所以有補充的手段自然也就不考慮CP架構了。

【3】但是對於分散式而言,CP架構確實十分重要的一部分;如zookeeper便是分散式的鼻祖,不過採用的是ZAB強一致性協定,而raft則也是強一致性協定的【兩者看似差不多,但也有一些差別】,而且市面上喜歡raft協定的更多。所以研究Raft協定便是比較必要的。

 

 Raft協定詳解

【1】演示網站:http://thesecretlivesofdata.com/raft/

【2】Raft定義角色的三種狀態

1)Follower state    //追隨者
2)Candidate state  //候選者
3)Leader state        //領導者

【3】流程分析

【3.1】所有的節點都以跟隨者狀態開始的

【3.2】如果追隨者沒有收到任何領導人的訊息,那麼他們就可以成為一名候選者【注意:這裡的候選者是一個至於這個候選者是怎麼來的還要分析,會不會出現多個的情況,但是能選舉成功必然是隻有一個候選者】,這也是與ZAB協定的不同之處,ZAB是所有人蔘與

【3.3】候選人向其他節點傳送投票請求,而節點對投票請求進行回覆

【3.4】候選人從大多數節點獲得選票,就將成為領導者

【4】資料一致性的流程分析

【4.1】對系統的所有變化現在都要經過領導【哪怕是你請求從節點,從節點也會轉發給主節點】,主節點每個更改都將作為節點紀錄檔中的一個條目新增,但此紀錄檔條目當前未提交,因此不會更新節點的值【注意:此時使用者端只是傳送了請求還沒收到回覆

【4.2】要提交條目,節點首先將其複製到跟蹤節點【主節點向從節點傳送請求】【一般是通過心跳請求,附帶上資料來完成的

【4.3】領導會等待,直到大多數節點已經寫入了條目並回復請求,該條目才會在領導節點上提交【這個時候便會回覆使用者端的請求

【4.4】領導再次傳送請求通知追隨者條目已提交【還是通過心跳進行二次通知

 

 

 

【4.5】因為標準的Raft中採用的是QUORUM, 即確保至少大部分節點都接收到寫操作之後才會返回結果給Client, 而Redis預設採用的實際上是ANY/ONE, 即只要Master節點寫入成功後,就立刻返回給Client,然後該寫入命令被非同步的傳送給所有的slave節點,來儘可能地讓所有地slave節點與master節點保持一致。

寫入型別 解釋說明
ZERO 接收到寫操作後立即返回,不保證寫入成功
ANY/ONE 確保該值被寫入至少一個節點
QUORUM  確保至少大多數節點(節點總數/2+1)接受到寫操作之後,再返回
ALL 確保所有副本節點都接受到寫操作之後,再返回

【5】多節點下出現腦裂問題【Raft如何保持一致】

【5.1】假設有五個分割區

【5.2】模擬網路故障導致分割區,AB為一組,CDE為一組。【由於分割區後,CDE沒有了領導者,收不到心跳之後,便會達到選舉超時的時間,由於C優先超時便會成為候選者】

【5.3】此時便會出現B和C都成為領導者的情況

【5.4】如果出現資料更新的情況

【5.4.1】人數少的分割區收到寫入請求是寫入不成功的【因為基於大多數寫入成功才算是寫入成功,所以終端使用者端會是寫入請求超時】

【5.4.2】相反數目多的分割區【在C寫入後,通過心跳傳遞資料給DE,然後收到回覆後明顯是符合寫入節點數大於或等於(節點總數/2+1)】,所以就會返回使用者端響應,並在下一次心跳中讓DE進行提交。

【5.5】當分割區網路恢復後【其實是根據選舉週期判斷哪個會保留為領導者,其餘將回滾未提交的條目,並匹配新領導者的紀錄檔】

【5.6】當然這裡面還會出現偶數分割區【如ABCD,分為AB,CD,那麼其實算是兩個分割區都不可用,畢竟都不滿足(4/2+1)為3的寫入】

 

 結合原始碼分析寫入資料部分的驗證

【1】基於上個篇幅的【1.1.2】分析addInstance註冊方法,裡面會根據ephemeral欄位屬性判斷是持久節點還是臨時節點

【1.1】RaftConsistencyServiceImpl類#put方法

@Override
public void put(String key, Record value) throws NacosException {
    checkIsStopWork();
    try {
        //利用自己實現的raftCore類
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft put failed.", e);
        throw new NacosException(...);
    }
}

【1.2】raftCore類#signalPublish方法是怎麼傳送和處理資料的【重點

/**
 * Signal publish new record. If not leader, signal to leader. If leader, try to commit publish.
 *
 * @param key   key
 * @param value value
 * @throws Exception any exception during publish
 */
public void signalPublish(String key, Record value) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    //驗證了只有領導者才有權利進行寫入
    if (!isLeader()) {
        ObjectNode params = JacksonUtils.createEmptyJsonNode();
        params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);
        
        final RaftPeer leader = getLeader();
        //非領導者將利用請求轉發給領導者,待領導者回復後回覆
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }
    
    OPERATE_LOCK.lock();
    try {
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }
        
        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
        //進行資料的釋出【1.3】
        onPublish(datum, peers.local());
        
        //接下來這一片,便是資料一致性的實現,大體為
        //先將資料轉成json,然後是發起非同步請求【不阻塞,會有回撥處理】
        //利用CountDownLatch阻塞: peers.size() / 2 + 1,來達成raft的大部分節點響應部分。
        //滿足就回複用戶端,不滿足就阻塞至請求超時
        final String content = json.toString();
        
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        for (final String server : peers.allServersIncludeMyself()) {
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            final String url = buildUrl(server, API_ON_PUB);
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        Loggers.RAFT
                                .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                        datum.key, server, result.getCode());
                        return;
                    }
                    latch.countDown();
                }
                
                @Override
                public void onError(Throwable throwable) {
                    Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                }
                
                @Override
                public void onCancel() {
                
                }
            });
            
        }
        
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            // only majority servers return success can we consider this update success
            Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
            throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
        }
        
        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        OPERATE_LOCK.unlock();
    }
}

【1.3】分析資料釋出的onPublish方法

/**
 * Do publish. If leader, commit publish to store. If not leader, stop publish because should signal to leader.
 *
 * @param datum  datum
 * @param source source raft peer
 * @throws Exception any exception during publish
 */
public void onPublish(Datum datum, RaftPeer source) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    RaftPeer local = peers.local();
    if (datum.value == null) {
        Loggers.RAFT.warn("received empty datum");
        throw new IllegalStateException("received empty datum");
    }
    
    if (!peers.isLeader(source.ip)) {
        Loggers.RAFT
                .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                        JacksonUtils.toJson(getLeader()));
        throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
    }
    
    if (source.term.get() < local.term.get()) {
        Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                JacksonUtils.toJson(local));
        throw new IllegalStateException(
                "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
    }
    
    local.resetLeaderDue();
    
    // if data should be persisted, usually this is true:
    if (KeyBuilder.matchPersistentKey(datum.key)) {
        raftStore.write(datum);  //寫入磁碟
    }
    
    datums.put(datum.key, datum);
    
    if (isLeader()) {
        local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
    } else {
        if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
            //set leader term:
            getLeader().term.set(source.term.get());
            local.term.set(getLeader().term.get());
        } else {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        }
    }
    raftStore.updateTerm(local.term.get());
    NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());  //釋出事件,非同步寫入記憶體
    Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

 

【1.4】進行磁碟的寫入【同步的】

/**
 * Write datum to cache file.
 *
 * @param datum datum
 * @throws Exception any exception during writing
 */
public synchronized void write(final Datum datum) throws Exception {
    
    String namespaceId = KeyBuilder.getNamespace(datum.key);
    
    File cacheFile = new File(cacheFileName(namespaceId, datum.key));
    
    if (!cacheFile.exists() && !cacheFile.getParentFile().mkdirs() && !cacheFile.createNewFile()) {
        MetricsMonitor.getDiskException().increment();
        
        throw new IllegalStateException("can not make cache file: " + cacheFile.getName());
    }
    
    ByteBuffer data;
    
    data = ByteBuffer.wrap(JacksonUtils.toJson(datum).getBytes(StandardCharsets.UTF_8));
    
    try (FileChannel fc = new FileOutputStream(cacheFile, false).getChannel()) {
        fc.write(data, data.position());
        fc.force(true);
    } catch (Exception e) {
        MetricsMonitor.getDiskException().increment();
        throw e;
    }
    
    // remove old format file:
    if (StringUtils.isNoneBlank(namespaceId)) {
        if (datum.key.contains(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER)) {
            String oldDatumKey = datum.key
                    .replace(Constants.DEFAULT_GROUP + Constants.SERVICE_INFO_SPLITER, StringUtils.EMPTY);
            
            cacheFile = new File(cacheFileName(namespaceId, oldDatumKey));
            if (cacheFile.exists() && !cacheFile.delete()) {
                Loggers.RAFT.error("[RAFT-DELETE] failed to delete old format datum: {}, value: {}", datum.key,
                        datum.value);
                throw new IllegalStateException("failed to delete old format datum: " + datum.key);
            }
        }
    }
}

private String cacheFileName(String namespaceId, String datumKey) {
    String fileName;
    if (StringUtils.isNotBlank(namespaceId)) {
        //對應叢集節點下面的data/naming/data/public下面
        fileName = CACHE_DIR + File.separator + namespaceId + File.separator + encodeDatumKey(datumKey);
    } else {
        fileName = CACHE_DIR + File.separator + encodeDatumKey(datumKey);
    }
    return fileName;
}

【1.5】釋出事件寫入記憶體【非同步的】

//使用快捷鍵 Ctrl+Shift+f 或 通過 Edit —> Find —>Find In Path 檢索 ValueChangeEvent 看對應的 onEvent處理
PersistentNotifier類#onEvent方法
@Override
public void onEvent(ValueChangeEvent event) {
    notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
}

public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
    if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
        if (KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key)) {
            for (RecordListener listener : listenerMap.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
                try {
                    if (action == DataOperation.CHANGE) {
                        listener.onChange(key, value);
                    }
                    if (action == DataOperation.DELETE) {
                        listener.onDelete(key);
                    }
                } catch (Throwable e) {
                    Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
                }
            }
        }
    }
    
    if (!listenerMap.containsKey(key)) {
        return;
    }
    
    for (RecordListener listener : listenerMap.get(key)) {
        try {
            if (action == DataOperation.CHANGE) {
                listener.onChange(key, value);
                continue;
            }
            if (action == DataOperation.DELETE) {
                listener.onDelete(key);
            }
        } catch (Throwable e) {
            Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
        }
    }
}

Service類#onChange方法
@Override
public void onChange(String key, Instances value) throws Exception {
    
    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    
    for (Instance instance : value.getInstanceList()) {
        
        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }
        
        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }
        
        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    
    recalculateChecksum();
}

 

 結合原始碼分析選舉部分的驗證

【1】核心類RaftCore類#init()

【1.1】重點說明:該方法利用的是Spring的特性@PostConstruct註解,在初始化類的時候自動啟動了兩個重要的定時任務【選舉任務(MasterElection)和心跳任務(HeartBeat)】

【1.2】程式碼展示

【2】選舉任務的分析

【2.1】選舉定時器程式碼展示

//對選舉任務進行註冊
public static ScheduledFuture registerMasterElection(Runnable runnable) {
    return NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}

//從GlobalExecutor類中可以看出,預設的執行緒數量為JVM可用超執行緒的兩倍
private static final ScheduledExecutorService NAMING_TIMER_EXECUTOR = ExecutorFactory.Managed
            .newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
                    Runtime.getRuntime().availableProcessors() * 2,
                    new NameThreadFactory("com.alibaba.nacos.naming.timer"));

【2.2】選舉任務程式碼展示【在這裡你會發現,一個bug,如果先甦醒的節點是缺失資料的節點呢,那麼節點必然會丟失,但是這個問題並不嚴重,就和AP架構是一樣的,使用者端會重新建立連線補全資料】

public class MasterElection implements Runnable {
    
    @Override
    public void run() {
        try {
            if (stopWork) {
                return;
            }
            if (!peers.isReady()) {
                return;
            }
            
            //這個實在選舉的時候會產生
            //(而其中的休眠時間是,隨機時間是0-15s,與資料量無關,所以選舉出來的leader不一定是資料最全的)
            RaftPeer local = peers.local();
            //定時任務預設是500MS
            local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
            
            if (local.leaderDueMs > 0) {
                return;
            }
            
            // reset timeout
            //重置選舉時間【15s-20s】
            local.resetLeaderDue();
            //重置心跳時間5s
            local.resetHeartbeatDue();
            //發起投票
            sendVote();
        } catch (Exception e) {
            Loggers.RAFT.warn("[RAFT] error while master election {}", e);
        }
        
    }
    
    private void sendVote() {
        
        RaftPeer local = peers.get(NetUtils.localServer());
        Loggers.RAFT.info(...);
        
        //將之前的選票資訊清空
        peers.reset();
        //增加選舉週期
        local.term.incrementAndGet();
        local.voteFor = local.ip;
        //設定當前節點為候選者
        local.state = RaftPeer.State.CANDIDATE;
        
        Map<String, String> params = new HashMap<>(1);
        params.put("vote", JacksonUtils.toJson(local));
        //遍歷叢集
        for (final String server : peers.allServersWithoutMySelf()) {
            final String url = buildUrl(server, API_VOTE);
            try {
                //傳送非同步請求
                HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
                    @Override
                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
                            return;
                        }
                        
                        RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
                        
                        Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
                        
                        peers.decideLeader(peer);
                        
                    }
                    
                    @Override
                    public void onError(Throwable throwable) {...}
                    
                    @Override
                    public void onCancel() {}
                });
            } catch (Exception e) {...}
        }
    }
}

//非同步情況下對於受到回覆
public RaftPeer decideLeader(RaftPeer candidate) {
    peers.put(candidate.ip, candidate);
    
    SortedBag ips = new TreeBag();
    int maxApproveCount = 0;
    String maxApprovePeer = null;
    for (RaftPeer peer : peers.values()) {
        if (StringUtils.isEmpty(peer.voteFor)) {
            continue;
        }
        
        ips.add(peer.voteFor);
        if (ips.getCount(peer.voteFor) > maxApproveCount) {
            maxApproveCount = ips.getCount(peer.voteFor);
            maxApprovePeer = peer.voteFor;
        }
    }
    
    if (maxApproveCount >= majorityCount()) {
        RaftPeer peer = peers.get(maxApprovePeer);
        peer.state = RaftPeer.State.LEADER;
        
        if (!Objects.equals(leader, peer)) {
            leader = peer;
            ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
            Loggers.RAFT.info("{} has become the LEADER", leader.ip);
        }
    }
    
    return leader;
}

 

 

 

【2.3】接收到選票的處理

//RaftController類
@PostMapping("/vote")
public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception {
    if (versionJudgement.allMemberIsNewVersion()) {
        throw new IllegalStateException("old raft protocol already stop");
    }
    RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class));
    
    return JacksonUtils.transferToJsonNode(peer);
}

public synchronized RaftPeer receivedVote(RaftPeer remote) {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    if (!peers.contains(remote)) {
        throw new IllegalStateException("can not find peer: " + remote.ip);
    }
    
    RaftPeer local = peers.get(NetUtils.localServer());
    //這種其實是應對兩個節點都傳送了選票環節,那麼肯定不會去選對方
    if (remote.term.get() <= local.term.get()) {
        String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
        
        Loggers.RAFT.info(msg);
        if (StringUtils.isEmpty(local.voteFor)) {
            local.voteFor = local.ip;
        }
        
        return local;
    }
    //將選舉時間重置,保證這次選舉不成功會進入下一輪休眠
    local.resetLeaderDue();
    
    local.state = RaftPeer.State.FOLLOWER;
    local.voteFor = remote.ip;
    //這裡的週期變更的原因是,如果這輪選舉不成功的話,下一輪萬一我先醒了保證選舉週期是最大的
    local.term.set(remote.term.get());
    
    Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
    
    return local;
}

 

 

 

【3】心跳任務的分析

【3.1】心跳定時器程式碼展示

//定義了心跳任務500ms一次,而且與選舉任務是同一個定時器
public static ScheduledFuture registerHeartbeat(Runnable runnable) {
    return NAMING_TIMER_EXECUTOR.scheduleWithFixedDelay(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}

 

【3.2】心跳任務程式碼展示【說白了也就是會傳送列表的key值【減少資料量,而對應從節點自動拉取來補全資料】過去,採用時間戳作為版本號】

public class HeartBeat implements Runnable {
    
    //非領導者不可發心跳
    @Override
    public void run() {
        try {
            if (stopWork) {
                return;
            }
            if (!peers.isReady()) {
                return;
            }
            
            RaftPeer local = peers.local();
            //心跳時間設定在5s內的亂數值
            local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
            if (local.heartbeatDueMs > 0) {
                return;
            }
            
            local.resetHeartbeatDue();
            
            sendBeat();
        } catch (Exception e) {...}
        
    }
    
    private void sendBeat() throws IOException, InterruptedException {
        RaftPeer local = peers.local();
        if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
            return;
        }
        if (Loggers.RAFT.isDebugEnabled()) {..紀錄檔部分忽略..}
        
        local.resetLeaderDue();
        
        // build data
        ObjectNode packet = JacksonUtils.createEmptyJsonNode();
        packet.replace("peer", JacksonUtils.transferToJsonNode(local));
        
        ArrayNode array = JacksonUtils.createEmptyArrayNode();
        
        if (switchDomain.isSendBeatOnly()) {
            Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
        }
        
        if (!switchDomain.isSendBeatOnly()) {
            for (Datum datum : datums.values()) {
                
                ObjectNode element = JacksonUtils.createEmptyJsonNode();
                
                if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                    element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                    element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                }
                //利用時間戳來當版本號
                element.put("timestamp", datum.timestamp.get());
                
                array.add(element);
            }
        }
        
        packet.replace("datums", array);
        // broadcast
        Map<String, String> params = new HashMap<String, String>(1);
        params.put("beat", JacksonUtils.toJson(packet));
        
        String content = JacksonUtils.toJson(params);
        
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        GZIPOutputStream gzip = new GZIPOutputStream(out);
        gzip.write(content.getBytes(StandardCharsets.UTF_8));
        gzip.close();
        
        byte[] compressedBytes = out.toByteArray();
        String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
        
        if (Loggers.RAFT.isDebugEnabled()) {
            Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
                    compressedContent.length());
        }
        
        for (final String server : peers.allServersWithoutMySelf()) {
            try {
                final String url = buildUrl(server, API_BEAT);
                if (Loggers.RAFT.isDebugEnabled()) {
                    Loggers.RAFT.debug("send beat to server " + server);
                }
                HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
                    @Override
                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
                            MetricsMonitor.getLeaderSendBeatFailedException().increment();
                            return;
                        }
                        
                        peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
                        if (Loggers.RAFT.isDebugEnabled()) {
                            Loggers.RAFT.debug("receive beat response from: {}", url);
                        }
                    }
                    
                    @Override
                    public void onError(Throwable throwable) {
                        Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,
                                throwable);
                        MetricsMonitor.getLeaderSendBeatFailedException().increment();
                    }
                    
                    @Override
                    public void onCancel() {
                    
                    }
                });
            } catch (Exception e) {
                Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
                MetricsMonitor.getLeaderSendBeatFailedException().increment();
            }
        }
        
    }
}

 

 

 

【3.3】接收到心跳的處理

//RaftController類
@PostMapping("/beat")
public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
    if (versionJudgement.allMemberIsNewVersion()) {
        throw new IllegalStateException("old raft protocol already stop");
    }
    //解壓,跳脫
    String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
    String value = URLDecoder.decode(entity, "UTF-8");
    value = URLDecoder.decode(value, "UTF-8");
    
    JsonNode json = JacksonUtils.toObj(value);
    
    RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));
    
    return JacksonUtils.transferToJsonNode(peer);
}

//處理心跳請求
public RaftPeer receivedBeat(JsonNode beat) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    final RaftPeer local = peers.local();
    final RaftPeer remote = new RaftPeer();
    JsonNode peer = beat.get("peer");
    remote.ip = peer.get("ip").asText();
    remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
    remote.term.set(peer.get("term").asLong());
    remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
    remote.leaderDueMs = peer.get("leaderDueMs").asLong();
    remote.voteFor = peer.get("voteFor").asText();
    
    if (remote.state != RaftPeer.State.LEADER) {
        Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
                JacksonUtils.toJson(remote));
        throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
    }
    
    if (local.term.get() > remote.term.get()) {
        Loggers.RAFT
                .info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",
                        remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);
        throw new IllegalArgumentException(
                "out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
    }
    
    if (local.state != RaftPeer.State.FOLLOWER) {
        
        Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
        // mk follower
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
    }
    
    final JsonNode beatDatums = beat.get("datums");
    //重置心跳和選舉任務的時間,這一步確保當領導者宕機後,叢集節點因為沒有收到領導者的心跳而重新發起選舉
    local.resetLeaderDue();
    local.resetHeartbeatDue();
    
    peers.makeLeader(remote);
    
    if (!switchDomain.isSendBeatOnly()) {
        
        Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
        //先將本地資料設定為0,後面將領導有的設定為1,最後面根據設定還為0的便是要刪除的
        for (Map.Entry<String, Datum> entry : datums.entrySet()) {
            receivedKeysMap.put(entry.getKey(), 0);
        }
        
        // now check datums
        List<String> batch = new ArrayList<>();
        
        int processedCount = 0;
        if (Loggers.RAFT.isDebugEnabled()) {
            Loggers.RAFT
                    .debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
                            beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
        }
        for (Object object : beatDatums) {
            processedCount = processedCount + 1;
            
            JsonNode entry = (JsonNode) object;
            String key = entry.get("key").asText();
            final String datumKey;
            
            if (KeyBuilder.matchServiceMetaKey(key)) {
                datumKey = KeyBuilder.detailServiceMetaKey(key);
            } else if (KeyBuilder.matchInstanceListKey(key)) {
                datumKey = KeyBuilder.detailInstanceListkey(key);
            } else {
                // ignore corrupted key:
                continue;
            }
            
            long timestamp = entry.get("timestamp").asLong();
            
            receivedKeysMap.put(datumKey, 1);
            
            try {
                if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
                        && processedCount < beatDatums.size()) {
                    continue;
                }
                
                if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
                    batch.add(datumKey);
                }
                
                //利用批次處理來提高效能,滿五十條進行一次處理
                //或者不滿但是,現在已經到了最後了,就再一批次全部處理
                if (batch.size() < 50 && processedCount < beatDatums.size()) {
                    continue;
                }
                
                String keys = StringUtils.join(batch, ",");
                
                //如果批次裡面沒有資料就不進行處理
                if (batch.size() <= 0) {
                    continue;
                }
                
                Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"
                                + ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),
                        processedCount, beatDatums.size(), datums.size());
                
                // update datum entry
                String url = buildUrl(remote.ip, API_GET);
                Map<String, String> queryParam = new HashMap<>(1);
                queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
                HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
                    @Override
                    public void onReceive(RestResult<String> result) {
                        if (!result.ok()) {
                            return;
                        }
                        
                        List<JsonNode> datumList = JacksonUtils
                                .toObj(result.getData(), new TypeReference<List<JsonNode>>() {
                                });
                        
                        for (JsonNode datumJson : datumList) {
                            Datum newDatum = null;
                            OPERATE_LOCK.lock();
                            try {
                                
                                Datum oldDatum = getDatum(datumJson.get("key").asText());
                                
                                if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
                                        .get()) {
                                    Loggers.RAFT
                                            .info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
                                                    datumJson.get("key").asText(),
                                                    datumJson.get("timestamp").asLong(), oldDatum.timestamp);
                                    continue;
                                }
                                
                                if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
                                    Datum<Service> serviceDatum = new Datum<>();
                                    serviceDatum.key = datumJson.get("key").asText();
                                    serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                    serviceDatum.value = JacksonUtils
                                            .toObj(datumJson.get("value").toString(), Service.class);
                                    newDatum = serviceDatum;
                                }
                                
                                if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
                                    Datum<Instances> instancesDatum = new Datum<>();
                                    instancesDatum.key = datumJson.get("key").asText();
                                    instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
                                    instancesDatum.value = JacksonUtils
                                            .toObj(datumJson.get("value").toString(), Instances.class);
                                    newDatum = instancesDatum;
                                }
                                
                                if (newDatum == null || newDatum.value == null) {
                                    Loggers.RAFT.error("receive null datum: {}", datumJson);
                                    continue;
                                }
                                
                                raftStore.write(newDatum);
                                
                                datums.put(newDatum.key, newDatum);
                                notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);
                                
                                local.resetLeaderDue();
                                
                                if (local.term.get() + 100 > remote.term.get()) {
                                    getLeader().term.set(remote.term.get());
                                    local.term.set(getLeader().term.get());
                                } else {
                                    local.term.addAndGet(100);
                                }
                                
                                raftStore.updateTerm(local.term.get());
                                
                                Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
                                        newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);
                                
                            } catch (Throwable e) {
                                Loggers.RAFT
                                        .error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,
                                                e);
                            } finally {
                                OPERATE_LOCK.unlock();
                            }
                        }
                        try {
                            TimeUnit.MILLISECONDS.sleep(200);
                        } catch (InterruptedException e) {
                            Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);
                        }
                        return;
                    }
                    
                    @Override
                    public void onError(Throwable throwable) {
                        Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);
                    }
                    
                    @Override
                    public void onCancel() {
                    
                    }
                    
                });
                
                batch.clear();
                
            } catch (Exception e) {
                Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
            }
            
        }
        
        List<String> deadKeys = new ArrayList<>();
        for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
            if (entry.getValue() == 0) {
                deadKeys.add(entry.getKey());
            }
        }
        //批次刪除
        for (String deadKey : deadKeys) {
            try {
                deleteDatum(deadKey);
            } catch (Exception e) {
                Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
            }
        }
        
    }
    
    return local;
}

 

 

Nacos叢集資料一致性(持久化範例CP模式Raft協定實現)

註冊中心CAP架構剖析