抓到 Netty 一個隱藏很深的記憶體洩露 Bug | 詳解 Recycler 物件池的精妙設計與實現

2022-07-06 21:01:02

歡迎關注公眾號:bin的技術小屋,如果大家在看文章的時候發現圖片載入不了,可以到公眾號檢視原文

本系列Netty原始碼解析文章基於 4.1.56.Final版本

最近在 Review Netty 程式碼的時候,不小心用我的肉眼抓到了一個隱藏很深很深的記憶體洩露 Bug。

於是筆者將這個故事....哦不 .....事故,詳細的闡述出來分享給大家。

這將是一篇很長很長的故事,在本文中筆者會詳細描述這個記憶體洩露 Bug 的發現,分析,修復過程。順便將物件池在 Netty 中的一些精妙的設計方案及其原始碼實現一起詳盡地展現給大家。

故事從何說起呢?讓我們回到另一個月黑風高天空還是顯得那麼深邃遙遠的夜晚,筆者再一次閒來無事捧起 Netty 物件池相關部分原始碼細細品讀的時候,突然菊花一緊,虎軀一震。意外的用肉眼盯出了一個記憶體洩露Bug出來。

於是筆者順手一個 Issue,反手一個修復 PR 提交了過去。

Issue11864 : https://github.com/netty/netty/issues/11864

PR : https://github.com/netty/netty/pull/11865

巧合的是 Netty 也意識到了物件池這塊的問題,Netty 最近也正在重構這一塊,因為 Recycler 整體設計的還是比較複雜的,這可以從我們接下來要分析的物件池原始碼實現中可以看的出來,Recycler 的複雜性在於它的使用場景混合了並行以及與 GC 相關的互動,這些相關的問題都比較難以定位,所以 Netty 決定將物件池這一塊用一種更加容易被理解的方式重構掉。

這位說話特別好聽的 chrisvest 大佬也說了 筆者發現的這個 Bug 也間接證明了 Netty 要簡化物件池設計的正確性和必要性。

隨口提一句,這個大牛 chrisvest 是大名鼎鼎的圖資料庫 Neo4j 的核心commitor,同時也是Netty Buffer相關 API 的設計者。

這裡先不詳細解釋這個 Issue,也不建議大家現在就開啟這個 Issue 檢視,筆者會在本文的介紹中隨著原始碼深入的解讀慢慢的為大家一層一層地撥開迷霧。

下面就讓我們一起帶著懷疑,審視,欣賞,崇敬,敬畏的態度來一起品讀世界頂級程式設計師編寫的程式碼。由衷的感謝他們在這一領域做出的貢獻。

1. 池化思想的應用

在我們日常開發工作中我們經常會遇到各種池化技術的設計思想,比如連線池,記憶體池,物件池,還有我們在業務開發過程中經常會快取一些業務計算結果資料這也同樣運用到了池化技術的設計思想,我們可以叫它為結果池。

池化技術的應用場景就是當一個物件的建立和銷燬需要付出比較大的效能開銷時,我們就需要將這些重量級物件放在一個池子裡管理,當需要時直接從池子裡獲取避免重複建立和銷燬的開銷從而達到了複用的效果。

比如連線池裡面儲存管理的都是一些網路連線物件,這些物件建立和銷燬的代價比較大。通過連線池將這些重量級的網路連線物件統一管理起來,業務執行緒可以直接複用,避免了重新建立,釋放連線的效能開銷以及等待時間。

還有我們在日常開發中遇到的一些計算邏輯複雜的業務,我們通常會先從資料庫中查詢資料,然後經過複雜的計算得到結果,為了避免下次在重複計算,我們會將計算結果放入快取中,我們可以稱做結果池。也是一種池化思想。

再比如我們在《Netty如何高效接收網路資料》一文中提到的記憶體池,為了避免不必要的資料拷貝以及JVM垃圾回收對效能的影響,Netty 選擇使用堆外記憶體儲存網路通訊資料。在 Netty 申請堆外記憶體之前,首先會在 JVM 堆中建立一個用於參照 native memory 的參照物件 DirectByteBuffer ,隨後會使用 native 方法 unsafe.allocateMemory 通過底層 malloc 系統呼叫申請一塊堆外記憶體。

這裡就涉及到到兩個重要開銷:

  • 在 JVM 堆中建立物件 DirectByteBuffer ,併為該物件申請分配 JVM 堆記憶體。

  • 通過 malloc 系統呼叫向作業系統申請堆外記憶體,然後被 DirectByteBuffer 參照。但是堆外記憶體的申請和釋放遠比堆內記憶體申請和釋放的開銷要大很多。

而在 Netty 面對的高並行網路通訊場景下,申請堆外記憶體是一個非常頻繁的操作,基於以上提到的兩個重要效能開銷,這種大量頻繁的記憶體申請釋放操作對程式的效能影響是巨大的,所以 Netty 就引入了記憶體池對記憶體相關的操作進行統一的管理。

2. 物件池簡介

以上內容的介紹就是池化思想的應用以及它所解決的問題,本文我們的主題是介紹物件池,物件池的引入是為了在需要大量建立物件以及銷燬物件的場景下,將物件進行池化以達到複用池中物件,避免大量地重複建立物件以及銷燬物件的效能開銷,

前邊我們在提到記憶體池的時候說到,在 Netty 所要面對的高並行網路通訊場景下,需要大量的申請堆外記憶體用來儲存通訊資料。在 Netty 中,我們通過 PooledDirectByteBuf 物件來參照堆外記憶體。所以 Netty 在處理網路 IO 的時候是需要大量頻繁的建立 PooledDirectByteBuf 物件。

為了避免在高並行的場景下大量的建立物件所引來的效能開銷,我們可以引入物件池來池化建立出來的 PooledDirectByteBuf 物件,需要用的時候直接從物件池中獲取,用完之後在回收到物件池中。

另外這裡提前向大家透露一點的是我們下篇文章中即將要介紹的 Netty 傳送資料流程涉及到的物件池的應用。我們都知道 Netty 是一個非同步事件驅動的高效能網路框架,當在業務執行緒中處理完業務邏輯準備響應業務結果到使用者端的時候,我們會向對應 channel 寫入業務結果,此時業務執行緒會立即返回,這是一個非同步的過程。

原因是在底層實現中,Netty 會將使用者的響應結果資料暫時寫入到每個 Channel 特有的一個傳送緩衝佇列 ChannelOutboundBuffer 中,也就是說這個 ChannelOutboundBuffer 快取著 Channel 中的待傳送資料。最終會通過 flush 方法,將 ChannelOutboundBuffer 中的這些待傳送資料寫入到底層 Socket 中,從而傳送給使用者端。

而這個傳送緩衝佇列 ChannelOutboundBuffer 中的佇列元素是一個 Entry 型別的,每次的寫入操作需要建立一個 Entry 物件來包裹傳送資料,並將這個 Entry 物件快取在傳送緩衝佇列 ChannelOutboundBuffer 中。

這裡大家只需要知道 ChannelOutboundBuffer 是個啥,它的大概作用,以及這個緩衝佇列快取的物件是 Entry 型別的就可以了,我們會在下篇文章為大家詳細介紹,這裡引出只是為了介紹物件池的應用場景。

所以Netty在面對海量網路 IO 的場景下,必定會大量頻繁地去建立 Entry 物件,那麼每一次的網路 IO 都要重新建立這些物件,並且用完又要被垃圾回收掉這樣無疑會大量增加 JVM 的負擔以及 GC 的時間,這對於最求極致效能的 Netty 來說肯定是不可接受的。

基於以上這幾種情況,物件池被用來管理那些需要頻繁建立使用的物件,在使用完後並不立即將它們釋放,而是將它們在物件池中快取起來,以供後續的應用程式重複使用,從而減少建立物件和釋放物件的開銷,進而改善應用程式的效能。

從另一方面來看,物件池還可以將物件限制在一定的數量內從而可以有效減少應用程式在記憶體上的開銷。


通過前邊關於物件池的簡要介紹之後,我想大家現在可能比較好奇這些物件在建立和回收的過程中到底需要哪些開銷呢?

接下來筆者就為大家介紹下這些開銷方面的內容方便大家更加全面清晰地理解物件池。

3. 物件在JVM中建立和回收開銷

3.1 物件的建立開銷

在 Java 程式中我們可以通過一個 new 關鍵字來建立物件,而當JVM遇到一條 new 的位元組碼指令後,會發生什麼呢?

  1. 首先 JVM 要去檢查 new 指令後面的引數也就是建立物件所屬的 Java 類是否能夠在方法區的常數池中定位到類的符號參照,進而檢查這個符號參照所代表的類是否已經載入,解析,初始化過。如果沒有,就需要先執行類的載入過程。

  2. 當通過類載入檢查之後,就開始為物件分配記憶體,而物件所需記憶體大小其實在類載入完成後就已經確定了。JVM要做的事情就是將一塊確定大小的記憶體區域從JVM堆中劃分出來。

關於如何確定物件所需記憶體大小,對這方面細節感興趣的同學可以回看下筆者的《物件在JVM中的記憶體佈局》這篇文章。

  1. 而在為物件劃分堆中記憶體的時候又會根據JVM堆中記憶體是否規整,從而分為指標碰撞法和空閒列表法。而多執行緒同時建立物件在JVM中是非常常見的行為,所以在多執行緒並行建立物件的時候JVM又需要保證劃分記憶體時的執行緒安全性。JVM需要對劃分記憶體空間的動作進行同步處理(CAS + 失敗重試)。

  2. 而為了避免這種劃分記憶體時的同步鎖定,JVM提供了另外一種方式就是每個執行緒先預先向JVM堆申請一塊記憶體(本地執行緒分配快取-TLAB),這樣當執行緒建立物件的時候,先是從自己的TLAB中為物件分配記憶體,當自己的TLAB用完時,才會去JVM堆中同步分配。 我們可以通過虛擬機器器引數-XX:+UseTLAB開啟TLAB(預設)。-XX:-UseTLAB關閉TLAB。

大家這裡需要記住這種利用TLAB的分配方式,因為Netty中的物件池Recycler也是利用這種思想避免多執行緒獲取物件的同步開銷。

  1. 在為物件分配好記憶體之後,JVM會將這塊記憶體初始化為零值。這樣就可以保證物件中的範例欄位不賦初始值就可以直接使用,其值為欄位對應資料型別的零值。

  2. 設定物件頭。包括設定MarkWord中的物件執行時資訊。以及通過型別指標參照關聯到類的後設資料資訊。這些內容我們在《物件在JVM中的記憶體佈局》一文中都有提到過,大家還記得嗎?

  3. 執行建構函式。這樣一個真正可用的物件就被建立出來了。

3.2 物件的回收開銷

  • JVM中的垃圾回收器通過可達性分析來探索所有Java存活物件,從GC ROOTS出發邊標記邊探索所有物件的參照鏈,以判斷物件是否存活。

  • 垃圾回收器在垃圾回收的過程中發生的GC PAUSE也就是STOP THE WORLD。這裡詳細的垃圾回收過程我們就不展開了,主要是為了指明在物件回收時最主要的兩個開銷點。


然而在高並行的網路IO處理場景下,這些單個物件的建立和回收開銷會被無限放大,於是Netty引入了一個輕量級的物件池 Recycler 來負責將這些需要頻繁建立的物件進行池化,統一分配,回收管理。

在為大家詳細介紹物件池 Recycler 的實現之前,筆者想先從物件池的使用上先讓大家可以直觀地感受一下 Recycler 對外提供的功能入口。

4. 物件池Recycler的使用

這裡我們直接看下Netty原始碼中是如何使用Recycler物件池的,首先我們來看下物件池在 PooledDirectByteBuf 類中是如何使用的。

大家這裡先不用去管這個PooledDirectByteBuf類是幹嗎的,只需要明白這個類是會被頻繁建立的,我們這裡主要是演示物件池的使用。

4.1 物件池在PooledDirectByteBuf類中的使用

final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
    //建立物件池
    private static final ObjectPool<PooledDirectByteBuf> RECYCLER = ObjectPool.newPool(
            new ObjectCreator<PooledDirectByteBuf>() {
        @Override
        public PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
            return new PooledDirectByteBuf(handle, 0);
        }
    });

    //物件在物件池中的回收控制程式碼
    private final Handle<PooledByteBuf<T>> recyclerHandle;

    static PooledDirectByteBuf newInstance(int maxCapacity) {
        //從物件池中獲取物件
        PooledDirectByteBuf buf = RECYCLER.get();
        buf.reuse(maxCapacity);
        return buf;
    }
    
    private void recycle() {
         //回收物件
        recyclerHandle.recycle(this);
    }

    ................省略和物件池無關的程式碼..................
}

前邊我們提到在Netty中需要大量頻繁的建立PooledDirectByteBuf物件,為了避免在高並行場景下頻繁建立物件的開銷從而引入了物件池來統一管理PooledDirectByteBuf物件。

Netty中每個被池化的物件中都會參照物件池的範例ObjectPool RECYCLER ,這個物件池的範例就是專門用來分配和管理被池化物件的。

這裡我們建立出來的物件池是專門用來管理PooledDirectByteBuf物件的(通過泛型指定物件池需要管理的具體物件)。泛型類ObjectPool<T>是Netty為物件池設計的一個頂層抽象。物件池的行為功能均定義在這個泛型抽象類中。我們可以通過 ObjectPool#newPool 方法建立指定的物件池。其引數 ObjectCreator 介面用來定義建立池化物件的行為。當物件池中需要建立新物件時,就會呼叫該介面方法 ObjectCreator#newObject 來建立物件。

其中每個池化物件中都會包含一個recyclerHandle,這個recyclerHandle是池化物件在物件池中的控制程式碼。裡邊封裝了和物件池相關的一些行為和資訊,recyclerHandle是由物件池在建立物件後傳遞進來的。

當我們需要PooledDirectByteBuf物件時,我們直接通過RECYCLER.get()從PooledDirectByteBuf物件池中獲取物件即可。

當我們使用完畢後,直接呼叫PooledDirectByteBuf物件在物件池中的控制程式碼recyclerHandle.recycle(this) 把物件回收到物件池中。

4.2 物件池在Channel寫入緩衝佇列中的使用

前邊提到,每個Channel都會有一個獨立的寫入緩衝佇列ChannelOutboundBuffer,用來暫時儲存使用者的待傳送資料。這樣使用者可以在呼叫channel的write方法之後立馬返回,實現非同步傳送流程。

在傳送資料時,Channel首先會將使用者要傳送的資料快取在自己的寫快取佇列ChannelOutboundBuffer中。而ChannelOutboundBuffer中的元素型別為Entry。在Netty中會大量頻繁的建立Entry物件。所以Entry物件同樣也需要被物件池管理起來。

在上小節介紹PooledDirectByteBuf物件池的過程中,我想大家已經對物件池的使用套路已經有了大概的瞭解。這裡我們藉助Entry物件池將使用步驟總結如下:

建立物件池

   static final class Entry {

        private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
            @Override
            public Entry newObject(Handle<Entry> handle) {
                return new Entry(handle);
            }
        });

        //recyclerHandle用於回收物件
        private  Handle<Entry> handle;
        
        private Entry(Handle<Entry> handle) {
            this.handle = handle;
        }
   }

前邊我們介紹到每一個要被池化的物件都需要一個靜態變數來參照其對應的物件池。

static final ObjectPool<Entry> RECYCLER 

匿名實現 ObjectCreator 介面來定義物件建立的行為方法。

    public interface ObjectCreator<T> {
        T newObject(Handle<T> handle);
    }

通過ObjectPool#newPool 建立用於管理Entry物件的物件池。

在物件池建立物件時,會為池化物件建立其在物件池中的控制程式碼Handler,隨後將Handler傳入建立好的池化物件中。當物件使用完畢後,我們可以通過Handler來將物件回收至物件池中等待下次繼續使用。

從物件池中獲取物件

由於Entry物件在設計上是被物件池管理的,所以不能對外提供public建構函式,無法在外面直接建立Entry物件。

所以池化物件都會提供一個獲取物件範例的 static 方法 newInstance。在該方法中通過RECYCLER.get()從物件池中獲取物件範例。

      static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
            Entry entry = RECYCLER.get();
            
            .........省略無關程式碼..............

            return entry;
        }

使用完畢回收物件

池化物件都會提供一個 recycle 方法,當物件使用完畢後,呼叫該方法將物件回收進物件池中。

        void recycle() {
            next = null;
            bufs = null;
            buf = null;
            msg = null;
            promise = null;
            progress = 0;
            total = 0;
            pendingSize = 0;
            count = -1;
            cancelled = false;
            handle.recycle(this);
        }
  • 清空物件中的所有屬性。

  • 通過物件中持有的物件池控制程式碼Handler,將物件回收進物件池中。


從上邊所列舉的Netty中使用物件池的例子,我們可以直觀的感受到物件池的使用非常簡單。無非就是從物件池獲取物件,將物件回收至物件池這兩個核心步驟。

同時我們也注意到池化物件的設計和普通物件是有所不同的,不過,我們只需要遵循本小節中所列舉的幾個步驟進行設計即可。

5. Recycler總體設計

Recycler物件池的設計還是比較複雜的但是卻很精妙,所以筆者這裡繼續採用總 - 分 - 總的結構來為大家介紹物件池的設計與實現。

一開始我們先不要去追求太過細節的內容,先要從總體上摸清楚物件池的設計架構,以及各個功能模組之間的關聯。

當我們從整體上理解了物件池的設計架構後,筆者後面會分模組來各個擊破它的實現細節。

在理清楚各個模組的實現細節之後,筆者將在從細節著手再次將物件池的整體設計架構為大家串聯起來。

我們按照這個思路先來看一下Recycler物件池的總體架構設計圖,從整體直觀上來感受下它的設計,以及包含的一些重要模組。

5.1 多執行緒獲取物件無鎖化設計

首先我們從外部整體來看,物件池對於我們來說它就是一個儲存物件的池子,當我們需要物件時會從這個池子裡直接獲取,用完物件時在把物件歸還回池子中方便下一次重複使用。

但我們俯瞰整個物件池的設計架構時,我們發現整個設計還是比較複雜其中蘊含了不少精妙的細節。

物件池中最重要的兩個結構分別是 Stack 和 WeakOrderQueue。

Stack 中包含一個用陣列實現的棧結構(圖中綠色部分),這個棧結構正是物件池中真正用於儲存池化物件的地方,我們每次從物件池中獲取物件都會從這個棧結構中彈出棧頂元素。同樣我們每次將使用完的物件歸還到物件池中也是將物件壓入這個棧結構中。

這裡有一個精妙的設計,我們從圖中可以看到每個執行緒都會擁有一個屬於自己的Stack。在我們介紹《物件建立的開銷》這一小節內容時,提到為了避免多執行緒並行申請記憶體時的同步鎖定開銷,JVM為每個執行緒預先申請了一塊記憶體(TLAB),這樣當執行緒建立物件時都是從自己的TLAB中為物件分配記憶體。從而避免了多執行緒之間的同步競爭。

同樣當多執行緒並行從物件池中獲取物件時, 如果整個物件池只有一個Stack結構的話,為了保證多執行緒獲取物件的執行緒安全性,我們只能同步地來存取這個Stack,這樣就為物件池的設計引入了多執行緒同步競爭的開銷。

為了避免這種不必要的同步競爭,Netty也採用了類似TLAB分配記憶體的方式,每個執行緒擁有一個獨立Stack,這樣當多個執行緒並行從物件池中獲取物件時,都是從自己執行緒中的Stack中獲取,全程無鎖化執行。大大提高了多執行緒從物件池中獲取物件的效率

這種多執行緒並行無鎖化的設計思想,在Netty中比比皆是

5.2 Stack的設計

從Recycler物件池的整體設計架構圖中我們可以看到,Stack的設計主要分為兩個重要的部分:

  • 一個是我們前邊提到的陣列實現的棧結構用來存放物件池中的物件,每個執行緒繫結一個獨立的Stack用來儲存由該執行緒建立出來並回收到物件池中的物件。

  • 另一個重要的結構是WeakOrderQueue連結串列,head 指標指向WeakOrderQueue連結串列的頭結點,cursor 指標指向連結串列的當前節點,prev 指標指向當前節點的前一個節點。WeakOrderQueue連結串列是用來儲存其他執行緒幫助本執行緒回收的物件(我們稱之為待回收物件)。其中WeakOrderQueue連結串列中的每一個節點對應一個其他執行緒,這個其他執行緒為本執行緒回收的物件儲存在對應的WeakOrderQueue節點中。

這裡我們先不需要管WeakOrderQueue的具體結構

那麼Stack結構在設計上為什麼要引入這個WeakOrderQueue連結串列呢

讓我們考慮一種多執行緒回收物件的場景,我們還是以Recycler物件池的整體設計架構圖為例。thread1 為當前執行緒,剩下的thread2 , thread3 , thread4為其他執行緒。讓我們把視角先聚焦在當前執行緒上。

我們先假設Stack結構中只有一個陣列棧,並沒有WeakOrderQueue連結串列。看看這樣會產生什麼後果?

當前執行緒 thread1 在處理業務邏輯時,建立了一個物件(注意:這個物件是由thread1建立的)如果這是一個單執行緒處理業務的場景,那麼物件會在thread1處理完業務邏輯後被回收至thread1對應的stack1中的陣列棧中。當`hread1再次需要建立物件時,會直接從其對應的stack1中的陣列棧(圖中綠色部分)中直接獲取上次回收的物件。

由這一點可以看出Stack中的陣列棧(綠色部分)存放的是真正被回收的物件,是可以直接被再次獲取使用的。

但如果這是一個多執行緒處理業務場景的話,很可能由thread1建立出來的物件,會被交給thread2或者thread3去處理剩下的業務邏輯,那麼當thread2或者thread3這些其他執行緒處理完業務邏輯時,此時物件的釋放並不是在thread1中,而是在其他執行緒中。

其他執行緒現在面對的任務就是要將由thread1建立出來的物件,釋放回收至thread1對應的stack1中的陣列棧中。如果此時多個其他執行緒並行的向stack1釋放回收物件,勢必會導致多執行緒之前的同步競爭,Netty將不得不把Stack結構中的陣列棧的存取設計成一個同步過程

那麼如果此時更不巧的是當前執行緒thread1又要同時向自己的Stack1獲取物件,thread1就只能同步等待,因為此時其他執行緒正在向Stack1釋放物件。

本來我們引入物件池的目的就是為了抵消建立物件的開銷加快獲取物件的速度,減少GC的壓力。結果由於Stack的同步存取設計又引入了同步開銷。這個同步的開銷甚至會比建立物件的開銷還要大,那麼物件池的引入就變得得不償失了。

那麼Netty該如何化解這種情況呢?答案還是之前反覆強調的無鎖化設計思想。

既然多執行緒的回收物件場景,會引入多執行緒之間的同步鎖定開銷,那麼我們就繼續採用無鎖化的設計思想,為每個執行緒(注意:這裡指的是非建立物件的執行緒也就是圖中的thead2 , thread3 ....)單獨分配一個WeakOrderQueue節點,每個執行緒在為建立執行緒回收物件時,會將這些物件暫時存放到自己對應的WeakOrderQueue節點中。

注意:存放進WeakOrderQueue中的物件我們稱為待回收物件,這些待回收物件並不在Stack結構中的陣列棧中,因此並不能被直接獲取使用。

為了方便後續描述,我們把建立物件的執行緒稱作建立執行緒(範例中的thread1),將為建立執行緒回收物件的其他執行緒稱作回收執行緒(範例中的thread2 , thread3 , thead4 .....)。

我們在將視角拉回到建立執行緒thread1對應的stack1中,每個回收執行緒將待回收物件放入與自己對應的WeakOrderQueue節點中,這樣就避免了在多執行緒回收場景中的同步競爭。當所有回收執行緒都在為stack1回收物件時,這樣在stack1中就形成了一個WeakOrderQueue連結串列。每個回收執行緒只操作與自己對應的節點。在Stack結構中通過head,prev,cursor將這些WeakOrderQueue節點組成了一個連結串列。

每一個WeakOrderQueue節點對應一個回收執行緒。

而當建立執行緒thread1再次從自己對應的Stack1中獲取物件時,只會從Stack結構的陣列棧中獲取,因為是單執行緒運算元組棧,自然是不會存在同步競爭的。

當Stack結構中的陣列棧沒有任何物件時,那麼建立執行緒就會根據 cursor 指標遍歷Stack結構中的WeakOrderQueue連結串列,將當前WeakOrderQueue節點存放的待回收物件轉移至陣列棧中。如果WeakOrderQueue連結串列中也沒有任何待回收物件可以轉移。那麼建立執行緒在物件池中就直接建立一個物件出來返回。

物件池回收物件的一個原則就是物件由誰建立的,最終就要被回收到建立執行緒對應的Stack結構中的陣列棧中。陣列棧中存放的才是真正被回收的池化物件,可以直接被取出複用。回收執行緒只能將待回收物件暫時存放至建立執行緒對應的Stack結構中的WeakOrderQueue連結串列中。當陣列棧中沒有物件時,由建立執行緒將WeakOrderQueue連結串列中的待回收物件轉移至陣列棧中。

正是由於物件池的這種無鎖化設計,物件池在多執行緒獲取物件和多執行緒回收物件的場景下,均是不需要同步的

大家在體會下這張圖中蘊含的這種無鎖化設計思想

5.3 WeakOrderQueue的設計

在我們介紹完物件池在多執行緒回收物件場景下的設計時,我們再來看下用於回收執行緒儲存待回收物件的WeakOrderQueue是如何設計的?

注意:這裡的回收執行緒,待回收物件這些概念是我們站在建立執行緒的視角提出的相對概念。

大家一開始可能從WeakOrderQueue字面意思上以為它的結構是一個佇列,但實際上從圖中我們可以看出WeakOrderQueue的結構其實是一個連結串列結構。

其中包含了連結串列的頭結點 Head,以及連結串列尾結點指標 Tail。連結串列中的元素型別為 Link 型別。

Link 型別中包含了一個 elements 陣列,該陣列用來存放回收執行緒收集的待回收物件。

除此之外Link型別中還包含了readIndex用來指示當前elements陣列中的讀取位置。writeIndex用來指示elements陣列的寫入位置。elements陣列中的容量預設為16,也就是說一個Link節點最多可以存放16個待回收物件。當回收執行緒收集的待回收物件超過16個時,就會新建立一個Link節點插入到Link連結串列的尾部。

當需要將WeakoOrderQueue節點中所存放的待回收物件回收轉移至其對應的Stack結構中的陣列棧中時,建立執行緒會遍歷當前WeakOrderQueue節點中的Link連結串列,然後從連結串列的Head節點開始,將Head節點中包裹的Link連結串列頭結點中存放的待回收物件回收至建立執行緒對應的Stack中。一次最多轉移一個Link大小的待回收物件(16個)。

當Link節點中的待回收物件全部轉移至建立執行緒對應的Stack中時,會立馬將這個Link節點從當前WeakOrderQueue節點中的Link連結串列裡刪除,隨後Head節點向後移動指向下一個Link節點。

head指標始終指向第一個未被轉移完畢的Link節點,建立執行緒從head節點處讀取轉移待回收物件,回收執行緒從Tail節點處插入待回收物件。這樣轉移操作和插入操作互不影響、沒有同步的開銷

注意這裡會存線上程可見性的問題,也就是說回收執行緒剛插入的待回收物件,在建立執行緒轉移這些待回收物件時,建立執行緒可能會看不到由回收執行緒剛剛插入的待回收物件。

Netty這裡為了不引入多執行緒同步的開銷,只會保證待回收物件的最終可見性。 因為如果要保證待回收物件的實時可見性,就要插入一些記憶體屏障指令,執行這些記憶體屏障指令也是需要開銷的。

事實上這裡也並不需要保證實時可見性,建立執行緒暫時看不到WeakOrderQueue節點中的待回收物件也是沒關係的,大不了就新建立一個物件。這裡還是遵循無鎖化的設計思想

維護執行緒之間操作的原子性,可見性都是需要開銷的,我們在日常多執行緒程式設計中一定要根據業務場景來綜合考慮,權衡取捨。儘量遵循我們這裡多次強調的多執行緒無鎖化設計思想。提高多執行緒的執行效率。避免引入不必要的同步開銷。

綜合以上 Netty Recycler 物件池的設計原理,我們看到多執行緒從物件池中獲取物件,以及多執行緒回收物件至物件池中,還有建立執行緒從WeakOrderQueue連結串列中轉移待回收物件到物件池中。這些步驟均是無鎖化進行的,沒有同步競爭。

在理解了物件池的基本設計原理後,下面就該介紹物件池在Netty中的原始碼實現環節了。

6. Recycler物件池的實現

在小節《4. 物件池Recycler的使用》中我們介紹了Recycler物件池的兩個使用案例:

  • 一個是物件池在PooledDirectByteBuf類中的運用。

  • 另一個是物件池在Channel對應的寫入緩衝佇列ChannelOutboundBuffer中的運用。

從這兩個案例中,我們看到在設計池化物件時,都需要在池化物件內部持有一個物件池的靜態參照從而可以與物件池進行互動,參照型別為 ObjectPool ,ObjectPool 是Netty物件池的頂層設計,其中定義了物件池的行為,以及各種頂層介面。

在介紹物件池的整體實現之前,我們先來看下物件池的這個頂層介面設計。

6.1 物件池的頂層設計ObjectPool

public abstract class ObjectPool<T> {

    ObjectPool() { }

    public abstract T get();

    public interface Handle<T> {
        void recycle(T self);
    }

    public interface ObjectCreator<T> {
        T newObject(Handle<T> handle);
    }

    ......................省略............

}

我們首先看到 ObjecPool 被設計成為一個泛型的抽象類,之所以使用泛型,是因為我們在建立物件池的時候需要指定物件池中被池化物件的型別。

比如《4. 物件池Recycler的使用》小節中的這兩個案例:

static final class Entry {

    private static final ObjectPool<Entry> RECYCLER

}
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {

    private static final ObjectPool<PooledDirectByteBuf> RECYCLER

}

ObjecPool 定義了從物件池中獲取物件的行為:

public abstract T get();

將池化物件回收至物件池中的行為被定義在 Handler 內部介面中:

     public interface Handle<T> {
        void recycle(T self);
    } 

Handler是池化物件在物件池中的一個模型,Handler裡面包裹了池化物件,幷包含了池化物件的一些回收資訊,以及池化物件的回收狀態。它的預設實現是DefaultHandle,後面我們會詳細介紹。

我們前邊介紹到的Stack結構中的陣列棧裡邊存放的就是DefaultHandle,以及WeakOrderQueue結構裡的Link節點中的elements陣列裡存放的也是DefaultHandle。

那麼為什麼要將池化物件的回收行為recycle定義在Handler中,而不是ObejctPool中呢

讓我們站在業務執行緒的角度來看,其實業務執行緒處理的都是物件級別這個維度,並不需要感知到物件池的存在,使用完物件,直接呼叫物件的回收方法recycle將池化物件回收掉即可。

在《4. 物件池Recycler的使用》小節我們介紹過池化物件的設計方法,其中我們提到池化物件中需要參照其在物件池中的Handler,這個Handler會在物件池建立物件的時候傳入。池化物件型別中需要定義recycle方法,recycle方法清空池化物件的所有屬性,並呼叫Handler的recycle方法將池化物件回收至物件池中。

static final class Entry {

        void recycle() {
            next = null;
            bufs = null;
            buf = null;
            msg = null;
            promise = null;
            progress = 0;
            total = 0;
            pendingSize = 0;
            count = -1;
            cancelled = false;
            handle.recycle(this);
        }

}

ObjectPool 還定義了物件池建立物件的行為介面:

    public interface ObjectCreator<T> {
        T newObject(Handle<T> handle);
    }

使用者在建立物件池的時候,需要通過ObjectCreator#newObject方法指定物件池建立物件的行為。Handler物件正是通過這個介面傳入池化物件中的。

  static final class Entry {

      private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
            @Override
            public Entry newObject(Handle<Entry> handle) {
                return new Entry(handle);
            }
        });

      //Entry物件只能通過物件池獲取,不可外部自行建立
      private Entry(Handle<Entry> handle) {
            this.handle = handle;
        }

  }

6.1.1 建立ObjectPool

public abstract class ObjectPool<T> {

    public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {
        return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));
    }

    private static final class RecyclerObjectPool<T> extends ObjectPool<T> {
        //recycler物件池範例
        private final Recycler<T> recycler;

        RecyclerObjectPool(final ObjectCreator<T> creator) {
             recycler = new Recycler<T>() {
                @Override
                protected T newObject(Handle<T> handle) {
                    return creator.newObject(handle);
                }
            };
        }

        @Override
        public T get() {
            return recycler.get();
        }
    }

}
public abstract class Recycler<T> {

    protected abstract T newObject(Handle<T> handle);
  
    ........................省略.............
}

呼叫 ObjectPool#newPool 建立物件池時,返回的是 RecyclerObjectPool 範例。而真正的物件池 Recycler 被包裹在 RecyclerObjectPool 中。

物件池Recycler建立物件的行為定義在使用者在建立物件池時指定的ObjectCreator 中。

7. Recycler物件池屬性詳解

在介紹完物件池的頂層設計之後,接下來我們介紹下Recycler物件池相關的一些重要屬性。相信大家在看過前邊關於物件池設計原理的介紹之後,現在應該能夠比較容易的理解即將介紹的這些屬性概念,這裡涉及到的屬性比較多,筆者把這些屬性的介紹放到原始碼實現之前的目的也是先讓大家混個眼熟,先有一個感性的認識,等到介紹原始碼實現時,筆者還會將涉及到的屬性再次拿出來介紹。

7.1 建立執行緒,回收執行緒的Id標識

public abstract class Recycler<T> {

    //用於產生池化物件中的回收Id,主要用來標識池化物件被哪個執行緒回收
    private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
    //用於標識建立池化物件的執行緒Id 注意這裡是static final欄位 也就意味著所有的建立執行緒OWN_THREAD_ID都是相同的
    //這裡主要用來區分建立執行緒與非建立執行緒。多個非建立執行緒擁有各自不同的Id
    //這裡的視角只是針對池化物件來說的:區分建立它的執行緒,與其他回收執行緒
    private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();

}
  • AtomicInteger ID_GENERATOR :物件池中定義了一個 AtomicInteger 型別的Id生成器,主要用於為建立執行緒以及回收執行緒建立Id標識,目的是區分建立執行緒和回收執行緒。

  • int OWN_THREAD_ID:在 Recycler 類初始化的時候,會利用ID_GENERATOR 為 OWN_THREAD_ID 欄位賦值,從字面意思上我們也可以看出 OWN_THREAD_ID 是用來標識建立執行緒Id的。這裡有一點大家需要注意的是,OWN_THREAD_ID 是一個 static final 欄位,這也就意味著所有的Recycler物件池範例中的 OWN_THREAD_ID 都是一樣的。

這裡有的同學可能會有疑問了,在多執行緒從物件池中獲取物件的場景中,建立執行緒會有很多個(比如下圖中的thread1, thread2, thread3.....),既然所有的Recycler 物件池範例中的 OWN_THREAD_ID 都是一樣的,那麼如何區分不同的建立執行緒呢?

事實上在物件池中我們並不需要區分建立執行緒與建立執行緒之間的Id,因為Netty在設計物件池的時候採用了無鎖化設計,建立執行緒與建立執行緒之間並不需要互動,每個執行緒只需要關注自己執行緒內的物件管理工作即可,所以從一個執行緒的內部視角來看,只會有一個建立執行緒就是它自己本身,剩下的執行緒均是回收執行緒。所以我們物件池的設計中只需要區分建立執行緒與回收執行緒就可以了,當然每個回收執行緒的Id是不一樣的。

回收執行緒的Id是由其對應的 WeakOrderQueue 節點來分配的,一個 WeakOrderQueue 範例對應一個回收執行緒Id。

private static final class WeakOrderQueue extends WeakReference<Thread> {

    //回收執行緒回收Id,每個weakOrderQueue分配一個,同一個stack下的一個回收執行緒對應一個weakOrderQueue節點
   private final int id = ID_GENERATOR.getAndIncrement();
}

7.2 物件池中的容量控制

    //物件池中每個執行緒對應的Stack中可以儲存池化物件的預設初始最大個數 預設為4096個物件 
    private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
    // 物件池中執行緒對應的Stack可以儲存池化物件預設最大個數 4096
    private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
    // 初始容量 min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256) 初始容量不超過256個
    private static final int INITIAL_CAPACITY;

Recycler 物件池中定義了以上三個屬性用於控制物件池中可以池化的物件容量。這些屬性對應的初始化邏輯如下:

    static {

        int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
                SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
        if (maxCapacityPerThread < 0) {
            maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
        }

        DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;

        INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
    }
  • DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD:定義每個建立執行緒對應的Stack結構中的陣列棧初始預設的最大容量。預設為4096個。可由JVM啟動引數 -D io.netty.recycler.maxCapacity 指定。

  • DEFAULT_MAX_CAPACITY_PER_THREAD:定義每個建立執行緒對應的Stack結構中的陣列棧的最大容量。可由JVM啟動引數 -D io.netty.recycler.maxCapacityPerThread 指定,如無特殊指定,即採用 DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD 的值,預設為4096個。

  • INITIAL_CAPACITY : 定義每個建立執行緒對應的Stack結構中的陣列棧的初始容量。計算公式為min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256),預設為256個。當池化物件超過256個時,則對物件池進行擴容,但不能超過最大容量 DEFAULT_MAX_CAPACITY_PER_THREAD。

7.3 回收執行緒可回收物件的容量控制

   //用於計算回收執行緒可幫助回收的最大容量因子  預設為2  
    private static final int MAX_SHARED_CAPACITY_FACTOR;
    //每個回收執行緒最多可以幫助多少個建立執行緒回收物件 預設:cpu核數 * 2
    private static final int MAX_DELAYED_QUEUES_PER_THREAD;
    //回收執行緒對應的WeakOrderQueue節點中的Link連結串列中的節點儲存待回收物件的容量 預設為16
    private static final int LINK_CAPACITY;

Recycler 物件池除了對建立執行緒中的 Stack 容量進行限制外,還需要對回收執行緒可回收物件的容量進行限制。相關回收容量限制屬性初始化邏輯如下:

    static {

        MAX_SHARED_CAPACITY_FACTOR = max(2,
                SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
                        2));

        MAX_DELAYED_QUEUES_PER_THREAD = max(0,
                SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
                        // We use the same value as default EventLoop number
                        NettyRuntime.availableProcessors() * 2));

        LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
                max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));

    }
  • MAX_SHARED_CAPACITY_FACTOR : 針對建立執行緒中的 Stack,其對應的所有回收執行緒總共可幫助其回收的物件總量計算因子。預設為2。可通過JVM引數 -D io.netty.recycler.maxSharedCapacityFactor 指定,總共回收物件總量就是通過物件池的最大容量和該計算因子計算出來的。計算公式: max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY) 。由此我們可以知道建立執行緒對應的所有回收執行緒總共可幫助其回收的物件總量預設為2048個,最小回收容量為 LINK_CAPACITY 預設為16。

  • MAX_DELAYED_QUEUES_PER_THREAD : 該引數定義每個回收執行緒最多可幫助多少個建立執行緒回收物件。預設為:CPU核數 * 2。可通過JVM引數 -D io.netty.recycler.maxDelayedQueuesPerThread 指定。注意:這裡是站在回收執行緒的角度

  • LINK_CAPACITY : 在建立執行緒對應的 Stack 結構中的 WeakOrderQueue 連結串列中,回收執行緒對應的WeakOrderQueue節點中的Link連結串列中的Link節點儲存待回收物件的容量。預設為16,可通過JVM引數 -D io.netty.recycler.linkCapacity 指定。

為了方便大家理解這些容量控制的相關引數,筆者又在物件池架構設計圖的基礎上補充了容量控制相關的資訊。大家可以對照上邊介紹到的這些引數的含義形象體會下:

7.4 物件回收頻率控制

物件池不能不考慮容量的限制而無腦的進行物件的回收,而是要對回收物件的頻率進行限制。在我們日常架構設計和程式設計時,我們也一定要有託底的方案,比如限流,降級,熔斷等託底方案。這樣程式就不至於被突發的異常流量擊垮。

在物件池的設計中,Netty用以下兩個引數來控制物件回收的頻率從而避免物件池迅速膨脹不可控制。

    //建立執行緒回收物件時的回收比例,預設是8,表示只回收1/8的物件。也就是產生8個物件回收一個物件到物件池中
    private static final int RATIO;
    //回收執行緒回收物件時的回收比例,預設也是8,同樣也是為了避免回收執行緒回收佇列瘋狂增長 回收比例也是1/8
    private static final int DELAYED_QUEUE_RATIO;

物件回收頻率控制引數的初始化邏輯如下:

    static {

        RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));

        DELAYED_QUEUE_RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.delayedQueue.ratio", RATIO));

    }

通過前邊對 Recycler 物件池的設計原理介紹,我們知道,在池化物件被回收的時候分別由兩類執行緒來執行。

  • 一類是建立執行緒。池化物件在建立執行緒中被建立出來後,一直在建立執行緒中被處理,處理完畢後由建立執行緒直接進行回收。而為了避免物件池不可控制地迅速膨脹,所以需要對建立執行緒回收物件的頻率進行限制。這個回收頻率由引數 RATIO 控制,預設為8,可由JVM啟動引數 -D io.netty.recycler.ratio 指定。表示建立執行緒只回收 1 / 8 的物件,也就是每建立 8 個物件最後只回收 1個物件。

  • 另一類就是回收執行緒。池化物件在建立執行緒中被建立出來,但是業務的相關處理是在回收執行緒中,業務處理完畢後由回收執行緒負責回收。前邊提到物件回收有一個基本原則就是物件是誰建立的,就要回收到建立執行緒對應的Stack中。所以回收執行緒就需要將池化物件回收至其建立執行緒對應的Stack中的WeakOrderQueue連結串列中。並等待建立執行緒將WeakOrderQueue連結串列中的待回收物件轉移至Stack中的陣列棧中。同樣,回收執行緒也需要控制回收頻率,由引數 DELAYED_QUEUE_RATIO 進行控制,預設也是8,可由JVM啟動引數 -D io.netty.recycler.delayedQueue.ratio 指定,表示回收執行緒每處理完 8 個物件才回收 1 個物件。

8. Recycler物件池的建立

    private static final class RecyclerObjectPool<T> extends ObjectPool<T> {
        //recycler物件池範例
        private final Recycler<T> recycler;

        RecyclerObjectPool(final ObjectCreator<T> creator) {
             recycler = new Recycler<T>() {
                @Override
                protected T newObject(Handle<T> handle) {
                    return creator.newObject(handle);
                }
            };
        }
      
        ..................省略............
      }

Netty 中的 Recycler 物件池是一個抽象類,裡面封裝了物件池的核心結構以及核心方法。在建立物件池的時候,我們往往會使用Recycler的匿名類來實現抽象方法 newObject 從而來定義物件池建立物件的行為。

public abstract class Recycler<T> {

   protected abstract T newObject(Handle<T> handle);

   protected Recycler() {
        this(DEFAULT_MAX_CAPACITY_PER_THREAD);
    }

    protected Recycler(int maxCapacityPerThread) {
        this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
    }

    protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
        this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
    }

    protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
                       int ratio, int maxDelayedQueuesPerThread) {
        this(maxCapacityPerThread, maxSharedCapacityFactor, ratio, maxDelayedQueuesPerThread,
                DELAYED_QUEUE_RATIO);
    }

    //建立執行緒持有物件池的最大容量
    private final int maxCapacityPerThread;
    //所有回收執行緒可回收物件的總量(計算因子)
    private final int maxSharedCapacityFactor;
    //建立執行緒的回收比例
    private final int interval;
    //一個回收執行緒可幫助多少個建立執行緒回收物件
    private final int maxDelayedQueuesPerThread;
    //回收執行緒回收比例
    private final int delayedQueueInterval;

    protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
                       int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
        interval = max(0, ratio);
        delayedQueueInterval = max(0, delayedQueueRatio);
        if (maxCapacityPerThread <= 0) {
            this.maxCapacityPerThread = 0;
            this.maxSharedCapacityFactor = 1;
            this.maxDelayedQueuesPerThread = 0;
        } else {
            this.maxCapacityPerThread = maxCapacityPerThread;
            this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
            this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
        }
    }

}

關於Recycler物件池中相關的重要屬性我們在上一小節已經詳細介紹過了,這裡只是將這些重要引數賦值於Recycler物件池中定義的對應屬性上。還是那句話,大家這裡只需要對這些屬性有一個感性的認識即可,並不需要強行完全理解,後面我們在介紹物件池的功能實現時還會結合具體場景來介紹這些屬性。

9. 多執行緒獲取物件無鎖化實現

我們在介紹Netty物件池多執行緒獲取物件的設計時提到,為了避免多執行緒並行獲取物件時引入的同步開銷,Netty採用了類似 TLAB 分配記憶體的思想,為每一個執行緒分配了一個獨立的Stack結構,池化物件就儲存在這個Stack結構中。當執行緒需要從物件池中獲取物件時,Recycler就會從執行緒對應的Stakc結構中獲取池化物件。各個執行緒獨立執行,沒有任何同步開銷。

    //threadlocal儲存每個執行緒對應的 stack結構
    private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
        @Override
        protected Stack<T> initialValue() {
            return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                    interval, maxDelayedQueuesPerThread, delayedQueueInterval);
        }
        
        ..............省略..........
    };

物件池中採用一個 FastThreadLocal 型別的欄位 threadLocal 為每個執行緒維護一個獨立的Stack結構。從而達到多執行緒無鎖化獲取物件的目的。

FastThreadLocal是Netty基於JDK的ThreadLocal做的一個優化版本,擁有更快的存取效能。詳細實現筆者後面會有一篇專門講解FastThreadLocal的文章,這裡大家只需要把它當成JDK的ThreadLocal來看待即可。

當執行緒第一次從物件池中獲取物件時會觸發其對應的Stack結構的建立。

9.1 Stack結構的建立

本小節我們來介紹一下物件池中Stack結構的設計實現。在前邊《5.2 Stack的設計》小節中我們介紹了Stack結構中的一些核心屬性,包括:陣列棧以及WeakOrderQueue連結串列的Head指標,Prev指標,Cursor指標。

本小節筆者會把Stack結構中的剩餘屬性介紹給大家,通過這一小節的介紹,相信大家就會對Stack的設計實現有了一個整體的瞭解。還是那句話,這裡大家只需要對這些屬性有一個感性的認識,先混個眼熟,後面筆者還會結合具體場景詳細講解。

private static final class Stack<T> {

        // 建立執行緒儲存池化物件的stack結構所屬物件池recycler範例
        final Recycler<T> parent;

        //用弱參照來關聯當前stack對應的建立執行緒 因為使用者可能在某個地方參照了defaultHandler -> stack -> thread,可能存在這個參照鏈
        //當建立執行緒死掉之後 可能因為這個參照鏈的存在而導致thread無法被回收掉
        final WeakReference<Thread> threadRef;

        //所有回收執行緒能夠幫助當前建立執行緒回收物件的總容量
        final AtomicInteger availableSharedCapacity;

        //當前Stack對應的建立執行緒作為其他建立執行緒的回收執行緒時可以幫助多少個執行緒回收其池化物件
        private final int maxDelayedQueues;

        //當前建立執行緒對應的stack結構中的最大容量。 預設4096個物件
        private final int maxCapacity;

        //當前建立執行緒回收物件時的回收比例
        private final int interval;

        //當前建立執行緒作為其他執行緒的回收執行緒時回收其他執行緒的池化物件比例
        private final int delayedQueueInterval;

        // 當前Stack中的陣列棧 預設初始容量256,最大容量為4096
        DefaultHandle<?>[] elements;

        //陣列棧 棧頂指標
        int size;

        //回收物件計數 與 interval配合 實現只回收一定比例的池化物件
        private int handleRecycleCount;

        //多執行緒回收的設計,核心還是無鎖化,避免多執行緒回收相互競爭
        //Stack結構中的WeakOrderQueue連結串列
        private WeakOrderQueue cursor, prev;
        private volatile WeakOrderQueue head;
}

  • Recycler<T> parent:Stack所屬Recycler物件池範例,一個物件池可被多個執行緒存取獲取物件,所以一個物件池對應多個Stack,每個Stack的parent屬性指向所屬的Recycler範例。比如圖中的 stack1 , stack2 , stack3 , stack4 中的parent屬性均指向同一個Recycler物件池範例。

  • WeakReference<Thread> threadRef :Stack會通過弱參照的方式參照到其對應的建立執行緒。這裡使用弱參照來持有對應建立執行緒的原因是因為物件池的設計中存在這樣一個參照關係:池化物件 -> DefaultHandler -> stack -> threadRef。而池化物件是暴露給使用者的,如果使用者在某個地方持有了池化物件的強參照忘記清理,而Stack持有建立執行緒的強參照的話,當建立執行緒死掉的之後,因為這樣一個強參照鏈的存在從而導致建立執行緒一直不能被GC回收。

  • AtomicInteger availableSharedCapacity:當前建立執行緒對應的所有回收執行緒可以幫助當前建立執行緒回收的物件總量。比如圖中thread2 , thread3 , thread4 這三個回收執行緒總共可以幫助 thread1 回收物件的總量。availableSharedCapacity 在多個回收執行緒中是共用的,回收執行緒每回收一個物件它的值就會減1,當小於 LINK_CAPACITY(回收執行緒對應WeakOrderQueue節點的最小儲存單元Link)時,回收執行緒將不能在為該stack回收物件了。該值的計算公式為前邊介紹的 max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)

當建立執行緒從Stack結構中的WeakOrderQueue連結串列中轉移待回收物件到陣列棧中後,availableSharedCapacity 的值也會相應增加。說白了這個值就是用來指示回收執行緒還能繼續回收多少物件。已達到控制回收執行緒回收物件的總體容量。

  • int maxDelayedQueues: 一個執行緒對於物件池來說,它可以是建立執行緒,也可以是回收執行緒,當該建立執行緒作為回收執行緒時,該值定義了最多可以為多少個建立執行緒回收物件。預設值為 CPU * 2。比如圖中 thread2 作為回收執行緒既可以幫 thread1 回收物件也可以幫助 thread3 , thread4 回收物件。那麼maxDelayedQueues 的值就是 3 。

  • int maxCapacity:定義當前Stack結構中的陣列棧的最大容量。預設為4096。

  • int interval:建立執行緒的回收比例,預設是8。

  • int delayedQueueInterval:建立執行緒作為回收執行緒時的回收比例。預設是8。

  • DefaultHandle<?>[] elements:這個就是我們前邊反覆提到的Stack結構中的陣列棧。用於存放物件池中的池化物件。當執行緒從物件池中獲取物件時就是從這裡獲取。

  • int size:陣列棧中的棧頂指標。

  • int handleRecycleCount:回收物件計數。與 interval 配合達到控制回收物件比例的目的。從 0 開始每遇到一個回收物件就 +1 ,同時把物件丟棄。直到handleRecycleCount == interval時回收物件,然後歸零。也就是前邊我們說到的每建立8個物件才回收1個。避免 Stack 不可控制的迅速增長。

  • WeakOrderQueue cursor, prev,head:這三個指標就是前邊我們在講Stack設計的時候介紹到的用於多執行緒無鎖化回收的 WeakOrderQueue 連結串列中的頭結點指標,當前節點指標,前一個節點指標(用於刪除節點)。

介紹完Stack結構中的這些重要屬性,建立的過程就很簡單了。就是利用前邊介紹過的已經初始化好的Recycler屬性對Stack結構中的這些屬性進行賦值。

    private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
        @Override
        protected Stack<T> initialValue() {
            return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                    interval, maxDelayedQueuesPerThread, delayedQueueInterval);
        }

      ..............省略............
    }
       Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
              int interval, int maxDelayedQueues, int delayedQueueInterval) {
            this.parent = parent;
            threadRef = new WeakReference<Thread>(thread);
            this.maxCapacity = maxCapacity;
            availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
            elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
            this.interval = interval;
            this.delayedQueueInterval = delayedQueueInterval;
            handleRecycleCount = interval; 
            this.maxDelayedQueues = maxDelayedQueues;
        }

9.2 從物件池中獲取物件

public abstract class Recycler<T> {
      //一個空的Handler,表示該物件不會被池化
     private static final Handle NOOP_HANDLE = new Handle() {
        @Override
        public void recycle(Object object) {
            // NOOP
        }
    };

    public final T get() {
        //如果物件池容量為0,則立馬新建立一個物件返回,但是該物件不會回收進物件池
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        //獲取當前執行緒 儲存池化物件的stack
        Stack<T> stack = threadLocal.get();
        //從stack中pop出物件,handler是池化物件在物件池中的模型,包裝了一些池化物件的回收資訊和回收狀態
        DefaultHandle<T> handle = stack.pop();
        //如果當前執行緒的stack中沒有池化物件 則直接建立物件
        if (handle == null) {
            //初始化的handler物件recycleId和lastRecyclerId均為0
            handle = stack.newHandle();
            //newObject為物件池recycler的抽象方法,由使用者初始化記憶體池的時候 匿名提供
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

}

Recycler對外表現為一個整體的物件池,但是物件池內部是按照執行緒的維度來池化物件的,每個執行緒所池化的物件儲存在對應的Stack結構中。

  1. 當物件池的最大容量maxCapacityPerThread == 0時,物件池會立馬建立一個物件出來,並將一個空的Handler傳遞進物件中。表示該物件在使用完畢後不會被回收進物件池中。

  2. 從threadLocal中獲取當前執行緒對應的Stack,隨後從Stack結構中的陣列棧中彈出棧頂物件的DefaultHandler。

  3. 如果彈出的DefaultHandler為空,說明當前Stack中並沒有回收的池化物件。直接建立一個新的DefaultHandler並建立一個新的物件,然後將DefaultHandler傳入到新建立的物件中,並用DefaultHandler包裹新建立的物件。這樣池化物件就與DefaultHandler關聯起來了。

static final class Entry {

     private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
            @Override
            public Entry newObject(Handle<Entry> handle) {
                return new Entry(handle);
            }
        });

     private Entry(Handle<Entry> handle) {
            this.handle = handle;
     }
}

9.3 DefaultHandler

前邊我們在介紹物件池的設計原理時提到,池化物件在物件池中的儲存模型為 Handler。

public abstract class ObjectPool<T> {

    public interface Handle<T> {
        void recycle(T self);
    }

}

在Recycler物件池中的預設實現是 DefaultHandler ,DefaultHandler 裡面包裹了池化物件以及池化物件在物件池中的一些相關資訊,(比如:池化物件的相關回收資訊和回收狀態)。

從結構設計角度上來說,池化物件是隸屬於其建立執行緒對應的Stack結構的,由於這層結構關係的存在,池化物件的DefaultHandler應該由Stack來進行建立。

 private static final class Stack<T> {

        DefaultHandle<T> newHandle() {
            return new DefaultHandle<T>(this);
        }
 }

我們來看下 DefaultHandler 的具體結構:

   private static final class DefaultHandle<T> implements Handle<T> {
        //用於標識最近被哪個執行緒回收,被回收之前均是0
        int lastRecycledId;
        //用於標識最終被哪個執行緒回收,在沒被回收前是0
        int recycleId;

        //是否已經被回收
        boolean hasBeenRecycled;
        //強參照關聯建立handler的stack
        Stack<?> stack;
        //池化物件
        Object value;

        DefaultHandle(Stack<?> stack) {
            this.stack = stack;
        }

        @Override
        public void recycle(Object object) {

          ...................省略.............
        }
    }

DefaultHandler屬性的第一部分資訊,首先就是池化物件在物件池中的回收資訊。

  • int lastRecycledId:用於標識最近被哪個執行緒回收,被回收之前均是0。

  • int recycleId:用於標識最終被哪個執行緒回收,在沒被回收前是0。

  • boolean hasBeenRecycled:該池化物件是否已經被回收至建立執行緒對應的Stack中。

這裡可能大家有疑問了,為什麼池化物件的回收還要分最近回收和最終回收呢

因為物件池中的池化物件回收可以分為兩種情況:

  • 由建立執行緒直接進行回收:這種回收情況就是一步到位,直接回收至建立執行緒對應的Stack中。所以這種情況下是不分階段的。recycleId = lastRecycledId = OWN_THREAD_ID

  • 由回收執行緒幫助回收:這種回收情況下就要分步進行了,首先由回收執行緒將池化物件暫時儲存在其建立執行緒對應Stack中的WeakOrderQueue連結串列中。此時並沒有完成真正的物件回收。recycleId = 0,lastRecycledId = 回收執行緒Id(WeakOrderQueue#id)。當建立執行緒將WeakOrderQueue連結串列中的待回收物件轉移至Stack結構中的陣列棧之後,這時池化物件才算真正完成了回收動作。recycleId = lastRecycledId = 回收執行緒Id(WeakOrderQueue#id)

這兩個欄位 lastRecycledId ,recycleId 主要是用來標記池化物件所處的回收階段,以及在這些回收階段具體被哪個執行緒進行回收。

最後兩個屬性就比較容易理解了,一個是 Object value 用來包裹真正的池化物件。另一個是 Stack<?> stack 用來強參照關聯池化物件的Handler所屬的Stack結構。

記不記得我們在介紹Stack結構的時候提到,Stack中持有其對應建立執行緒的弱參照。筆者在解釋為什麼持有建立執行緒的弱參照時,提到過這樣一個參照鏈關係:池化物件 -> DefaultHandler -> Stack -> threadRef。這裡大家明白了嗎?

static final class Entry {
    //池化物件Entry強參照它的DefaultHandler
    private  Handle<Entry> handle;
  
}


private static final class DefaultHandle<T> implements Handle<T> {
    // DefaultHandler強參照其所屬的Stack
    Stack<?> stack;

}

private static final class Stack<T> {
    // Stack弱參照其對應的建立執行緒
    final WeakReference<Thread> threadRef;

}

9.4 從Stack中獲取池化物件

        DefaultHandle<T> pop() {
            //普通出棧操作,從棧頂彈出一個回收物件
            int size = this.size;
            if (size == 0) {
                //如果當前執行緒所屬stack已經沒有物件可用,則遍歷stack中的weakOrderQueue連結串列(其他執行緒幫助回收的物件存放在這裡)將這些待回收物件回收進stack
                if (!scavenge()) {
                    return null;
                }
                size = this.size;
                if (size <= 0) {
                    // 如果WeakOrderQueue連結串列中也沒有待回收物件可轉移
                    // 直接返回null 新建立一個物件
                    return null;
                }
            }
            size --;
            DefaultHandle ret = elements[size];
            elements[size] = null;
            this.size = size;

            if (ret.lastRecycledId != ret.recycleId) {
                // 這種情況表示物件至少被一個執行緒回收了,要麼是建立執行緒,要麼是回收執行緒
                throw new IllegalStateException("recycled multiple times");
            }

            //物件初次建立以及回收物件再次使用時  它的 recycleId = lastRecycleId = 0
            ret.recycleId = 0;
            ret.lastRecycledId = 0;
            return ret;
        }

這裡就是業務執行緒從物件池中真正獲取池化物件的地方。從Stack結構中的陣列棧的棧頂位置彈出池化物件。

  • 首先判斷陣列棧中是否有回收的池化物件。棧頂指標 size == 0 說明當前陣列棧中是空的。隨後就會呼叫 scavenge 方法,從Stack結構中的WeakOrderQueue連結串列中轉移最多一個Link大小的待回收物件到陣列棧中。如果WeakOrderQueue連結串列中也沒有待回收物件,說明當前Stack結構就是空的沒有任何回收的池化物件,物件池直接返回 null ,並建立一個新的池化物件返回給業務執行緒。

  • 如果陣列棧不為空,則將棧頂元素 DefaultHandler 彈出,初始化池化物件DefaultHandler的回收資訊。recycleId = lastRecycledId = 0表示該池化物件剛剛從物件池中取出。

recycleId 與 lastRecycledId 之間的關係分為以下幾種情況:

  • recycleId = lastRecycledId = 0:表示池化物件剛剛被建立或者剛剛從物件池中取出即將被再次複用。這是池化物件的初始狀態。

  • recycleId = lastRecycledId != 0:表示當前池化物件已經被回收至對應Stack結構裡的陣列棧中。可以直接被取出複用。可能是被其建立執行緒直接回收,也可能是被回收執行緒回收。

  • recycleId != lastRecycledId:表示當前池化物件處於半回收狀態。池化物件已經被業務執行緒處理完畢,並被回收執行緒回收至對應的WeakOrderQueue節點中。並等待建立執行緒將其最終轉移至Stack結構中的陣列棧中。

9.4 轉移回收執行緒回收的物件到Stack中

通過前邊介紹Stack結構的設計原理我們知道,物件池中池化物件的回收儲存分為兩個部分:

  • 一個是池化物件直接被建立執行緒回收,直接儲存在建立執行緒對應Stack結構中的陣列棧中。

  • 另一個是池化物件被回收執行緒回收,臨時間接儲存在建立執行緒對應Stack結構中的WeakOrderQueue連結串列中。每個回收執行緒對應一個WeakOrderQueue節點。

當Stack結構中的陣列棧為空時,建立執行緒會遍歷WeakOrderQueue連結串列,從而將回收執行緒為其回收的物件從WeakOrderQueue節點中轉移至陣列棧中。多執行緒回收物件無鎖化設計

這個轉移的動作就是由 scavenge 方法來完成的。

       private boolean scavenge() {
            //從其他執行緒回收的weakOrderQueue裡 轉移 待回收對像 到當前執行緒的stack中
            if (scavengeSome()) {
                return true;
            }

            // 如果weakOrderQueue中沒有待回收物件可轉移,那麼就重置stack中的cursor.prev
            // 因為在掃描weakOrderQueue連結串列的過程中,cursor已經發生變化了
            prev = null;
            cursor = head;
            return false;
        }

scavengeSome() 執行具體的轉移邏輯。如果WeakOrderQueue連結串列中還有待回收物件並轉移成功則返回 true 。如果WeakOrderQueue連結串列為空沒有任何待回收物件可轉移,則重置連結串列相關的指標,cursor重新指向head節點,prev指向null。因為在遍歷WeakOrderQueue連結串列搜尋可轉移物件時,cursor指標已經發生變化了,這裡需要重置。

9.5 轉移回收物件

下面建立執行緒就開始遍歷Stack結構中的WeakOrderQueue連結串列,將其中儲存的回收執行緒回收進來的物件轉移到陣列棧中。

為了讓大家更清晰的理解遍歷WeakOrderQueue連結串列的過程,我們先來了解下Stack中WeakOrderQueue連結串列的狀態結構如下圖所示:

在Stack結構剛剛建立的初始狀態,WeakOrderQueue連結串列是空的,所以 prev = head = cursor = null 。

後面當回收執行緒在回收物件時會加入自己對應的WeakOrderQueue節點到連結串列中。注意:WeakOrderQueue節點的插入都是在連結串列的頭結點進行插入

後面我們在講到多執行緒回收物件時還會再次詳細講解WeakOrderQueue連結串列的操作,這裡大家只需要先理解連結串列的狀態結構即可。

head指標始終指向連結串列的頭結點,cursor指標指向當前遍歷的節點。在沒有開始遍歷連結串列前,cursor指標指向頭結點。表示從頭結點開始遍歷。prev指標指向cursor前一個節點。當前遍歷節點為頭結點時,prev指標指向空。

在理解了WeakOrderQueue連結串列的狀態結構後,我們來看一下連結串列的遍歷轉移過程邏輯:

        private boolean scavengeSome() {
            WeakOrderQueue prev;
            //獲取當前執行緒stack 的weakOrderQueue連結串列指標(本次掃描起始節點)
            WeakOrderQueue cursor = this.cursor;
            //在stack初始化完成後,cursor,prev,head等指標全部是null,這裡如果cursor == null 意味著當前stack第一次開始掃描weakOrderQueue連結串列
            if (cursor == null) {
                prev = null;
                cursor = head;
                if (cursor == null) {
                    //說明目前weakOrderQueue連結串列裡還沒有節點,並沒有其他執行緒幫助回收的池化物件
                    return false;
                }
            } else {
                //獲取prev指標,用於操作連結串列(刪除當前cursor節點)
                prev = this.prev;
            }

            boolean success = false;
            //迴圈遍歷weakOrderQueue連結串列 轉移待回收物件
            do {
                //將weakOrderQueue連結串列中當前節點中包含的待回收物件,轉移到當前stack中,一次轉移一個link
                if (cursor.transfer(this)) {
                    success = true;
                    break;
                }
                //如果當前cursor節點沒有待回收物件可轉移,那麼就繼續遍歷連結串列獲取下一個weakOrderQueue節點
                WeakOrderQueue next = cursor.getNext();
                //如果當前weakOrderQueue對應的回收執行緒已經掛掉了,則
                if (cursor.get() == null) {
                    // 判斷當前weakOrderQueue節點是否還有可回收物件
                    if (cursor.hasFinalData()) {
                        //回收weakOrderQueue中最後一點可回收物件,因為對應的回收執行緒已經死掉了,這個weakOrderQueue不會再有任何物件了
                        for (;;) {

                            if (cursor.transfer(this)) {
                                success = true;
                            } else {
                                break;
                            }
                        }
                    }

                    //回收執行緒以死,對應的weaoOrderQueue節點中的最後一點待回收物件也已經回收完畢,就需要將當前節點從連結串列中刪除。unlink當前cursor節點
                    //這裡需要注意的是,netty永遠不會刪除第一個節點,因為更新頭結點是一個同步方法,避免更新頭結點而導致的競爭開銷
                    // prev == null 說明當前cursor節點是頭結點。不用unlink,如果不是頭結點 就將其從連結串列中刪除,因為這個節點不會再有執行緒來收集池化物件了
                    if (prev != null) {
                        //確保當前weakOrderQueue節點在被GC之前,我們已經回收掉它所有的佔用空間
                        cursor.reclaimAllSpaceAndUnlink();
                        //利用prev指標刪除cursor節點
                        prev.setNext(next);
                    }
                } else {
                    prev = cursor;
                }
                //向後移動prev,cursor指標繼續遍歷weakOrderQueue連結串列
                cursor = next;

            } while (cursor != null && !success);

            this.prev = prev;
            this.cursor = cursor;
            return success;
        }
  1. 再開始遍歷WeakOrderQueue連結串列之前,首先需要檢查cursor指標是否為空,如果為空說明當前Stack是第一次開始遍歷WeakOrderQueue連結串列。隨後讓cursor指標指向head指標,如果head指標指向為空,說明當前WeakOrderQueue連結串列是空的,此時沒有任何回收執行緒在回收物件。如果head指標不為空,則從head指標指向的頭結點開始遍歷WeakOrderQueue連結串列。

  2. 首先會從cursor指標指向的當前遍歷節點開始,將當前WeakOrderQueue節點中儲存的待回收物件轉移到Stack結構中的陣列棧中。一次最多轉移一個Link大小的物件。轉移成功後退出。如果當前WeakOrderQueue節點此時沒有任何待回收物件可被轉移則轉移失敗,繼續遍歷下一個WeakOrderQueue節點。

        if (cursor.transfer(this)) {
            success = true;
            break;
        }

        WeakOrderQueue next = cursor.getNext();
  1. 為了多執行緒能夠無鎖化回收物件,一個回收執行緒對應一個WeakOrderQueue節點,在WeakOrderQueue節點中持有對應回收執行緒的弱參照,目的也是為了當回收執行緒掛掉的時候,能夠保證回收執行緒被GC及時的回收掉。如果cursor.get() == null說明當前WeakOrderQueue節點對應的回收執行緒已經掛掉了,此時如果當前節點還有待回收物件,則需要將節點中的所有待回收物件全部轉移至Stack中的陣列棧中。注意這裡是轉移節點所有的待回收物件而不是隻轉移一個Link。因為對應的回收執行緒已經掛掉了,該執行緒後續將不再會幫助建立執行緒回收物件了,所以要清理其對應的WeakOrderQueue節點。
private static final class WeakOrderQueue extends WeakReference<Thread> {

    ............WeakOrderQueue本身就是一個弱參照,參照對應的回收執行緒.........

}
  1. 當清理完已經掛掉的回收執行緒對應的WeakOrderQueue節點後,就需要將該節點從Stack結構裡的WeakOrderQueue連結串列中刪除。保證被清理後的WeakOrderQueue節點可以被GC回收。當然刪除節點之前需要通過cursor.reclaimAllSpaceAndUnlink()釋放回收執行緒回收物件的availableSharedCapacity容量。釋放的容量的大小為被刪除WeakOrderQueue節點中儲存的待回收物件容量。
        if (prev != null) {
              cursor.reclaimAllSpaceAndUnlink();
              //利用prev指標刪除cursor節點
              prev.setNext(next);
        }

這裡需要注意的是,Netty不會對WeakOrderQueue連結串列的頭結點進行刪除。如果prev == null說明當前節點是頭結點,即使對應的回收執行緒已經掛掉了,但在本次遍歷中不會對其進行刪除。因為操作連結串列頭結點的方法是一個同步方法,Netty這裡是為了避免不必要的同步開銷。

以上邏輯就是建立執行緒遍歷WeakOrderQueue連結串列轉移回收物件的處理邏輯,如果本次遍歷的當前節點中並沒有物件可轉移,那麼就繼續從下一個節點開始遍歷。迴圈執行轉移邏輯直到遍歷完連結串列或者中途轉移成功。退出迴圈時要記錄更新cursor指標記錄當前遍歷到的節點。

這裡大家可能會有兩個問題:

第一個問題:如果頭結點對應的回收執行緒已經掛掉,這個頭結點不在本次遍歷中刪除,那麼會在什麼時候被刪除呢

首先當回收執行緒第一次開始幫助建立執行緒回收物件時,會將自己對應的WeakOrderQueue節點插入到建立執行緒對應Stack結構中的WeakOrderQueue連結串列的頭結點位置。節點始終在連結串列的頭結點位置插入

如圖所示,當本次遍歷發現頭結點對應的回收執行緒 thread4 已經掛掉後,清理完頭結點中儲存的待回收物件後,讓其繼續呆在連結串列中,並不著急將其刪除。隨後cursor指標指向thread3對應的節點,下一次遍歷就會從thread3對應的節點開始遍歷。

當有一個新的回收執行緒 thread5 加入後,此時thread5對應的WeakOrderQueue節點變成了連結串列中的頭結點,當經過多次遍歷之後,cursor指標最終會再次指向死亡執行緒thread4對應的節點時,會再次進入cursor.get() == null的處理邏輯,而此時thread4對應的節點已經不是頭結點了,所以在這次遍歷中就將該節點從連結串列中刪除。

這就是多執行緒並行程式碼和單執行緒程式碼設計上的不同,在多執行緒程式設計中,我們一定要時刻警惕同步操作的開銷。能避免就要儘量避免。

第二個問題:操作WeakOrderQueue連結串列的頭結點為什麼是同步方法呢?

我們都知道一個回收執行緒對應一個WeakOrderQueue節點,當一個回收執行緒第一次為該建立執行緒回收物件時,都會新建立一個WeakOrderQueue節點並將節點插入到建立執行緒對應Stack中的WeakOrderQueue連結串列中的頭結點位置。

在多執行緒回收場景下,可能會有多個回收執行緒同時向建立執行緒對應Stack中的WeakOrderQueue連結串列的頭結點插入自己對應的節點。

那麼此時對於連結串列頭結點的操作就必須做同步處理了。當節點同步插入到連結串列的頭結點後,以後該回收執行緒回收物件就是無鎖化了。只不過就是在一開始插入節點的時候會有一點同步的開銷,但是這是無法避免的

        //整個recycler物件池唯一的一個同步方法,而且同步塊非常小,邏輯簡單,執行迅速
        synchronized void setHead(WeakOrderQueue queue) {
            //始終在weakOrderQueue連結串列頭結點插入新的節點
            queue.setNext(head);
            head = queue;
        }

縱觀整個Recycler的設計實現,這個方法是唯一一個同步的方法,而且同步塊非常的短,裡面的邏輯非常簡單。

在多執行緒程式設計中,如果遇到無法避免的同步情況,那麼也必須使同步塊內的程式碼邏輯儘量簡單。

10. WeakOrderQueue的設計實現

之前我們在介紹WeakOrderQueue結構設計原理的時候講到,雖然該結構命名的字尾是一個Queue,但其實是一個連結串列,連結串列中的元素型別為Link,頭結點指標Head永遠指向第一個未被轉移完畢的Link,當一個Link裡的待回收物件被全部轉移完畢後,head指標隨即指向下一個節點,但是該Link節點並不會從連結串列中刪除。尾指標Tail指向連結串列中最後一個Link節點。節點的插入是從連結串列的尾部開始插入。

10.1 Link結構

    private static final class WeakOrderQueue extends WeakReference<Thread> {

        // link結構是用於真正儲存待回收物件的結構,繼承AtomicInteger 本身可以用來當做writeindex使用
        static final class Link extends AtomicInteger {
            //陣列用來儲存待回收物件,容量為16
            final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];

            int readIndex;
            //weakOrderQueue中的儲存結構時由link結構節點元素組成的連結串列結構
            Link next;
        }
}

首先我們從WeakOrderQueue的繼承結構上來看,它繼承於 WeakReference < Thread > , 表示該結構持有一個執行緒的弱參照,一個回收執行緒對應於一個WeakOrderQueue節點,很明顯是持有其對應回收執行緒的弱參照,方便當回收執行緒掛掉的時候被GC回收。

  • DefaultHandle<?>[] elements : Link結構中包含一個容量為LINK_CAPACITY ,預設為16大小的DefaultHandle陣列,用來儲存回收執行緒回收的物件。

  • int readIndex:建立執行緒在轉移Link節點中的待回收物件時,通過這個readIndex來讀取未被轉移的物件。由於readIndex只會被建立執行緒使用,所以這裡並不需要保證原子性和可見性。用一個普通的int變數儲存就好。

  • writeIndex:Link結構繼承於AtomicInteger型別,這就意味著Link結構本身就可以被當做一個writeIndex來使用,由於回收執行緒在向Link節點新增回收物件的時候需要修改writeIndex,於此同時建立執行緒在轉移Link節點的時候需要讀取writeIndex,所以writeIndex需要保證執行緒安全性,故採用AtomicInteger型別儲存。

  • Link next:Link節點的next指標,用於指向連結串列中的下一個節點。

10.2 Head結構

        // weakOrderQueue內部link連結串列的頭結點
        private static final class Head {
            //所有回收執行緒能夠幫助建立執行緒回收物件的總容量 reserveSpaceForLink方法中會多執行緒操作該欄位
            //用於指示當前回收執行緒是否繼續為建立執行緒回收物件,所有回收執行緒都可以看到,這個值是所有回收執行緒共用的。以便可以保證所有回收執行緒回收的物件總量不能超過availableSharedCapacity
            private final AtomicInteger availableSharedCapacity;
            //link連結串列的頭結點
            Link link;

            Head(AtomicInteger availableSharedCapacity) {
                this.availableSharedCapacity = availableSharedCapacity;
            }

            void reclaimAllSpaceAndUnlink() {
                    ....回收head節點的所有空間,並從連結串列中刪除head節點,head指標指向下一節點....
            }

            private void reclaimSpace(int space) {
                //所有回收執行緒都可以看到,這個值是所有回收執行緒共用的。以便可以保證所有回收執行緒回收的物件總量不能超過availableSharedCapacity
                availableSharedCapacity.addAndGet(space);
            }

            //引數link為新的head節點,當前head指標指向的節點已經被回收完畢
            void relink(Link link) {
                  ...回收當前頭結點的容量,更新head節點為指定的Link節點...
            }

            Link newLink() {
                  ....建立新的Link節點...
            }

            //此處目的是為接下來要建立的link預留空間容量
            static boolean reserveSpaceForLink(AtomicInteger availableSharedCapacity) {               
                  ...在建立新的Link節點之前需要呼叫該方法預訂容量空間...
            }
        }

從程式碼結構上我們可以看出,Head結構的設計不只是作為頭結點指標那麼簡單,其中還封裝了很多連結串列操作以及回收的邏輯。

  • AtomicInteger availableSharedCapacity:這個欄位前邊已經介紹過多次了,它是多執行緒共用的一個欄位,可以被多個回收執行緒進行操作,表達的語意是所有回收執行緒總共可以幫助建立執行緒一共可以回收多少物件。對所有回收執行緒回收物件的總量進行限制。每建立一個Link節點,它的值就減少一個LINK_CAPACITY ,每釋放一個Link節點,它的值就增加一個LINK_CAPACITY 。

  • Link link:Head結構封裝的Link連結串列中的頭結點。

剩下Head結構中封裝的相關邏輯處理方法,等到介紹到具體應用場景的時候,筆者在拿出來為大家介紹,這裡先混個眼熟就行。先看懂個大概,腦海裡朦朦朧朧有個粗淺的認識即可。

10.3 WeakOrderQueue中的重要屬性

 private static final class WeakOrderQueue extends WeakReference<Thread> {

        //link連結串列的頭結點,head指標始終指向第一個未被轉移完畢的LinK節點
        private final Head head;
        //尾結點
        private Link tail;
        //站在stack的視角中,stack中包含一個weakOrderQueue的連結串列,每個回收執行緒為當前stack回收的物件存放在回收執行緒對應的weakOrderQueue中
        //這樣通過stack中的這個weakOrderQueue連結串列,就可以找到其他執行緒為該建立執行緒回收的物件
        private WeakOrderQueue next;
        //回收執行緒回收Id,每個weakOrderQueue分配一個,同一個stack下的一個回收執行緒對應一個weakOrderQueue節點
        private final int id = ID_GENERATOR.getAndIncrement();
        //回收執行緒回收比例 預設是8
        private final int interval;
        //回收執行緒回收計數 回收1/8的物件
        private int handleRecycleCount;

}
  • Head head:用於指向WeakOrderQueue中Link連結串列的頭結點。

  • Link tail:指向Link連結串列中的尾結點。

  • WeakOrderQueue next:站在Stack結構的視角上,Stack包含一個WeakOrderQueue連結串列,用來存放回收執行緒回收過來的池化物件。該欄位為WeakOrderQueue節點的next指標,用於指向下一個回收執行緒對應的WeakOrderQueue節點。

  • int id :對應回收執行緒的回收Id,同一個Stack結構下,不同的回收執行緒對應不同的Id。

  • int interval:回收執行緒對應的回收頻率,預設只回收 1 / 8 的池化物件。

  • int handleRecycleCount:回收物件計數,前邊我們多次講過了。用於控制回收頻率。

10.4 WeakOrderQueue結構的建立

private static final class WeakOrderQueue extends WeakReference<Thread> {
        //為了使stack能夠被GC,這裡不會持有其所屬stack的參照
        private WeakOrderQueue(Stack<?> stack, Thread thread) {
            //weakOrderQueue持有對應回收執行緒的弱參照
            super(thread);
            //建立尾結點
            tail = new Link();

            // 建立頭結點  availableSharedCapacity = maxCapacity / maxSharedCapacityFactor
            head = new Head(stack.availableSharedCapacity);
            head.link = tail;
            interval = stack.delayedQueueInterval;
            handleRecycleCount = interval; 
        }
}

在建立WeakOrderQueue結構的時候,首先會呼叫父類別 WeakReference<Thread> 的構造方法持有當前回收執行緒的弱應用。

然後建立第一個Link節點,head指標和tail指標同時指向這第一個節點。

用建立執行緒對應的Stack中的屬性初始化WeakOrderQueue結構中的相關屬性。

大家這裡可能會問了,既然這裡用Stack中的屬性去初始化WeakOrderQueue結構中的相關屬性,那為什麼WeakOrderQueue不直接持有Stack的參照呢

之前我們提到,一個回收執行緒對應一個WeakOrderQueue節點,當回收執行緒掛掉的時候,需要清理WeakOrderQueue節點並將其從Stack結構中的WeakOrderQueue連結串列(頭結點除外)中刪除。使得WeakOrderQueue節點可以被GC回收掉。

如果Stack結構對應的建立執行緒掛掉,而此時WeakOrderQueue又持有了Stack的參照,這樣就使得Stack結構無法被GC掉。

所以這裡只會用Stack結構的相關屬性去初始化WeakOrderQueue結構,在WeakOrderQueue中並不會持有Stack的參照。

在複雜程式結構的設計中,我們要時刻對物件之間的參照關係保持清晰的認識。防止記憶體洩露。

10.5 從WeakOrderQueue中轉移回收物件

WeakOrderQueue的transfer方法用於將當前WeakOrderQueue節點中的待回收物件轉移至建立執行緒對應的Stack中。

開始轉移回收物件時會從WeakOrderQueue節點中的Link連結串列的頭結點開始遍歷,如果頭結點中還有未被轉移的物件,則將頭結點剩餘的未轉移物件轉移至Stack中。所以建立執行緒每次最多轉移一個LINK_CAPACITY大小的物件至Stack中。只要成功轉移了哪怕一個物件,transfer方法就會返回true。

如果頭結點中儲存的物件已經全部轉移完畢,則更新head指標指向下一個Link節點,開始轉移下一個Link節點。建立執行緒每次只會轉移一個Link節點。如果Link連結串列是空的,沒有轉移成功一個物件,則transfer方法返回false。

由於transfer方法體比較大,筆者將其按照上述邏輯步驟拆分開來為大家講解:

10.5.1 判斷頭結點中的待回收物件是否轉移完畢

            //獲取當前weakOrderQueue節點中的link連結串列頭結點
            Link head = this.head.link;
            //頭結點為null說明還沒有待回收物件
            if (head == null) {
                return false;
            }

            //如果頭結點中的待回收物件已經被轉移完畢
            if (head.readIndex == LINK_CAPACITY) {
                //判斷是否有後續Link節點
                if (head.next == null) {
                    //整個link連結串列沒有待回收物件了已經
                    return false;
                }
                head = head.next;
                //當前Head節點已經被轉移完畢,head指標向後移動,head指標始終指向第一個未被轉移完畢的LinK節點
                this.head.relink(head);
            }

首先從Link連結串列的頭結點開始轉移,head == null 說明當前Link連結串列是空的並沒有物件可被轉移,直接返回false。

head.readIndex == LINK_CAPACITY 判斷當前頭結點中的物件是否已經被轉移完畢,如果當前頭結點中的物件已經被全部轉移完畢,則將head指標更新 relink 為下一個節點,開始從下一個節點開始轉移物件。如果此時Link連結串列已經為空了,直接返回false。

 private static final class Head {

            //引數link為新的head節點,當前head指標指向的節點已經被回收完畢
            void relink(Link link) {
                //更新availableSharedCapacity,因為當前link節點中的待回收物件已經被轉移完畢,所以需要增加availableSharedCapacity的值
                reclaimSpace(LINK_CAPACITY);
                //head指標指向新的頭結點(第一個未被回收完畢的link節點)
                this.link = link;
            }
            private void reclaimSpace(int space) {
                //所有回收執行緒都可以看到,這個值是所有回收執行緒共用的。以便可以保證所有回收執行緒回收的物件總量不能超過availableSharedCapacity
                availableSharedCapacity.addAndGet(space);
            }
}

10.5.2 根據本次轉移物件容量評估是否應該對Stack進行擴容

此時Head節點已經校驗完畢,可以執行正常的轉移邏輯了。但在轉移邏輯正式開始之前,還需要對本次轉移物件的容量進行計算,並評估Stack的當前容量是否可以容納的下,如果Stack的當前容量不夠,則需要對Stack進行擴容。

            final int srcStart = head.readIndex;
            //writeIndex
            int srcEnd = head.get();
            //該link節點可被轉移的物件容量
            final int srcSize = srcEnd - srcStart;
            if (srcSize == 0) {
                return false;
            }

            // 獲取建立執行緒stack中的當前回收物件數量總量
            final int dstSize = dst.size;
            // 待回收物件從weakOrderQueue中轉移到stack後,stack的新容量 = 轉移前stack容量 + 轉移的待回收物件個數
            final int expectedCapacity = dstSize + srcSize;

            if (expectedCapacity > dst.elements.length) {
                //如果轉移後的stack容量超過當前stack的容量 則對stack進行擴容
                final int actualCapacity = dst.increaseCapacity(expectedCapacity);
                //每次轉移最多一個Link的容量
                //actualCapacity - dstSize表示擴容後的stack還有多少剩餘空間
                srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
            }

獲取Link連結串列頭結點的readIndex和writeIndex,通過 writeIndex - readIndex 計算出當前頭結點有多少可被轉移的物件。

Stack的最終容量為: expectedCapacity = stack當前容量 + 轉移物件的容量

如果計算得出轉移後Stack的最終容量 expectedCapacity 超過了Stack的當前容量則需要對Stack進行擴容。根據擴容後的容量最終決定本次轉移多少物件: min(srcStart + actualCapacity - dstSize, srcEnd) ,確保不能超過Stack可容納的空間。

private static final class Stack<T> {

        int increaseCapacity(int expectedCapacity) {
            int newCapacity = elements.length;
            int maxCapacity = this.maxCapacity;
            do {
                newCapacity <<= 1;
            } while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
            //擴容後的新容量為最接近指定容量expectedCapacity的最大2的次冪
            newCapacity = min(newCapacity, maxCapacity);
            if (newCapacity != elements.length) {
                elements = Arrays.copyOf(elements, newCapacity);
            }

            return newCapacity;
        }

}

如果當前Stack已經達到最大容量,無法再繼續擴容:actualCapacity - dstSize = 0,則停止本次轉移操作,直接返回false

if (srcStart != srcEnd) {
   .....具體的轉移邏輯.......
}else {
    // The destination stack is full already.
    return false;
}

如果Stack的容量可以容納頭結點中儲存的待轉移物件,則開始正式的轉移邏輯

10.5.3 轉移回收物件

                //待轉移物件集合 也就是Link節點中儲存的元素
                final DefaultHandle[] srcElems = head.elements;
                //stack中儲存轉移物件陣列
                final DefaultHandle[] dstElems = dst.elements;
                int newDstSize = dstSize;
                for (int i = srcStart; i < srcEnd; i++) {
                    DefaultHandle<?> element = srcElems[i];
                    //recycleId == 0 表示物件還沒有被真正的回收到stack中
                    if (element.recycleId == 0) {
                        //設定recycleId 表明是被哪個weakOrderQueue回收的
                        element.recycleId = element.lastRecycledId;
                    } else if (element.recycleId != element.lastRecycledId) {
                        //既被建立執行緒回收 同時也被回收執行緒回收  回收多次 則停止轉移
                        throw new IllegalStateException("recycled already");
                    }
                    //物件轉移後需要置空Link節點對應的位置
                    srcElems[i] = null;

                    //這裡從weakOrderQueue將待回收物件真正回收到所屬stack之前 需要進行回收頻率控制
                    if (dst.dropHandle(element)) {
                        // Drop the object.
                        continue;
                    }
                    //重新為defaultHandler設定其所屬stack(初始建立該handler的執行緒對應的stack)
                    //該defaultHandler在被回收物件回收的時候,會將其stack置為null,防止極端情況下,建立執行緒掛掉,對應stack無法被GC
                    element.stack = dst;
                    //此刻,handler才真正的被回收到所屬stack中
                    dstElems[newDstSize ++] = element;
                }

將當前Link節點中的elements陣列裡儲存的物件轉移至Stack中的陣列棧elements中。轉移範圍 srcStart -> srcEnd

如果當前轉移物件 element.recycleId == 0 說明當前物件還沒有被真正的回收至建立執行緒對應的Stack中,符合轉移條件(不能被多次回收)。還記不記得我們前邊在《9.3 從Stack中獲取池化物件》小節介紹的:

  • recycleId = lastRecycledId = 0:表示池化物件剛剛被建立或者剛剛從物件池中取出即將被再次複用。這是池化物件的初始狀態。

隨後設定回收Id element.recycleId = element.lastRecycledId。此處的lastRecycledId為當前WeakOrderQueue節點對應的回收執行緒Id。

element.recycleId != element.lastRecycledId 此處表示當前物件可能被建立執行緒回收了,也可能被回收執行緒回收了。

如果當前轉移物件已經被回收至Stack中,則不能被再次回收,停止轉移。

10.5.4 控制物件回收頻率

符合轉移條件的物件,需要再次經過回收頻率的控制,即前邊介紹的只回收 1 / 8 的物件,也就是每 8 個物件回收 1 個。

        boolean dropHandle(DefaultHandle<?> handle) {
            if (!handle.hasBeenRecycled) {
                //回收計數handleRecycleCount 初始值為8 這樣可以保證建立的第一個物件可以被池化回收
                //interval控制回收頻率 8個物件回收一個
                if (handleRecycleCount < interval) {
                    handleRecycleCount++;
                    // Drop the object.
                    return true;
                }
                //回收一個物件後,回收計數清零
                handleRecycleCount = 0;
                //設定defaultHandler的回收標識為true
                handle.hasBeenRecycled = true;
            }
            return false;
        }

當物件通過了回收頻率的驗證之後,最後將回收物件的DefaultHandler中持有的Stack參照再次設定為其建立執行緒對應的Stack。因為在回收執行緒將池化物件回收至WeakOrderQueue節點時,會將其DefaultHandler中對Stack的參照置為null。所以這裡需要重置回來。

具體為什麼在回收執行緒回收時會將回收物件的Stack參照置為null,大家這裡可以自己先思考下,等到後面我們講解多執行緒回收時,筆者在為大家揭開謎底。

隨後會將物件壓入Stack結構中的陣列棧中,到這裡,回收執行緒幫助建立執行緒回收的物件才算真正的被回收了,業務執行緒可以直接從物件池中取出使用了。

當物件轉移完畢後,更新當前Link節點的readIndex,更新Stack中陣列棧的棧頂指標。如果當前Link節點已經被轉移完畢,則Head指標指向連結串列中的下一個節點,開始等待下一次的轉移。

             if (srcEnd == LINK_CAPACITY && head.next != null) {
                    // Add capacity back as the Link is GCed.
                    // 如果當前Link已經被回收完畢,且link連結串列還有後續節點,則更新head指標
                    this.head.relink(head.next);
                }

                //更新當前回收Link的readIndex
                head.readIndex = srcEnd;
                //如果沒有轉移任何資料 return false
                if (dst.size == newDstSize) {
                    return false;
                }
                dst.size = newDstSize;
                return true;

到現在為止,多執行緒從Recycler物件池中無鎖化獲取物件的完整流程,筆者就為大家介紹完了,下面我們來繼續剖析下多執行緒回收物件的場景。

11. 多執行緒回收物件無鎖化實現

之前我們在介紹池化物件的設計時,提到業務執行緒在使用物件的時候不應該感受到物件池的存在,所以將池化物件的回收,封裝在其DefaultHandler中。在業務執行緒使用完物件時,直接呼叫池化物件的recycle方法進行回收即可。

static final class Entry {

       private  Handle<Entry> handle;

       void recycle() {
            next = null;
            bufs = null;
            buf = null;
            msg = null;
            promise = null;
            progress = 0;
            total = 0;
            pendingSize = 0;
            count = -1;
            cancelled = false;
            handle.recycle(this);
        }

}
private static final class DefaultHandle<T> implements Handle<T> {
        
        ..................省略............

        //強參照關聯建立handler的stack
        Stack<?> stack;
        //池化物件
        Object value;

        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }

            Stack<?> stack = this.stack;
            //handler初次建立以及從物件池中獲取到時  recycleId = lastRecycledId = 0(物件被回收之前)
            //建立執行緒回收物件後recycleId = lastRecycledId = OWN_THREAD_ID
            //回收執行緒回收物件後lastRecycledId = 回收執行緒Id,當物件被轉移到stack中後 recycleId = lastRecycledId = 回收執行緒Id
            if (lastRecycledId != recycleId || stack == null) {
                throw new IllegalStateException("recycled already");
            }

            stack.push(this);
        }

}

DefaultHandler中的 recycle 方法邏輯比較簡單,唯一不好理解的地方在於判斷物件是否已經被回收的 if 條件語句。

  • lastRecycledId != recycleId :此時物件的狀態正處於已經被回收執行緒回收至對應 WeakOrderQueue 節點的半回收狀態,但還未被轉移至其建立執行緒對應的Stack中。所以這個條件要控制的事情就是如果物件已經被回收執行緒回收,那麼就停止本次的回收操作

忘記的同學可以在回看下《9.3 從Stack中獲取池化物件》小節,那裡詳細介紹了 recycleId 和 lastRecycledId 之間各種關係的變化及其含義

  • stack == null :這種情況其實前邊我們也有提到過,就是當池化物件對應的建立執行緒掛掉的時候,對應的Stack隨後也被GC回收掉。那麼這時就不需要在回收該池化物件了。

11.1 回收物件至Stack中——啊哈!Bug!

private static final class Stack<T> {
        //持有對應建立執行緒的弱參照
        final WeakReference<Thread> threadRef;

        void push(DefaultHandle<?> item) {
            Thread currentThread = Thread.currentThread();
            //判斷當前執行緒是否為建立執行緒  物件池的回收原則是誰建立,最終由誰回收。其他執行緒只是將回收物件放入weakOrderQueue中
            //最終是要回收到建立執行緒對應的stack中的
            if (threadRef.get() == currentThread) {
                // 如果當前執行緒正是建立物件的執行緒,則直接進行回收 直接放入與建立執行緒關聯的stack中
                pushNow(item);
            } else {
                // 當前執行緒不是建立執行緒,則將回收物件放入建立執行緒對應的stack中的weakOrderQueue連結串列相應節點中(currentThread對應的節點)
                pushLater(item, currentThread);
            }
        }
}

這裡會進入到池化物件DefaultHandler中持有的Stack中,在Stack中進行物件的回收。

大家這裡先不要看筆者下面的解釋,試著自己著重分析下這個 if...else...邏輯判斷,有沒有發現什麼問題??Bug就在這裡!!

這裡首先會判斷當前回收執行緒是否為池化物件的建立執行緒:threadRef.get() == currentThread)。如果是,則由建立執行緒直接回收 pushNow(item) 。

如果 threadRef.get() != currentThread) 這裡有兩種情況:

  1. currentThread是回收執行緒,那麼就按多執行緒回收的邏輯 pushLater(item, currentThread) ,由回收執行緒將物件回收至其對應的WeakOrderQueue節點中,這裡沒什麼毛病。

  2. Bug就出現在第二種情況,還有一種情況是 threadRef.get() == null 也會走到 else 分支裡。表示該回收物件的建立執行緒已經掛掉,並被GC回收。那麼在這種情況下已經沒有必要在對該物件進行回收了,因為建立執行緒已經掛掉,隨後對應的Stack也遲早被GC掉,這個物件即使被回收進Stack也永遠不會在被使用到。但是Netty的做法還是會讓回收執行緒將其回收至Stack中的WeakOrderQueue連結串列中,筆者認為這裡根本就沒必要在新增至WeakOrderQueue連結串列中了。

Bug產生的場景如下如所示:

在第二種情況下,Netty還有一個重要的場景沒有考慮到,會導致記憶體洩露!!

什麼場景呢?大家再來回顧下池化物件與物件池之間的參照關係圖:

這裡我們看到池化物件會參照DefaultHandler,而DefaultHandler又強參照了Stack。於是就形成了這樣一條參照鏈:

而池化物件是對外暴露的,使用者可能在某個地方一直參照著這個池化物件,如果建立執行緒掛掉,並被GC回收之後,那麼其在物件池中對應的Stack也應該被回收,因為Stack裡儲存的回收物件將再也不會被用到了。但是因為這條參照鏈的存在,導致Stack無法被GC回收從而造成記憶體洩露!

11.2 筆者反手一個PR,修復這個Bug!

現在Bug產生的原因和造成的影響,筆者為大家已經分析清楚了,那麼接下來的解決方案就變得很簡單了。

筆者先向Netty社群提了一個 Issue11864 來說明這個問題。

Issue11864 : https://github.com/netty/netty/issues/11864

然後直接提了 PR11865 來修復這個Bug。

PR : https://github.com/netty/netty/pull/11865

PR中主要的修改點分為以下兩點:

  1. 筆者在修復方案中覺得在這裡應該儘早處理掉 threadRef.get() == null 的情況,因為建立執行緒已經死掉,此時在為建立執行緒回收物件已經沒有任何意義了,這種情況直接 return 掉就好。

  2. 由於池化物件強參照到了其建立執行緒對應的Stack,當建立執行緒掛掉之後,我們需要解除這個參照鏈 item.stack = null,保證Stack最終可以被GC回收。

以下程式碼為筆者提交的PR中的修復方案,主要增加了對 threadRef.get() == null 情況的處理,並新增了詳細註釋。

        void push(DefaultHandle<?> item) {
            Thread currentThread = Thread.currentThread();
            if (threadRef.get() == currentThread) {
                pushNow(item);
            } else if (threadRef.get() == null) {
                // when the thread that belonged to the Stack was died or GC'ed,
                // There is no need to add this item to WeakOrderQueue-linked-list which belonged to the Stack any more
                item.stack = null;
            } else {
                pushLater(item, currentThread);
            }
        }

11.3 PR的後續

當筆者提交了 PR11865之後,得到了相關作者如下回復。

巧合的是Netty也意識到了物件池這塊的問題,Netty最近也正在重構 Recycler 這一塊,因為Recycler整體設計的還是比較複雜的,這從我們這篇原始碼解析的文章中也可以看的出來,Recycler的複雜性在於它的使用場景混合了並行以及與GC相關的互動,這些相關的問題都比較難以定位,所以Netty決定將物件池這一塊用一種更加容易被理解的方式重構掉。

相關的重構內容大家可以看作者的這個commit。

重構commit:https://github.com/netty/netty/commit/28b9834612638ffec4948c0c650d04f766f20690

重構後的Recycler物件池在4.1.71.Final版本已經發布。筆者後續也會為大家安排一篇重構後的Recycler物件池原始碼解析,但是本文還是聚焦於4.1.71.Final之前版本的物件池介紹,雖然被重構了,但是這裡也有很多的設計思想和多執行緒程式設計細節非常值得我們學習!

4.1.71.Final版本釋出之後,筆者想的是後面抽空看下重構後的物件池實現,哈哈,只要謂語動詞出現—— 」想的是.....「 類似這樣的句式,估計就沒有以後了,哈哈。筆者還是大意了,這個 Issue11864 : https://github.com/netty/netty/issues/11864 在過了幾個月之後在社群裡又被討論了起來。有人發現在4.1.71.Final物件池重構後的版本中筆者提到的這些問題還是存在的。

於是作者 chrisvest 又 提了一個 PR11996 最終在 4.1.74.Final版本中修復了筆者提的這個 Issue11864。

PR11996 :https://github.com/netty/netty/pull/11996

隨口提一句,這個大牛 chrisvest 是大名鼎鼎的圖資料庫 Neo4j 的核心commitor,同時也是Netty Buffer相關API的設計者。

這裡筆者將這個Bug在 4.1.74.Final 版本中的最終修復方案和大家說明一下,收個尾。

  1. 首先 chrisvest 大牛 認為 當建立執行緒掛掉的時候,我們可以在threadLocal的
    onRemoval方法中將建立執行緒對應的LocalPool裡邊用於存放回收物件的pooledHandles 直接置為 null。這裡的語意是標記LocalPool已經死掉了,不會再繼續使用。

在重構後的版本中引入了 LocalPool 來代替我們前邊介紹的Stack。LocalPool中的pooledHandles大家可以簡單認為類似Stack中陣列棧的功能。

public abstract class Recycler<T> {

    private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
        @Override
        protected LocalPool<T> initialValue() {
            return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
        }

        @Override
        protected void onRemoval(LocalPool<T> value) throws Exception {
            //刪除LocalPool
            super.onRemoval(value);
            MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
            //pooledHandles 置為 null,取消參照
            value.pooledHandles = null;
            //清除LocalPool中儲存的回收物件
            handles.clear();
        }
    };

}
  1. 在多執行緒回收物件的時候,會首先判斷該回收物件對應的LocalPool裡的pooledHandles是否已經被清理變為不可用狀態。如果是的話就停止回收。
private static final class LocalPool<T> {
    //保證可見性
    private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;

     void release(DefaultHandle<T> handle) {
            MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
            handle.toAvailable();
            if (handles != null) {
                handles.relaxedOffer(handle);
            }
        }
}

通過以上兩個措施 就保證了 當建立執行緒被GC掉之後,它對應的 在物件池中的回收快取LocalPool(類比Stack)不會出現記憶體洩露,同時保證了多執行緒不在將回收物件至已經被清理的LocalPool中。

好了,這一塊的Bug修改我們介紹完了,我們繼續多執行緒回收物件主流程的介紹:

11.4 建立執行緒直接回收物件

       private void pushNow(DefaultHandle<?> item) {
            //池化物件被回收前 recycleId = lastRecycleId = 0
            //如果其中之一不為0 說明已經被回收了
            if ((item.recycleId | item.lastRecycledId) != 0) {
                throw new IllegalStateException("recycled already");
            }

            //此處是由建立執行緒回收,則將池化物件的recycleId與lastRecycleId設定為建立執行緒Id-OWN_THREAD_ID
            //注意這裡的OWN_THREAD_ID是一個固定的值,是因為這裡的視角是池化物件的視角,只需要區分建立執行緒和非建立執行緒即可。
            //對於一個池化物件來說建立執行緒只有一個 所以用一個固定的OWN_THREAD_ID來表示建立執行緒Id
            item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

            int size = this.size;
            //如果當前池化物件的容量已經超過最大容量 則丟棄物件
            //為了避免池化物件的急速膨脹,這裡只會回收1/8的物件,剩下的物件都需要丟棄
            if (size >= maxCapacity || dropHandle(item)) {
                // Hit the maximum capacity or should drop - drop the possibly youngest object.
                //丟棄物件
                return;
            }

            //當前執行緒對應的stack容量已滿但是還沒超過最大容量限制,則對stack進行擴容
            if (size == elements.length) {
                //容量擴大兩倍
                elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
            }
            //將物件回收至當前stack中
            elements[size] = item;
            //更新當前stack的棧頂指標
            this.size = size + 1;
        }
  • 首先需要判斷該回收物件是否已經被回收了。item.recycleId | item.lastRecycledId) != 0,這裡任意Id只要不為0,說明該物件已經對回收了,則停止本次回收操作。

  • 當物件被建立執行緒回收時,設定回收Id:item.recycleId = item.lastRecycledId = OWN_THREAD_ID

  • 如果當前Stack已經達到最大容量則將物件丟棄。

  • 為了避免物件池不可控制的迅速膨脹,這裡只會回收 1 / 8 的物件,剩下的物件都需要丟棄 dropHandle 。

  • 如果當前Stack容量已滿但是還沒超過最大容量限制,則對stack進行擴容。一次性擴容兩倍但不能超過最大容量。

  • 最後將物件壓入Stack結構中的陣列棧中,完成物件的回收。

11.5 回收執行緒間接回收物件

在Recycler物件池中,一個執行緒既可以是建立執行緒也可以是回收執行緒。

比如上圖中的 thread2 , thread3 , thread4 ... 這裡的每一個執行緒既可以在物件池中建立物件,並將物件回收至自己對應的Stack結構裡的陣列棧中,此刻它們的角色為建立執行緒。比如圖中的thread1。

同時其他執行緒 比如圖中的 thread2 , thread3 , thread4 ... 也可以為thread1回收由thread1建立的物件,將這些物件回收至thread1對應的Stack結構裡的WeakOrderQueue連結串列中。此刻 thread2 , thread3 , thread4 ... 為回收執行緒。

在之前介紹Recycler物件池的重要屬性時,我們提到過 maxDelayedQueuesPerThread 屬性。

public abstract class Recycler<T> {

      //每個回收執行緒最多可以幫助多少個建立執行緒回收物件 預設:cpu核數 * 2
      private static final int MAX_DELAYED_QUEUES_PER_THREAD;

     //一個回收執行緒可幫助多少個建立執行緒回收物件
      private final int maxDelayedQueuesPerThread;

      private static final class Stack<T> {

            // 當前執行緒可以幫助多少個執行緒回收其池化物件
            private final int maxDelayedQueues;

      }

}

在Recycler物件池中,一個回收執行緒能夠幫助多少個建立執行緒回收物件是有限制的,通過 maxDelayedQueuesPerThread屬性 控制。

那麼在物件池中,一個回收執行緒如何儲存為其他建立執行緒回收到的物件呢

如圖中所示,我們站在回收執行緒的視角來看,在物件池中有一個 FastThreadLocal 型別的 DELAYED_RECYCLED 欄位, DELAYED_RECYCLED 為每個回收執行緒儲存了一個 WeakHashMap,正是這個回收執行緒持有的 WeakHashMap 結構中儲存了該回收執行緒為每個建立執行緒回收的物件。

WeakHashMap 結構中的 key 表示建立執行緒對應的 Stack 結構。意思是該回收執行緒為哪個建立執行緒回收物件。value 表示這個回收執行緒在建立執行緒中對應Stack結構裡的WeakOrderQueue連結串列中對應的節點。大家在結合 《Recycler物件池.png》 這副圖仔細體會下這個結構設計。

public abstract class Recycler<T> {

    //實現跨執行緒回收的核心,這裡儲存的是當前執行緒為其他執行緒回收的物件(由其他執行緒建立的池化物件)
    //key: 池化物件對應的建立執行緒stack  value: 當前執行緒代替該建立執行緒回收的池化物件 存放在weakOrderQueue中
    //這裡的value即是 建立執行緒對應stack中的weakOrderQueue連結串列中的節點(每個節點表示其他執行緒為當前建立執行緒回收的物件)
    private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
            new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
        @Override
        protected Map<Stack<?>, WeakOrderQueue> initialValue() {
            return new WeakHashMap<Stack<?>, WeakOrderQueue>();
        }
    };

}

而這個WeakHashMap 的size即表示當前回收執行緒已經在為多少個建立執行緒回收物件了,size的值不能超過 maxDelayedQueuesPerThread 。

這裡為什麼要用WeakHashMap呢?

其實我們前邊多少也提到過了,考慮到一種極端的情況就是當建立執行緒掛掉並且被GC回收之後,其實這個建立執行緒對應的Stack結構已經沒有用了,儲存在Stack結構中的池化物件永遠不會再被使用到,此時回收執行緒完全就沒有必要在為掛掉的建立執行緒回收物件了。而這個Stack結構如果沒有任何參照鏈存在的話,隨後也會被GC回收。那麼這個Stack結構在WeakHashMap中對應的Entry也會被自動刪除。如果這裡不採用WeakHashMap,那麼回收執行緒為該Stack回收的物件就會一直停留在回收執行緒中。

介紹完這些背景知識,下面我們就來正式介紹下回收執行緒到底是如何幫助建立執行緒回收物件的:

      private void pushLater(DefaultHandle<?> item, Thread thread) {
            //maxDelayQueues == 0 表示不支援物件的跨執行緒回收
            if (maxDelayedQueues == 0) {
                //直接丟棄
                return;
            }
            
            //注意這裡的視角切換,當前執行緒為回收執行緒
            Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
            //獲取當前回收物件屬於的stack 由當前執行緒幫助其回收  注意這裡是跨執行緒回收 當前執行緒並不是建立執行緒
            WeakOrderQueue queue = delayedRecycled.get(this);
            //queue == null 表示當前執行緒是第一次為該stack回收物件
            if (queue == null) {
                //maxDelayedQueues指示一個執行緒最多可以幫助多少個執行緒回收其建立的物件
                //delayedRecycled.size()表示當前執行緒已經幫助多少個執行緒回收物件
                if (delayedRecycled.size() >= maxDelayedQueues) {
                
                    //如果超過指定幫助執行緒個數,則停止為其建立WeakOrderQueue,停止為其回收物件
                    //WeakOrderQueue.DUMMY這裡是一個標識,後邊遇到這個標識  就不會為其回收物件了
                    delayedRecycled.put(this, WeakOrderQueue.DUMMY);
                    return;
                }
 
                // 建立為回收執行緒對應的WeakOrderQueue節點以便儲存當前執行緒為其回收的物件
                if ((queue = newWeakOrderQueue(thread)) == null) {
                    // 建立失敗則丟棄物件
                    return;
                }
                //在當前執行緒的threadLocal中建立 回收物件對應的stack 與 weakOrderQueue的對應關係
                delayedRecycled.put(this, queue);
            } else if (queue == WeakOrderQueue.DUMMY) {
                // drop object
                // 如果queue的值是WeakOrderQueue.DUMMY 表示當前已經超過了允許幫助的執行緒數 直接丟棄物件
                return;
            }

            //當前執行緒為物件的建立執行緒回收物件  放入對應的weakOrderQueue中
            queue.add(item);
        }
  1. 首先需要判斷當前Recycler物件池是否支援跨執行緒回收。 maxDelayedQueues == 0 表示不支援物件的跨執行緒回收。

  2. 如果當前回收執行緒是第一次為該回收物件的建立執行緒進行回收,則需要為當前回收執行緒在物件的建立執行緒對應Stack結構中建立對應的WeakOrderQueue節點。(這裡正是多執行緒無鎖化回收物件的核心所在)。當然建立之前需要判斷是否超過了可幫助建立執行緒的個數 maxDelayedQueues 。

  3. 如果當前回收執行緒幫助的建立執行緒個數已經超過了 maxDelayedQueues 限制,則向對應的 WeakHashMap 塞入一個空的 WeakOrderQueue節點 DUMMY,後續如果遇到 WeakOrderQueue 節點是 DUMMY 範例則丟棄物件,放棄回收。

 private static final class WeakOrderQueue extends WeakReference<Thread> {
        //作為一個標識,遇到DUMMY範例,則直接丟棄回收物件
        static final WeakOrderQueue DUMMY = new WeakOrderQueue();

}
  1. 如果當前回收執行緒幫助的建立執行緒個數還沒有超過 maxDelayedQueues 限制,則通過 stack#newWeakOrderQueue 為當前回收執行緒在回收物件對應Stack結構中建立相應的WeakOrderQueue節點。並在回收執行緒持有的WeakHashMap中建立Stack與回收執行緒對應的WeakOrderQueue節點的關聯關係。

  2. 最終由回收執行緒將物件回收至其建立執行緒對應的Stack結構中。(將回收物件新增至回收執行緒對應的WeakOrderQueue節點中,完成多執行緒無鎖化回收)

11.6 為回收執行緒建立對應的WeakOrderQueue節點

上小節提到,當回收執行緒第一次為建立執行緒回收物件的時候,需要在建立執行緒對應Stack結構中的WeakOrderQueue連結串列中建立與回收執行緒對應的WeakOrderQueue節點。

   private static final class Stack<T> {

         private WeakOrderQueue newWeakOrderQueue(Thread thread) {
              return WeakOrderQueue.newQueue(this, thread);
        }
   }

private static final class WeakOrderQueue extends WeakReference<Thread> {

        static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
    
            // link是weakOrderQueue中儲存回收物件的最小結構,此處是為接下來要建立的Link預訂空間容量
            // 如果stack指定的availableSharedCapacity 小於 LINK_CAPACITY大小,則分配失敗
            if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
                return null;
            }

            //如果還夠容量來分配一個link那麼就建立weakOrderQueue
            final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);

            // 向stack中的weakOrderQueue連結串列中新增當前回收執行緒對應的weakOrderQueue節點(始終在頭結點處新增節點 )
            // 此處向stack中新增weakOrderQueue節點的操作被移到WeakOrderQueue構造器之外的目的是防止WeakOrderQueue.this指標
            // 逃逸避免被其他執行緒在其構造的過程中存取
            stack.setHead(queue);

            return queue;
        }

}

在前邊介紹WeakOrderQueue的結構的時候,我們提到WeakOrderQueue結構內部其實一個由Link節點組成的連結串列。WeakOrderQueue在初始狀態下是隻包含一個Link節點的連結串列。

所有在建立WeakOrderQueue結構的時候需要同時為其建立一個Link節點。而這些Link節點正是真正儲存回收執行緒所回收到的物件的地方。

而對於一個建立執行緒來說它的所有回收執行緒能夠為其回收物件的總量是被availableSharedCapacity 限制的,每建立一個Link節點,它的值就減少一個LINK_CAPACITY ,每釋放一個Link節點,它的值就增加一個LINK_CAPACITY 。這樣就能保證所有回收執行緒的回收總量不會超過 availableSharedCapacity 的限制。

所以在為WeakOrderQueue結構建立首個Link節點時,需要判斷當前所有回收執行緒回收的物件總量是否已經超過了 availableSharedCapacity 。如果容量還夠回收一個Link大小的物件,則開始建立WeakOrderQueue結構。

如果當前回收容量已經超過availableSharedCapacity或者不足回收一個Link大小的物件,則停止建立WeakOrderQueue節點,回收流程終止。不在對該回收物件進行回收。

            //此處目的是為接下來要建立的link預留空間容量
            static boolean reserveSpaceForLink(AtomicInteger availableSharedCapacity) {
                for (;;) {
                    //獲取stack中允許異執行緒回收物件的總容量(異執行緒還能為該stack收集多少物件)
                    int available = availableSharedCapacity.get();
                    //當availbale可供回收容量小於一個Link時,說明異執行緒回收物件已經達到上限,不能在為stack回收物件了
                    if (available < LINK_CAPACITY) {
                        return false;
                    }
                    //為Link預留到一個Link的空間容量,更新availableSharedCapacity
                    if (availableSharedCapacity.compareAndSet(available, available - LINK_CAPACITY)) {
                        return true;
                    }
                }
            }

這裡的預訂容量其實就是將 availableSharedCapacity 的值減去一個 LINK_CAPACITY 大小。其他回收執行緒會看到這個 availableSharedCapacity 容量的變化,方便決定是否繼續為建立執行緒回收物件。

當為WeakOrderQueue結構的首個Link節點預訂容量成功後,就開始建立WeakOrderQueue節點。

        //為了使stack進行GC,這裡不會持有其所屬stack的參照
        private WeakOrderQueue(Stack<?> stack, Thread thread) {
            //weakOrderQueue持有對應跨執行緒的弱參照
            super(thread);
            //建立尾結點
            tail = new Link();

            // 建立頭結點  availableSharedCapacity = maxCapacity / maxSharedCapacityFactor
            // 此時availableSharedCapacity的值已經變化了,減去了一個link的大小
            head = new Head(stack.availableSharedCapacity);
            head.link = tail;
            interval = stack.delayedQueueInterval;
            handleRecycleCount = interval; 
        }

當回收執行緒對應的WeakOrderQueue節點建立成功後,就將其插入到回收物件對應的Stack結構裡的WeakOrderQueue連結串列中的頭結點處。因為這裡可能會涉及多個回收執行緒並行向WeakOrderQueue連結串列頭結點處新增節點,所以更新Stack結構中WeakOrderQueue連結串列頭結點的方法被設計成同步方法。這也是整個Recycler 物件池設計中,唯一的一個同步方法。

       synchronized void setHead(WeakOrderQueue queue) {
            //始終在weakOrderQueue連結串列頭結點插入新的queue(其他執行緒收集的由本執行緒建立的物件)
            queue.setNext(head);
            head = queue;
        }

11.7 向WeakOrderQueue節點中新增回收物件

終於的終於我們到了多執行緒回收物件的最後一步了,本篇文章到這裡也接近尾聲了,大家在堅持一下。

這裡要做的事情就是,將回收物件新增到回收執行緒對應的WeakOrderQueue節點中,Netty會在Link連結串列的尾結點處新增回收物件,如果尾結點容量已滿,就繼續新建立一個Link。將回收物件新增到新的Link節點中。

      void add(DefaultHandle<?> handle) {
            //將handler中的lastRecycledId標記為當前weakOrderQueue中的Id,一個stack和一個回收執行緒對應一個weakOrderQueue節點
            //表示該池化物件 最近的一次是被當前回收執行緒回收的。
            handle.lastRecycledId = id;

            // 控制異執行緒回收頻率 只回收1/8的物件
            // 這裡需要關注的細節是其實在scavengeSome方法中將weakOrderQueue中的待回收物件轉移到建立執行緒的stack中時,Netty也會做回收頻率的限制
            // 這裡在回收執行緒回收的時候也會控制回收頻率(總體控制兩次)netty認為越早的做回收頻率控制越好 這樣可以避免weakOrderQueue中的容量迅速的增長從而失去控制
            if (handleRecycleCount < interval) {
                handleRecycleCount++;
                // Drop the item to prevent recycling to aggressive.
                return;
            }
            handleRecycleCount = 0;

            //從尾部link節點開始新增新的回收物件
            Link tail = this.tail;
            int writeIndex;

            //如果當前尾部link節點容量已滿,就需要建立新的link節點
            if ((writeIndex = tail.get()) == LINK_CAPACITY) {
                //建立新的Link節點
                Link link = head.newLink();
                //如果availableSharedCapacity的容量不夠了,則無法建立Link。丟棄待回收物件
                if (link == null) {
                    // 丟棄物件
                    return;
                }
                // We allocate a Link so reserve the space
                //更新尾結點
                this.tail = tail = tail.next = link;

                writeIndex = tail.get();
            }

            //將回收物件handler放入尾部link節點中
            tail.elements[writeIndex] = handle;
            //這裡將stack置為null,是為了方便stack被回收。
            //如果Stack不再使用,期望被GC回收,發現handle中還持有stack的參照,那麼就無法被GC回收,從而造成記憶體漏失
            //在從物件池中再次取出該物件時,stack還會被重新賦予
            handle.stack = null;
            //注意這裡用lazySet來延遲更新writeIndex。只有當writeIndex更新之後,在建立執行緒中才可以看到該待回收物件
            //保證執行緒最終可見而不保證立即可見的原因就是 其實這裡Netty還是為了效能考慮避免執行記憶體屏障指令的開銷。
            //況且這裡也並不需要考慮執行緒的可見性,當建立執行緒呼叫scavengeSome從weakOrderQueue連結串列中回收物件時,看不到當前節點weakOrderQueue
            //新新增的物件也沒關係,因為是多執行緒一起回收,所以繼續找下一個節點就好。及時全沒看到,大不了就在建立一個物件。主要還是為了提高weakOrderQueue的寫入效能
            tail.lazySet(writeIndex + 1);
        }
  1. 首先第一步就要設定回收物件DefaultHandler中的lastRecycledId ,將其設定為該回收執行緒Id,表示該回收物件最近一次是由當前回收執行緒回收的。此時的DefaultHandler中 recycleId != lastRecycledId ,物件處於半回收狀態。

  2. 控制回收執行緒的回收頻率(只回收 1 / 8 的物件),大家是否還記得我們在《9.5 轉移回收物件》小節中介紹 stack#scavengeSome方法 的時候,在建立執行緒從Stack中的WeakOrderQueue連結串列中轉移物件到陣列棧中的時候,也會被回收頻率進行控制,只轉移 1 / 8 的物件。所以這裡我們可以看到回收頻率的控制在多執行緒回收物件的時候會控制兩次,netty認為越早做回收頻率控制越好這樣可以避免weakOrderQueue中的容量迅速的增長從而失去控制。

  3. 在WeakOrderQueue結構中,當我們向Link連結串列新增回收物件時,都會向Link連結串列的尾結點中新增回收物件,如果當前尾結點容量已經滿了 writeIndex = tail.get()) == LINK_CAPACITY ,我們就需要新建立一個Link節點,並將tail指標指向新的Link節點更新尾結點。最後將回收物件回收至新的尾結點中。當然我們要考慮到 availableSharedCapacity 容量的限制,如果容量不夠了,就不能在新建Link節點,直接將回收物件丟棄,停止回收。

    private static final class Head {

             Link newLink() {
                  //此處的availableSharedCapacity可能已經被多個回收執行緒改變,因為availableSharedCapacity是用來控制回收執行緒回收的總容量限制
                  //每個回收執行緒再回收物件時都需要更新availableSharedCapacity
                  return reserveSpaceForLink(availableSharedCapacity) ? new Link() : null;
             }

            //此處目的是為接下來要建立的link預留空間容量
            static boolean reserveSpaceForLink(AtomicInteger availableSharedCapacity) {
                for (;;) {
                    //獲取stack中允許異執行緒回收物件的總容量(異執行緒還能為該stack收集多少物件)
                    int available = availableSharedCapacity.get();
                    //當availbale可供回收容量小於一個Link時,說明異執行緒回收物件已經達到上限,不能在為stack回收物件了
                    if (available < LINK_CAPACITY) {
                        return false;
                    }
                    //為Link預留到一個Link的空間容量,更新availableSharedCapacity
                    if (availableSharedCapacity.compareAndSet(available, available - LINK_CAPACITY)) {
                        return true;
                    }
                }
            }
    }

到這裡Recycler物件池的整個多執行緒無鎖化回收物件的流程筆者就為大家介紹完了。

但是這裡還有兩個點,筆者想要和大家再強調一下:

第一:為什麼這裡會將handle.stack設定為null?

不知大家還記不記得我們在介紹 stack#scavengeSome方法 的時候專門提到,在建立執行緒遍歷WeakOrderQueue連結串列將連結串列中的待回收物件轉移至stack中的陣列棧時,會將待回收物件的DefaultHandler持有的stack重新設定為其建立執行緒對應的stack。

boolean transfer(Stack<?> dst) {

      .................省略..............

      //重新為defaultHandler設定其所屬stack(初始建立該handler的執行緒對應的stack)
      //該defaultHandler在被回收物件回收的時候,會將其stack置為null,防止極端情況下,建立執行緒掛掉,對應stack無法被GC
      element.stack = dst;

      .................省略..............
}

而這裡在回收執行緒向WeakOrderQueue節點新增回收物件時先將 handle.stack設定為 null,而在轉移回收物件時又將 handle.stack 設定回來,這不是多此一舉嗎?

其實並不是多此一舉,這樣設計是非常有必要的,我們假設一種極端的情況,當建立執行緒掛掉並被GC回收之後,其實stack中儲存的回收物件已經不可能在被使用到了,stack應該也被回收掉。但是如果這裡回收執行緒在回收的時候不將物件持有的stack設定為null的話,直接新增到了WeakOrderQueue節點中,當建立被GC掉的時候,由於這條參照鏈的存在導致對應stack永遠不會被GC掉,造成記憶體洩露。

所以筆者在本文中多次強調,當我們在設計比較複雜的程式結構時,對於物件之間的參照關係,一定要時刻保持清晰的認識,防止記憶體洩露。

第二:為什麼最後使用lazySet來更新尾結點的writeIndex

當我們向Link連結串列的尾結點新增完回收物件之後,在更新尾結點的writeIndex時,使用到了延時更新,而延時更新並不會保證多執行緒的可見性,如果此時建立執行緒正在轉移物件,那麼將不會看到新新增進來的回收物件了。

而事實上,我們這裡並不需要保證執行緒之間的實時可見性,只需要保證最終可見性即可。

確實在當建立執行緒轉移物件的時候可能並不會看到剛剛被回收執行緒新新增進來的回收物件,看不到沒關係,建立執行緒大不了在本次轉移中不回收它不就完了麼。因為只要建立執行緒Stack結構中的陣列棧為空,建立執行緒就會從WeakOrderQueue連結串列中轉移物件,以後會有很多次機會來WeakOrderQueu連結串列中轉移物件,什麼時候看見了,什麼時候轉移它。並不需要實時性。退一萬步講,即使全部看不到,大不了建立執行緒直接建立一個物件返回就行了。

而如果這裡要保證執行緒之間的實時可見性,在更新尾結點的writeIndex的時候就不得不插入 LOCK 字首記憶體屏障指令保證多執行緒之間的實時可見性,而執行記憶體屏障指令是需要開銷的,所以為了保證WeakOrderQueue的寫入效能,Netty這裡選擇了只保證最終可見性而不保證實時可見性。


總結

到這裡關於Recycler物件池的整個設計與原始碼實現,筆者就為大家詳細的剖析完畢了,在剖析的過程中,我們提煉出了很多多執行緒並行程式的設計要點和注意事項。大家可以在日常開發工作中多多體會並實踐。

雖然本文介紹的Recycler物件池整體設計將會在4.1.71.Final版本被重構,但是在當前版本Recycler物件池的設計和實現中,我們還是可以學習到很多東西的。

筆者真心十分佩服能夠耐心看到這裡的大家,不知不覺已經嘮叨了三萬多字了,謝謝大家的觀看~~,大家記得晚餐時給自己加餐個雞腿獎勵一下自己,哈哈!!

閱讀原文

歡迎關注公眾號:bin的技術小屋