HBase Compaction 原理與線上調優實踐

2023-07-28 12:01:23

作者:vivo 網際網路儲存技術團隊- Hang Zhengbo

本文對 HBase Compaction 的原理、流程以及限流的策略進行了詳細的介紹,列舉了幾個線上進行調優的案例,最後對 Compaction 的相關引數進行了總結。

一、Compaction 介紹

HBase 是基於一種 LSM-Tree(Log-Structured Merge Tree)體系架構的儲存模型設計的,寫入時先寫入 WAL(Write-Ahead-Log)紀錄檔,再寫入 Memstore 快取,滿足一定條件後,會執行 Flush 操作將快取資料刷寫到磁碟,生成一個 HFile 資料檔案。隨著資料不斷寫入,HFile 檔案會越來越多,檔案太多導致查詢資料時 IO 次數增加,進而影響到 HBase 的查詢效能。為了優化讀的效能,採用合併小 HFile 的方法來減少檔案數量,這種合併 HFile 的操作就稱為 Compaction。Compaction 是從一個 Region 的一個 Store 中選擇部分 HFile 檔案進行合併的過程。合併原理是從這些待合併的資料檔案中依次讀出 KeyValue,由小到大排序後寫入一個新的檔案中。之後這個新生成的檔案就會取代之前已合併的所有檔案對外提供服務。

1.1 Compaction 的分類

HBase 根據合併規模將 Compaction 分為兩類:Minor Compaction 和 Major Compaction。

  • Minor Compaction 是指選取部分小的、相鄰的 HFile,將它們合併成一個更大的 HFile;

  • Major Compaction 是指將一個Store 中所有的 HFile 合併成一個 HFile,這個過程會清理三種無意義的資料:TTL 過期資料、被刪除的資料與版本號超過設定版本號的資料。

下圖形象的描述了2種 Compaction 的區別:

圖片

一般情況下,Major Compaction 持續時間比較長,整個過程消耗大量系統資源,因此線上資料量較大的業務通常推薦關閉自動觸發 Major Compaction 功能,改為在業務低峰期手動觸發(或設定策略自動在低峰期觸發)。

1.2 Compaction的意義

  1. 合併小檔案,減少檔案數,提升讀取效能,穩定隨機讀延遲;

  2. 合併的時候會讀取遠端 DataNode 上的檔案寫入本地 DataNode,提高資料的在地化率;

  3. 清除過期資料和被刪除的資料,減少表的儲存量。

1.3 Compaction觸發時機

HBase 中觸發 Compaction 的時機有很多種,最常見的觸發時機有三種:後臺執行緒週期性檢查時觸發、MemStore Flush 觸發以及手動觸發。

(1)後臺執行緒週期性檢查:後臺執行緒 CompactionChecker 會定期檢查是否需要執行 Compaction,檢查週期為hbase.server.thread.wakefrequency *hbase.server.compactchecker.interval.multiplier,這裡主要考慮的是一段時間內沒有寫入請求導致 Flush 觸發不了 Compaction 的情況。其中引數hbase.server.thread.wakefrequency 預設值是10s,是 HBase 伺服器端執行緒喚醒時間間隔,引數hbase.server.compactchecker.interval.multiplier 預設值1000,是 Compaction 操作週期性檢查乘數因子。10 * 1000 s 約等於 2hrs 46mins 40sec。

(2)MemStore Flush:Compaction 的根源在於 Flush,MemStore 達到一定閾值就會觸發 Flush ,將記憶體中的資料刷寫到磁碟生成 HFile 檔案,隨著 HFile 檔案越來越多就需要執行 Compaction。HBase 每次 Flush之後,都會判斷是否需要進行 Compaction,一旦滿足 Minor Compaction 或 Major Compaction 的條件便會觸發執行。

(3)手動:是指通過 HBase API、HBase Shell 或者 Master UI 介面等方式執行 compact、major_compact 等命令。

二、Compaction流程

瞭解完基本的背景後,接下來介紹 Compaction 的整個過程。

  • RegionServer 啟動一個 Compaction 檢查執行緒,定期對 Region 的 Store 進行檢查;

  • Compaction 始於特定的觸發條件。一旦觸發,HBase 會將該 Compaction 交由一個獨立的執行緒處理;

  • 從對應的 Store 中選擇合適的 HFile 檔案,這步是整個 Compaction 的核心,選取檔案時需要遵循很多條件,比如檔案數既不能太多也不能太少、檔案大小不能太大等,儘可能地選取承載 IO 負載重的檔案集。基於此,HBase 實現了多種檔案選取策略:常用的有

    RatioBasedCompactionPolicy、ExploringCompactionPolicy 和 StripeCompactionPolicy 等,也支援自定義的 Compaction 演演算法;

  • 選出待合併的檔案後,會根據這些 HFile 檔案的總大小選擇對應的執行緒池來進行處理;

  • 對這些檔案執行具體的 Compaction 操作。

下圖簡單的描述了上述流程。

圖片

下面對圖2中具體的每一步進行詳細說明。

2.1 啟動 Compaction 定時執行緒

在 RegionServer 啟動時,會初始化 CompactSplitThread 執行緒以及定時檢查的 CompactionChecker ,預設10s執行一次。

// Compaction thread
this.compactSplitThread = new CompactSplitThread(this);
// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
 
if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);

其中 CompactSplitThread 是用來實現 Compaction 以及 Split 流程的類,而 CompactChecker 是用來週期性檢查是否執行 Compaction 的。

CompactionChecker 是 ScheduledChore 型別,而 ScheduledChore 是 HBase定期執行的一個 Task。

2.2 觸發 Compaction

Compaction 的觸發時機在上面已經介紹過,下面對這3種觸發機制進行詳細的介紹。

2.2.1 後臺執行緒週期性檢查

後臺執行緒 CompactionChecker 定期檢查是否需要執行 Compaction,檢查週期為hbase.regionserver.compaction.check.period(預設10s)。

(1)首先檢查檔案數是否大於可執行 Compaction 的檔案數,一旦大於就會觸發 Compaction。

(2)如果不滿足,會接著檢查是否到了 Major Compaction 的執行週期。如果當前 Store 中 HFile 的最早更新時間早於某個值 mcTime,就會觸發 Major Compaction,其中 mcTime 是一個浮動值,浮動區間預設為[7-7*0.2,7+7*0.2],其中7為設定項 hbase.hregion.majorcompaction 設定,0.2為設定項 hbase.hregion.majorcompaction.jitter,所以在7天左右就會執行一次 Major Compaction。使用者如果想禁用 Major Compaction,只需要將引數hbase.hregion.majorcompaction 設為0。

(3)如果到了 Major Compaction 的執行週期:

  • 首先判斷有幾個 HFile 檔案,如果只有1個檔案,會判斷是否有過期資料、在地化率是否比較低,如果都不滿足就不做 Major Compaction;

  • 如果大於1個檔案,也會做 Major Compaction。

後臺執行緒週期性檢查的流程如圖3所示。

圖片

下面是該執行緒的關鍵程式碼:

//ScheduledChore的run方法會一直呼叫chore函數
@Override
protected void chore() {
  // 遍歷instance下的所有online的region 進行迴圈檢測
  // onlineRegions是HRegionServer上儲存的所有能夠提供有效服務的線上Region集合;
  for (HRegion r : this.instance.onlineRegions.values()) {
    if (r == null)
      continue;
    // 取出每個region的store
    for (Store s : r.getStores().values()) {
      try {
        // 檢查是否需要compact的時間間隔 hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,multiplier預設1000;
        long multiplier = s.getCompactionCheckMultiplier();
        assert multiplier > 0;
        // 未到multiplier的倍數跳過,每當迭代因子iteration為合併檢查倍增器multiplier的整數倍時,才會發起檢查
        if (iteration % multiplier != 0) continue;
        // 需要合併的話,發起SystemCompaction請求,此處最終比較的是是否當前hfile數量減去正在compacting的檔案數大於設定的compact min值。若滿足則執行systemcompact
        if (s.needsCompaction()) {
          // Queue a compaction. Will recognize if major is needed.
          this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
              + " requests compaction");
        } else if (s.isMajorCompaction()) {
          if (majorCompactPriority == DEFAULT_PRIORITY
              || majorCompactPriority > r.getCompactPriority()) {
            this.instance.compactSplitThread.requestCompaction(r, s, getName()
                + " requests major compaction; use default priority", null);
          } else {
            this.instance.compactSplitThread.requestCompaction(r, s, getName()
                + " requests major compaction; use configured priority",
              this.majorCompactPriority, null);
          }
        }
      } catch (IOException e) {
        LOG.warn("Failed major compaction check on " + r, e);
      }
    }
  }
  iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
}

2.2.2 Memstore Flush 觸發

Memstore Flush 會產生 HFile 檔案,檔案越來越多就需要 Compaction。因此在每次執行完 Flush 操作之後,都會對當前 Store 中的檔案數進行判斷,一旦檔案數超過 Compaction 的閾值 ,就會觸發 Compaction。這裡需要強調的是,Compaction 是以 Store 為單位進行的,而在 Flush 觸發條件下,整個 Region 的所有 Store 都會執行 Compaction,所以會在短時間內可能會執行多次 Compaction。下面是 Flush 操作觸發 Compaction 的程式碼。

/**
   * Flush a region.
   * @param region Region to flush.
   * @param emergencyFlush Set if we are being force flushed. If true the region
   * needs to be removed from the flush queue. If false, when we were called
   * from the main flusher run loop and we got the entry to flush by calling
   * poll on the flush queue (which removed it).
   * @param forceFlushAllStores whether we want to flush all store.
   * @return true if the region was successfully flushed, false otherwise. If
   * false, there will be accompanying log messages explaining why the region was
   * not flushed.
   */
  private boolean flushRegion(final Region region, final boolean emergencyFlush,
      boolean forceFlushAllStores) {
    synchronized (this.regionsInQueue) {
      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      // Use the start time of the FlushRegionEntry if available
      if (fqe != null && emergencyFlush) {
        // Need to remove from region from delay queue.  When NOT an
        // emergencyFlush, then item was removed via a flushQueue.poll.
        flushQueue.remove(fqe);
      }
    }
  
    lock.readLock().lock();
    try {
      // flush
      notifyFlushRequest(region, emergencyFlush);
      FlushResult flushResult = region.flush(forceFlushAllStores);
     // 檢查是否需要compact
      boolean shouldCompact = flushResult.isCompactionNeeded();
      // We just want to check the size
      // 檢查是否需要split
      boolean shouldSplit = ((HRegion)region).checkSplit() != null;
      if (shouldSplit) {
        this.server.compactSplitThread.requestSplit(region);
      } else if (shouldCompact) {
        // 發起compact請求
        server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
      }
    } catch (DroppedSnapshotException ex) {
      // Cache flush can fail in a few places. If it fails in a critical
      // section, we get a DroppedSnapshotException and a replay of wal
      // is required. Currently the only way to do this is a restart of
      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
      // where hdfs was bad but passed the hdfs check).
      server.abort("Replay of WAL required. Forcing server shutdown", ex);
      return false;
    } catch (IOException ex) {
      LOG.error("Cache flush failed" + (region != null ? (" for region " +
          Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
        RemoteExceptionHandler.checkIOException(ex));
      if (!server.checkFileSystem()) {
        return false;
      }
    } finally {
      lock.readLock().unlock();
      wakeUpIfBlocking();
    }
    return true;
  }

2.2.3 手動觸發

手動觸發就是通過命令或者 API 介面手動觸發 Compaction,手動觸發的原因有三個:

  • 很多業務擔心自動 Major Compaction 影響讀寫效能,因此會選擇低峰期手動觸發;

  • 使用者在執行完修改ttl的屬性後希望立刻生效,執行手動觸發 Major Compaction;

  • 硬碟容量不夠的情況下手動觸發 Major Compaction 刪除大量過期資料。

大多數都是基於第1點原因進行手動觸發。

2.3 選擇待合併的檔案

Compaction 的核心就是選擇合適的檔案進行合併,因為合併檔案的大小以及其當前承載的 IO 直接決定了 Compaction 的效果。希望能找到這樣的檔案:承載了大量 IO 請求但是檔案大小很小,這樣 Compaction 本身不會消耗太多 IO,而且合併完成之後對讀的效能會有顯著提升。現實情況可能大部分都不會是這樣。目前 HBase 提供了多種 Minor Compaction 檔案選擇策略,通過設定項hbase.hstore.engine.class 設定。不管哪種策略,在執行之前都要做對檔案做一些篩選操作,排除不符合條件的檔案,以減少 Compaction 的工作量,減少對讀寫的影響。

  • 排除當前正在執行 Compaction 的檔案;

  • 如果一個檔案所有的記錄都已經過期,則直接將檔案刪除;

  • 排除過大的單個檔案,如果檔案大小大於 hbase.hstore.compaction.max.size(預設Long最大值)則被排除,不排除會產生大量 IO 消耗。

排除完後剩下的檔案稱為候選檔案,接下來會再判斷是否滿足 Major Compaction 條件,如果滿足,就會選擇全部檔案進行合併。判斷條件有下面三條,只要滿足其中一條就會執行 Major Compaction:

  • 到了 Compaction 自動執行的週期且候選檔案數小於 hbase.hstore.compaction.max(預設10),如果關掉自動 Major Compaction 執行則不適用;

  • Store 中含有 Reference 檔案,Reference 檔案是 Split Region 產生的臨時參照檔案,在 Compaction 過程中刪除;

  • 使用者手動執行的 Major Compaction。

如果不滿足上述執行條件,則為 Minor compaction。Minor Compaction 的策略有很多種,下面重點介紹 RationBasedCompactionPolicy(0.98之前的版本)、ExploringCompactionPolicy(0.98之後預設的版本) 和 StripeCompactionPolicy 的執行策略。

2.3.1 Compaction檔案選擇策略的建模

所謂的 Compaction 檔案選擇策略可以建模為下面的問題:

圖片

圖中的每個數位表示了檔案的 Sequence ID,數位越大則檔案越新,很有可能剛剛Flush而成,意味著檔案 Size 也可能越小。這樣的檔案在 Compaction 時優先選擇,因此 Store下的 Storefile 檔案會依據 Sequence ID 從小到大排序,依次標記為 f[0]、f[1]。。。。f[n-1],篩選策略就是要確定一個連續範圍 [Start, End] 內的 Storefile 參與 Compaction。

Compaction 的目的是減少檔案數量和刪除無用的資料,優化讀效能,Compaction 實現是將原檔案的內容重寫到新的檔案,如果檔案過大意味著 Compaction 的時間長,Compaction 過程中產生的 IO 放大越明顯,因此檔案篩選的準則是用最小的 IO 代價去合併減少最多的檔案數。

Compaction 依賴兩個先決條件:

  • 所有 StoreFile 按照順序進行排序(此順序為:老檔案在前,新檔案在後);

  • 參與 Compaction 的檔案必須是連續的。

2.3.2 RationBasedCompactionPolicy

基本思想就是選擇在固定 End 為最後一個檔案的前提下(一般情況),從佇列頭開始滑動尋找 Start,直到 Start 滿足下面的條件之一便停止掃描:

  1. 當前檔案大小 < 比當前檔案新的所有檔案大小總和 * ratio,就是滿足公式 f[start].size <= ratio * (f[start+1].size +.......+ f[end-1].size)。其中 ration 是一個可變的比例,高峰期 ration 為1.2,非高峰期 ration 為5,非高峰期允許合併更大的檔案。可以通過引數 hbase.offpeak.start.hour 和 hbase.offpeak.end.hour 設定高峰期時間段。

  2. 當前所剩候選檔案數 >= hbase.store.compaction.min(預設為3),因為要保證本次 Compaction 的時候檔案個數要大於設定的 Compaction 最小值。

下面附上 RationBasedCompactionPolicy 的具體邏輯程式碼。

/**
  * @param candidates pre-filtrate
  * @return filtered subset
  * -- Default minor compaction selection algorithm:
  * choose CompactSelection from candidates --
  * First exclude bulk-load files if indicated in configuration.
  * Start at the oldest file and stop when you find the first file that
  * meets compaction criteria:
  * (1) a recently-flushed, small file (i.e. <= minCompactSize)
  * OR
  * (2) within the compactRatio of sum(newer_files)
  * Given normal skew, any newer files will also meet this criteria
  * <p/>
  * Additional Note:
  * If fileSizes.size() >> maxFilesToCompact, we will recurse on
  * compact().  Consider the oldest files first to avoid a
  * situation where we always compact [end-threshold,end).  Then, the
  * last file becomes an aggregate of the previous compactions.
  *
  * normal skew:
  *
  *         older ----> newer (increasing seqID)
  *     _
  *    | |   _
  *    | |  | |   _
  *  --|-|- |-|- |-|---_-------_-------  minCompactSize
  *    | |  | |  | |  | |  _  | |
  *    | |  | |  | |  | | | | | |
  *    | |  | |  | |  | | | | | |
  */
ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
  if (candidates.isEmpty()) {
    return candidates;
  }
  
  // we're doing a minor compaction, let's see what files are applicable
  int start = 0;
  // 獲取檔案合併比例:取引數hbase.hstore.compaction.ratio,預設為1.2
  double ratio = comConf.getCompactionRatio();
  if (mayUseOffPeak) {
    // 取引數hbase.hstore.compaction.ratio.offpeak,預設為5.0
    ratio = comConf.getCompactionRatioOffPeak();
    LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
  }
  
  // get store file sizes for incremental compacting selection.
  final int countOfFiles = candidates.size();
  long[] fileSizes = new long[countOfFiles];
  long[] sumSize = new long[countOfFiles];
  for (int i = countOfFiles - 1; i >= 0; --i) {
    StoreFile file = candidates.get(i);
    fileSizes[i] = file.getReader().length();
    // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
    // tooFar表示後移動最大檔案數位置的檔案大小,也就是剛剛滿足達到最大檔案數位置的那個檔案,從i至tooFar數目為合併時允許的最大檔案數
    int tooFar = i + comConf.getMaxFilesToCompact() - 1;
    sumSize[i] = fileSizes[i]
      + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
      - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
  }
  // 倒序迴圈,如果檔案數目滿足最小合併時允許的最小檔案數,且該位置的檔案大小大於合併時允許的檔案最小大小與下一個檔案視窗檔案總大小乘以一定比例中的較大者,則繼續;
  // 實際上就是選擇出一個檔案視窗內能最小能滿足的檔案大小的一組檔案
  while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
    fileSizes[start] > Math.max(comConf.getMinCompactSize(),
        (long) (sumSize[start + 1] * ratio))) {
    ++start;
  }
  if (start < countOfFiles) {
    LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
      + " files from " + countOfFiles + " candidates");
  } else if (mayBeStuck) {
    // We may be stuck. Compact the latest files if we can.保證最小檔案數目的要求
    int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
    if (filesToLeave >= 0) {
      start = filesToLeave;
    }
  }
  candidates.subList(0, start).clear();
  return candidates;
}

2.3.3 ExploringCompactionPolicy

該策略繼承自 RatioBasedCompactionPolicy,不同的是 Ration 策略在找到一個合適的檔案集合之後就停止掃描了,而 Exploring 策略會把 Storefile 列表劃分成多個子佇列,從中找出一個最優解參與 Compaction。最優解可以理解為:待合併檔案數最多或者待合併檔案數相同的情況下檔案較小,這樣有利於減少 Compaction 帶來的 IO 消耗。演演算法流程可以描述為:

  1. 從頭到尾遍歷檔案,判斷所有符合條件的組合;

  2. 選擇組合內檔案數 >= minFiles,且 <= maxFiles;

  3. 計算各組合檔案的總大小 size,選擇組合 size <= MaxCompactSize,且 >= minCompactSize;

  4. 每個組合裡面的每一個檔案大小都必須滿足 FileSize(i) <= (sum(0,N,FileSize(_)) - FileSize(i)) * ration,意義在於去掉很大的檔案,每次 Compaction 時應該儘量合併一些大小較小的檔案;

  5. 滿足以上 1-4 條件的組合裡面選擇檔案數最多,檔案數一樣多時進一步選擇檔案總 size 最小的,目的在於儘可能多地合併檔案並且 Compaction 帶來的 IO 壓力越小越好。

下面附上 ExploringCompactionPolicy 的具體邏輯程式碼。

public List<StoreFile> applyCompactionPolicy(final List<StoreFile> candidates,
       boolean mightBeStuck, boolean mayUseOffPeak, int minFiles, int maxFiles) {
  
    final double currentRatio = mayUseOffPeak
        ? comConf.getCompactionRatioOffPeak() : comConf.getCompactionRatio();
  
    // Start off choosing nothing.
    List<StoreFile> bestSelection = new ArrayList<StoreFile>(0);
    List<StoreFile> smallest = mightBeStuck ? new ArrayList<StoreFile>(0) : null;
    long bestSize = 0;
    long smallestSize = Long.MAX_VALUE;
  
    int opts = 0, optsInRatio = 0, bestStart = -1; // for debug logging
    // Consider every starting place. 從頭到尾遍歷檔案
    for (int start = 0; start < candidates.size(); start++) {
      // Consider every different sub list permutation in between start and end with min files.
      for (int currentEnd = start + minFiles - 1;
          currentEnd < candidates.size(); currentEnd++) {
        List<StoreFile> potentialMatchFiles = candidates.subList(start, currentEnd + 1);
  
        // Sanity checks
        if (potentialMatchFiles.size() < minFiles) {
          continue;
        }
        if (potentialMatchFiles.size() > maxFiles) {
          continue;
        }
  
        // Compute the total size of files that will
        // have to be read if this set of files is compacted. 計算檔案大小
        long size = getTotalStoreSize(potentialMatchFiles);
  
        // Store the smallest set of files.  This stored set of files will be used
        // if it looks like the algorithm is stuck. 總size最小的
        if (mightBeStuck && size < smallestSize) {
          smallest = potentialMatchFiles;
          smallestSize = size;
        }
  
        if (size > comConf.getMaxCompactSize(mayUseOffPeak)) {
          continue;
        }
  
        ++opts;
        if (size >= comConf.getMinCompactSize()
            && !filesInRatio(potentialMatchFiles, currentRatio)) {
          continue;
        }
  
        ++optsInRatio;
        if (isBetterSelection(bestSelection, bestSize, potentialMatchFiles, size, mightBeStuck)) {
          bestSelection = potentialMatchFiles;
          bestSize = size;
          bestStart = start;
        }
      }
    }
    if (bestSelection.size() == 0 && mightBeStuck) {
      LOG.debug("Exploring compaction algorithm has selected " + smallest.size()
          + " files of size "+ smallestSize + " because the store might be stuck");
      return new ArrayList<StoreFile>(smallest);
    }
    LOG.debug("Exploring compaction algorithm has selected " + bestSelection.size()
        + " files of size " + bestSize + " starting at candidate #" + bestStart +
        " after considering " + opts + " permutations with " + optsInRatio + " in ratio");
    return new ArrayList<StoreFile>(bestSelection);
  }

2.3.4 StripeCompactionPolicy

Stripe Compaction (HBASE-7667)還是為了減少 Major Compaction 的壓力而提出的。其思想是:減少 Major Compaction 壓力最直接辦法是減少 Region 的大小,最好整個叢集都是由很多小 Region 組成,這樣參與 Compaction 的檔案總大小就必然不會太大。可是 Region 設定過小會導致 Region 數量很多,這一方面會導致 HBase 管理 Region 的開銷很大,另一方面 Region 過多也要求 HBase 能夠分配更多的記憶體作為 Memstore 使用,否則有可能導致整個 RegionServer 級別的 Flush,進而引起長時間的寫阻塞。因此單純地通過將 Region 大小設定過小並不能本質解決問題。

(1) Level Compaction

社群開發者借鑑了 Leveldb 的 Compaction 策略 Level Compaction。Level Compaction 設計思路是將 Store 中的所有資料劃分為很多層,每一層都會有一部分資料,如下圖所示:

圖片

資料組織形式不再按照時間前後進行組織,而是按照 KeyRange 進行組織,每個 KeyRange 中會包含多個檔案,這些檔案所有資料的 Key 必須分佈在同一個範圍。比如 Key 分佈在 Key0~KeyN 之間的所有資料都會落在第一個 KeyRange 區間的檔案中,Key 分佈在 KeyN+1~KeyT 之間的所有資料會分佈在第二個區間的檔案中,以此類推。

整個資料體系會被劃分為很多層,最上層(Level 0)表示最新資料,最下層(Level 6)表示最舊資料。每一層都由大量 KeyRange 塊組成(Level 0除外),KeyRange 之間沒有 Key 重合。而且層數越大,對應層的每個 KeyRange 塊大小越大,下層 KeyRange 塊大小是上一層大小的10倍。圖中 Range 顏色越深,對應的 Range 塊越大。

資料從 Memstore 中 Flush 之後,會首先落入 Level 0,此時落入 Level 0 的資料可能包含所有可能的 Key。此時如果需要執行 Compaction,只需要將 Level 0 中的 KV 一個一個讀出來,然後按照 Key 的分佈分別插入 Level 1 中對應 KeyRange 塊的檔案中,如果此時剛好 Level 1 中的某個KeyRange 塊大小超過了一定閾值,就會繼續往下一層合併。

Level Compaction 依然會有 Major Compaction 的概念,發生 Major Compaction 只需要將 Range 塊內的檔案執行合併就可以,而不需要合併整個 Region 內的資料檔案。

可見,這種 Compaction 在合併的過程中,從上到下只需要部分檔案參與,而不需要對所有檔案執行 Compaction 操作。另外,Level Compaction 還有另外一個好處,對於很多唯讀最近寫入資料’的業務來說,大部分讀請求都會落到 Level 0,這樣可以使用 SSD 作為上層 Level 儲存媒介,進一步優化讀。然而,這種 Compaction 因為 Level 層數太多導致 Compaction 的次數明顯增多,經過測試,發現這種 Compaction 並沒有對 IO 利用率有任何提升。

(2)Stripe Compaction

雖然原生的 Level Compaction 並不適用於 HBase,但是這種 Compaction 的思想卻激發了HBase 研發者的靈感,再結合之前提到的小 Region 策略,就形成了 Stripe Compaction。

同 Level Compaction 相同,Stripe Compaction 會將整個 Store 中的檔案按照 Key 劃分為多個 Range,稱為 Stripe,Stripe 的數量可以通過引數設定,相鄰的 Stripe 之間 Key 不會重合。Stripe 類似於 Sub-Region 的概念,即將一個大 Region 切分成了很多小的 Sub-Region。

隨著資料寫入,Memstore 執行 Flush 之後形成 HFile,這些 HFile 並不會馬上寫入對應的 Stripe,而是放到一個稱為 L0 的地方,使用者可以設定 L0 可以放置 HFile 的數量。一旦 L0 放置的檔案數超過設定值,系統就會將這些 HFile 寫入對應的 Stripe:首先讀出 HFile 的 KVs,再根據每個 KV 的 Key 定位到具體的 Stripe,將該 KV 插入對應 Stripe 的檔案中即可,如圖6所示。由於 Stripe 是個小的 Region,所以 Compaction 並不會太多消耗系統資源。另外,讀取資料時,根據對應的 Key 查詢到對應的 Stripe,然後在 Stripe 內部執行查詢,因為 Stripe 內資料量相對很小,所以一定程度上也可以提升資料查詢效能。

圖片

2.4 執行 Compaction 操作

挑選好待合併檔案後,就是執行真正的合併。合併流程主要分為以下幾步:

  1. 按順序讀出待合併所有 HFile 檔案的 KV,並順序寫到位於./tmp 目錄下的臨時檔案中;

  2. 將臨時檔案移動到對應 Region 的正式資料目錄中;

  3. 將 Compaction 的輸入檔案路徑和輸出檔案路徑封裝為 KV 寫入 WAL 紀錄檔,並打上 Compaction 標記,最後強制執行 sync;

  4. 將對應 Region 資料目錄下的 Compaction 的輸入檔案全部刪除。

HBase對整個 Compaction 的考慮是非常全面的,上述4個步驟的每一步發生錯誤,都具有很強的容錯性和冪等性(執行一次和多次的結果相同)。

  • 如果 RS 在步驟2或步驟2之前發生異常,本次 Compaction 會被認為失敗,如果繼續進行同樣的 Compaction,上次異常對接下來的 Compaction不會有任何影響,也不會對讀寫有影響,唯一的影響就是多了一份冗餘的資料;

  • 如果 RS 在步驟2之後、步驟3或步驟3之前發生異常,也僅僅會多一份冗餘資料;

  • 如果在步驟3之後、步驟4之前發生異常,則 RS 在重新開啟 Region 之後就會從 WAL 中看到上次 Compaction 的紀錄檔。因為此時輸入檔案和輸出檔案已經持久化到 HDFS,因此只需要根據 WAL 紀錄檔移除掉 Compaction 的輸入檔案即可。

下面附上 Store 的 compact 方法。

public List<StoreFile> compact(CompactionContext compaction,
   CompactionThroughputController throughputController, User user) throws IOException {
   assert compaction != null;
   List<StoreFile> sfs = null;
   CompactionRequest cr = compaction.getRequest();
   try {
     // Do all sanity checking in here if we have a valid CompactionRequest
     // because we need to clean up after it on the way out in a finally
     // block below
     long compactionStartTime = EnvironmentEdgeManager.currentTime();
     assert compaction.hasSelection();
     Collection<StoreFile> filesToCompact = cr.getFiles();
     assert !filesToCompact.isEmpty();
     synchronized (filesCompacting) {
       // sanity check: we're compacting files that this store knows about
       // TODO: change this to LOG.error() after more debugging
       // 再次檢查
       Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
     }
  
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
         + this + " of " + this.getRegionInfo().getRegionNameAsString()
         + " into tmpdir=" + fs.getTempDir() + ", totalSize="
         + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
  
     // Commence the compaction.  開始compact,newFiles是合併後的新檔案
     List<Path> newFiles = compaction.compact(throughputController, user);
  
     long outputBytes = 0L;
     // TODO: get rid of this!
     if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
       LOG.warn("hbase.hstore.compaction.complete is set to false");
       sfs = new ArrayList<StoreFile>(newFiles.size());
       final boolean evictOnClose =
           cacheConf != null? cacheConf.shouldEvictOnClose(): true;
       for (Path newFile : newFiles) {
         // Create storefile around what we wrote with a reader on it.
         StoreFile sf = createStoreFileAndReader(newFile);
         sf.closeReader(evictOnClose);
         sfs.add(sf);
       }
       return sfs;
     }
     // Do the steps necessary to complete the compaction.
     // 將newFiles移動到新的位置,返回StoreFile列表
     sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
     // 在WAL中寫入Compaction記錄
     writeCompactionWalRecord(filesToCompact, sfs);
     // 將新生成的StoreFile列表替換到StoreFileManager的storefile中
     replaceStoreFiles(filesToCompact, sfs);
     // 根據compact型別,累加相應計數器
     if (cr.isMajor()) {
       majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
       majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
     } else {
       compactedCellsCount += getCompactionProgress().totalCompactingKVs;
       compactedCellsSize += getCompactionProgress().totalCompactedSize;
     }
     for (StoreFile sf : sfs) {
       outputBytes += sf.getReader().length();
     }
     // At this point the store will use new files for all new scanners.
     // 歸檔舊檔案
     completeCompaction(filesToCompact, true); // Archive old files & update store size.
     long now = EnvironmentEdgeManager.currentTime();
     if (region.getRegionServerServices() != null
         && region.getRegionServerServices().getMetrics() != null) {
       region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
         now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
         outputBytes);
     }
     // 記錄紀錄檔資訊並返回
     logCompactionEndMessage(cr, sfs, now, compactionStartTime);
     return sfs;
   } finally {
     finishCompactionRequest(cr);
   }
 }

三、Compaction 的限流

上述幾種策略都是根據不同的業務場景設定對應的檔案選擇策略,核心都是減少參與 Compaction 的檔案數,縮短整個 Compaction 執行的時間,間接降低 Compaction 的 IO 放大效應,減少對業務讀寫的延遲影響。但是,如果不對 Compaction 執行階段的讀寫吞吐量進行限制的話也會引起短時間大量系統資源消耗,影響使用者讀寫延遲。HBase 通過限制 Compaction 速度 和 Compaction 的頻寬來對 Compaction 進行限流。

3.1 Limit Compaction Speed

該優化方案通過感知 Compaction 的壓力情況自動調節系統的 Compaction 吞吐量,在壓力大的時候降低合併吞吐量,壓力小的時候增加合併吞吐量。

基本原理為:

在正常情況下,使用者需要設定吞吐量下限引數 hbase.hstore.compaction.throughput.lower.bound (預設10MB/sec) 和上限引數 hbase.hstore.compaction.throughput.higher.bound (預設20MB/sec),實際會工作時吞吐量為 lower + (higer – lower) * ratio,其中 ratio 是一個取值範圍在0到1的小數,它由當前 Store 中待參與 Compation 的 HFile 數量決定,數量越多,ratio 越小,反之越大。

如果當前 Store中 HFile 的數量太多,並且超過了引數 blockingFileCount,此時所有寫請求就會阻塞等待 Compaction 完成,這種場景下上述限制會自動失效。

3.2 Compaction BandWidth Limit

原理其實和 Limit Compaction Speed 思路基本一致,它主要涉及兩個引數:compactBwLimit 和 numOfFilesDisableCompactLimit。

作用分別如下:

  • compactBwLimit:一次 Compaction 的最大頻寬使用量,如果 Compaction 所使用的頻寬高於該值,就會強制令其 sleep 一段時間。

  • numOfFilesDisableCompactLimit:很顯然,在寫請求非常大的情況下,限制 Compaction 頻寬的使用量必然會導致 HFile 堆積,進而會影響到讀請求響應延時。因此該值意義就很明顯,一旦 Store 中 HFile 數量超過該設定值,頻寬限制就會失效。

// 該方法進行Compaction的動態限制
private void tune(double compactionPressure) {
    double maxThroughputToSet;
    // 壓力大於1,最大限速不受限制
    if (compactionPressure > 1.0) {
      // set to unlimited if some stores already reach the blocking store file count
      maxThroughputToSet = Double.MAX_VALUE;
     // 空閒時間,最大限速為設定的Compaction最大吞吐量
    } else if (offPeakHours.isOffPeakHour()) {
      maxThroughputToSet = maxThroughputOffpeak;
    } else {
      // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
      // calculate the throughput limitation.
      // lower + (higher - lower) * ratio
      maxThroughputToSet =
          maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
              * compactionPressure;
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
          + throughputDesc(maxThroughputToSet));
    }
    this.maxThroughput = maxThroughputToSet;
  }

再來看下獲取 R S的 Compaction 壓力的 getCompactionPressure 方法,其實就是遍歷每個 Region 的每個 Store,取壓力最大的。

@Override
public double getCompactionPressure() {
  double max = 0;
  for (Region region : onlineRegions.values()) {
    for (Store store : region.getStores()) {
      double normCount = store.getCompactionPressure();
      if (normCount > max) {
        max = normCount;
      }
    }
  }
  return max;
}
@Override
public double getCompactionPressure() {
  int storefileCount = getStorefileCount();
  int minFilesToCompact = comConf.getMinFilesToCompact();
  if (storefileCount <= minFilesToCompact) {
    return 0.0;
  }
  return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
}

HBase 的限流方案通過感知 Compaction 的壓力情況自動調節系統的 Compaction 吞吐量,在壓力大的時候降低合併吞吐量,壓力小的時候增加合併吞吐量。

基本原理為:

在正常情況下,使用者需要設定吞吐量下限引數 hbase.hstore.compaction.throughput.lower.bound (預設10MB/sec)   和上限引數 hbase.hstore.compaction.throughput.higher.bound(預設20MB/sec),而實際會工作在吞吐量為 lower + (higer – lower) * ratio的情況下,其中 ratio 是一個取值範圍在0到1的小數,它由當前 Store 中待參與 Compation 的 HFile 數量決定,數量越多,ratio 越小,反之越大。

如果當前 Store中 HFile 的數量太多,並且超過了 blockingFileCount 的值,該值由引數 hbase.hstore.blockingStoreFiles 設定,此時所有寫請求就會阻塞等待 Compaction 完成,這種場景下,上述限制會自動失效。

四、線上遇到的問題及調優方法

由於線上環境的複雜性,對 Compaction 模組做了較多的優化,下面選取兩個典型案例進行說明。

4.1 關閉了自動觸發 Major Compaction,但是監控中 Major Compaction 佇列仍然有值進而影響讀寫效能

線上叢集都是關閉自動觸發 Major Compaction 的功能,在業務低峰期由定時任務手動觸發 Major Compaction。在某次故障中,業務反饋讀寫效能在非執行 Major Compaction 的時段延遲比較大。檢視監控發現,監控中的 Major Compaction 佇列的值比較大。

下面是當時的 Major Compaction 佇列長度和讀寫呼叫平均耗時的監控圖,從圖中可以很明顯地看出下面幾點:

  • Major Compaction 的佇列長度比較大的時候,讀寫的耗時也比較大;

  • Major Compaction 的佇列長度跟入流量有關係,入流量比較大的時候,Major Compaction 的佇列長度就比較大。

這裡就產生了疑問,關閉了自動 Major Compaction,是什麼條件觸發了 Major Compaction ?

圖片

圖片

帶著上面的疑問,我們從原始碼的層面對問題進行分析。

1)首先檢視了 Major Compaction 佇列長度這個指標的含義,該指標表示 longCompaction 執行緒池的工作佇列中等待的個數。

@Override
public int getLargeCompactionQueueSize() {
  //The thread could be zero.  if so assume there is no queue.
  if (this.regionServer.compactSplitThread == null) {
    return 0;
  }
  return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
}
public int getLargeCompactionQueueSize() {
  return longCompactions.getQueue().size();
}

2)檢視 HBase 紀錄檔,發現確實有做 Major Compaction 的行為。

圖片

3)進一步排查什麼時候會去呼叫 long Compaction 的執行緒池,檢視 Compaction 選擇 long Compaction 和 small Compaction 佇列相關的原始碼。

/**
 * @param candidateFiles candidate files, ordered from oldest to newest. All files in store.
 * @return subset copy of candidate list that meets compaction criteria
 * @throws java.io.IOException
 */
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
    final List<StoreFile> filesCompacting, final boolean isUserCompaction,
    final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
  // Preliminary compaction subject to filters
  ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
  // Stuck and not compacting enough (estimate). It is not guaranteed that we will be
  // able to compact more if stuck and compacting, because ratio policy excludes some
  // non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
  int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
  boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
      >= storeConfigInfo.getBlockingFileCount();
  candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
  LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
      filesCompacting.size() + " compacting, " + candidateSelection.size() +
      " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
 
  // If we can't have all files, we cannot do major anyway
  boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
  if (!(forceMajor && isAllFiles)) {
    // 過濾掉大檔案
    candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
    isAllFiles = candidateFiles.size() == candidateSelection.size();
  }
  ...
}

其中 skipLargeFiles 方法對待合併檔案進行過濾,去掉大檔案,該閾值是由 maxCompactSize =conf.getLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY,Long.MAX_VALUE)設定,預設是Long.MAX_VALUE。

/**
 * @param candidates pre-filtrate
 * @return filtered subset
 * exclude all files above maxCompactSize
 * Also save all references. We MUST compact them
 */
private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
  boolean mayUseOffpeak) {
  int pos = 0;
  while (pos < candidates.size() && !candidates.get(pos).isReference()
    && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
    ++pos;
  }
  if (pos > 0) {
    LOG.debug("Some files are too large. Excluding " + pos
        + " files from compaction candidates");
    candidates.subList(0, pos).clear();
  }
  return candidates;
}

之後再通過待合併檔案的大小來選擇 long Compaction 執行緒池還是 small Compaction 的執行緒池。

@Override
public boolean throttleCompaction(long compactionSize) {
  return compactionSize > comConf.getThrottlePoint();
}

這個閾值的計算方法如下,預設是2.5G,就是說如果待合併的檔案大小大於2.5G,就會放到 long Compaction 的執行緒池中去執行。

throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
          2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());

4)檢視 ReigonServer 該時間段的紀錄檔,發現有大量大於 2.5G 的檔案在 Compaction,這就解釋了為什麼RS紀錄檔中該時間段並沒有做 Major Compaction 的紀錄檔但是 long Compaction 佇列有值的問題。

圖片

至此,問題原因就找到了,入流量的增加導致單個 HFile 檔案比較大,Flush 之後做 Minor Compaction 的時候如果待合併檔案總大小大於2.5G(預設值)的時候,會將此次 Minor Compaction 放入到 long Compaction 的執行緒池中執行。待合併的檔案比較大導致磁碟 IO 消耗比較高,進而影響到讀寫效能。

5)措施

我們調整了 Compaction 的引數 hbase.hstore.compaction.max.size 將該值修改為2G,表示在 Minor Compaction 的時候大於 2G 的 HFile 將會被排除,等到業務低峰期的時候再對大於2G的檔案合併,減少 Compaction 對磁碟 IO 的影響。

6)效果

調整之後,在非手動觸發 Major Compaction 期間就很少有佔用 long Compaction 執行緒池的情況出現了,讀寫平均耗時也降到了50ms以下。

圖片

圖片

4.2 定時手動觸發的 Major Compation 任務執行時間過長

業務反饋某張表的讀寫效能最近有點慢,通過監控檢視到該表的儲存一直在增長,儲存單副本達到了578TB。檢視表資訊,該表的TTL設定的15天,該表的輸入流量也沒有明顯的增加。監控圖如下:

圖片

圖片

於是懷疑每天的 Compaction 任務沒有做完,導致過期資料未能完全刪除。檢視線上設定,Major Compaction 的執行緒池大小是1,該表的資料量又比較大。於是調整了 Compaction 執行緒池的大小為10,並且設定了叢集的空閒時間 hbase.offpeak.start.hour 與 hbase.offpeak.end.hour,在這個時間段內 Compaction 的時候可以增加待合併檔案大小。調整完成後,通過監控檢視 Compaction 的效果對比圖,可以看到 Compaction 的工作量明顯增大了。

圖片

檢視該表所佔儲存的大小,可以看到該表已經從 578T 下降到了 349T,下降幅度達到了40%。業務的讀寫耗時也恢復正常。Compaction 的引數比較重要, 在調整的時候需要考慮對業務是否有影響,調整之後要多觀察業務的耗時情況,可以循序漸進的對引數進行調整。

五、Compaction相關引數介紹

下面附上 Compaction 相關的引數,線上環境可以根據實際情況進行調整。

圖片

六、總結

Compaction 是 HBase 提升讀寫效能非常重要的手段,而 Compaction 的邏輯又比較複雜,並且使用不當,會導致寫放大,進而會影響到正常的讀寫請求。本文重點介紹了 Compaction 的觸發機制、Compaction 發展過程中出現的多種合併策略、待合併檔案的選擇演演算法、 Compaction 的限流以及 Compaction 相關的引數做了詳細的描述,最後選擇線上的2個案例,介紹了具體的分析思路和調優的方法,經調優後,效能得到了成倍的提升,保障了業務高效、穩定的執行。