原始碼地址:https://github.com/Microsoft/Microsoft.IO.RecyclableMemoryStream
小物件池和大物件池管理、RecyclableMemoryStream建立、各場景的ETW訊息\事件勾點;執行緒安全
備註:官方這張圖,只是池塊增長策略闡述,不能很好理解其內部池具體實現。小物件還好理解,大物件組織分配並不像畫的這樣。
blockSize:小物件池,塊大小;預設128KB。
largeBufferMultiple:大物件池,策略被乘數大小;預設1M。
maximumBufferSize:大物件池,塊最大大小;預設128M。
useExponentialLargeBuffer:大物件池策略,true:指數、false:線性;一個類實體只能對應一種策略,如果想使用2種策略,就要定義2個類實體;預設線性。
maximumSmallPoolFreeBytes:byte[] 歸還小物件池,小物件池最大大小;超過GC回收,否則歸還小物件池;預設不限制。
maximumLargePoolFreeBytes:byte[] 歸還大物件池,大物件池最大大小;超過GC回收,否則歸還大物件池;預設不限制。
private readonly ConcurrentStack<byte[]> smallPool; //小物件池
private long smallPoolFreeSize; //小物件池歸還(空閒)byte大小
private long smallPoolInUseSize; //小物件借出(使用)byte大小
smallPool 使用執行緒安全堆疊,每個元素bye[] 大小一樣,對應blockSize
如:blockSize = 128K,則bye[] 大小都是 128K
maximumSmallPoolFreeBytes = 128M,則代表smallPool 所有塊大小總和最大值為128M,超出則歸還時不儲存到最小池中
private readonly ConcurrentStack<byte[]>[] largePools; //大物件池
private readonly long[] largeBufferFreeSize; //大物件每個層級,歸還(空閒)byte大小
private readonly long[] largeBufferInUseSize; //大物件每個層級,借出(使用)byte大小
largePools 使用執行緒安全堆疊的陣列,陣列每個索引對應指數/線性一個層級大小,每個層級bye[] 大小一樣。
如:largeBufferMultiple = 1M、maximumBufferSize = 128M
則,指數 1M、2M、4M、8M、16M、32M、64M、128M,largePools.Length = 8;largePools[2] 下面bye[] 大小都是4M
線性 1M、2M、3M、4M、5M、6M、7M、......、128M,largePools.Length = 128;largePools[2] 下面bye[] 大小都是3M
largeBufferFreeSize.Length == largePools.Length
largeBufferInUseSize = largePools.Length + 1 ,多出一個元素儲存,借出時requiredSize > maximumBufferSize 所有byte[]大小,此byte[] 無法歸還到大物件池,會被GC直接回收。
maximumLargePoolFreeBytes = 256M,則代表大池的各個維度塊大小總和最大值,超出則歸還時不儲存到池中,各個維度如:線性有128個維度,指數8個維度,各個維度都是堆疊
線性最大空間:256M * 128 = 32G
指數最大空間:256M * 8 = 2G
借出:
internal byte[] GetBlock() //從小物件池獲取byte[],若無則直接建立 new byte[this.BlockSize]
internal byte[] GetLargeBuffer(long requiredSize, Guid id, string tag) //從大物件池獲取byte[],首先根據requiredSize計算對應大物件索引位置,若無則直接建立 new byte[requiredSize]
歸還:
internal void ReturnBlocks(List<byte[]> blocks, Guid id, string tag) //多個塊歸還小物件池,判斷是否maximumSmallPoolFreeBytes超出,不超出則歸還
internal void ReturnBlock(byte[] block, Guid id, string tag) //單個塊歸還小物件池,判斷是否maximumSmallPoolFreeBytes超出,不超出則歸還
//歸還大物件池,首先根據buffer.Length計算對應大物件索引位置,判斷索引對應層級大小是否maximumLargePoolFreeBytes超出,不超出則歸還
internal void ReturnLargeBuffer(byte[] buffer, Guid id, string tag)
public MemoryStream GetStream(Guid id, string tag, long requiredSize, bool asContiguousBuffer)
asContiguousBuffer == true && requiredSize > this.BlockSize
請求連續byte[] 且 請求位元組大於1個小物件塊大小時,則使用大象池建立RecyclableMemoryStream,否則使用小物件池建立RecyclableMemoryStream。
其他過載方法,無asContiguousBuffer引數,預設使用小物件池建立RecyclableMemoryStream。
方法中,byte[] buffer、Memory<byte> buffer、ReadOnlySpan<byte> buffer 引數,會把其中的byte 資料寫入新申請的小物件blocks裡面,不會複用這些物件。
非執行緒安全
internal RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id, string tag, long requestedSize, byte[] initialLargeBuffer)
memoryManager:RecyclableMemoryStreamManager參照,呼叫其提供借出\歸還\通知狀態等方法或者屬性。
initialLargeBuffer:不為null代表使用大物件池,否則小物件池。GetBuffer()方法也會影響小物件池轉為大物件池。
requestedSize:根據請求大小,分配1個大物件池bye[] 或者多個小物件池Block byte[]
private readonly List<byte[]> blocks = new List<byte[]>(); //如果使用小物件池,則儲存借出的多個小物件
private byte[] largeBuffer; //如果使用大物件池,則儲存借出的大物件
/*
* 如果使用大物件池,Capacity調整需要更換更大的大物件;
* 老的大物件歸還,大物件池超出暫時無法回收大物件,則儲存到此,在物件Dispose時再次嘗試歸還。
* 因為可能有多次此情況發生,所有為List<>
*/
private List<byte[]> dirtyBuffers;
/// <summary>
/// The finalizer will be called when a stream is not disposed properly.
/// </summary>
/// <remarks>Failing to dispose indicates a bug in the code using streams. Care should be taken to properly account for stream lifetime.</remarks>
~RecyclableMemoryStream()
{
// 解構方法,兜底釋放
this.Dispose(false);
}
//非公開方法
/// <summary>
/// Returns the memory used by this stream back to the pool.
/// </summary>
/// <param name="disposing">Whether we're disposing (true), or being called by the finalizer (false).</param>
//disposing 區分是否解構函式呼叫
protected override void Dispose(bool disposing)
{
if (this.disposed)
{
// 已釋放不在釋放,記錄通知事件
string doubleDisposeStack = null;
if (this.memoryManager.GenerateCallStacks)
{
doubleDisposeStack = Environment.StackTrace;
}
this.memoryManager.ReportStreamDoubleDisposed(this.id, this.tag, this.AllocationStack, this.DisposeStack, doubleDisposeStack);
return;
}
//標記已釋放
this.disposed = true;
if (this.memoryManager.GenerateCallStacks)
{
this.DisposeStack = Environment.StackTrace;
}
this.memoryManager.ReportStreamDisposed(this.id, this.tag, this.AllocationStack, this.DisposeStack);
if (disposing)
{
//已釋放,不用進入解構佇列,不會觸發解構函式。
GC.SuppressFinalize(this);
}
else
{
// We're being finalized.
this.memoryManager.ReportStreamFinalized(this.id, this.tag, this.AllocationStack);
//如果此應用程式域正在解除安裝,並且公共語言執行時已開始呼叫終止程式,則不執行歸還池邏輯。
if (AppDomain.CurrentDomain.IsFinalizingForUnload())
{
// If we're being finalized because of a shutdown, don't go any further.
// We have no idea what's already been cleaned up. Triggering events may cause
// a crash.
base.Dispose(disposing);
return;
}
}
this.memoryManager.ReportStreamLength(this.length);
if (this.largeBuffer != null)
{
//歸還大物件
this.memoryManager.ReturnLargeBuffer(this.largeBuffer, this.id, this.tag);
}
if (this.dirtyBuffers != null)
{
//再次嘗試歸還老的大物件列表
foreach (var buffer in this.dirtyBuffers)
{
this.memoryManager.ReturnLargeBuffer(buffer, this.id, this.tag);
}
}
//歸還小物件塊列表
this.memoryManager.ReturnBlocks(this.blocks, this.id, this.tag);
this.blocks.Clear();
base.Dispose(disposing);
}
//公共方法
/// <summary>
/// Equivalent to <c>Dispose</c>.
/// </summary>
public override void Close()
{
this.Dispose(true);
}
標識位置資訊,方便引數傳遞,操作blocks屬性。
private struct BlockAndOffset
{
public int Block; //小物件塊所在整體位置索引
public int Offset; //小物件塊中未使用位元組開始位置\已使用位元組結束位置
public BlockAndOffset(int block, int offset)
{
this.Block = block;
this.Offset = offset;
}
}
public ReadOnlySequence<byte> GetReadOnlySequence()
{
this.CheckDisposed();
if (this.largeBuffer != null)
{
//大物件,只有1個位元組陣列,連續的
AssertLengthIsSmall();
return new ReadOnlySequence<byte>(this.largeBuffer, 0, (int)this.length);
}
if (this.blocks.Count == 1)
{
//小物件1個塊,只有1個位元組陣列,連續的
AssertLengthIsSmall();
return new ReadOnlySequence<byte>(this.blocks[0], 0, (int)this.length);
}
//小物件多個塊,多個位元組資料,不連續的
var first = new BlockSegment(this.blocks[0]);
var last = first;
//建立關聯下一個塊物件
for (int blockIdx = 1; last.RunningIndex + last.Memory.Length < this.length; blockIdx++)
{
last = last.Append(this.blocks[blockIdx]);
}
//首尾物件
return new ReadOnlySequence<byte>(first, 0, last, (int)(this.length - last.RunningIndex));
}
private sealed class BlockSegment : ReadOnlySequenceSegment<byte>
{
public BlockSegment(Memory<byte> memory) => Memory = memory;
public BlockSegment Append(Memory<byte> memory)
{
var nextSegment = new BlockSegment(memory) { RunningIndex = RunningIndex + Memory.Length };
Next = nextSegment;
return nextSegment;
}
}
private byte[] bufferWriterTempBuffer;
private ArraySegment<byte> GetWritableBuffer(int sizeHint)
{
this.CheckDisposed();
if (sizeHint < 0)
{
throw new ArgumentOutOfRangeException(nameof(sizeHint), $"{nameof(sizeHint)} must be non-negative.");
}
var minimumBufferSize = Math.Max(sizeHint, 1);
this.EnsureCapacity(this.position + minimumBufferSize);
if (this.bufferWriterTempBuffer != null)
{
this.ReturnTempBuffer(this.bufferWriterTempBuffer);
this.bufferWriterTempBuffer = null;
}
if (this.largeBuffer != null)
{
return new ArraySegment<byte>(this.largeBuffer, (int)this.position, this.largeBuffer.Length - (int)this.position);
}
BlockAndOffset blockAndOffset = this.GetBlockAndRelativeOffset(this.position);
int remainingBytesInBlock = this.MemoryManager.BlockSize - blockAndOffset.Offset;
if (remainingBytesInBlock >= minimumBufferSize)
{
//分配小物件,範圍屬於一個block,則返回block連續段
return new ArraySegment<byte>(this.blocks[blockAndOffset.Block], blockAndOffset.Offset, this.MemoryManager.BlockSize - blockAndOffset.Offset);
}
//分配小物件,單位大於一個block塊,則通過大物件/小物件分配byte[];記錄賦值給屬性bufferWriterTempBuffer;規避不好返回多個blocks中byte
this.bufferWriterTempBuffer = minimumBufferSize > this.memoryManager.BlockSize ?
this.memoryManager.GetLargeBuffer(minimumBufferSize, this.id, this.tag) :
this.memoryManager.GetBlock();
return new ArraySegment<byte>(this.bufferWriterTempBuffer);
}
public void Advance(int count)
{
this.CheckDisposed();
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count), $"{nameof(count)} must be non-negative.");
}
byte[] buffer = this.bufferWriterTempBuffer;
if (buffer != null)
{
if (count > buffer.Length)
{
throw new InvalidOperationException($"Cannot advance past the end of the buffer, which has a size of {buffer.Length}.");
}
//把bufferWriterTempBuffer屬性中資料,寫回小物件blocks
this.Write(buffer, 0, count);
this.ReturnTempBuffer(buffer);
this.bufferWriterTempBuffer = null;
}
else
{
long bufferSize = this.largeBuffer == null
? this.memoryManager.BlockSize - this.GetBlockAndRelativeOffset(this.position).Offset
: this.largeBuffer.Length - this.position;
if (count > bufferSize)
{
throw new InvalidOperationException($"Cannot advance past the end of the buffer, which has a size of {bufferSize}.");
}
this.position += count;
this.length = Math.Max(this.position, this.length);
}
}
如果使用大物件,則返回大物件資料
如果使用小物件+塊1個,則直接返回這個塊。如果大於1個,則申請新大物件返回,之前小物件歸還。升級為使用大物件。
通過new byte[this.Length] 建立新byte陣列,拷貝大物件/小物件資料過來。