前言
在分散式系統中經常會遇到某個業務僅需要單個節點執行的場景,通常這樣做是為了解決並行引起的狀態不一致問題。但是為了防止出現單點故障,又需要為這些節點做故障轉移的實現。簡單的方案是同時起多個節點,但是隻有一個節點作為主節點執行業務,其他的作為備份節點需要實時跟蹤主節點執行狀態,一旦發現主節點掛掉就將自己轉變為主節點進行業務處理,這也就是所謂的「多節點搶注(主)」。
實現
實現一個簡單的多節點搶注功能並不複雜,只需要藉助一些中介軟體進行狀態維護就可以做到,這裡使用大家常用的redis作為實現方案。這個最小實現方案中僅涉及三個參與角色,分別是:
WorkNode -- 工作節點
Redis -- Redis快取
CallbackHandler -- 節點註冊成功回撥(代表業務處理)
具體實現程式碼如下(僅供參考,相關細節需要自己根據業務進一步實現)
WorkNode
package me.kongdl.ezwok.work; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * @author: kongdl * @date: 2022/6/7 14:55 * @description: 分散式節點搶佔註冊模型 **/ public class WorkNode extends Thread { // 業務程式碼 private String code; // 節點ID private String nodeId; // redis private Redis redis; // 主節點標誌 private AtomicBoolean master; // 註冊成功回撥介面 private CallbackHandler callbackHandler; public WorkNode(String code, String nodeId, CallbackHandler callbackHandler) { this.code = code; this.nodeId = nodeId; this.redis = new Redis(); this.master = new AtomicBoolean(false); this.callbackHandler = callbackHandler; } @Override public void run() { // master:event-handler => <NODE_ID> String key = "master:" + code; while (!registerAsMaster(key)) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return; } } redis.expire(key, 5, TimeUnit.SECONDS); while (true) { // 節點續期 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); break; } redis.expire(key, 5, TimeUnit.SECONDS); } } /** * 嘗試註冊為主節點 * @param key * @return true-成功,false-失敗 */ private boolean registerAsMaster(String key) { boolean result = redis.setnx(key, nodeId); if (result) { master.set(true); // callback in async mode new Thread(() -> callbackHandler.handle(this)).start(); } return result; } /** * 當前節點是否為主節點 * @return */ public boolean isMaster() { return master.get(); } /** * 業務程式碼 * @return */ public String getCode() { return code; } /** * 節點ID * @return */ public String getNodeId() { return nodeId; } }
Redis
package me.kongdl.ezwok.work; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.jedis.JedisClientConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import redis.clients.jedis.JedisPoolConfig; import java.util.concurrent.TimeUnit; /** * @author: kongdl * @date: 2022/6/7 14:58 * @description: Redis **/ public class Redis { private final StringRedisTemplate redisTemplate; public Redis() { this.redisTemplate = initRedisTemplate(); } private StringRedisTemplate initRedisTemplate() { RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration(); standaloneConfig.setHostName("localhost"); standaloneConfig.setPort(6379); standaloneConfig.setPassword("Redis#321"); standaloneConfig.setDatabase(0); JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMinIdle(2); poolConfig.setMaxIdle(10); poolConfig.setMaxTotal(100); JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder().usePooling().poolConfig(poolConfig).build(); JedisConnectionFactory connectionFactory = new JedisConnectionFactory(standaloneConfig, clientConfiguration); connectionFactory.afterPropertiesSet(); StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory); redisTemplate.afterPropertiesSet(); return redisTemplate; } public boolean setnx(String key, String value) { return redisTemplate.opsForValue().setIfAbsent(key, value); } public boolean expire(String key, long timeout, TimeUnit unit) { return redisTemplate.expire(key, timeout, unit); } }
CallbackHandler
package me.kongdl.ezwok.work; /** * @author: kongdl * @date: 2022/6/7 15:58 * @description: 回撥處理介面 **/ public interface CallbackHandler { /** * 回撥處理 */ void handle(WorkNode node); }
測試
/** * 模擬多節點下的執行狀況 **/ public class Demo { public static void main(String[] args) { // 業務程式碼,用以區分不同的業務 String code = "event-handler"; // 節點註冊成功後的回撥處理 final CallbackHandler callbackHandler = node -> { // 執行業務操作 System.out.println(node.getCode() + "<" + node.getNodeId() + "> registered as master ok!"); Random random = new Random(System.currentTimeMillis()); try { // 模擬執行緒隨機掛掉 TimeUnit.SECONDS.sleep(random.nextInt(10)); node.interrupt(); } catch (InterruptedException e) { e.printStackTrace(); } }; // 啟動多個節點 for (int i = 1; i <= 10; i++) { String nodeId = "node-" + i; new WorkNode(code, nodeId, callbackHandler).start(); } } }
結論
測試程式碼執行後會發現某個節點註冊成功,執行一段時間(幾秒)後掛掉,後續備份節點會自動註冊成為主節點並接替執行業務,從而證明了該模型具備了基本的節點搶注和故障轉移功能。當然,實際生產環境具有更復雜的場景和業務需求,但是可以認為都是在這個基礎上進行了相關擴充套件。