快取是一種非常常見的設計,通過將資料快取到存取速度更快的儲存裝置中,來提高資料的存取速度,如記憶體、CPU快取、硬碟快取等。
但與快取的高速相對的是,快取的成本較高,因此容量往往是有限的,當快取滿了之後,就需要一種策略來決定將哪些資料移除出快取,以騰出空間來儲存新的資料。
這樣的策略被稱為快取替換策略(Cache Replacement Policy)。
常見的快取替換策略有:FIFO(First In First Out)、LRU(Least Recently Used)、LFU(Least Frequently Used)等。
今天給大家介紹的是LRU演演算法。
LRU演演算法基於這樣一個假設:如果資料最近被存取過,那麼將來被存取的機率也更高。
大部分情況下這個假設是成立的,因此LRU演演算法也是比較常用的快取替換策略。
基於這個假設,我們在實現的時候,需要維護一個有序的資料結構,來記錄資料的存取歷史,當快取滿了之後,就可以根據這個資料結構來決定將哪些資料移除出快取。
但如果資料的存取模式不符合LRU演演算法的假設,那麼LRU演演算法就會失效。
例如:資料的存取模式是週期性的,那麼LRU演演算法就會把週期性的資料淘汰掉,這樣就會導致快取命中率的下降。
換個說法比如,如果現在快取的資料只在白天被存取,晚上存取的是另一批資料,那麼在晚上,LRU演演算法就會把白天存取的資料淘汰掉,第二天白天又會把昨天晚上存取的資料淘汰掉,這樣就會導致快取命中率的下降。
後面有時間會給大家介紹LFU(Least Frequently Used)演演算法,以及LFU和LRU的結合LFRU(Least Frequently and Recently Used)演演算法,可以有效的解決這個問題。
上文提到,LRU演演算法需要維護一個有序的資料結構,來記錄資料的存取歷史。通常我們會用雙向連結串列來實現這個資料結構,因為雙向連結串列可以在O(1)的時間複雜度內往連結串列的頭部或者尾部插入資料,以及在O(1)的時間複雜度內刪除資料。
我們將資料儲存在雙向連結串列中,每次存取資料的時候,就將資料移動到連結串列的尾部,這樣就可以保證連結串列的尾部就是最近存取的資料,連結串列的頭部就是最久沒有被存取的資料。
當快取滿了之後,如果需要插入新的資料,因為連結串列的頭部就是最久沒有被存取的資料,所以我們就可以直接將連結串列的頭部刪除,然後將新的資料插入到連結串列的尾部。
如果我們要實現一個鍵值對的快取,我們可以用一個雜湊表來儲存鍵值對,這樣就可以在O(1)的時間複雜度內完成查詢操作,.NET 中我們可以使用 Dictionary。
同時我們使用 LinkedList 來作為雙向連結串列的實現,儲存快取的 key,以此記錄資料的存取歷史。
我們在每次操作 Dictionary 進行插入、刪除、查詢的時候,都需要將對應的 key 也插入、刪除、移動到連結串列的尾部。
// 實現 IEnumerable 介面,方便遍歷
public class LRUCache<TKey, TValue> : IEnumerable<KeyValuePair<TKey, TValue>>
{
private readonly LinkedList<TKey> _list;
private readonly Dictionary<TKey, TValue> _dictionary;
private readonly int _capacity;
public LRUCache(int capacity)
{
_capacity = capacity;
_list = new LinkedList<TKey>();
_dictionary = new Dictionary<TKey, TValue>();
}
public TValue Get(TKey key)
{
if (_dictionary.TryGetValue(key, out var value))
{
// 在連結串列中刪除 key,然後將 key 新增到連結串列的尾部
// 這樣就可以保證連結串列的尾部就是最近存取的資料,連結串列的頭部就是最久沒有被存取的資料
// 但是在連結串列中刪除 key 的時間複雜度是 O(n),所以這個演演算法的時間複雜度是 O(n)
_list.Remove(key);
_list.AddLast(key);
return value;
}
return default;
}
public void Put(TKey key, TValue value)
{
if (_dictionary.TryGetValue(key, out _))
{
// 如果插入的 key 已經存在,將 key 對應的值更新,然後將 key 移動到連結串列的尾部
_dictionary[key] = value;
_list.Remove(key);
_list.AddLast(key);
}
else
{
if (_list.Count == _capacity)
{
// 快取滿了,刪除連結串列的頭部,也就是最久沒有被存取的資料
_dictionary.Remove(_list.First.Value);
_list.RemoveFirst();
}
_list.AddLast(key);
_dictionary.Add(key, value);
}
}
public void Remove(TKey key)
{
if (_dictionary.TryGetValue(key, out _))
{
_dictionary.Remove(key);
_list.Remove(key);
}
}
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
foreach (var key in _list)
{
yield return new KeyValuePair<TKey, TValue>(key, _dictionary[key]);
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
var lruCache = new LRUCache<int, int>(4);
lruCache.Put(1, 1);
lruCache.Put(2, 2);
lruCache.Put(3, 3);
lruCache.Put(4, 4);
Console.WriteLine(string.Join(" ", lruCache));
Console.WriteLine(lruCache.Get(2));
Console.WriteLine(string.Join(" ", lruCache));
lruCache.Put(5, 5);
Console.WriteLine(string.Join(" ", lruCache));
lruCache.Remove(3);
Console.WriteLine(string.Join(" ", lruCache));
輸出:
[1, 1] [2, 2] [3, 3] [4, 4] // 初始化
2 // 存取 2
[1, 1] [3, 3] [4, 4] [2, 2] // 2 移動到連結串列尾部
[3, 3] [4, 4] [2, 2] [5, 5] // 插入 5
[4, 4] [2, 2] [5, 5] // 刪除 3
上面的實現中,對快取的查詢、插入、刪除都會涉及到連結串列中資料的刪除(移動也是刪除再插入)。
因為我們在 LinkedList 中儲存的是 key,所以我們需要先通過 key 在連結串列中找到對應的節點,然後再進行刪除操作,這就導致了連結串列的刪除操作的時間複雜度是 O(n)。
雖然 Dictionary 的查詢、插入、刪除操作的時間複雜度都是 O(1),但因為連結串列操作的時間複雜度是 O(n),整個演演算法的最差時間複雜度是 O(n)。
演演算法優化的關鍵在於如何降低連結串列的刪除操作的時間複雜度。
優化思路:
也就是說,我們讓兩個本來不相關的資料結構之間產生聯絡。
不管是在插入、刪除、查詢快取的時候,都可以通過這種聯絡來將時間複雜度降低到 O(1)。
public class LRUCache_V2<TKey, TValue> : IEnumerable<KeyValuePair<TKey, TValue>>
{
private readonly LinkedList<KeyValuePair<TKey, TValue>> _list;
private readonly Dictionary<TKey, LinkedListNode<KeyValuePair<TKey, TValue>>> _dictionary;
private readonly int _capacity;
public LRUCache_V2(int capacity)
{
_capacity = capacity;
_list = new LinkedList<KeyValuePair<TKey, TValue>>();
_dictionary = new Dictionary<TKey, LinkedListNode<KeyValuePair<TKey, TValue>>>();
}
public TValue Get(TKey key)
{
if (_dictionary.TryGetValue(key, out var node))
{
_list.Remove(node);
_list.AddLast(node);
return node.Value.Value;
}
return default;
}
public void Put(TKey key, TValue value)
{
if (_dictionary.TryGetValue(key, out var node))
{
node.Value = new KeyValuePair<TKey, TValue>(key, value);
_list.Remove(node);
_list.AddLast(node);
}
else
{
if (_list.Count == _capacity)
{
_dictionary.Remove(_list.First.Value.Key);
_list.RemoveFirst();
}
var newNode = new LinkedListNode<KeyValuePair<TKey, TValue>>(new KeyValuePair<TKey, TValue>(key, value));
_list.AddLast(newNode);
_dictionary.Add(key, newNode);
}
}
public void Remove(TKey key)
{
if (_dictionary.TryGetValue(key, out var node))
{
_dictionary.Remove(key);
_list.Remove(node);
}
}
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
return _list.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
因為我們對 雙向連結串列 的儲存需求是客製化化的,要求節點中儲存 key-value,直接使用 C# 的 LinkedList 我們就需要用 KeyValuePair 這樣的結構來間接儲存,會導致一些不必要的記憶體開銷。
我們可以自己實現一個雙向連結串列,這樣就可以直接在節點中儲存 key-value,從而減少記憶體開銷。
public class LRUCache_V3<TKey, TValue>
{
private readonly DoubleLinkedListNode<TKey, TValue> _head;
private readonly DoubleLinkedListNode<TKey, TValue> _tail;
private readonly Dictionary<TKey, DoubleLinkedListNode<TKey, TValue>> _dictionary;
private readonly int _capacity;
public LRUCache_V3(int capacity)
{
_capacity = capacity;
_head = new DoubleLinkedListNode<TKey, TValue>();
_tail = new DoubleLinkedListNode<TKey, TValue>();
_head.Next = _tail;
_tail.Previous = _head;
_dictionary = new Dictionary<TKey, DoubleLinkedListNode<TKey, TValue>>();
}
public TValue Get(TKey key)
{
if (_dictionary.TryGetValue(key, out var node))
{
RemoveNode(node);
AddLastNode(node);
return node.Value;
}
return default;
}
public void Put(TKey key, TValue value)
{
if (_dictionary.TryGetValue(key, out var node))
{
RemoveNode(node);
AddLastNode(node);
node.Value = value;
}
else
{
if (_dictionary.Count == _capacity)
{
var firstNode = RemoveFirstNode();
_dictionary.Remove(firstNode.Key);
}
var newNode = new DoubleLinkedListNode<TKey, TValue>(key, value);
AddLastNode(newNode);
_dictionary.Add(key, newNode);
}
}
public void Remove(TKey key)
{
if (_dictionary.TryGetValue(key, out var node))
{
_dictionary.Remove(key);
RemoveNode(node);
}
}
private void AddLastNode(DoubleLinkedListNode<TKey, TValue> node)
{
node.Previous = _tail.Previous;
node.Next = _tail;
_tail.Previous.Next = node;
_tail.Previous = node;
}
private DoubleLinkedListNode<TKey, TValue> RemoveFirstNode()
{
var firstNode = _head.Next;
_head.Next = firstNode.Next;
firstNode.Next.Previous = _head;
firstNode.Next = null;
firstNode.Previous = null;
return firstNode;
}
private void RemoveNode(DoubleLinkedListNode<TKey, TValue> node)
{
node.Previous.Next = node.Next;
node.Next.Previous = node.Previous;
node.Next = null;
node.Previous = null;
}
internal class DoubleLinkedListNode<TKey, TValue>
{
public DoubleLinkedListNode()
{
}
public DoubleLinkedListNode(TKey key, TValue value)
{
Key = key;
Value = value;
}
public TKey Key { get; set; }
public TValue Value { get; set; }
public DoubleLinkedListNode<TKey, TValue> Previous { get; set; }
public DoubleLinkedListNode<TKey, TValue> Next { get; set; }
}
}
使用 BenchmarkDotNet 對3個版本進行效能測試對比。
[MemoryDiagnoser]
public class WriteBenchmarks
{
// 保證寫入的資料有一定的重複性,藉此來測試LRU的最差時間複雜度
private const int Capacity = 1000;
private const int DataSize = 10_0000;
private List<int> _data;
[GlobalSetup]
public void Setup()
{
_data = new List<int>();
var shared = Random.Shared;
for (int i = 0; i < DataSize; i++)
{
_data.Add(shared.Next(0, DataSize / 10));
}
}
[Benchmark]
public void LRUCache_V1()
{
var cache = new LRUCache<int, int>(Capacity);
foreach (var item in _data)
{
cache.Put(item, item);
}
}
[Benchmark]
public void LRUCache_V2()
{
var cache = new LRUCache_V2<int, int>(Capacity);
foreach (var item in _data)
{
cache.Put(item, item);
}
}
[Benchmark]
public void LRUCache_V3()
{
var cache = new LRUCache_V3<int, int>(Capacity);
foreach (var item in _data)
{
cache.Put(item, item);
}
}
}
public class ReadBenchmarks
{
// 保證寫入的資料有一定的重複性,藉此來測試LRU的最差時間複雜度
private const int Capacity = 1000;
private const int DataSize = 10_0000;
private List<int> _data;
private LRUCache<int, int> _cacheV1;
private LRUCache_V2<int, int> _cacheV2;
private LRUCache_V3<int, int> _cacheV3;
[GlobalSetup]
public void Setup()
{
_cacheV1 = new LRUCache<int, int>(Capacity);
_cacheV2 = new LRUCache_V2<int, int>(Capacity);
_cacheV3 = new LRUCache_V3<int, int>(Capacity);
_data = new List<int>();
var shared = Random.Shared;
for (int i = 0; i < DataSize; i++)
{
int dataToPut = shared.Next(0, DataSize / 10);
int dataToGet = shared.Next(0, DataSize / 10);
_data.Add(dataToGet);
_cacheV1.Put(dataToPut, dataToPut);
_cacheV2.Put(dataToPut, dataToPut);
_cacheV3.Put(dataToPut, dataToPut);
}
}
[Benchmark]
public void LRUCache_V1()
{
foreach (var item in _data)
{
_cacheV1.Get(item);
}
}
[Benchmark]
public void LRUCache_V2()
{
foreach (var item in _data)
{
_cacheV2.Get(item);
}
}
[Benchmark]
public void LRUCache_V3()
{
foreach (var item in _data)
{
_cacheV3.Get(item);
}
}
}
寫入效能測試結果:
| Method | Mean | Error | StdDev | Median | Gen0 | Gen1 | Allocated |
|------------ |----------:|----------:|----------:|----------:|---------:|---------:|----------:|
| LRUCache_V1 | 16.890 ms | 0.3344 ms | 0.8012 ms | 16.751 ms | 750.0000 | 218.7500 | 4.65 MB |
| LRUCache_V2 | 7.193 ms | 0.1395 ms | 0.3958 ms | 7.063 ms | 703.1250 | 226.5625 | 4.22 MB |
| LRUCache_V3 | 5.761 ms | 0.1102 ms | 0.1132 ms | 5.742 ms | 585.9375 | 187.5000 | 3.53 MB |
查詢效能測試結果:
| Method | Mean | Error | StdDev | Gen0 | Allocated |
|------------ |----------:|----------:|----------:|--------:|----------:|
| LRUCache_V1 | 19.475 ms | 0.3824 ms | 0.3390 ms | 62.5000 | 474462 B |
| LRUCache_V2 | 1.994 ms | 0.0273 ms | 0.0242 ms | - | 4 B |
| LRUCache_V3 | 1.595 ms | 0.0187 ms | 0.0175 ms | - | 3 B |