今年在公司內部主導了兩個的行情資料系統的構建,兩者均使用到了常見的時序資料壓縮演演算法。
這裡簡單總結一下過程中積累的一些經驗。
讓我們先來思考一個問題:壓縮演演算法生效的前提是什麼?
資料本身至少要符合以下兩種特性其一:
在時序資料領域,資料冗餘度與相似度較高,因此天生適合進行壓縮。
但對於不同型別的資料,其所適用的壓縮演演算法也大相徑庭。
下面我們逐一介紹這些資料相應的壓縮演演算法。
整型資料是構建各種應用的基石,時序型應用也不例外。
在行情資料中,存在大量的整型資料,例如:逐筆成交中的時間戳、成交量。
根據壓縮演演算法的不同,可以將整型資料分為以下 3 類:
一個 32 位的無符號整型能表達 0 - 4294967295 之間的任意數位
但這些數位在日常生活中出現的概率並不是均勻分佈的,一個著名的例子是本福特定律,該定律常被用於辨別資料的真偽。
通常情況下,較小的數位出現的概率會高於極大的資料。
以年齡為例,無論人口如何分佈,大部分人的年齡都位於 0 ~ 100 之間。
表示 128 僅需要 7bit 足矣,如果使用 32bit 的無符號整型進行儲存,意味著至少浪費了 24bit。
幸運的是,我們能通過一種自適應編碼方式來減少這種浪費 —— Varint。
public class VarIntCodec {
static int encodeInt(int v, byte[] bytes, int offset) {
if (v < 0) {
throw new IllegalStateException();
} else if (v < 128) {
bytes[offset++] = (byte) v;
} else if (v < 16384) {
bytes[offset++] = (byte) (v | 0x80);
bytes[offset++] = (byte) ((v >>> 7) & 0x7F);
} else if (v < 2097152) {
bytes[offset++] = (byte) (v | 0x80);
bytes[offset++] = (byte) ((v >>> 7) | 0x80);
bytes[offset++] = (byte) (v >>> 14);
} else if (v < 268435456) {
bytes[offset++] = (byte) (v | 0x80);
bytes[offset++] = (byte) ((v >>> 7) | 0x80);
bytes[offset++] = (byte) ((v >>> 14) | 0x80);
bytes[offset++] = (byte) (v >>> 21);
} else {
bytes[offset++] = (byte) (v | 0x80);
bytes[offset++] = (byte) ((v >>> 7) | 0x80);
bytes[offset++] = (byte) ((v >>> 14) | 0x80);
bytes[offset++] = (byte) ((v >>> 21) | 0x80);
bytes[offset++] = (byte) (v >>> 28);
}
return offset;
}
static int decodeInt(byte[] bytes, int[] offset) {
int val;
int off = offset[0];
byte b0, b1, b2, b3;
if ((b0 = bytes[off++]) >= 0) {
val = b0;
} else if ((b1 = bytes[off++]) >= 0) {
val = (b0 & 0x7F) + (b1 << 7);
} else if ((b2 = bytes[off++]) >= 0) {
val = (b0 & 0x7F) + ((b1 & 0x7F) << 7) + (b2 << 14);
} else if ((b3 = bytes[off++]) >= 0) {
val = (b0 & 0x7F) + ((b1 & 0x7F) << 7) + ((b2 & 0x7F) << 14) + (b3 << 21);;
} else {
val = (b0 & 0x7F) + ((b1 & 0x7F) << 7) + ((b2 & 0x7F) << 14) + ((b3 & 0x7F) << 21) + (bytes[off++] << 28);
}
offset[0] = off;
return val;
}
}
通過觀察程式碼可以得知,Varint 編碼並不是沒有代價的:每 8bit 需要需要犧牲 1bit 作為標記位。
對於值較大的資料,Varint 需要額外的空間進行編碼,從而導致更大的空間消耗。
因此使用 Varint 的前提有兩個:
Varint 編碼負數的效率很低:
對於 big-endian 資料來說,Varint 是通過削減 leading-zero 來實現的壓縮。
而負數的首個 bit 永遠非零,因此不但無法壓縮資料,反而會引入不必要的空間開銷。
ZigZag 通過以下方式來彌補這一缺陷:
增加小負數的 leading-zero 數量,然後再進行 Varint 編碼。
其原理很簡單,增加一個 ZigZag 對映:
(n << 1) ^ (n >> 31)
(n >>> 1) ^ -(n & 1)
當 n = -1 時,Varint 編碼前對映:
n = -1 -> 11111111111111111111111111111111
a = n << 1 -> 11111111111111111111111111111110
b = n >> 31 -> 11111111111111111111111111111111
a ^ b -> 00000000000000000000000000000001
當 n = -1 時,Varint 解碼後對映:
m = a ^ b -> 00000000000000000000000000000001
a = m >>> 1 -> 00000000000000000000000000000000
b = -(m & 1) -> 11111111111111111111111111111111
a ^ b -> 11111111111111111111111111111111
ZigZag 對映能夠有效增加小負數的 leading-zero 數量,進而提高編碼效率。
ZigZag 本身也並不是完美的的:
時間戳是時序資料中的一類特殊的資料,其值往往比較大,因此不適用於 Varint 編碼。
但是時間戳本身具有以下兩個特性:
前面提到過:提高整型資料壓縮率的一個重要手段是增加 leading-zero 數量。
說人話就是:儘可能儲存較小的數位。
因此,相較於儲存時間戳,儲存前後兩條資料的時間間隔是一個更好的選擇。
第一種方式是使用 Delta 編碼:
但是 Delta 編碼的資料冗餘仍然較多,因此可以改進為 Delta2 編碼:
編碼後的 int64 的時間戳,可以用 int32 甚至更小的資料型別進行儲存。
朋友,你覺得 Varint 與 Delta2 已經是整型壓縮的極限了嗎?
某些特殊的時序事件,可能以很高的頻率出現,比如行情盤口報價。
這類資料的時間戳進行 Delta2 編碼後,可能會出現以下兩種情況:
對於場景1,遊程編碼(RLE)是個不錯的選擇,但普適性較差。
對於場景2,每個值僅需 6bit 儲存即可,使用 Varint 編碼仍有空間浪費。
Simple8b 演演算法能較好的處理這一問題,其核心思想是:
將盡可能多數位打包在一個 64bit 長整型中。
Simple8b 將 64 位整數分為兩部分:
selector(4bit)
用於指定剩餘 60bit 中儲存的整數的個數與有效位長度payload(60bit)
則是用於儲存多個定長的整數Simple8b 維護了一個查詢表,可以將資料模式匹配到最優的 selector,然後將多個資料編碼至 payload。編碼演演算法可以參考這個Go實現。
需要指出的是,這個開源實現使用回溯法進行編碼,其複雜度為 \(O(n^2)\)( n 為輸入資料長度)。該實現對於離線壓縮來說已經足夠,但對於實時壓縮來說稍顯不足。
我們在此程式碼的基礎上,使用查表法維護了一個狀態轉移陣列,將編碼速度提升至 \(O(n)\),使其能夠應用於實時壓縮。
浮點數是一類較難壓縮的資料,IEEE 705 在編碼上幾乎沒有浪費任何一個 bit,因此無法使用類似 Varint 的方式進行壓縮:
目前常用的壓縮方式可以分為兩類:
有失真壓縮需要配合業務領域知識來使用,因為首先要確定需要保留的精度。
當精度確定之後,就可以將浮點數轉換為整數,然後使用 Varint 進行編碼。
public static ScaledResult scaleDecimal(float[] floats, final int scaleLimit) throws CodecException {
BigDecimal[] decimals = new BigDecimal[floats.length];
for (int i=0; i<floats.length; i++) {
decimals[i] = new BigDecimal(Float.toString(floats[i]));
}
BigDecimal base = null;
int maxScale = 0;
for (BigDecimal decimal : decimals) {
Preconditions.checkState(decimal.signum() >= 0);
int scale = decimal.scale();
if (scale == 1 && maxScale == 0 && decimal.stripTrailingZeros().scale() <= 0) {
scale = 0;
}
maxScale = Math.max(maxScale, scale);
base = base == null || decimal.compareTo(base) < 0 ? decimal : base;
}
final int scale = Math.min(maxScale, scaleLimit);
final long scaledBase = base.scaleByPowerOfTen(scale).setScale(0, RoundingMode.HALF_UP).longValueExact();
long[] data = new long[decimals.length];
for (int i=0; i<decimals.length; i++) {
long scaledValue = decimals[i].scaleByPowerOfTen(scale).setScale(0, RoundingMode.HALF_UP).longValueExact();
data[i] = scaledValue - scaledBase;
}
return new ScaledResult(scale, scaledBase, data);
}
這個壓縮演演算法的優點主要是直觀,並且天然跨語言,比較適合前後端互動。
不過也存在以下侷限性:
編碼速度慢很大的原因,是在 float 轉換為 BigDecimal 這一步使用Float.toString
保留資料精度。該方法不僅複雜,還生成了許多中間物件,因此效率自然不高。
一個 workaround 方式是使用高效的解析庫 Ryu ,並將其返回值改為 BigDecimal,減少中間物件的生成。這一優化能夠將編碼速度提高約 5 倍。
有失真壓縮的一個缺點是泛用性較差,並且無法處理小數位較大的情況。
然而高冗餘的浮點資料,在時序資料中比比皆是,比如某些機器監控指標,其值往往只在 0 ~ 1 之間波動。
2015 年 Fackbook 發表了一篇關於記憶體時序資料庫的 論文,其中提出了一種基於互斥或演演算法的浮點數壓縮演演算法。
Facebook 的研究人員在時序浮點數中發現個規律:
在同個時間序列中,大部分浮點數的佔用的有效 bit 位是類似的,並且往往只有中間的一個連續區塊儲存著不同的資料。
因此他們想了個辦法提取並只儲存這部分資料:
由於大部分資料的 leading-zero 與連續區塊長度 block-size 均相等,往往只需要儲存 flag 與 block 資料,因此大多數情況都能有效的減少資料儲存空間。
這個演演算法主要的難點是實現高效的 BitWriter,這個可以通過繼承 ByteArrayOutputStream 並增加一個 long 型別的 buffer 實現。
abstract class BlockMeta<T extends BlockMeta<T>> {
short leadingZero;
short tailingZero;
int blockSize;
BlockMeta<T> withMeta(int leadingZero, int blockSize) {
this.leadingZero = (short) leadingZero;
this.tailingZero = (short) (length() - leadingZero - blockSize);
this.blockSize = blockSize;
return this;
}
abstract long value();
abstract int length();
boolean fallInSameBlock(BlockMeta<? extends BlockMeta<?>> block) {
return block != null && block.leadingZero == leadingZero && block.tailingZero == tailingZero;
}
}
static void encodeBlock(BitsWriter buffer, BlockMeta<?> block, BlockMeta<?> prevBlock) {
if (block.value() == 0) {
buffer.writeBit(false);
} else {
boolean ctrlBit;
buffer.writeBit(true);
buffer.writeBit(ctrlBit = ! block.fallInSameBlock(prevBlock));
if (ctrlBit) {
buffer.writeBits(block.leadingZero, 6);
buffer.writeBits(block.blockSize, 7);
}
buffer.writeBits(block.value(), block.blockSize);
}
}
static long decodeBlock(BitsReader reader, BlockMeta<?> blockMeta) {
if (reader.readBit()) {
boolean ctrlBit = reader.readBit();
if (ctrlBit) {
int leadingZero = (int) reader.readBits(6);
int blockSize = (int) reader.readBits(7);
blockMeta.withMeta(leadingZero, blockSize);
}
CodecException.malformedData(blockMeta == null);
long value = reader.readBits(blockMeta.blockSize);
return value << blockMeta.tailingZero;
}
return 0;
}
Facebook 聲稱,使用該演演算法壓縮 2 小時的時序資料,每個資料點僅僅需 1.37 byte:
A two-hour block allows us to achieve a compression ratio of
1.37 bytes per data point.
經測試,該演演算法能夠將分時資料壓縮為原來的 33%,壓縮率可達 60% 以上,效果顯著。
時序資料中的字串大致可以分為兩類:
標籤型字串在時時序資料中更為常見,比如監控資料中的 IP 與 Metric 名稱便是此類資料。
該類資料通常作為查詢索引使用,其值也比較固定。因此通常使用資料字典進行壓縮,將變長字串轉換為定長的整型索引。
當轉換成整型後,還能進一步的利用 BloomFilter 與 Bitmap 等資料結構,進一步提升查詢效能。
非標籤型的字串較為少見,較少時序資料引擎支援該型別。
這類資料的值較為不可控,因此只能使用通用的壓縮演演算法進行壓縮。
為了兼顧效率,通常情況下會使用 snappy 或 lz4,兩者不相伯仲。
通常情況下兩者的壓縮比較為接近,這方面 snappy 有微弱的優勢。
不夠 lz4 提供了更為靈活的壓縮策略:
https://www.vldb.org/pvldb/vol8/p1816-teller.pdf
https://github.com/jwilder/encoding/blob/master/simple8b/encoding.go
https://kkc.github.io/2021/01/30/integer-compression-note/