實現簡單的多節點搶注(主)功能

2022-06-08 12:01:28

前言

  在分散式系統中經常會遇到某個業務僅需要單個節點執行的場景,通常這樣做是為了解決並行引起的狀態不一致問題。但是為了防止出現單點故障,又需要為這些節點做故障轉移的實現。簡單的方案是同時起多個節點,但是隻有一個節點作為主節點執行業務,其他的作為備份節點需要實時跟蹤主節點執行狀態,一旦發現主節點掛掉就將自己轉變為主節點進行業務處理,這也就是所謂的「多節點搶注(主)」。

 

實現

  實現一個簡單的多節點搶注功能並不複雜,只需要藉助一些中介軟體進行狀態維護就可以做到,這裡使用大家常用的redis作為實現方案。這個最小實現方案中僅涉及三個參與角色,分別是:

  WorkNode -- 工作節點

  Redis -- Redis快取

  CallbackHandler -- 節點註冊成功回撥(代表業務處理)

具體實現程式碼如下(僅供參考,相關細節需要自己根據業務進一步實現)

WorkNode

package me.kongdl.ezwok.work;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author: kongdl
 * @date: 2022/6/7 14:55
 * @description: 分散式節點搶佔註冊模型
 **/
public class WorkNode extends Thread {
    // 業務程式碼
    private String code;
    // 節點ID
    private String nodeId;
    // redis
    private Redis redis;
    // 主節點標誌
    private AtomicBoolean master;
    // 註冊成功回撥介面
    private CallbackHandler callbackHandler;

    public WorkNode(String code, String nodeId, CallbackHandler callbackHandler) {
        this.code = code;
        this.nodeId = nodeId;
        this.redis = new Redis();
        this.master = new AtomicBoolean(false);
        this.callbackHandler = callbackHandler;
    }

    @Override
    public void run() {
        // master:event-handler => <NODE_ID>
        String key = "master:" + code;
        while (!registerAsMaster(key)) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                return;
            }
        }
        redis.expire(key, 5, TimeUnit.SECONDS);
        while (true) { // 節點續期
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
            redis.expire(key, 5, TimeUnit.SECONDS);
        }
    }

    /**
     * 嘗試註冊為主節點
     * @param key
     * @return true-成功,false-失敗
     */
    private boolean registerAsMaster(String key) {
        boolean result = redis.setnx(key, nodeId);
        if (result) {
            master.set(true);
            // callback in async mode
            new Thread(() -> callbackHandler.handle(this)).start();
        }
        return result;
    }

    /**
     * 當前節點是否為主節點
     * @return
     */
    public boolean isMaster() {
        return master.get();
    }

    /**
     * 業務程式碼
     * @return
     */
    public String getCode() {
        return code;
    }

    /**
     * 節點ID
     * @return
     */
    public String getNodeId() {
        return nodeId;
    }

}
View Code

 

Redis

package me.kongdl.ezwok.work;

import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import redis.clients.jedis.JedisPoolConfig;

import java.util.concurrent.TimeUnit;

/**
 * @author: kongdl
 * @date: 2022/6/7 14:58
 * @description: Redis
 **/
public class Redis {
    private final StringRedisTemplate redisTemplate;

    public Redis() {
        this.redisTemplate = initRedisTemplate();
    }

    private StringRedisTemplate initRedisTemplate() {
        RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration();
        standaloneConfig.setHostName("localhost");
        standaloneConfig.setPort(6379);
        standaloneConfig.setPassword("Redis#321");
        standaloneConfig.setDatabase(0);
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMinIdle(2);
        poolConfig.setMaxIdle(10);
        poolConfig.setMaxTotal(100);
        JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder().usePooling().poolConfig(poolConfig).build();
        JedisConnectionFactory connectionFactory = new JedisConnectionFactory(standaloneConfig, clientConfiguration);
        connectionFactory.afterPropertiesSet();
        StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    public boolean setnx(String key, String value) {
        return redisTemplate.opsForValue().setIfAbsent(key, value);
    }

    public boolean expire(String key, long timeout, TimeUnit unit) {
        return redisTemplate.expire(key, timeout, unit);
    }
}
View Code

 

CallbackHandler

package me.kongdl.ezwok.work;

/**
 * @author: kongdl
 * @date: 2022/6/7 15:58
 * @description: 回撥處理介面
 **/
public interface CallbackHandler {

    /**
     * 回撥處理
     */
    void handle(WorkNode node);
}
View Code

 

測試

/**
* 模擬多節點下的執行狀況
**/
public class Demo {

    public static void main(String[] args) {
        // 業務程式碼,用以區分不同的業務
        String code = "event-handler";
        // 節點註冊成功後的回撥處理
        final CallbackHandler callbackHandler = node -> {
            // 執行業務操作
            System.out.println(node.getCode() + "<" + node.getNodeId() + "> registered as master ok!");
            Random random = new Random(System.currentTimeMillis());
            try { // 模擬執行緒隨機掛掉
                TimeUnit.SECONDS.sleep(random.nextInt(10));
                node.interrupt();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        // 啟動多個節點
        for (int i = 1; i <= 10; i++) {
            String nodeId = "node-" + i;
            new WorkNode(code, nodeId, callbackHandler).start();
        }
    }
}

 

結論

  測試程式碼執行後會發現某個節點註冊成功,執行一段時間(幾秒)後掛掉,後續備份節點會自動註冊成為主節點並接替執行業務,從而證明了該模型具備了基本的節點搶注和故障轉移功能。當然,實際生產環境具有更復雜的場景和業務需求,但是可以認為都是在這個基礎上進行了相關擴充套件。