1、背景
字串按照分隔符拆成多個子串在日常工作中很常見,譬如:後臺服務對設定的解析、模型服務對輸入特徵的拆分、磁碟格式索引檔案轉記憶體格式等等,通常最簡單的實現是使用 boost 庫:std::vector<std::string> output_tokens;
boost::split(output_tokens, input_sentence, boost::is_any_of(" "), boost::token_compress_off);
這是最簡單也是效能最差的寫法。在一些頻繁做字串拆分的場景下,部分開發者會發現用 string 會觸發字串拷貝的代價,於是改為自己使用 string_view,這會顯著提高效能,除此之外,在使用 string_view 的基礎上仍然有兩個點需要特別注意:
本文將對此做細緻分析。
2、目標
針對單字元分割和任意字元分割兩類場景,本文以程式碼案例為主,講解如何寫出高效能的字串分割函數。在考慮高效能的情況下,最基本的要求是字串分割不應觸發拷貝,因此本文實現的多個字串分割函數,都基於 string_view 消除了拷貝,在此基礎上再進行效能優化分析。本文兩類場景的函數簽名如下:
std::vector<std::string_view> SplitString(std::string_view input, char delimiter); std::vector<std::string_view> SplitString(std::string_view input, std::string_view delimiters);
3、高效能實現
單字元分割和任意字元分割這兩類場景分開介紹。3.1、單字元分割字串的 5 個版本
下面提供 5 種單字元分割字串的實現,並對其效能做總結分析。(1)版本1-簡單版本
單字元分割是最常見的場景,譬如用於紀錄檔解析、特徵解析等,首先我們考慮基於遍歷自行實現,實現如下:std::vector<std::string_view> SplitStringV1(std::string_view input, char delimiter) { std::vector<std::string_view> tokens; int start_pos = 0; int size = 0; for (size_t i = 0; i < input.size(); ++i) { if (input[i] == delimiter) { if (size != 0) { tokens.emplace_back(input.data() + start_pos, size); size = 0; } start_pos = i + 1; } else { ++size; } } if (size > 0) { tokens.emplace_back(input.data() + start_pos, size); } return tokens; }
這樣的實現很好理解:遍歷 input 字串,發現有分隔符 delimiter 時,考慮生成結果子字串,生成子字串需要知道起點和長度,因此定義了 token_start 和 size 兩個臨時變數,並在遍歷過程中維護這兩臨時變數。但仔細觀察這個函數的實現,可以發現有三個變數:token_start、size、i,但通常來說,定位一個子字串只需要兩個變數(或者叫遊標):start、end 即可,這裡存在效能優化空間。
(2)版本2-優化版本
基於兩個遊標的實現程式碼:
std::vector<std::string_view> SplitStringV2(std::string_view input, char delimiter) { std::vector<std::string_view> tokens; const char* token_start = input.data(); const char* p = token_start; const char* end_pos = input.data() + input.size(); for (; p != end_pos; ++p) { if (*p == delimiter) { if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } token_start = p + 1; continue; } } if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } return tokens; }
這裡 token_start 作為子串的起始位置,p 作為遞增遊標,上一份程式碼的 size 可以通過 p - token_start 獲得。這個實現少維護 size 變數,因此效能更好。
(3)版本3-STL 版本
有時候,我們也會考慮使用標準庫的 find_first_of 函數實現,程式碼量更少,程式碼如下:
std::vector<std::string_view> SplitStringV3(std::string_view input, char delimiter) { std::vector<std::string_view> tokens; size_t token_start = 0; while (token_start < input.size()) { auto token_end = input.find_first_of(delimiter, token_start); if (token_end > token_start) { tokens.emplace_back(input.substr(token_start, token_end - token_start)); } if (token_end == std::string_view::npos) { break; } token_start = token_end + 1; } return tokens; }
但上述實現,效能比我們自己實現遍歷的 SplitStringV2 版本效能要差,畢竟 find_first_of 的每次查詢都要重新初始化其實位置,相比自己實現遍歷有效能浪費。
(4)版本4-SIMD 最佳版本
字串分割很重要的一步是逐個字元對字串做比較,是否可以並行比較呢?是可以的,SIMD 指令可以加速這裡的比較,當前大多數機器都已支援 AVX2,但還未普遍支援 AVX512,下面以 AVX2 為例。程式碼如下:
// 編譯:g++ split_string_by_char.cc -mavx2 -o split_string_by_char
std::vector<std::string_view> SplitStrintV4(std::string_view input, char delimiter) { if (input.size() < 32) { return SplitStringV2(input, delimiter); } std::vector<std::string_view> tokens; uint32_t end_pos = input.size() >> 5 << 5; __m256i cmp_a = _mm256_set1_epi8(delimiter); // 8bit的分隔重複32次擴充到256bit const char* p = input.data(); const char* end = p + end_pos; uint32_t last_lead_zero = 0; // 上一輪256bit(32個字元)處理後剩下的未拷貝進結果集的字串個數 while (p < end) { __m256i cmp_b = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p)); // 32個字元載入進記憶體 __m256i cmp = _mm256_cmpeq_epi8(cmp_a, cmp_b); // 256 bit 一次比較 uint32_t mask = _mm256_movemask_epi8(cmp); if (mask == 0) { last_lead_zero += 32; p += 32; continue; } // 記錄本次的頭部0個數,注:mask的序和字串序是相反的,所以這裡頭部的0對應字串尾部的不匹配字元 uint32_t lead_zero = __builtin_clz(mask); // 補上一次未拷貝的字串 uint32_t tail_zero = __builtin_ctz(mask); if (last_lead_zero != 0 || tail_zero != 0) { tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; // 補完,繼續處理 while (mask != 0) { uint32_t tail_zero = __builtin_ctz(mask); if (tail_zero != 0) { tokens.emplace_back(p, tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; } last_lead_zero = lead_zero; p += lead_zero; } // 256 bit(32位元組) 對齊之後剩下的部分 const char* token_start = input.data() + end_pos - last_lead_zero; const char* pp = token_start; const char* sentence_end = input.data() + input.size(); for (; pp != sentence_end; ++pp) { if (*pp == delimiter) { if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } token_start = pp + 1; continue; } } if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } return tokens; }
這裡使用了 5 個關鍵的函數:
同時在程式碼實現上需要考慮多個細節點:
要考慮好這些細節點,程式碼很複雜,考慮到 SIMD 指令是單條指令,而我們程式碼中對 SIMD 比較完成後的 32 bit(4位元組) 的逐個比較是由多個單條指令祖傳,因此嘗試讓 SIMD 指令多算一些,而減少對 32 bit 的輪詢判斷。
(5)版本5- SIMD 較差版本
減少 SIMD 比較指令執行完後的 32 bit 遍歷處理,改成每次 SIMD 比較指令完成後,只取頭一個分隔符的結果。程式碼如下:
// 編譯:g++ split_string_by_char.cc -mavx2 -o split_string_by_char
std::vector<std::string_view> SplitStrintV5(std::string_view input, char delimiter) { if (input.size() < 32) { return SplitStringV2(input, delimiter); } std::vector<std::string_view> tokens; __m256i cmp_a = _mm256_set1_epi8(delimiter); // 8bit的分隔重複32次擴充到256bit const char* p = input.data(); uint32_t last_lead_zero = 0; // 上一輪256bit(32個字元)處理後剩下的未拷貝進結果集的字串個數 while (p + 32 < input.data() + input.size()) { __m256i cmp_b = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p)); // 32個字元載入進記憶體 __m256i cmp = _mm256_cmpeq_epi8(cmp_a, cmp_b); // 256 bit 一次比較 uint32_t mask = _mm256_movemask_epi8(cmp); if (mask == 0) { last_lead_zero += 32; p += 32; continue; } // 補上一次未拷貝的字串 uint32_t tail_zero = __builtin_ctz(mask); if (last_lead_zero != 0 || tail_zero != 0) { tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero); last_lead_zero = 0; } p += tail_zero + 1; } // 不足 256 bit(32位元組)部分 const char* token_start = p - last_lead_zero; const char* sentence_end = input.data() + input.size(); for (; p != sentence_end; ++p) { if (*p == delimiter) { if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } token_start = p + 1; continue; } } if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } return tokens; }
(6)效能對比
以上 5 個版本的程式碼,再加上 google 開源的 absl 基礎庫 absl::StrSplit 函數,對 2K+ 的長文字迴圈 10000 次做壓測,效能對比如下:
效能對比 | V1-簡單版 | V2-優化版 | V3-stl版本 | V4-SIMD最佳版本 | V5-SIMD較差版本 | absl |
耗時(ms) | 552 | 426 | 508 | 413 | 501 | 709 |
3.2、任意字元分割字串
任意字元分割字串也很常見,譬如有些紀錄檔支援空格、tab、逗號任意一個作為分隔符。和 3.1 章的單字元分割類似,提供三種實現,並對其效能做總結。
(1)STL 版
// 不使用 SIMD,使用標準庫的查詢,是效能最差的方式 std::vector<std::string_view> SplitString(std::string_view input, std::string_view delimiters) { if (delimiters.empty()) { return {input}; } std::vector<std::string_view> tokens; std::string_view::size_type token_start = input.find_first_not_of(delimiters, 0); std::string_view::size_type token_end = input.find_first_of(delimiters, token_start); while (token_start != std::string_view::npos || token_end != std::string_view::npos) { tokens.emplace_back(input.substr(token_start, token_end - token_start)); token_start = input.find_first_not_of(delimiters, token_end); token_end = input.find_first_of(delimiters, token_start); } return tokens; }
這個實現和 3.1 裡的 stl 版本類似,程式碼量比較少,效能比較差。
(2)自行遍歷版
// 不使用 SIMD,使用遍歷,是非 SIMD 模式下效能最好的方式 std::vector<std::string_view> SplitStringV2(std::string_view input, std::string_view delimiters) { if (delimiters.empty()) { return {input}; } std::vector<std::string_view> tokens; const char* token_start = input.data(); const char* p = token_start; const char* end_pos = input.data() + input.size(); for (; p != end_pos; ++p) { bool match_delimiter = false; for (auto delimiter : delimiters) { if (*p == delimiter) { match_delimiter = true; break; } } if (match_delimiter) { if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } token_start = p + 1; continue; } } if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } return tokens; }
自行遍歷版本,和 3.1 裡的版本 2 程式碼很像,效能也是非 SIMD 模式下最好的。
(3)SIMD 版本
std::vector<std::string_view> SplitStringWithSimd256(std::string_view input, std::string_view delimiters) { if (delimiters.empty()) { return {input}; } if (input.size() < 32) { return SplitStringV2(input, delimiters); } std::vector<std::string_view> tokens; uint32_t end_pos = input.size() >> 5 << 5; const char* p = input.data(); const char* end = p + end_pos; uint32_t last_lead_zero = 0; // 上一輪256bit(32個字元)處理後剩下的未拷貝進結果集的字串個數 while (p < end) { __m256i cmp_a = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p)); // 32個字元載入進記憶體 __m256i cmp_result_a = _mm256_cmpeq_epi8(cmp_a, _mm256_set1_epi8(delimiters[0])); for (int i = 1; i != delimiters.size(); ++i) { __m256i cmp_result_b = _mm256_cmpeq_epi8(cmp_a, _mm256_set1_epi8(delimiters[i])); cmp_result_a = _mm256_or_si256(cmp_result_a, cmp_result_b); } uint32_t mask = _mm256_movemask_epi8(cmp_result_a); if (mask == 0) { last_lead_zero += 32; p += 32; continue; } // 記錄本次的頭部0個數,注:mask的序和字串序是相反的,所以這裡頭部的0對應字串尾部的不匹配字元 uint32_t lead_zero = __builtin_clz(mask); // 補上一次未拷貝的字串 uint32_t tail_zero = __builtin_ctz(mask); if (last_lead_zero != 0 || tail_zero != 0) { tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; // 補完,繼續處理 while (mask != 0) { uint32_t tail_zero = __builtin_ctz(mask); if (tail_zero != 0) { tokens.emplace_back(p, tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; } last_lead_zero = lead_zero; p += lead_zero; } // 256 bit(32位元組) 對齊之後剩下的部分 const char* token_start = input.data() + end_pos - last_lead_zero; const char* pp = token_start; const char* sentence_end = input.data() + input.size(); for (; pp != sentence_end; ++pp) { bool match_delimiter = false; for (auto delimiter : delimiters) { if (*pp == delimiter) { match_delimiter = true; break; } } if (match_delimiter) { if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } token_start = pp + 1; continue; } } if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } return tokens; }
處理任意分隔符和處理單分隔符大部分程式碼類似,只有兩部分有變化:
(4)效能對比
以上 3 個版本的程式碼,再加上 google 開源的 absl 基礎庫 absl::StrSplit 函數,對 2K+ 的長文字迴圈 10000 次做壓測,效能對比如下:
效能對比 | stl版 | 自行遍歷版 | SIMD版 | absl |
耗時(ms) | 768 | 658 | 480 | 1054 |
4、思考
字串分割是很常見的功能,通常其實現程式碼也很簡潔,這就使得開發者容易忽略其效能,寫出非最佳效能的程式碼,譬如:沒有使用現代 C++ 中的 string_view、對遍歷過程沒有精細考慮。通過精細的控制計算量以及應用 SIMD 指令可以獲得比較好的收益,特別是 SIMD 指令在任意多分隔符場景下效能優化效果非常明顯。
注:本文部分內容也放在公司內部知識庫,方便直接 Copy 原始碼使用。
本文地址:https://www.cnblogs.com/cswuyg/p/17794816.html