.NET利用ArrayPoolPool<T>和MemoryPool<T>提供了針對Array/Memory<T>的物件池功能。最近在一個專案中需要使用到針對位元組陣列的物件池,由於這些池化的位元組陣列相當龐大,我希望將它們分配到POH上以降低GC的壓力。由於ArrayPoolPool<T>沒法提供支援,所以我提供了一個極簡的實現。
目錄
一、Bucket
二、ByteArrayOwner
三、ByteArrayPool
四、測試
和大部分實現方案一樣,我需要限制池化陣列的最大尺寸,同時設定最小長度為16。我們將[16-MaxLength]劃分為N個區間,每個區間對應一個Bucket,該Bucket用來管理「所在長度區間」的位元組陣列。如下所示的就是這個Bucket型別的定義:我們利用一個ConcurrentBag<byte[]>來維護池化的位元組陣列,陣列的「借」與「還」由TryTake和Add方法來實現。
internal sealed class Bucket { private readonly ConcurrentBag<byte[]> _byteArrays = new(); public void Add(byte[] array) => _byteArrays.Add(array); public bool TryTake([MaybeNullWhen(false)] out byte[] array) => _byteArrays.TryTake(out array); }
從物件池「借出」的是一個ByteArrayOwner 物件,它是對位元組陣列和所在Bucket的封裝。如果指定的陣列長度超過設定的閾值,意味著Bucket不存在,借出的位元組陣列也不需要還回去,這一邏輯體現在IsPooled屬性上。ByteArrayOwner 實現了IDisposable介面,實現Dispose方法呼叫Bucket的Add方法完成了針對位元組陣列的「歸還」,該方法利用針對_isReleased欄位的CompareExchange操作解決「重複歸還」的問題。
public sealed class ByteArrayOwner : IDisposable { private readonly byte[] _bytes; private readonly Bucket? _bucket; private volatile int _isReleased; public bool IsPooled => _bucket is not null; internal ByteArrayOwner(byte[] bytes, Bucket? bucket) { _bytes = bytes; _bucket = bucket; } public byte[] Bytes => _isReleased == 0 ? _bytes : throw new ObjectDisposedException("The ByteArrayOwner has been released."); public void Dispose() { if (Interlocked.CompareExchange(ref _isReleased, 1, 0) == 0) { _bucket?.Add(_bytes); } } }
具體的物件池實現體現在如下所示的ByteArrayPool型別上,池化陣列的最大長度在建構函式中指定,ByteArrayPool據此劃分長度區間並建立一組通過_buckets欄位表示的Bucket陣列。具體的區間劃分實現在靜態方法SelectBucketIndex方法中,當我們根據指定的陣列長度確定具體Bucket的時候(對於Bucket在_buckets陣列中的索引)同樣呼叫此方法。另一個靜態方法GetMaxSizeForBucket執行相反的操作,它根據指定的Bucket索引計算長度區間的最大值。當某個Bucket確定後,得到的陣列都具有這個長度。作為ArrayPoolPool<T>的預設實現,ConfigurableArrayPool<T>也採用一樣的演演算法。
public sealed class ByteArrayPool { private readonly Bucket[] _buckets; public int MaxArrayLength { get; } public static ByteArrayPool Create(int maxLength) => new(maxLength); private ByteArrayPool(int maxLength) { var bucketCount = SelectBucketIndex(maxLength) + 1; _buckets = new Bucket[bucketCount]; MaxArrayLength = GetMaxSizeForBucket(bucketCount - 1); for (int index = 0; index < bucketCount; index++) { _buckets[index] = new Bucket(); } } public ByteArrayOwner Rent(int minimumLength) { if (minimumLength < 0) { throw new ArgumentOutOfRangeException(nameof(minimumLength)); } if (minimumLength > MaxArrayLength) { return new ByteArrayOwner(bytes: new byte[minimumLength], bucket: null); } var bucketIndex = SelectBucketIndex(minimumLength); for (int index = bucketIndex; index < _buckets.Length; index++) { var bucket = _buckets[index]; if (bucket.TryTake(out var array)) { return new ByteArrayOwner(array, bucket: bucket); } } return new ByteArrayOwner(bytes: GC.AllocateUninitializedArray<byte>(GetMaxSizeForBucket(bucketIndex), pinned: true), bucket: _buckets[bucketIndex]); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int GetMaxSizeForBucket(int index) => 16 << index; [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int SelectBucketIndex(int length) => BitOperations.Log2((uint)(length - 1) | 0xFu) - 3; }
在核心方法Rent中,如果指定的長度超過閾值,該方法會直接建立一個位元組陣列,並封裝成返回的ByteArrayOwner 物件。由於這樣的ByteArrayOwner 不具有對應的Bucket,所以不需要「歸還」。如果指定的陣列長度在允許的範圍內,該方法會根據此長度確定對應Bucket的索引,並確定此索引對應的Bucket以及後續的Bucket是否保留著池化的陣列,如果存在,直接將其封裝成返回的ByteArrayOwner 物件。
如果所有符合長度要求的Bucket都是「空」的,那麼我們會根據指定長度對應Bucket建立一個位元組陣列(長度為該Bucket對應長度區間的最大值),並封裝成返回的ByteArrayOwner 物件。上面介紹的針對POH的分配體現在針對GC.AllocateUninitializedArray<byte>方法的呼叫,我們將pinned引數設定為True。
ByteArrayPool針對位元組陣列的池化通過如下的程式來演示。
var pool = ByteArrayPool.Create(maxLength: 1000); var length = 100; var minLength = 65; var maxLength = 128; // 在允許的最大長度內,被池化 var owner = pool.Rent(minimumLength: length); Debug.Assert(owner.IsPooled); var bytes = owner.Bytes; Debug.Assert(bytes.Length == maxLength); owner.Dispose(); for (int len = minLength; len <= maxLength; len++) { using (owner = pool.Rent(len)) { Debug.Assert(owner.IsPooled); Debug.Assert(ReferenceEquals(owner.Bytes, bytes)); } } // 只有被釋放的陣列才會被複用 owner = pool.Rent(minimumLength: length); Debug.Assert(!ReferenceEquals(owner.Bytes, pool.Rent(minimumLength: length).Bytes)); // 超出最大長度,不會被池化 owner = pool.Rent(minimumLength: pool.MaxArrayLength + 1); Debug.Assert(!owner.IsPooled); bytes = owner.Bytes; owner.Dispose(); Debug.Assert(!ReferenceEquals (pool.Rent(minimumLength: pool.MaxArrayLength + 1).Bytes, bytes));