elasticsearch升級和索引重建。

2023-04-15 18:00:43

1.背景描述

  2020年團隊決定對elasticsearch升級。es(elasticsearch縮寫,下同)當前版本為0.9x,升級到5.x版本。es在本公司承載三個部分的業務,站內查詢,訂單資料統計,elk紀錄檔分析。

  對於站內查詢和訂單資料統計,當前業務架構是

  mysql -> canal -> kafka -> es

  (可以考慮使用kafka connector 代替canal)

2.難點

  難點是在升級的時候如何不影響當前業務。

3.具體步驟

A.部署es新叢集

  下載5.x版本的es,在新的機器上部署新的叢集。

B.pull程式碼,升級程式碼到es新版本

  由於從0.9x到5.x版本跨度比較大,許多java api都發生了變化,需要修復。

  一個坑是alias api 發生了語意變化,在後來的自測中修復了此問題。

C.重建索引

  我們使用索引重建程式來新建索引。重建索引具體步驟如下,我們稱線上索引為online index, 新建立的索引為new index。

  1.init

    重新整理索引名對映關係,檢查當前alias只有一個物理索引。

    根據預定義的mapping,建立索引new index。

    設定線上索引記錄資料變更紀錄檔,即記錄線上索引消費kafka資料,並儲存為change log檔案.

  2.全量索引資料庫上的資料到new index

    從mysql查出資料同步到es中,如果有多個分表,就按照表順序同步。可以開啟多執行緒批次插入。

  3.對new index索引優化

    refresh, flush 索引。呼叫force-merge api,進行段合併。

  4.重放change log到new index中

    根據change log 轉換為es query,寫入到new index。    

  5.暫停線上索引的寫入

    因為online index和new index 使用的是相同的kafka consumer group,所以必須停掉online index的消費功能。

  6.關閉change log

    停止記錄線上索引記錄資料變更紀錄檔。

  7.第二階段重放change log

    根據change log 轉換為es query,寫入到new index。 

  8.刪除change log 

    刪除線索引記錄資料變更紀錄檔。

  9.設定副本數 

    new index建立索引的時候預設副本數為0,現在動態調整副本數為業務需要的值。比如對現實搜尋業務設定兩個副本,對訂單統計類索引不需要副本。

PUT /new_index/_settings
{
    "number_of_replicas": 2
}

    此階段可能會比較耗時,需要等待幾分鐘才能進行下一步操作。更好的做法是呼叫health api 檢視分片狀態。

GET _cluster/health

{
  "cluster_name" : "testcluster",
  "status" : "yellow",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 1,
  "active_shards" : 1,
  "relocating_shards" : 0, // 重新定位的分片
  "initializing_shards" : 0, // 初始化中的分片
  "unassigned_shards" : 1, // 未分配的分片
  "delayed_unassigned_shards": 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch": 0,
  "task_max_waiting_in_queue_millis": 0,
  "active_shards_percent_as_number": 50.0
}

  10.別名切換 

POST /_aliases
{
    "actions": [
        { "remove": { "index": "online_index", "alias": "my_index" }},
        { "add":    { "index": "new_index", "alias": "my_index" }}
    ]
}

  11.執行線上索引 (從kafka裡面讀取資料)

    new_index 開始從kafka裡面消費最新資料。由於之前的操作可能會有延時,需要等待幾分鐘才能同步到最新資料。

  12.刪除舊的索引

    刪除old_index

詳細程式碼步驟如下

        // 1.init
        logger.info("初始化");
        ESHighLevelFactory esHighLevelFactory = ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName());
        logger.info("重新整理索引名對映關係");
        if (!indexContext.refreshIndexName()) {
            throw new IndexException("重新整理索引對映關係失敗");
        }

        rebuildIndexName = indexContext.getPhysicalRebuildIndexName();

        logger.info("初始化重建索引環境,當前重建索引名:" + rebuildIndexName);
        logger.info("建立索引,索引名:" + rebuildIndexName);
        boolean isCreate = false;
        try {
            isCreate = indexContext.getIndex().createIndex(rebuildIndexName);
        } catch (Throwable t) {
            logger.info("建立索引失敗,本次失敗可以不處理,將會自動重試 ...");
        }

        logger.info("設定線上索引記錄資料變更紀錄檔");
        indexContext.startChangeLog();

        // 2. 重建索引
        logger.info("全量索引資料庫上的資料 ...");
        long startRebulidTime = System.currentTimeMillis();
        rebuild();
        logger.info(" ------  完成全量索引資料庫上的資料,對應索引" + rebuildIndexName + ",耗時" + ((System.currentTimeMillis() - startRebulidTime) / 1000)
            + " 秒    ------  ");

        // 3. 索引優化 -- 是否調到變更重放完畢後做優化
        logger.info("優化索引 ...");
        long startOptimizeTime = System.currentTimeMillis();
        ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, 1);
        logger.info(" ------  完成" + rebuildIndexName + "索引優化,耗時 " + ((System.currentTimeMillis() - startOptimizeTime) / 1000)
            + " 秒    ------  ");

        // TODO 字元集設定
        BufferedReader logReader = new BufferedReader(new FileReader(indexContext.getChangeLogFilePath()));

        // 4. 重放變更紀錄檔
        logger.info("重放本地資料變更紀錄檔[第一階段] ...");
        long startReplay1Time = System.currentTimeMillis();
        int replayChangeLogCount = replayChangeLogFirst(logReader);
        logger.info(" ------  完成[第一階段]的變更紀錄檔重放,行數" + replayChangeLogCount + " 耗時 "
            + ((System.currentTimeMillis() - startReplay1Time) / 1000) + " 秒    ------  ");

        // 5. 暫停線上索引
        logger.info("暫停線上索引");
        indexContext.pauseOnlineIndex();
        isPauseOnline.set(true);

        // 6. 設定 線上索引只做索引更新 以及 關閉 change log
        logger.info("停止變更紀錄檔");
        indexContext.stopChangeLog();

        // 7. 繼續重放 change log
        logger.info("重放本地資料變更紀錄檔[第二階段] ...");
        long startReplay2Time = System.currentTimeMillis();
        replayChangeLogCount = replayChangeLogCount + replayChangeLogSecond(logReader);
        if ((indexContext.getWriteChangeLogCount() - replayChangeLogCount) != 0) {
            logger.error("變更紀錄檔,處於錯誤的狀態,統計的紀錄檔行數:" + indexContext.getWriteChangeLogCount() + ", 但實際只有:" + replayChangeLogCount);
        }
        logger.info(" ------  完成[第二階段]的變更紀錄檔重放,行數" + replayChangeLogCount + " 耗時 "
            + ((System.currentTimeMillis() - startReplay2Time) / 1000) + " 秒    ------  ");

        // 8. 刪除變更紀錄檔, OnlineIndex.startChangeLog 有做環境清理,這裡不執行
        logger.info("簡單優化索引 ...");
        long startSimpleOptimizeTime = System.currentTimeMillis();
        ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, null);

        logger.info(" ------  完成" + rebuildIndexName + "索引簡單優化,耗時 " + ((System.currentTimeMillis() - startSimpleOptimizeTime) / 1000)
            + " 秒    ------  ");

        // 9. 設定副本數 (懷疑比較耗時~~~待確認)
        logger.info("設定副本數 ...");
        int replicas = 3;
        if (rebuildIndexName.startsWith(IndexNameConst.ORDER_INDEX_PREFIX)) {
            replicas = 1;
        } else if (rebuildIndexName.startsWith(IndexNameConst.IndexName.activityTicket.getIndexName())) {
            replicas = 2;
        } else {
            String replicasStr = Configuration.getInstance().loadDiamondProperty(Configuration.ES_INDEX_REPLICAS);
            if (NumberUtils.isNumber(replicasStr)) {
                replicas = NumberUtils.toInt(replicasStr);
            }
        }
        ESHighLevelFactory.getInstance(rebuildIndexName).setReplicas(rebuildIndexName, replicas);

        // 執行索引切換流程
        // 預發、線上環境阻塞等待2分鐘同步資料後,再執行索引切換和刪除舊索引邏輯
        try {
            if(IDCUtil.isBuildOrProduction()){
                Thread.sleep(120 * 1000);
            }
        } catch (InterruptedException e) {
        }
        // 10. 別名切換
        logger.info("索引切換:將" + rebuildIndexName + "設定為線上索引");
        if (!indexContext.switchIndex(rebuildIndexName)) {
            throw new IndexException("索引切換失敗:將" + rebuildIndexName + "設定為線上索引失敗");
        }

        // 11. 執行線上索引
        logger.info("執行線上索引");
        indexContext.keepRuningOnlineIndex();
        isPauseOnline.set(false);

        // 12. 刪除原有線上索引
        String oldOnlineIndexName = indexContext.getPhysicalRebuildIndexName();
        logger.info("刪除原有線上索引,索引名:" + oldOnlineIndexName);
        if (!ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName()).deleteIndex(oldOnlineIndexName)) {
            throw new IndexException("刪除索引失敗,索引名:" + oldOnlineIndexName);
        }

思考

如果只是簡單地新建索引,完全可以這樣做(使用不同的消費組) 

  1.記錄時間戳 

  2.全量索引資料的資料

  3.根據前面的時間戳找到kafka中的下標,下標得時間戳必須 < 記錄的時間戳

  4.根據上一步的下標開始索引資料

D.使用新叢集進行業務測試

  部署新的使用者端服務呼叫新的es叢集,檢查業務是否正常。對站內查詢檢查搜尋結果是否一致,對統計類查詢檢視統計結果是否一致。

E.釋出線上使用者端搜尋程式碼,修改es地址為新叢集地址

  上線,觀察業務是否穩定。

F.下線舊的es叢集

  釋放舊的es叢集的資源。

4.總結

  es升級這份工作是兩年之前做的,現在來進行總結,部分細節可能會有疏漏。但是總結起來,依然後很多收穫,從架構,程式碼細節上都有改進的空間。es重建程式碼可以做得更通用,然後開源出來。