實現一個極簡的位元組陣列物件池

2023-11-03 09:00:35

.NET利用ArrayPoolPool<T>和MemoryPool<T>提供了針對Array/Memory<T>的物件池功能。最近在一個專案中需要使用到針對位元組陣列的物件池,由於這些池化的位元組陣列相當龐大,我希望將它們分配到POH上以降低GC的壓力。由於ArrayPoolPool<T>沒法提供支援,所以我提供了一個極簡的實現。

目錄
一、Bucket
二、ByteArrayOwner
三、ByteArrayPool
四、測試

一、Bucket

和大部分實現方案一樣,我需要限制池化陣列的最大尺寸,同時設定最小長度為16。我們將[16-MaxLength]劃分為N個區間,每個區間對應一個Bucket,該Bucket用來管理「所在長度區間」的位元組陣列。如下所示的就是這個Bucket型別的定義:我們利用一個ConcurrentBag<byte[]>來維護池化的位元組陣列,陣列的「借」與「還」由TryTake和Add方法來實現。

internal sealed class Bucket
{
    private readonly ConcurrentBag<byte[]> _byteArrays = new();
    public void Add(byte[] array) => _byteArrays.Add(array);
    public bool TryTake([MaybeNullWhen(false)] out byte[] array) => _byteArrays.TryTake(out array);
}

二、ByteArrayOwner

從物件池「借出」的是一個ByteArrayOwner 物件,它是對位元組陣列和所在Bucket的封裝。如果指定的陣列長度超過設定的閾值,意味著Bucket不存在,借出的位元組陣列也不需要還回去,這一邏輯體現在IsPooled屬性上。ByteArrayOwner 實現了IDisposable介面,實現Dispose方法呼叫Bucket的Add方法完成了針對位元組陣列的「歸還」,該方法利用針對_isReleased欄位的CompareExchange操作解決「重複歸還」的問題。

public sealed class ByteArrayOwner : IDisposable
{
    private readonly byte[] _bytes;
    private readonly Bucket? _bucket;
    private volatile int _isReleased;
    public bool IsPooled => _bucket is not null;
    internal ByteArrayOwner(byte[] bytes, Bucket? bucket)
    {
        _bytes = bytes;
        _bucket = bucket;
    }
    public byte[] Bytes => _isReleased == 0 ? _bytes : throw new ObjectDisposedException("The ByteArrayOwner has been released.");
    public void Dispose()
    {
        if (Interlocked.CompareExchange(ref _isReleased, 1, 0) == 0)
        {
            _bucket?.Add(_bytes);
        }
    }
}

三、ByteArrayPool

具體的物件池實現體現在如下所示的ByteArrayPool型別上,池化陣列的最大長度在建構函式中指定,ByteArrayPool據此劃分長度區間並建立一組通過_buckets欄位表示的Bucket陣列。具體的區間劃分實現在靜態方法SelectBucketIndex方法中,當我們根據指定的陣列長度確定具體Bucket的時候(對於Bucket在_buckets陣列中的索引)同樣呼叫此方法。另一個靜態方法GetMaxSizeForBucket執行相反的操作,它根據指定的Bucket索引計算長度區間的最大值。當某個Bucket確定後,得到的陣列都具有這個長度。作為ArrayPoolPool<T>的預設實現,ConfigurableArrayPool<T>也採用一樣的演演算法。

public sealed class ByteArrayPool
{
    private readonly Bucket[] _buckets;
    public int MaxArrayLength { get; }
    public static ByteArrayPool Create(int maxLength) => new(maxLength);
    private ByteArrayPool(int maxLength)
    {
        var bucketCount = SelectBucketIndex(maxLength) + 1;
        _buckets = new Bucket[bucketCount];
        MaxArrayLength = GetMaxSizeForBucket(bucketCount - 1);
        for (int index = 0; index < bucketCount; index++)
        {
            _buckets[index] = new Bucket();
        }
    }

    public ByteArrayOwner Rent(int minimumLength)
    {
        if (minimumLength < 0)
        {
            throw new ArgumentOutOfRangeException(nameof(minimumLength));
        }
        if (minimumLength > MaxArrayLength)
        {
            return new ByteArrayOwner(bytes: new byte[minimumLength], bucket: null);
        }

        var bucketIndex = SelectBucketIndex(minimumLength);
        for (int index = bucketIndex; index < _buckets.Length; index++)
        {
            var bucket = _buckets[index];
            if (bucket.TryTake(out var array))
            {
                return new ByteArrayOwner(array, bucket: bucket);
            }
        }

        return new ByteArrayOwner(bytes: GC.AllocateUninitializedArray<byte>(GetMaxSizeForBucket(bucketIndex), pinned: true), bucket: _buckets[bucketIndex]);
    }

    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public static int GetMaxSizeForBucket(int index) => 16 << index;

    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public static int SelectBucketIndex(int length) => BitOperations.Log2((uint)(length - 1) | 0xFu) - 3;
}

在核心方法Rent中,如果指定的長度超過閾值,該方法會直接建立一個位元組陣列,並封裝成返回的ByteArrayOwner 物件。由於這樣的ByteArrayOwner 不具有對應的Bucket,所以不需要「歸還」。如果指定的陣列長度在允許的範圍內,該方法會根據此長度確定對應Bucket的索引,並確定此索引對應的Bucket以及後續的Bucket是否保留著池化的陣列,如果存在,直接將其封裝成返回的ByteArrayOwner 物件。

果所有符合長度要求的Bucket都是「空」的,那麼我們會根據指定長度對應Bucket建立一個位元組陣列(長度為該Bucket對應長度區間的最大值),並封裝成返回的ByteArrayOwner 物件。上面介紹的針對POH的分配體現在針對GC.AllocateUninitializedArray<byte>方法的呼叫,我們將pinned引數設定為True。

四、測試

ByteArrayPool針對位元組陣列的池化通過如下的程式來演示。

var pool = ByteArrayPool.Create(maxLength: 1000);
var length = 100;
var minLength = 65;
var maxLength = 128;

// 在允許的最大長度內,被池化
var owner = pool.Rent(minimumLength: length);
Debug.Assert(owner.IsPooled);
var bytes = owner.Bytes;
Debug.Assert(bytes.Length == maxLength);
owner.Dispose();
for (int len = minLength; len <= maxLength; len++)
{
    using (owner = pool.Rent(len))
    {
        Debug.Assert(owner.IsPooled);
        Debug.Assert(ReferenceEquals(owner.Bytes, bytes));
    }
}

// 只有被釋放的陣列才會被複用
owner = pool.Rent(minimumLength: length);
Debug.Assert(!ReferenceEquals(owner.Bytes, pool.Rent(minimumLength: length).Bytes));

// 超出最大長度,不會被池化
owner = pool.Rent(minimumLength: pool.MaxArrayLength + 1);
Debug.Assert(!owner.IsPooled);
bytes = owner.Bytes;
owner.Dispose();
Debug.Assert(!ReferenceEquals (pool.Rent(minimumLength: pool.MaxArrayLength + 1).Bytes, bytes));