ULID規範解讀與實現原理

2022-10-18 06:00:11

前提

最近發現各個頻道推薦了很多ULID相關文章,這裡對ULID的規範檔案進行解讀,並且基於Java語言自行實現ULID,通過此實現過程展示ULID的底層原理。

ULID出現的背景

ULID全稱是Universally Unique Lexicographically Sortable Identifier,直譯過來就是通用唯一按字典排序的識別符號,它的原始倉庫是https://github.com/ulid/javascript,該專案由前端開發者alizain發起,基於JavaScript語言編寫。從專案中的commit歷史來看已經超過了5年,理論上得到充分的實踐驗證ULID出現的原因是一些開發者認為主流的UUID方案在許多場景下可能不是最優的,存在下面的原因:

  • UUID不是128 bit隨機編碼(由128 bit亂數通過編碼生成字串)的最高效實現方式
  • UUIDv1/v2實現在許多環境中是不切實際的,因為這兩個版本的的實現需要存取唯一的、穩定的MAC地址
  • UUIDv3/v5實現需要唯一的種子,並且產生隨機分佈的ID,這可能會導致在許多資料結構中出現碎片
  • UUIDv4除了隨機性之外不需要提供其他資訊,隨機性可能會在許多資料結構中導致碎片

這裡概括一下就是:UUIDv1/v2實現依賴唯一穩定MAC地址不現實,v3/v4/v5實現因為隨機性產生的ID會"碎片化"。

基於此提出了ULID,它用起來像這樣:

ulid() // 01ARZ3NDEKTSV4RRFFQ69G5FAV

ULID的特點如下:

  • 設計為128 bit大小,與UUID相容
  • 每毫秒生成1.21e+24個唯一的ULID(高效能)
  • 按字典順序(字母順序)排序
  • 標準編碼為26個字元的字串,而不是像UUID那樣需要36個字元
  • 使用Crockfordbase32演演算法來提高效率和可讀性(每個字元5 bit
  • 不區分大小寫
  • 沒有特殊字串(URL安全,不需要進行二次URL編碼)
  • 單調排序(正確地檢測並處理相同的毫秒,所謂單調性,就是毫秒數相同的情況下,能夠確保新的ULID隨機部分的在最低有效位上加1位)

ULID規範

下面的ULID規範在ULID/javascript類庫中實現,此二進位制格式目前沒有在JavaScript中實現:

 01AN4Z07BY      79KA1307SR9X4MV3
|----------|    |----------------|
 Timestamp          Randomness
   48bits             80bits

組成

時間戳(Timestamp)

  • 佔據48 bit(high)
  • 本質是UNIX-time,單位為毫秒
  • 直到公元10889年才會用完

亂數(Randomness)

  • 佔據80 bit(low)
  • 如果可能的話,使用加密安全的隨機源

排序

"最左邊"的字元必須排在最前面,"最右邊"的字元排在最後(詞法順序,或者俗稱的字典排序),並且所有字元必須使用預設的ASCII字元集。在相同的毫秒(時間戳)內,無法保證排序順序。

規範的表示形式

ULID規範的字串表示形式如下:

ttttttttttrrrrrrrrrrrrrrrr

where
t is Timestamp (10 characters)
r is Randomness (16 characters)

也就是:

  • 時間戳佔據高(左邊)10個(編碼後的)字元
  • 亂數佔據低(右邊)16個(編碼後的)字元

ULID規範的字串表示形式的長度是確定的,共佔據26個字元

編碼

使用Crockford Base32編碼演演算法,這個編碼演演算法的字母表如下:

0123456789ABCDEFGHJKMNPQRSTVWXYZ

該字母表排除了ILOU字母,目的是避免混淆和濫用。此演演算法實現不難,它的官網有詳細的演演算法說明(見https://www.crockford.com/base32.html):

單調性

(如果啟用了單調性這個特性為前提下)當在相同的毫秒內生成多個ULID時,可以保證排序的順序。也就是說,如果檢測到相同的毫秒,則隨機分量在最低有效位上加1位(帶進位)。例如:

monotonicUlid()  // 01BX5ZZKBKACTAV9WEVGEMMVRZ
monotonicUlid()  // 01BX5ZZKBKACTAV9WEVGEMMVS0

溢位錯誤處理

從技術實現上來看,26個字元的Base32編碼字串可以包含130 bit資訊,而ULID只包含128 bit資訊,所以該編碼演演算法是能完全滿足ULID的需要。基於Base32編碼能夠生成的最大的合法ULID其實就是7ZZZZZZZZZZZZZZZZZZZZZZZZZ,並且使用的時間戳為epoch time281474976710655或者說2 ^ 48 - 1。對於任何對大於此值的ULID進行解碼或編碼的嘗試都應該被所有實現拒絕,以防止溢位錯誤。

二進位制佈局

二進位制佈局的多個部分被編碼為16 byte,每個部分都以最高位元組優先(網路位元組序,也就是big-endian)進行編碼,佈局如下:

0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                      32_bit_uint_time_high                    |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|     16_bit_uint_time_low      |       16_bit_uint_random      |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                       32_bit_uint_random                      |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                       32_bit_uint_random                      |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

ULID使用

對於script標籤參照:

<script src="https://unpkg.com/ulid@latest/dist/index.umd.js"></script>
<script>
    ULID.ulid()
</script>

NPM安裝:

npm install --save ulid

TypeScript, ES6+, Babel, Webpack, Rollup等等下使用:

// import
import { ulid } from 'ulid'

ulid()

// CommonJS env
const ULID = require('ulid')

ULID.ulid()

後端Maven專案中使用需要引入依賴,這裡選用ulid-creator實現:

<dependency>
  <groupId>com.github.f4b6a3</groupId>
  <artifactId>ulid-creator</artifactId>
  <version>5.0.2</version>
</dependency>

然後呼叫UlidCreator#getUlid()系列方法:

// 常規
Ulid ulid = UlidCreator.getUlid();

// 單調排序
Ulid ulid = UlidCreator.getMonotonicUlid();

實現ULID

前面已經提到ULID的規範,其實具體實現ULID就是對著規範裡面的每一個小節進行編碼實現。先看二進位制佈局,由於使用128 bit去儲存,可以借鑑UUID那樣,使用兩個long類似的成員變數儲存ULID的資訊,看起來像這樣:

public final class ULID {

    /*
     * The most significant 64 bits of this ULID.
     *
     */
    private final long msb;

    /*
     * The least significant 64 bits of this ULID.
     *
     */
    private final long lsb;

    public ULID(long msb, long lsb) {
        this.msb = msb;
        this.lsb = lsb;
    }
}

按照ULID的組成來看,可以提供一個入參為時間戳和亂數位元組陣列的構造:

public ULID(long timestamp, byte[] randomness) {
    if ((timestamp & TIMESTAMP_MASK) != 0) {
        throw new IllegalArgumentException("Invalid timestamp");
    }
    if (Objects.isNull(randomness) || RANDOMNESS_BYTE_LEN != randomness.length) {
        throw new IllegalArgumentException("Invalid randomness");
    }
    long msb = 0;
    long lsb = 0;
    // 時間戳左移16位元,低位補零準備填入部分亂數位,即16_bit_uint_random
    msb |= timestamp << 16;
    // randomness[0]左移0位填充到16_bit_uint_random的高8位元,randomness[1]填充到16_bit_uint_random的低8位元
    msb |= (long) (randomness[0x0] & 0xff) << 8;
    // randomness[1]填充到16_bit_uint_random的低8位元
    msb |= randomness[0x1] & 0xff;
    // randomness[2] ~ randomness[9]填充到剩餘的bit_uint_random中,要左移相應的位
    lsb |= (long) (randomness[0x2] & 0xff) << 56;
    lsb |= (long) (randomness[0x3] & 0xff) << 48;
    lsb |= (long) (randomness[0x4] & 0xff) << 40;
    lsb |= (long) (randomness[0x5] & 0xff) << 32;
    lsb |= (long) (randomness[0x6] & 0xff) << 24;
    lsb |= (long) (randomness[0x7] & 0xff) << 16;
    lsb |= (long) (randomness[0x8] & 0xff) << 8;
    lsb |= (randomness[0x9] & 0xff);
    this.msb = msb;
    this.lsb = lsb;
}

這是完全按照規範的二進位制佈局編寫程式碼,可以像UUID的構造那樣精簡一下:

long msb = 0;
long lsb = 0;
byte[] data = new byte[16];
byte[] ts = ByteBuffer.allocate(8).putLong(0, timestamp << 16).array();
System.arraycopy(ts, 0, data, 0, 6);
System.arraycopy(randomness, 0, data, 6, 10);
for (int i = 0; i < 8; i++)
    msb = (msb << 8) | (data[i] & 0xff);
for (int i = 8; i < 16; i++)
    lsb = (lsb << 8) | (data[i] & 0xff);

接著可以簡單新增下面幾個方法:

public long getMostSignificantBits() {
    return this.msb;
}

public long getLeastSignificantBits() {
    return this.lsb;
}

// 靜態工廠方法,由UUID範例生成ULID範例
public static ULID fromUUID(UUID uuid) {
    return new ULID(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
}

// 實體方法,當前ULID範例轉換為UUID範例
public UUID toUUID() {
    return new UUID(this.msb, this.lsb);
}

接著需要覆蓋toString()方法,這是ULID的核心方法,需要通過Crockford Base32編碼生成規範的字串表示形式。由於128 bit要對映為26 char,這裡可以考慮分三段進行對映,也就是48 bit時間戳對映為10 char,剩下的兩部分亂數分別做40 bit8 char的對映,加起來就是26 char

 |----------|      |----------------|
  Timestamp     Randomness[split to 2 part]
48bit => 10char    80bit => 16char

編寫方法:

/**
  * Default alphabet of ULID
  */
private static final char[] DEFAULT_ALPHABET = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C',
        'D', 'E', 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'V', 'W', 'X', 'Y', 'Z'};

/**
  * Default alphabet mask
  */
private static final int DEFAULT_ALPHABET_MASK = 0b11111;

/**
  * Character num of ULID
  */
private static final int ULID_CHAR_LEN = 0x1a;

@Override
public String toString() {
    return toCanonicalString(DEFAULT_ALPHABET);
}

public String toCanonicalString(char[] alphabet) {
    char[] chars = new char[ULID_CHAR_LEN];
    long timestamp = this.msb >> 16;
    // 第一部分亂數取msb的低16位元+lsb的高24位元,這裡(msb & 0xffff) << 24作為第一部分亂數的高16位元,所以要左移24位元
    long randMost = ((this.msb & 0xffffL) << 24) | (this.lsb >>> 40);
    // 第二部分亂數取lsb的低40位,0xffffffffffL是2^40-1
    long randLeast = (this.lsb & 0xffffffffffL);
    // 接著每個部分的偏移量和DEFAULT_ALPHABET_MASK(31)做一次或運算就行,就是char[index] = alphabet[(part >> (step * index)) & 31]
    chars[0x00] = alphabet[(int) (timestamp >>> 45 & DEFAULT_ALPHABET_MASK)];
    chars[0x01] = alphabet[(int) (timestamp >>> 40 & DEFAULT_ALPHABET_MASK)];
    chars[0x02] = alphabet[(int) (timestamp >>> 35 & DEFAULT_ALPHABET_MASK)];
    chars[0x03] = alphabet[(int) (timestamp >>> 30 & DEFAULT_ALPHABET_MASK)];
    chars[0x04] = alphabet[(int) (timestamp >>> 25 & DEFAULT_ALPHABET_MASK)];
    chars[0x05] = alphabet[(int) (timestamp >>> 20 & DEFAULT_ALPHABET_MASK)];
    chars[0x06] = alphabet[(int) (timestamp >>> 15 & DEFAULT_ALPHABET_MASK)];
    chars[0x07] = alphabet[(int) (timestamp >>> 10 & DEFAULT_ALPHABET_MASK)];
    chars[0x08] = alphabet[(int) (timestamp >>> 5 & DEFAULT_ALPHABET_MASK)];
    chars[0x09] = alphabet[(int) (timestamp & DEFAULT_ALPHABET_MASK)];
    chars[0x0a] = alphabet[(int) (randMost >>> 35 & DEFAULT_ALPHABET_MASK)];
    chars[0x0b] = alphabet[(int) (randMost >>> 30 & DEFAULT_ALPHABET_MASK)];
    chars[0x0c] = alphabet[(int) (randMost >>> 25 & DEFAULT_ALPHABET_MASK)];
    chars[0x0d] = alphabet[(int) (randMost >>> 20 & DEFAULT_ALPHABET_MASK)];
    chars[0x0e] = alphabet[(int) (randMost >>> 15 & DEFAULT_ALPHABET_MASK)];
    chars[0x0f] = alphabet[(int) (randMost >>> 10 & DEFAULT_ALPHABET_MASK)];
    chars[0x10] = alphabet[(int) (randMost >>> 5 & DEFAULT_ALPHABET_MASK)];
    chars[0x11] = alphabet[(int) (randMost & DEFAULT_ALPHABET_MASK)];
    chars[0x12] = alphabet[(int) (randLeast >>> 35 & DEFAULT_ALPHABET_MASK)];
    chars[0x13] = alphabet[(int) (randLeast >>> 30 & DEFAULT_ALPHABET_MASK)];
    chars[0x14] = alphabet[(int) (randLeast >>> 25 & DEFAULT_ALPHABET_MASK)];
    chars[0x15] = alphabet[(int) (randLeast >>> 20 & DEFAULT_ALPHABET_MASK)];
    chars[0x16] = alphabet[(int) (randLeast >>> 15 & DEFAULT_ALPHABET_MASK)];
    chars[0x17] = alphabet[(int) (randLeast >>> 10 & DEFAULT_ALPHABET_MASK)];
    chars[0x18] = alphabet[(int) (randLeast >>> 5 & DEFAULT_ALPHABET_MASK)];
    chars[0x19] = alphabet[(int) (randLeast & DEFAULT_ALPHABET_MASK)];
    return new String(chars);
}

上面的方法toCanonicalString()看起來很"臃腫",但是能保證效能比較高,實現思路來自於Long#fastUUID(),也就是UUID的五段格式化方法。借鑑並且簡化一下可以抽取一個toCanonicalString0()方法:

public String toCanonicalString0() {
    byte[] bytes = new byte[ULID_CHAR_LEN];
    formatUnsignedLong0(this.lsb & 0xffffffffffL, 5, bytes, 18, 8);
    formatUnsignedLong0(((this.msb & 0xffffL) << 24) | (this.lsb >>> 40), 5, bytes, 10, 8);
    formatUnsignedLong0(this.msb >> 16, 5, bytes, 0, 10);
    return new String(bytes, StandardCharsets.US_ASCII);
}

private static void formatUnsignedLong0(long val, int shift, byte[] buf, int offset, int len) {
    int charPos = offset + len;
    long radix = 1L << shift;
    long mask = radix - 1;
    do {
        buf[--charPos] = (byte) DEFAULT_ALPHABET[(int) (val & mask)];
        val >>>= shift;
    } while (charPos > offset);
}

toCanonicalString0()方法和toString()方法會得到相同的ULID格式化字串。接著新增常用的工廠方法:

public static ULID ulid() {
    return ulid(System::currentTimeMillis, len -> {
        byte[] bytes = new byte[len];
        ThreadLocalRandom.current().nextBytes(bytes);
        return bytes;
    });
}

public static ULID ulid(Supplier<Long> timestampProvider,
                        IntFunction<byte[]> randomnessProvider) {
    return new ULID(timestampProvider.get(), randomnessProvider.apply(RANDOMNESS_BYTE_LEN));
}

預設使用ThreadLocalRandom生成亂數,如果是JDK17以上,還可以選用更高效能的新型PRNG實現,對應介面是RandomGenerator,預設實現是L32X64MixRandom。編寫一個main方法執行一下:

public static void main(String[] args) {
    System.out.println(ULID.ulid());
}
// 某次執行結果
01GFGGMBFGB5WKXBN7S84ATRDG

最後實現"單調遞增"的ULID構造,先提供一個"增長"方法:

/**
  * The least significant 64 bits increase overflow, 0xffffffffffffffffL + 1
  */
private static final long OVERFLOW = 0x0000000000000000L;

public ULID increment() {
    long msb = this.msb;
    long lsb = this.lsb + 1;
    if (lsb == OVERFLOW) {
        msb += 1;
    }
    return new ULID(msb, lsb);
}

其實就是低位加1,溢位後高位加1。接著新增一個靜態內部子類和響應方法如下:

// 建構函式
public ULID(ULID other) {
    this.msb = other.msb;
    this.lsb = other.lsb;
}


public static byte[] defaultRandomBytes(int len) {
    byte[] bytes = new byte[len];
    ThreadLocalRandom.current().nextBytes(bytes);
    return bytes;
}

public static MonotonicULIDSpi monotonicUlid() {
    return monotonicUlid(System::currentTimeMillis, ULID::defaultRandomBytes);
}

public static MonotonicULIDSpi monotonicUlid(Supplier<Long> timestampProvider,
                                              IntFunction<byte[]> randomnessProvider) {
    return new MonotonicULID(timestampProvider, randomnessProvider, timestampProvider.get(),
            randomnessProvider.apply(RANDOMNESS_BYTE_LEN));
}

// @SPI MonotonicULID
public interface MonotonicULIDSpi {

  ULID next();
}

private static class MonotonicULID extends ULID implements MonotonicULIDSpi {

    @Serial
    private static final long serialVersionUID = -9158161806889605101L;

    private volatile ULID lastULID;

    private final Supplier<Long> timestampProvider;

    private final IntFunction<byte[]> randomnessProvider;

    public MonotonicULID(Supplier<Long> timestampProvider,
                          IntFunction<byte[]> randomnessProvider,
                          long timestamp,
                          byte[] randomness) {
        super(timestamp, randomness);
        this.timestampProvider = timestampProvider;
        this.randomnessProvider = randomnessProvider;
        this.lastULID = new ULID(timestamp, randomness);
    }
    
    // 這裡沒設計好,子類快取了上一個節點,需要重寫一下increment方法,父類別可以移除此方法 
    @Override
    public ULID increment() {
        long newMsb = lastULID.msb;
        long newLsb = lastULID.lsb + 1;
        if (newLsb == OVERFLOW) {
            newMsb += 1;
        }
        return new ULID(newMsb, newLsb);
    }

    @Override
    public synchronized ULID next() {
        long lastTimestamp = lastULID.getTimestamp();
        long timestamp = getTimestamp();
        // 這裡做了一個恆為true的判斷,後面再研讀其他程式碼進行修改
        if (lastTimestamp >= timestamp || timestamp - lastTimestamp <= 1000) {
            this.lastULID = this.increment();
        } else {
            this.lastULID = new ULID(timestampProvider.get(), randomnessProvider.apply(RANDOMNESS_BYTE_LEN));
        }
        return new ULID(this.lastULID);
    }
}

關於上一個ULID和下一個ULID之間的時間戳判斷,這裡從規範檔案沒看出細節實現,先簡單做一個永遠為true的分支判斷,後面再深入研究然後修改。使用方式如下:

public static void main(String[] args) {
    MonotonicULIDSpi spi = ULID.monotonicUlid();
    System.out.println(spi.next());
    System.out.println(spi.next());
    System.out.println(spi.next());
    System.out.println(spi.next());
}
// 某次執行輸出
01GFGASXXQXD5ZJ26PKSCFGNPF
01GFGASXXQXD5ZJ26PKSCFGNPG
01GFGASXXQXD5ZJ26PKSCFGNPH
01GFGASXXQXD5ZJ26PKSCFGNPJ

這裡為了更加靈活,沒有采用全域性靜態屬性快取上一個ULID範例,而是簡單使用繼承方式實現。

ULID效能評估

引入JMH做了一個簡單的效能測試,程式碼如下:

@Fork(1)
@Threads(10)
@State(Scope.Benchmark)
@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 3)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class BenchmarkRunner {

    private static ULID.MonotonicULIDSpi SPI;

    @Setup
    public void setup() {
        SPI = ULID.monotonicUlid();
    }


    @Benchmark
    public UUID createUUID() {
        return UUID.randomUUID();
    }

    @Benchmark
    public String createUUIDToString() {
        return UUID.randomUUID().toString();
    }

    @Benchmark
    public ULID createULID() {
        return ULID.ulid();
    }

    @Benchmark
    public String createULIDToString() {
        return ULID.ulid().toString();
    }
    
    @Benchmark
    public String createULIDToCanonicalString0() {
        return ULID.ulid().toCanonicalString0();
    }

    @Benchmark
    public ULID createMonotonicULID() {
        return SPI.next();
    }

    @Benchmark
    public String createMonotonicULIDToString() {
        return SPI.next().toString();
    }

    public static void main(String[] args) throws Exception {
        new Runner(new OptionsBuilder().build()).run();
    }
}

某次測試報告如下(開發環境Intel 6700K 4C8T 32G,使用OpenJDK-19):

Benchmark                                      Mode  Cnt       Score      Error   Units
BenchmarkRunner.createMonotonicULID           thrpt    5   18529.565 ± 3432.113  ops/ms
BenchmarkRunner.createMonotonicULIDToString   thrpt    5   12308.443 ± 1729.675  ops/ms
BenchmarkRunner.createULID                    thrpt    5  122347.702 ± 3183.734  ops/ms
BenchmarkRunner.createULIDToCanonicalString0  thrpt    5   50848.135 ± 3699.334  ops/ms
BenchmarkRunner.createULIDToString            thrpt    5   37346.891 ± 1029.809  ops/ms
BenchmarkRunner.createUUID                    thrpt    5     806.134 ±  218.622  ops/ms
BenchmarkRunner.createUUIDToString            thrpt    5     813.380 ±   46.333  ops/ms

小結

本文就ULID的規範進行解讀,通過規範和參考現有類庫進行ULIDJava實現。ULID適用於一些"排序ID"生成或者需要"單調ID"生成的場景,可以考慮用於資料庫鍵設計、順序號設計等等場景。從實現上看它效能會優於UUID(特別是單調ULID,因為不需要重新獲取亂數部分,吞吐量會提升一個數量級)。

Demo專案倉庫:

  • framework-mesh/ulid4jhttps://github.com/zjcscut/framework-mesh/tree/master/ulid4j

參考資料: