C++小練習:字串分割的高效能實現

2023-10-30 06:00:53
字串分割是日常工作中比較常見的基礎函數,通常大家會使用現成的基礎庫,基礎庫的效能是否是最佳的?本文基於一個週末小練習,研究如何最大限度的提升字串分割的效能。   

1、背景

字串按照分隔符拆成多個子串在日常工作中很常見,譬如:後臺服務對設定的解析、模型服務對輸入特徵的拆分、磁碟格式索引檔案轉記憶體格式等等,通常最簡單的實現是使用 boost 庫:
std::vector<std::string> output_tokens;
boost::split(output_tokens, input_sentence, boost::is_any_of(" "), boost::token_compress_off); 

這是最簡單也是效能最差的寫法。在一些頻繁做字串拆分的場景下,部分開發者會發現用 string 會觸發字串拷貝的代價,於是改為自己使用 string_view,這會顯著提高效能,除此之外,在使用 string_view 的基礎上仍然有兩個點需要特別注意:

  1. 儘可能的減少分割時的計算量
  2. 使用 SIMD 指令

本文將對此做細緻分析。

2、目標

針對單字元分割和任意字元分割兩類場景,本文以程式碼案例為主,講解如何寫出高效能的字串分割函數。在考慮高效能的情況下,最基本的要求是字串分割不應觸發拷貝,因此本文實現的多個字串分割函數,都基於 string_view 消除了拷貝,在此基礎上再進行效能優化分析。本文兩類場景的函數簽名如下:

std::vector<std::string_view> SplitString(std::string_view input, char delimiter);
std::vector<std::string_view> SplitString(std::string_view input, std::string_view delimiters);

3、高效能實現

單字元分割和任意字元分割這兩類場景分開介紹。

3.1、單字元分割字串的 5 個版本

下面提供 5 種單字元分割字串的實現,並對其效能做總結分析。

(1)版本1-簡單版本

單字元分割是最常見的場景,譬如用於紀錄檔解析、特徵解析等,首先我們考慮基於遍歷自行實現,實現如下:
std::vector<std::string_view> SplitStringV1(std::string_view input, char delimiter) {
  std::vector<std::string_view> tokens;
  int start_pos = 0;
  int size = 0;
  for (size_t i = 0; i < input.size(); ++i) {
    if (input[i] == delimiter) {
      if (size != 0) {
        tokens.emplace_back(input.data() + start_pos, size);
        size = 0;
      }
      start_pos = i + 1;
    } else {
      ++size;
    }
  }
  if (size > 0) {
    tokens.emplace_back(input.data() + start_pos, size);
  }
  return tokens;
}

    這樣的實現很好理解:遍歷 input 字串,發現有分隔符 delimiter 時,考慮生成結果子字串,生成子字串需要知道起點和長度,因此定義了 token_start 和 size 兩個臨時變數,並在遍歷過程中維護這兩臨時變數。但仔細觀察這個函數的實現,可以發現有三個變數:token_start、size、i,但通常來說,定位一個子字串只需要兩個變數(或者叫遊標):start、end 即可,這裡存在效能優化空間。

(2)版本2-優化版本

    基於兩個遊標的實現程式碼:

std::vector<std::string_view> SplitStringV2(std::string_view input, char delimiter) {
  std::vector<std::string_view> tokens;
  const char* token_start = input.data();
  const char* p = token_start;
  const char* end_pos = input.data() + input.size();
  for (; p != end_pos; ++p) {
    if (*p == delimiter) {
      if (p > token_start) {
        tokens.emplace_back(token_start, p - token_start);
      }
      token_start = p + 1;
      continue;
    }
  }
  if (p > token_start) {
    tokens.emplace_back(token_start, p - token_start);
  }
  return tokens;
}

    這裡 token_start 作為子串的起始位置,p 作為遞增遊標,上一份程式碼的 size 可以通過 p - token_start 獲得。這個實現少維護 size 變數,因此效能更好。

(3)版本3-STL 版本

有時候,我們也會考慮使用標準庫的 find_first_of 函數實現,程式碼量更少,程式碼如下:
std::vector<std::string_view> SplitStringV3(std::string_view input, char delimiter) {
  std::vector<std::string_view> tokens;
  size_t token_start = 0;
  while (token_start < input.size()) {
    auto token_end = input.find_first_of(delimiter, token_start);
    if (token_end > token_start) {
      tokens.emplace_back(input.substr(token_start, token_end - token_start));
    }
    if (token_end == std::string_view::npos) {
      break;
    }
    token_start = token_end + 1;
  }
  return tokens;
}

    但上述實現,效能比我們自己實現遍歷的 SplitStringV2 版本效能要差,畢竟 find_first_of 的每次查詢都要重新初始化其實位置,相比自己實現遍歷有效能浪費。

(4)版本4-SIMD 最佳版本

字串分割很重要的一步是逐個字元對字串做比較,是否可以並行比較呢?是可以的,SIMD 指令可以加速這裡的比較,當前大多數機器都已支援 AVX2,但還未普遍支援 AVX512,下面以 AVX2 為例。程式碼如下:

// 編譯:g++ split_string_by_char.cc -mavx2 -o split_string_by_char
std::vector<std::string_view> SplitStrintV4(std::string_view input, char delimiter) { if (input.size() < 32) { return SplitStringV2(input, delimiter); } std::vector<std::string_view> tokens; uint32_t end_pos = input.size() >> 5 << 5; __m256i cmp_a = _mm256_set1_epi8(delimiter); // 8bit的分隔重複32次擴充到256bit const char* p = input.data(); const char* end = p + end_pos; uint32_t last_lead_zero = 0; // 上一輪256bit(32個字元)處理後剩下的未拷貝進結果集的字串個數 while (p < end) { __m256i cmp_b = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p)); // 32個字元載入進記憶體 __m256i cmp = _mm256_cmpeq_epi8(cmp_a, cmp_b); // 256 bit 一次比較 uint32_t mask = _mm256_movemask_epi8(cmp); if (mask == 0) { last_lead_zero += 32; p += 32; continue; } // 記錄本次的頭部0個數,注:mask的序和字串序是相反的,所以這裡頭部的0對應字串尾部的不匹配字元 uint32_t lead_zero = __builtin_clz(mask); // 補上一次未拷貝的字串 uint32_t tail_zero = __builtin_ctz(mask); if (last_lead_zero != 0 || tail_zero != 0) { tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; // 補完,繼續處理 while (mask != 0) { uint32_t tail_zero = __builtin_ctz(mask); if (tail_zero != 0) { tokens.emplace_back(p, tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; } last_lead_zero = lead_zero; p += lead_zero; } // 256 bit(32位元組) 對齊之後剩下的部分 const char* token_start = input.data() + end_pos - last_lead_zero; const char* pp = token_start; const char* sentence_end = input.data() + input.size(); for (; pp != sentence_end; ++pp) { if (*pp == delimiter) { if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } token_start = pp + 1; continue; } } if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } return tokens; }

 這裡使用了 5 個關鍵的函數:

  1. _mm256_loadu_si256,用於載入 256 位(32 位元組)資料
  2. _mm256_cmpeq_epi8,用於比較 256 位資料
  3. _mm256_movemask_epi8,用於將比較的結果壓縮到 32 位中,一位代表一個位元組
  4. __builtin_clz,獲取頭部的 0 bit 個數
  5. __builtin_ctz,獲取尾部的 0  bit 個數

同時在程式碼實現上需要考慮多個細節點:

  1. 待比較字串小於 32 位元組,此時不需要用 SIMD 指令,直接逐個字元比較
  2. 在比較過程中,
  3. 在每一次比較結果的處理時,除了逐個判斷 32 個字元中的分隔符之外,還要考慮上一輪 32 位元組比較的尾部有部分字元沒有生成結果子字串,要和本輪次的頭部字串拼成一個子字串
  4. 多輪的 SIMD 指令比較都完成後,考慮:(1)末尾輪有部分字元沒有進入結果子字串;(2)部分字元沒有對齊 32 位元組,有尾巴部分的資料需要逐個字元比較垂類

要考慮好這些細節點,程式碼很複雜,考慮到 SIMD 指令是單條指令,而我們程式碼中對 SIMD 比較完成後的 32 bit(4位元組) 的逐個比較是由多個單條指令祖傳,因此嘗試讓 SIMD 指令多算一些,而減少對 32 bit 的輪詢判斷。

(5)版本5- SIMD 較差版本

減少 SIMD 比較指令執行完後的 32 bit 遍歷處理,改成每次 SIMD 比較指令完成後,只取頭一個分隔符的結果。程式碼如下:

// 編譯:g++ split_string_by_char.cc -mavx2 -o split_string_by_char
std::vector<std::string_view> SplitStrintV5(std::string_view input, char delimiter) {
  if (input.size() < 32) {
    return SplitStringV2(input, delimiter);
  }

  std::vector<std::string_view> tokens;
  __m256i cmp_a = _mm256_set1_epi8(delimiter);  // 8bit的分隔重複32次擴充到256bit
  const char* p = input.data();
  uint32_t last_lead_zero = 0;  // 上一輪256bit(32個字元)處理後剩下的未拷貝進結果集的字串個數
  while (p + 32 < input.data() + input.size()) {
    __m256i cmp_b = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p));  // 32個字元載入進記憶體
    __m256i cmp = _mm256_cmpeq_epi8(cmp_a, cmp_b);  // 256 bit 一次比較
    uint32_t mask = _mm256_movemask_epi8(cmp);
    if (mask == 0) {
      last_lead_zero += 32;
      p += 32;
      continue;
    }

    // 補上一次未拷貝的字串
    uint32_t tail_zero = __builtin_ctz(mask);
    if (last_lead_zero != 0 || tail_zero != 0) {
      tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero);
      last_lead_zero = 0;
    }
    p += tail_zero + 1;
  }

  // 不足 256 bit(32位元組)部分
  const char* token_start = p - last_lead_zero;
  const char* sentence_end = input.data() + input.size();
  for (; p != sentence_end; ++p) {
    if (*p == delimiter) {
      if (p > token_start) {
        tokens.emplace_back(token_start, p - token_start);
      }
      token_start = p + 1;
      continue;
    }
  }
  if (p > token_start) {
    tokens.emplace_back(token_start, p - token_start);
  }
  return tokens;
}
在長文字下評測發現本程式碼效能較差。

(6)效能對比

以上 5 個版本的程式碼,再加上 google 開源的 absl 基礎庫 absl::StrSplit 函數,對 2K+ 的長文字迴圈 10000 次做壓測,效能對比如下:

效能對比 V1-簡單版 V2-優化版 V3-stl版本 V4-SIMD最佳版本 V5-SIMD較差版本 absl
耗時(ms) 552 426 508 413 501 709
可以看到,absl::StrSplit 效能最差,可能是因為其會返回空字串導致。效能最佳的是需要複雜處理的 SIMD 版本,效能次之的是採用兩個遊標的非 SIMD 版本,兩者的差異非常小。考慮到 SIMD 實現的複雜性,且效能收益較小,在實際業務場景中,可以在複雜性和效能收益之間做個權衡。

3.2、任意字元分割字串

    任意字元分割字串也很常見,譬如有些紀錄檔支援空格、tab、逗號任意一個作為分隔符。和 3.1 章的單字元分割類似,提供三種實現,並對其效能做總結。

(1)STL 版

// 不使用 SIMD,使用標準庫的查詢,是效能最差的方式
std::vector<std::string_view> SplitString(std::string_view input, std::string_view delimiters) {
  if (delimiters.empty()) {
    return {input};
  }
  std::vector<std::string_view> tokens;
  std::string_view::size_type token_start = input.find_first_not_of(delimiters, 0);
  std::string_view::size_type token_end = input.find_first_of(delimiters, token_start);
  while (token_start != std::string_view::npos || token_end != std::string_view::npos) {
    tokens.emplace_back(input.substr(token_start, token_end - token_start));
    token_start = input.find_first_not_of(delimiters, token_end);
    token_end = input.find_first_of(delimiters, token_start);
  }
  return tokens;
}

這個實現和 3.1 裡的 stl 版本類似,程式碼量比較少,效能比較差。

(2)自行遍歷版

// 不使用 SIMD,使用遍歷,是非 SIMD 模式下效能最好的方式
std::vector<std::string_view> SplitStringV2(std::string_view input, std::string_view delimiters) {
  if (delimiters.empty()) {
    return {input};
  }
  std::vector<std::string_view> tokens;
  const char* token_start = input.data();
  const char* p = token_start;
  const char* end_pos = input.data() + input.size();
  for (; p != end_pos; ++p) {
    bool match_delimiter = false;
    for (auto delimiter : delimiters) {
      if (*p == delimiter) {
        match_delimiter = true;
        break;
      }
    }
    if (match_delimiter) {
      if (p > token_start) {
        tokens.emplace_back(token_start, p - token_start);
      }
      token_start = p + 1;
      continue;
    }
  }
  if (p > token_start) {
    tokens.emplace_back(token_start, p - token_start);
  }
  return tokens;
}

自行遍歷版本,和 3.1 裡的版本 2 程式碼很像,效能也是非 SIMD 模式下最好的。

(3)SIMD 版本

std::vector<std::string_view> SplitStringWithSimd256(std::string_view input, std::string_view delimiters) {
  if (delimiters.empty()) {
    return {input};
  }

  if (input.size() < 32) {
    return SplitStringV2(input, delimiters);
  }

  std::vector<std::string_view> tokens;
  uint32_t end_pos = input.size() >> 5 << 5;
  const char* p = input.data();
  const char* end = p + end_pos;
  uint32_t last_lead_zero = 0;  // 上一輪256bit(32個字元)處理後剩下的未拷貝進結果集的字串個數
  while (p < end) {
    __m256i cmp_a = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p));  // 32個字元載入進記憶體
    __m256i cmp_result_a = _mm256_cmpeq_epi8(cmp_a, _mm256_set1_epi8(delimiters[0]));

    for (int i = 1; i != delimiters.size(); ++i) {
      __m256i cmp_result_b = _mm256_cmpeq_epi8(cmp_a, _mm256_set1_epi8(delimiters[i]));
      cmp_result_a = _mm256_or_si256(cmp_result_a, cmp_result_b);
    }
    uint32_t mask = _mm256_movemask_epi8(cmp_result_a);
    if (mask == 0) {
      last_lead_zero += 32;
      p += 32;
      continue;
    }

    // 記錄本次的頭部0個數,注:mask的序和字串序是相反的,所以這裡頭部的0對應字串尾部的不匹配字元
    uint32_t lead_zero = __builtin_clz(mask);

    // 補上一次未拷貝的字串
    uint32_t tail_zero = __builtin_ctz(mask);
    if (last_lead_zero != 0 || tail_zero != 0) {
      tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero);
    }
    mask >>= (tail_zero + 1);
    p += tail_zero + 1;

    // 補完,繼續處理
    while (mask != 0) {
      uint32_t tail_zero = __builtin_ctz(mask);
      if (tail_zero != 0) {
        tokens.emplace_back(p, tail_zero);
      }
      mask >>= (tail_zero + 1);
      p += tail_zero + 1;
    }

    last_lead_zero = lead_zero;
    p += lead_zero;
  }

  // 256 bit(32位元組) 對齊之後剩下的部分
  const char* token_start = input.data() + end_pos - last_lead_zero;
  const char* pp = token_start;
  const char* sentence_end = input.data() + input.size();
  for (; pp != sentence_end; ++pp) {
    bool match_delimiter = false;
    for (auto delimiter : delimiters) {
      if (*pp == delimiter) {
        match_delimiter = true;
        break;
      }
    }
    if (match_delimiter) {
      if (pp > token_start) {
        tokens.emplace_back(token_start, pp - token_start);
      }
      token_start = pp + 1;
      continue;
    }
  }
  if (pp > token_start) {
    tokens.emplace_back(token_start, pp - token_start);
  }
  return tokens;
}

處理任意分隔符和處理單分隔符大部分程式碼類似,只有兩部分有變化:

  1. 一個待比較字串和多個分隔符比較得到的多個 mask 碼,使用 _mm256_or_si256 做合併
  2. 非 SIMD 部分使用 for 迴圈遍歷判斷多個分隔符

(4)效能對比

以上 3 個版本的程式碼,再加上 google 開源的 absl 基礎庫 absl::StrSplit 函數,對 2K+ 的長文字迴圈 10000 次做壓測,效能對比如下:

效能對比 stl版 自行遍歷版 SIMD版 absl
耗時(ms) 768 658 480 1054
absl::StrSplit 效能最差,SIMD 版本有多個分隔符的場景下,效能提升非常明顯,SIMD 應用程式碼的高複雜度付出是值得的。

4、思考

字串分割是很常見的功能,通常其實現程式碼也很簡潔,這就使得開發者容易忽略其效能,寫出非最佳效能的程式碼,譬如:沒有使用現代 C++ 中的 string_view、對遍歷過程沒有精細考慮。通過精細的控制計算量以及應用 SIMD 指令可以獲得比較好的收益,特別是 SIMD 指令在任意多分隔符場景下效能優化效果非常明顯。

注:本文部分內容也放在公司內部知識庫,方便直接 Copy 原始碼使用。

本文地址:https://www.cnblogs.com/cswuyg/p/17794816.html