cache2go-原始碼閱讀

2022-08-02 18:01:41

簡介

cache2go 是非常簡短的 go 開源專案了,很適合作為第一個讀原始碼專案。

如果你有一定的 go 開發經驗,讀起來會感覺到比較容易。

如果你剛剛接觸 go 語音,基礎知識還不完全瞭解,希望閱讀本文時,遇到一個不會的知識點,去攻克一個,帶著為了看懂本文原始碼的目的去學習基礎知識。比如:

  • time.Timer
  • defer
  • sync.RWMutex

作者這樣介紹:Concurrency-safe golang caching library with expiration capabilities,簡單來說就是具有過期功能的並行安全 golang 快取庫,因此它具有兩大特性:

  • 並行安全
  • 自動過期

專案結構

該專案非常簡單,全部邏輯由三個檔案實現:

  • cache.go:快取多個表。
  • cachetable.go:快取一個表。
  • cacheitem.go:快取表中的一個條目。

資料結構圖:

接下來會自下而上地分析原始碼。

cacheitem.go

該檔案中包含兩塊重要內容:

  • 結構體 CacheItem,用來快取表中的一個條目。
  • 函數 NewCacheItem,用來建立 CacheItem 範例。

CacheItem

CacheItem 用來快取表中的一個條目,屬性解釋:

  • sync.RWMutex:讀寫鎖,保證並行讀寫安全。
  • key:鍵。
  • value:值,即資料。
  • lifeSpan:該條目的存活週期,即過期時間。
  • createdOn:建立時間。
  • accessedOn:上次存取時間。
  • accessCount:存取次數。
  • aboutToExpire:從快取中刪除專案之前觸發的回撥方法,可以在刪除之前做一些自定義操作。

原始碼如下:

// CacheItem is an individual cache item
// Parameter data contains the user-set value in the cache.
type CacheItem struct {
	sync.RWMutex

	// The item's key.
	key interface{}
	// The item's data.
	data interface{}
	// How long will the item live in the cache when not being accessed/kept alive.
	lifeSpan time.Duration

	// Creation timestamp.
	createdOn time.Time
	// Last access timestamp.
	accessedOn time.Time
	// How often the item was accessed.
	accessCount int64

	// Callback method triggered right before removing the item from the cache
	aboutToExpire []func(key interface{})
}

Get 方法

下面是一些比較簡單的 Get 方法,一些有寫場景的屬性會多兩行獲取鎖與釋放鎖的程式碼。

// LifeSpan returns this item's expiration duration.
func (item *CacheItem) LifeSpan() time.Duration {
	// immutable
	return item.lifeSpan
}

// AccessedOn returns when this item was last accessed.
func (item *CacheItem) AccessedOn() time.Time {
	item.RLock()
	defer item.RUnlock()
	return item.accessedOn
}

// CreatedOn returns when this item was added to the cache.
func (item *CacheItem) CreatedOn() time.Time {
	// immutable
	return item.createdOn
}

// AccessCount returns how often this item has been accessed.
func (item *CacheItem) AccessCount() int64 {
	item.RLock()
	defer item.RUnlock()
	return item.accessCount
}

// Key returns the key of this cached item.
func (item *CacheItem) Key() interface{} {
	// immutable
	return item.key
}

// Data returns the value of this cached item.
func (item *CacheItem) Data() interface{} {
	// immutable
	return item.data
}

KeepAlive

保活函數:

  • 前兩行程式碼錶示:加鎖保證並行安全讀寫。
  • 後兩行程式碼錶示:當被存取時,更新存取時間,同時存取次數加 1。
// KeepAlive marks an item to be kept for another expireDuration period.
func (item *CacheItem) KeepAlive() {
	item.Lock()
	defer item.Unlock()
	item.accessedOn = time.Now()
	item.accessCount++
}

AddAboutToExpireCallback

新增回撥函數,回撥函數無返回值,僅有一個引數 interface{},即支援任意的引數。

// AddAboutToExpireCallback appends a new callback to the AboutToExpire queue
func (item *CacheItem) AddAboutToExpireCallback(f func(interface{})) {
	item.Lock()
	defer item.Unlock()
	item.aboutToExpire = append(item.aboutToExpire, f)
}

SetAboutToExpireCallback

設定回撥函數需要完全替代,不同於新增,需要先清空,再覆蓋。

// SetAboutToExpireCallback configures a callback, which will be called right
// before the item is about to be removed from the cache.
func (item *CacheItem) SetAboutToExpireCallback(f func(interface{})) {
	if len(item.aboutToExpire) > 0 {
		item.RemoveAboutToExpireCallback()
	}
	item.Lock()
	defer item.Unlock()
	item.aboutToExpire = append(item.aboutToExpire, f)
}

RemoveAboutToExpireCallback

通過直接置空,刪除所有的回撥函數。

// RemoveAboutToExpireCallback empties the about to expire callback queue
func (item *CacheItem) RemoveAboutToExpireCallback() {
	item.Lock()
	defer item.Unlock()
	item.aboutToExpire = nil
}

NewCacheItem

建立 CacheItem 範例

func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
	t := time.Now()
	return &CacheItem{
		key:           key,
		lifeSpan:      lifeSpan,
		createdOn:     t,
		accessedOn:    t,
		accessCount:   0,
		aboutToExpire: nil,
		data:          data,
	}
}

cachetable.go

該檔案中總共有 3 個類:CacheTable、CacheItemPair 和 CacheItemPairList。

下面由簡單到複雜逐個分析。

CacheItemPair

CacheItemPair 用來記錄快取存取的次數。

// CacheItemPair maps key to access counter
type CacheItemPair struct {
	Key         interface{}
	AccessCount int64
}

CacheItemPairList

CacheItemPairList 是 CacheItemPair 的切片,通過實現方法 Swap、Len 和 Less 實現了 sort.Interface,支援排序。

需要注意方法 Less 的實現,是元素 i 大於元素 j,這種實現是為了降序排序。降序排序是為了方法 CacheTable.MostAccessed 返回存取次數最多的條目列表。

// CacheItemPairList is a slice of CacheItemPairs that implements sort.
// Interface to sort by AccessCount.
type CacheItemPairList []CacheItemPair

func (p CacheItemPairList) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
func (p CacheItemPairList) Len() int           { return len(p) }
func (p CacheItemPairList) Less(i, j int) bool { return p[i].AccessCount > p[j].AccessCount }

CacheTable

CacheTable 用來快取一個表,屬性解釋:

  • sync.RWMutex:讀寫鎖,保證並行讀寫安全。
  • name:表名。
  • items:表中的條目列表。
  • cleanupTimer:過期清除定時器。
  • cleanupInterval:過期清除的時間。
  • logger:列印紀錄檔的物件。
  • loadData:讀取不存在 key 的回撥函數,可以用來做初始化快取的邏輯。
  • addedItem:新增條目時的回撥函數,增加靈活性。
  • aboutToDeleteItem:刪除條目前的回撥函數,增加靈活性。

原始碼如下:

// CacheTable is a table within the cache
type CacheTable struct {
	sync.RWMutex

	// The table's name.
	name string
	// All cached items.
	items map[interface{}]*CacheItem

	// Timer responsible for triggering cleanup.
	cleanupTimer *time.Timer
	// Current timer duration.
	cleanupInterval time.Duration

	// The logger used for this table.
	logger *log.Logger

	// Callback method triggered when trying to load a non-existing key.
	loadData func(key interface{}, args ...interface{}) *CacheItem
	// Callback method triggered when adding a new item to the cache.
	addedItem []func(item *CacheItem)
	// Callback method triggered before deleting an item from the cache.
	aboutToDeleteItem []func(item *CacheItem)
}

下面會先介紹核心方法,再看簡單的方法。

Add 新增條目

程式碼邏輯通過流程圖描述了一下,其中的「過期檢查」單獨抽出來後面分析。

NotFoundAdd 和 Add 核心邏輯是一樣的,具體區別不做額外描述,原始碼如下:

// Add adds a key/value pair to the cache.
// Parameter key is the item's cache-key.
// Parameter lifeSpan determines after which time period without an access the item
// will get removed from the cache.
// Parameter data is the item's value.
func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
	item := NewCacheItem(key, lifeSpan, data)

	// Add item to cache.
	table.Lock()
	table.addInternal(item)

	return item
}

func (table *CacheTable) addInternal(item *CacheItem) {
	// Careful: do not run this method unless the table-mutex is locked!
	// It will unlock it for the caller before running the callbacks and checks
	table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name)
	table.items[item.key] = item

	// Cache values so we don't keep blocking the mutex.
	expDur := table.cleanupInterval
	addedItem := table.addedItem
	table.Unlock()

	// Trigger callback after adding an item to cache.
	if addedItem != nil {
		for _, callback := range addedItem {
			callback(item)
		}
	}

	// If we haven't set up any expiration check timer or found a more imminent item.
	if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
		table.expirationCheck()
	}
}

// NotFoundAdd checks whether an item is not yet cached. Unlike the Exists
// method this also adds data if the key could not be found.
func (table *CacheTable) NotFoundAdd(key interface{}, lifeSpan time.Duration, data interface{}) bool {
	table.Lock()

	if _, ok := table.items[key]; ok {
		table.Unlock()
		return false
	}

	item := NewCacheItem(key, lifeSpan, data)
	table.addInternal(item)

	return true
}

expirationCheck 過期檢查

過期檢查的處理,是一個值得學習的點,這裡並不是我們印象中用迴圈定期掃描哪些 key 過期了,也不是給每個條目分別定義一個定時器。

每次新增條目時,掃描得到最近過期條目的過期時間,僅定義一個定時器。該定時器觸發時清除快取,並生成下一個定時器,如此接力處理。

過期檢查中會呼叫方法 table.deleteInternal 來清除過期的 key,這塊兒在講 Delete 方法時會再詳細分析。

// Expiration check loop, triggered by a self-adjusting timer.
func (table *CacheTable) expirationCheck() {
	table.Lock()
	if table.cleanupTimer != nil {
		table.cleanupTimer.Stop()
	}
	if table.cleanupInterval > 0 {
		table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
	} else {
		table.log("Expiration check installed for table", table.name)
	}

	// To be more accurate with timers, we would need to update 'now' on every
	// loop iteration. Not sure it's really efficient though.
	now := time.Now()
	smallestDuration := 0 * time.Second
	for key, item := range table.items {
		// Cache values so we don't keep blocking the mutex.
		item.RLock()
		lifeSpan := item.lifeSpan
		accessedOn := item.accessedOn
		item.RUnlock()

		if lifeSpan == 0 {
			continue
		}
		if now.Sub(accessedOn) >= lifeSpan {
			// Item has excessed its lifespan.
			table.deleteInternal(key)
		} else {
			// Find the item chronologically closest to its end-of-lifespan.
			if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
				smallestDuration = lifeSpan - now.Sub(accessedOn)
			}
		}
	}

	// Setup the interval for the next cleanup run.
	table.cleanupInterval = smallestDuration
	if smallestDuration > 0 {
		table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
			go table.expirationCheck()
		})
	}
	table.Unlock()
}

Delete 方法

從流程圖可以看出,這塊兒大部分邏輯是在加鎖、釋放鎖,有這麼多鎖主要是有如下幾個原因:

  • 一部分是表級別的,一部分是條目級別的;
  • 表級別鎖出現兩次獲取與釋放,這種實現主要是考慮到 deleteInternal 的複用性,同時支援 Delete 和 expirationCheck 的呼叫,做了一些鎖回溯的邏輯。思考:假如 Mutex 是可重入鎖,是不是不需要回溯處理了?

// Delete an item from the cache.
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
	table.Lock()
	defer table.Unlock()

	return table.deleteInternal(key)
}

func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
	r, ok := table.items[key]
	if !ok {
		return nil, ErrKeyNotFound
	}

	// Cache value so we don't keep blocking the mutex.
	aboutToDeleteItem := table.aboutToDeleteItem
	table.Unlock()

	// Trigger callbacks before deleting an item from cache.
	if aboutToDeleteItem != nil {
		for _, callback := range aboutToDeleteItem {
			callback(r)
		}
	}

	r.RLock()
	defer r.RUnlock()
	if r.aboutToExpire != nil {
		for _, callback := range r.aboutToExpire {
			callback(key)
		}
	}

	table.Lock()
	table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
	delete(table.items, key)

	return r, nil
}

Value 取值

取值本身是比較簡單的,只不過這裡要進行一些額外處理:

  • 取不值時,是否有自定義邏輯,比如降級查詢後快取進去。
  • 取到值時,更新存取時間,達到保活的目的。

// Value returns an item from the cache and marks it to be kept alive. You can
// pass additional arguments to your DataLoader callback function.
func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
	table.RLock()
	r, ok := table.items[key]
	loadData := table.loadData
	table.RUnlock()

	if ok {
		// Update access counter and timestamp.
		r.KeepAlive()
		return r, nil
	}

	// Item doesn't exist in cache. Try and fetch it with a data-loader.
	if loadData != nil {
		item := loadData(key, args...)
		if item != nil {
			table.Add(key, item.lifeSpan, item.data)
			return item, nil
		}

		return nil, ErrKeyNotFoundOrLoadable
	}

	return nil, ErrKeyNotFound
}

MostAccessed 最常存取的條目

這個方法用到了前文提到的 CacheItemPair 和 CacheItemPairList。

  • 首先遍歷條目,取出 key 和 accessCount 儲存到 p 中,用來排序;
  • 接著用有序的 p 對映出所有條目的順序,返回有序的條目。
// MostAccessed returns the most accessed items in this cache table
func (table *CacheTable) MostAccessed(count int64) []*CacheItem {
	table.RLock()
	defer table.RUnlock()

	p := make(CacheItemPairList, len(table.items))
	i := 0
	for k, v := range table.items {
		p[i] = CacheItemPair{k, v.accessCount}
		i++
	}
	sort.Sort(p)

	var r []*CacheItem
	c := int64(0)
	for _, v := range p {
		if c >= count {
			break
		}

		item, ok := table.items[v.Key]
		if ok {
			r = append(r, item)
		}
		c++
	}

	return r
}

Foreach 方法

為開發者提供更加豐富的自定義操作。

// Foreach all items
func (table *CacheTable) Foreach(trans func(key interface{}, item *CacheItem)) {
	table.RLock()
	defer table.RUnlock()

	for k, v := range table.items {
		trans(k, v)
	}
}

清空快取

清空快取的方法比較簡單,一方面是資料的清空,另一方面是定時器的清空。

// Flush deletes all items from this cache table.
func (table *CacheTable) Flush() {
	table.Lock()
	defer table.Unlock()

	table.log("Flushing table", table.name)

	table.items = make(map[interface{}]*CacheItem)
	table.cleanupInterval = 0
	if table.cleanupTimer != nil {
		table.cleanupTimer.Stop()
	}
}

查詢相關方法

Count 和 Exists 方法是比較簡單的,不用多說。

// Count returns how many items are currently stored in the cache.
func (table *CacheTable) Count() int {
	table.RLock()
	defer table.RUnlock()
	return len(table.items)
}

// Exists returns whether an item exists in the cache. Unlike the Value method
// Exists neither tries to fetch data via the loadData callback nor does it
// keep the item alive in the cache.
func (table *CacheTable) Exists(key interface{}) bool {
	table.RLock()
	defer table.RUnlock()
	_, ok := table.items[key]

	return ok
}

Set 相關方法

下面這些 Set 方法比較簡單,也不多做贅述。

// SetDataLoader configures a data-loader callback, which will be called when
// trying to access a non-existing key. The key and 0...n additional arguments
// are passed to the callback function.
func (table *CacheTable) SetDataLoader(f func(interface{}, ...interface{}) *CacheItem) {
	table.Lock()
	defer table.Unlock()
	table.loadData = f
}

// SetAddedItemCallback configures a callback, which will be called every time
// a new item is added to the cache.
func (table *CacheTable) SetAddedItemCallback(f func(*CacheItem)) {
	if len(table.addedItem) > 0 {
		table.RemoveAddedItemCallbacks()
	}
	table.Lock()
	defer table.Unlock()
	table.addedItem = append(table.addedItem, f)
}

//AddAddedItemCallback appends a new callback to the addedItem queue
func (table *CacheTable) AddAddedItemCallback(f func(*CacheItem)) {
	table.Lock()
	defer table.Unlock()
	table.addedItem = append(table.addedItem, f)
}

// SetAboutToDeleteItemCallback configures a callback, which will be called
// every time an item is about to be removed from the cache.
func (table *CacheTable) SetAboutToDeleteItemCallback(f func(*CacheItem)) {
	if len(table.aboutToDeleteItem) > 0 {
		table.RemoveAboutToDeleteItemCallback()
	}
	table.Lock()
	defer table.Unlock()
	table.aboutToDeleteItem = append(table.aboutToDeleteItem, f)
}

// AddAboutToDeleteItemCallback appends a new callback to the AboutToDeleteItem queue
func (table *CacheTable) AddAboutToDeleteItemCallback(f func(*CacheItem)) {
	table.Lock()
	defer table.Unlock()
	table.aboutToDeleteItem = append(table.aboutToDeleteItem, f)
}

// SetLogger sets the logger to be used by this cache table.
func (table *CacheTable) SetLogger(logger *log.Logger) {
	table.Lock()
	defer table.Unlock()
	table.logger = logger
}

刪除相關方法

過於簡單,不做贅述

// RemoveAddedItemCallbacks empties the added item callback queue
func (table *CacheTable) RemoveAddedItemCallbacks() {
	table.Lock()
	defer table.Unlock()
	table.addedItem = nil
}

// RemoveAboutToDeleteItemCallback empties the about to delete item callback queue
func (table *CacheTable) RemoveAboutToDeleteItemCallback() {
	table.Lock()
	defer table.Unlock()
	table.aboutToDeleteItem = nil
}

cache.go

Cache 函數是該快取庫的入口函數,該函數存在一段雙檢邏輯,需要特別瞭解下原因:

  • mutex.Lock() 獲取鎖過程中,可能另一協程已經完成了初始化。因此,需要再次校驗。
// Cache returns the existing cache table with given name or creates a new one
// if the table does not exist yet.
func Cache(table string) *CacheTable {
	mutex.RLock()
	t, ok := cache[table]
	mutex.RUnlock()

	if !ok {
		mutex.Lock()
		t, ok = cache[table]
		// Double check whether the table exists or not.
		if !ok {
			t = &CacheTable{
				name:  table,
				items: make(map[interface{}]*CacheItem),
			}
			cache[table] = t
		}
		mutex.Unlock()
	}

	return t
}

examples

樣例也比較簡單,讀者可以自行閱讀下。

參照

  1. https://github.com/muesli/cache2go
  2. https://mp.weixin.qq.com/s/6JjL0KVccW7nAQiKuDAl-w
  3. https://mp.weixin.qq.com/s/gIvNjn7GdOQUwg1pKtDTEQ
  4. https://mp.weixin.qq.com/s/898HtDyFTykvMu2-vvMy-A