Nodejs 使用 ZooKeeper 做服務發現

2023-02-17 21:00:14
將單體服務拆分為微服務後,為了服務高可用,一般會做叢集多範例。但在分散式下,怎麼進行高效、便捷的進行服務存取問題,出現了各類服務註冊和服務發現框架。這裡使用的是Zookeeper。ZooKeeper 官網 https://zookeeper.apache.org
我們的業務系統使用的開發語言是JAVA,但是部分頁面請求是先到nodejs 做的webportal服務,進行許可權校驗,校驗通過後呼叫Java提供的API。當前階段Java端已經微服務化,使用Zookeeper作為註冊中心,目前只需要讓nodejs端,也接入到Zookeeper,作為服務消費者,就能搭建機器環境。

找輪子

通過查詢,發現npm有現成的庫 node-zookeeper-client ,避免重複造輪子,就用它了。

接入思路

由於我們只是作為服務消費者,不需要使用服務註冊的api,大部分可以直接在檔案中找到API。

編碼過程 

npm 安裝

 1 npm i node-zookeeper-client 

連線ZK

 1 const Zookeeper = require('node-zookeeper-client');
 2 const CONNECTION_STRING = "127.0.0.1:2181"; // ZK的服務地址
 3 const OPTIONS = {
 4   sessionTimeout: 5000 
 5 }
 6 const zk = Zookeeper.createClient(CONNECTION_STRING, OPTIONS);
 7   zk.on('connected', function(){
 8   console.log("zk=====", zk);
 9 });
10 //獲取根節點下的子節點資料
11 zk.getChildren('/', function(error, children, stat){
12   if(error){
13     console.log(error.stack);
14     return;
15   }
16   console.log(children);
17 })
18 zk.connect();

其他API(僅供參考)

 1 // 判斷節點是否已存在
 2 zk.exists('/phpnode',function(error,stat){
 3    if(stat){
 4         console.log("節點存在");
 5    }else{
 6        console.log("節點不存在");
 7    }
 8 })
 9  
10  // 建立/註冊節點
11 zk.create('/phpnode',new Buffer('hello'),function(error,path){
12    console.log(path);
13 })
14 
15 // 獲取節點資料
16 zk.getData('/phpnode',function(error,data,stat){
17    console.log(data.toString());
18 });
19 
20 //節點刪除
21 zk.remove('/phpnode',function(error){
22    if(!error){
23        console.log('node 節點刪除成功');
24     }
25 })

於是有了第一版本程式碼

 1 const zookeeper = require('node-zookeeper-client');
 2 
 3 
 4 // ZK基礎設定資訊,正式專案需要從環境檔案匯入
 5 export const ZK = {
 6     clientAddress: 'localhost:2181/zk/test', // ZK地址
 7     servicePath: '/test-service', // 服務路徑
 8 };
 9 
10 let zkClient = null;
11 
12 // 獲取服務ip+port
13 export const getZKServiceBaseUrl = (servicePath) => {
14     return new Promise((resolve, reject) => {
15         try {
16             // 防止重複連線
17             if (zkClient) {
18                 disconnectZKService();
19             }
20 
21             // 新建連線
22             zkClient = zookeeper.createClient(ZK.clientAddress);
23             // 連線後執行一次
24             zkClient.once('connected', async function () {
25                 // 獲取服務節點資訊
26                 const res = await listChildren(zkClient, servicePath);
27                 res.message ? reject(res) : resolve(res);
28             });
29 
30             zkClient.connect();
31         } catch (error) {
32             reject(error);
33         }
34     });
35 };
36 
37 // 斷開連結
38 export const disconnectZKService = () => {
39     if (zkClient) {
40         zkClient.close();
41     }
42 };
43 
44 // 獲取節點資訊,ip+port
45 function listChildren(client, path) {
46     return new Promise((resolve, reject) => {
47         client.getChildren(path,
48             function () {},
49             function (error, children) {
50                 if (error) {
51                     reject({
52                         ...error,
53                         message: `獲取ZK節點error,Path: ${path}`
54                     });
55                 }
56                 try {
57                     let addressPath = path + '/';
58                     if (children.length > 1) {
59                         //若存在多個地址,則隨機獲取一個地址
60                         addressPath += children[Math.floor(Math.random() * children.length)];
61                     } else {
62                         //若只有唯一地址,則獲取該地址
63                         addressPath += children[0];
64                     }
65                     //獲取服務地址
66                     client.getData(addressPath, function (err, data) {
67                         if (err) {
68                             reject({
69                                 ...error,
70                                 message: `獲取ZK服務地址error,Stack: ${err.stack}`
71                             });
72                         }
73                         if (!data) {
74                             reject({
75                                 ...error,
76                                 message: `ZK data is not exist`
77                             });
78                         }
79                         const serviceInfo = JSON.parse(data);
80 
81                         const url = serviceInfo.address + ':' + serviceInfo.port;
82                         resolve(url);
83                     });
84                 } catch (error) {
85                     reject({
86                         ...error,
87                         message: `list ZK children error`
88                     });
89                 }
90             }
91         );
92     });
93 }

通過測試程式碼,可以實現呼叫Java服務。可能一般的程式設計師實現功能了就好了,可是作為一個有點追求的,感覺程式碼哪裡有問題。具體是哪裡呢,盯著螢幕瞅了兩分鐘,發現每次獲取服務都取 ZK 註冊中心獲取,這個過程涉及到的網路請求而且還不是一次HTTP,如果只是這麼簡單的改造,程式單純在效能響應上很有可能還不如老版本。我們可以在獲取服務的真實遠端地址前,新增一個本地快取。通過ZK訂閱機制,更新本地快取資料。

思路雖然明確了,可以api掃了掃,沒有我們想要的監聽器,如下所示

 

這怎麼辦,按理說的應該會有一個,節點資料改變推播的監聽器,例如新增,刪除,修改等等。找了半天也沒找到合適的。

沒辦法,接著看原始碼吧,看了一會,忽然,看到一個似乎可用的,類

 

 這不就是我需要的類嗎,但是居然在一方法中注入監聽器,先試試吧。

 

 試了一下,嘿,真的可以了,當伺服器端節點資料發生變動後,會自動觸發監聽器 watcher 的回撥邏輯。這就好辦了,改造開始。

改進後的程式碼

const zookeeper = require('node-zookeeper-client');
var ZK = require('../config/env.js').zk;

const client = Object.freeze({
    zkClient: zookeeper.createClient(ZK.connectionString),
    serviceSet: [], //
    serviceCache: Object.freeze({
        map: new Map(),

        /**
         * 更新快取
         * @param {String} path 服務路徑
         * @param {Array<String>} arr 真實存取集合
         */
        updateCache: function (path, arr) {
            this.map.set(path, arr);
        },

        /**
         * 從快取中獲取存取地址
         * 
         * @param {String} path 服務路徑
         * @returns String 真實存取地址
         */
        getRealPath: function (path) {
            let arr = this.map.get(path);

            if (arr.length > 1) //若存在多個地址,則隨機獲取一個地址
                return arr[Math.floor(Math.random() * arr.length)];
            else //若只有唯一地址,則獲取該地址
                return arr[0];
        }
    }),

    connect: function () {
        console.info("連線 zookeeper");

        this.zkClient.once('connected', function () {
            console.info("連線成功");
        });

        this.zkClient.connect();
    },

    getRealPath: function (serviceName) {
        return new Promise(async (resolve, reject) => {
            if (this.serviceSet.includes(serviceName)) {
                resolve(this.serviceCache.getRealPath(serviceName));
            } else {
                // 載入服務節點資訊
                this.loadChildren(serviceName).then(url => resolve(url)).catch(error => reject(error));
            }
        });
    },

    loadChildren: function (path) {
        console.info("進入 loadChildren ");
        return new Promise((resolve, reject) => {
            this.zkClient.getChildren(path, (event) => {
                console.info(" loadChildren watcher ", path, event);

                this.getChildren(event.path);
            }, (error, ids) => {
                console.info(" loadChildren callback ", path, error, ids);
                if (error) {
                    reject({
                        ...error,
                        message: `獲取ZK節點error,Path: ${path}`
                    });
                } else {
                    resolve(this.getData(path, ids));
                }
            });
        });
    },

    getChildren: function (path) {
        console.info("進入 getChildren ");
        return new Promise((resolve, reject) => {
            this.zkClient.getChildren(path, (error, ids) => {
                console.info(" getChildren callback ", path, error, ids);
                if (error) {
                    reject({
                        ...error,
                        message: `獲取ZK節點error,Path: ${path}`
                    });
                }

                resolve(this.getData(path, ids));
            });
        });
    },

    getData: function (path, ids) {
        console.info("進入 getData ");

        let pros = ids.map(id => new Promise((resolve, reject) => {
            //獲取服務地址
            this.zkClient.getData(path + "/" + id, (error, data) => {
                console.info(" getData callback ", path, id);
                if (error) {
                    reject({
                        ...error,
                        message: `獲取ZK服務地址error,Stack: ${err.stack}`
                    });
                }
                if (!data) {
                    reject({
                        ...error,
                        message: `ZK data is not exist`
                    });
                }

                const node = JSON.parse(data).payload;
                const protocol = node.ssl ? "https://" : "http://";

                resolve(`${protocol}${node.host}:${node.port}`);
            });
        }));

        return Promise.all(pros).then(arr => this.serviceCache.updateCache(path, arr)).then(() => this.serviceCache.getRealPath(path));
    },

    disconnect: function () { //斷開連線
        console.info("進入 disconnect ")
        if (this.zkClient) {
            console.info("執行 close")
            this.zkClient.close();
        }
    },
});

client.connect();

module.exports = {
    getServiceUrl: (path) => client.getRealPath(path),
    disconnect: () => client.disconnect(),
}

這樣終於,好一點了。

未完待續(多節點的選擇問題)

多節點選擇策略:隨機,輪轉,粘性 等等,一般不同的專案使用的策略也不太一樣,範例中使用的是簡單隨機策略,後續再進行節點選擇的策略問題優化啦。

關機,收工!!!