深入理解 Netty FastThreadLocal

2023-10-19 12:00:20

作者:vivo 網際網路伺服器團隊- Jiang Zhu

本文以線上詭異問題為切入點,通過對比JDK ThreadLocal和Netty FastThreadLocal實現邏輯以及優缺點,並深入解讀原始碼,由淺入深理解Netty FastThreadLocal。

一、前言

最近在學習Netty相關的知識,在看到Netty FastThreadLocal章節中,回想起一起線上詭異問題。

  • 問題描述:外銷業務獲取使用者資訊判斷是否支援https場景下,獲取的使用者資訊有時候竟然是錯亂的。

  • 問題分析:使用ThreadLocal儲存使用者資訊時,未能及時進行remove()操作,而Tomcat工作執行緒是基於執行緒池的,會出現執行緒重用情況,所以獲取的使用者資訊可能是之前執行緒遺留下來的。

  • 問題修復:ThreadLocal使用完之後及時remove()、ThreadLocal使用之前也進行remove()雙重保險操作。

接下來,我們繼續深入瞭解下JDK ThreadLocal和Netty FastThreadLocal吧。

二、JDK ThreadLocal介紹

ThreadLocal是JDK提供的一個方便物件在本執行緒內不同方法中傳遞、獲取的類。用它定義的變數,僅在本執行緒中可見,不受其他執行緒的影響,與其他執行緒相互隔離

那具體是如何實現的呢?如圖1所示,每個執行緒都會有個ThreadLocalMap範例變數,其採用懶載入的方式進行建立,當執行緒第一次存取此變數時才會去建立。

ThreadLocalMap使用線性探測法儲存ThreadLocal物件及其維護的資料,具體操作邏輯如下:

  • 假設有一個新的ThreadLocal物件,通過hash計算它應儲存的位置下標為x。

  • 此時發現下標x對應位置已經儲存了其他的ThreadLocal物件,則它會往後尋找,步長為1,下標變更為x+1。

  • 接下來發現下標x+1對應位置也已經儲存了其他的ThreadLocal物件,同理則它會繼續往後尋找,下標變更為x+2。

  • 直到尋找到下標為x+3時發現是空閒的,然後將該ThreadLocal物件及其維護的資料構建一個entry物件儲存在x+3位置。

在ThreadLocalMap中資料很多的情況下,很容易出現hash衝突,解決衝突需要不斷的向下遍歷,該操作的時間複雜度為O(n),效率較低

圖片

圖1

從下面的程式碼中可以看出:

Entry 的 key 是弱參照,value 是強參照。在 JVM 垃圾回收時,只要發現弱參照的物件,不管記憶體是否充足,都會被回收。

但是當 ThreadLocal 不再使用被 GC 回收後,ThreadLocalMap 中可能出現 Entry 的 key 為 NULL,那麼 Entry 的 value 一直會強參照資料而得不到釋放,只能等待執行緒銷燬,從而造成記憶體漏失

static class ThreadLocalMap {
    // 弱參照,在資源緊張的時候可以回收部分不再參照的ThreadLocal變數
    static class Entry extends WeakReference<ThreadLocal<?>> {
        // 當前ThreadLocal物件所維護的資料
        Object value;
 
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    // 省略其他程式碼
}

綜上所述,既然JDK提供的ThreadLocal可能存在效率較低和記憶體漏失的問題,為啥不做相應的優化和改造呢?

  1. 從ThreadLocal類註釋看,它是JDK1.2版本引入的,早期可能不太關注程式的效能。

  2. 大部分多執行緒場景下,執行緒中的ThreadLocal變數較少,因此出現hash衝突的概率相對較小,及時偶爾出現了hash衝突,對程式的效能影響也相對較小。

  3. 對於記憶體漏失問題,ThreadLocal本身已經做了一定的保護措施。作為使用者,線上程中某個ThreadLocal物件不再使用或出現異常時,立即呼叫 remove() 方法刪除 Entry 物件,養成良好的編碼習慣。

三、Netty FastThreadLocal介紹

FastThreadLocal是Netty中對JDK提供的ThreadLocal優化改造版本,從名稱上來看,它應該比ThreadLocal更快了,以應對Netty處理並行量大、資料吞吐量大的場景。

那具體是如何實現的呢?如圖2所示,每個執行緒都會有個InternalThreadLocalMap範例變數。

每個FastThreadLocal範例建立時,都會採用AtomicInteger保證順序遞增生成一個不重複的下標index,它是該FastThreadLocal物件維護的資料應該儲存的位置。

讀寫資料的時候通過FastThreadLocal的下標 index 直接定位到該FastThreadLocal的位置,時間複雜度為 O(1),效率較高。

如果該下標index遞增到特別大,InternalThreadLocalMap維護的陣列也會特別大,所以FastThreadLocal是通過空間換時間來提升讀寫效能的。

圖片

圖2

四、Netty FastThreadLocal原始碼分析

4.1 構造方法

public class FastThreadLocal<V> {
    // FastThreadLocal中的index是記錄了該它維護的資料應該儲存的位置
    // InternalThreadLocalMap陣列中的下標, 它是在建構函式中確定的
    private final int index;
 
    public InternalThreadLocal() {
        index = InternalThreadLocalMap.nextVariableIndex();
    }
    // 省略其他程式碼
}

 

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    // 自增索引, ⽤於計算下次儲存到Object陣列中的位置
    private static final AtomicInteger nextIndex = new AtomicInteger();
 
    private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;
 
    public static int nextVariableIndex() {
        int index = nextIndex.getAndIncrement();
        if (index >= ARRAY_LIST_CAPACITY_MAX_SIZE || index < 0) {
            nextIndex.set(ARRAY_LIST_CAPACITY_MAX_SIZE);
            throw new IllegalStateException("too many thread-local indexed variables");
        }
        return index;
    }
    // 省略其他程式碼
}

上面這兩段程式碼在Netty FastThreadLocal介紹中已經講解過,這邊就不再重複介紹了。

4.2 get 方法

public class FastThreadLocal<V> {
    // FastThreadLocal中的index是記錄了該它維護的資料應該儲存的位置
    private final int index;
 
    public final V get() {
        // 獲取當前執行緒的InternalThreadLocalMap
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        // 根據當前執行緒的index從InternalThreadLocalMap中獲取其繫結的資料
        Object v = threadLocalMap.indexedVariable(index);
        // 如果獲取當前執行緒繫結的資料不為預設值UNSET,則直接返回;否則進行初始化
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }
 
        return initialize(threadLocalMap);
    }
    // 省略其他程式碼
}

 

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;
 
    // 未賦值的Object變數(預設值),當⼀個與執行緒繫結的值被刪除之後,會被設定為UNSET
    public static final Object UNSET = new Object();
 
    // 儲存繫結到當前執行緒的資料的陣列
    private Object[] indexedVariables;
 
    // slowThreadLocalMap為JDK ThreadLocal儲存InternalThreadLocalMap
    private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
            new ThreadLocal<InternalThreadLocalMap>();
 
    // 從繫結到當前執行緒的資料的陣列中取出index位置的元素
    public Object indexedVariable(int index) {
        Object[] lookup = indexedVariables;
        return index < lookup.length? lookup[index] : UNSET;
    }
 
    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        // 判斷當前執行緒是否是FastThreadLocalThread型別
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }
 
    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        // 直接獲取當前執行緒的InternalThreadLocalMap
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        // 如果當前執行緒的InternalThreadLocalMap還未建立,則建立並賦值
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }
 
    private static InternalThreadLocalMap slowGet() {
        // 使用JDK ThreadLocal獲取InternalThreadLocalMap
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }
 
    private InternalThreadLocalMap() {
        indexedVariables = newIndexedVariableTable();
    }
 
    // 初始化一個32位元長度的Object陣列,並將其元素全部設定為預設值UNSET
    private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
        Arrays.fill(array, UNSET);
        return array;
    }
    // 省略其他程式碼
}

原始碼中 get() 方法主要分為下面3個步驟處理:

  1. 通過InternalThreadLocalMap.get()方法獲取當前執行緒的InternalThreadLocalMap。

  2. 根據當前執行緒的index 從InternalThreadLocalMap中獲取其繫結的資料。

  3. 如果不是預設值UNSET,直接返回;如果是預設值,則執行initialize方法進行初始化。

下面我們繼續分析一下InternalThreadLocalMap.get()方法的實現邏輯。

  1. 首先判斷當前執行緒是否是FastThreadLocalThread型別,如果是FastThreadLocalThread型別則直接使用fastGet方法獲取InternalThreadLocalMap,如果不是FastThreadLocalThread型別則使用slowGet方法獲取InternalThreadLocalMap兜底處理。

  2. 兜底處理中的slowGet方法會退化成JDK原生的ThreadLocal獲取InternalThreadLocalMap。

  3. 獲取InternalThreadLocalMap時,如果為null,則會直接建立一個InternalThreadLocalMap返回。其建立過過程中初始化一個32位元長度的Object陣列,並將其元素全部設定為預設值UNSET。

4.3 set 方法

public class FastThreadLocal<V> {
    // FastThreadLocal初始化時variablesToRemoveIndex被賦值為0
    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
 
    public final void set(V value) {
        // 判斷value值是否是未賦值的Object變數(預設值)
        if (value != InternalThreadLocalMap.UNSET) {
            // 獲取當前執行緒對應的InternalThreadLocalMap
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            // 將InternalThreadLocalMap中資料替換為新的value
            // 並將FastThreadLocal物件儲存到待清理的Set中
            setKnownNotUnset(threadLocalMap, value);
        } else {
            remove();
        }
    }
 
    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        // 將InternalThreadLocalMap中資料替換為新的value
        if (threadLocalMap.setIndexedVariable(index, value)) {
            // 並將當前的FastThreadLocal物件儲存到待清理的Set中
            addToVariablesToRemove(threadLocalMap, this);
        }
    }
 
    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        // 取下標index為0的資料,用於儲存待清理的FastThreadLocal物件Set集合中
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        Set<FastThreadLocal<?>> variablesToRemove;
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            // 下標index為0的資料為空,則建立FastThreadLocal物件Set集合
            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
            // 將InternalThreadLocalMap中下標為0的資料,設定成FastThreadLocal物件Set集合
            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
        } else {
            variablesToRemove = (Set<FastThreadLocal<?>>) v;
        }
        // 將FastThreadLocal物件儲存到待清理的Set中
        variablesToRemove.add(variable);
    }
    // 省略其他程式碼
}

 

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    // 未賦值的Object變數(預設值),當⼀個與執行緒繫結的值被刪除之後,會被設定為UNSET
    public static final Object UNSET = new Object();
    // 儲存繫結到當前執行緒的資料的陣列
    private Object[] indexedVariables;
    // 繫結到當前執行緒的資料的陣列能再次採用x2擴容的最大量
    private static final int ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD = 1 << 30;
    private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;
 
    // 將InternalThreadLocalMap中資料替換為新的value
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            // 直接將陣列 index 位置設定為 value,時間複雜度為 O(1)
            lookup[index] = value;
            return oldValue == UNSET;
        } else { // 繫結到當前執行緒的資料的陣列需要擴容,則擴容陣列並陣列設定新value
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }
 
    private void expandIndexedVariableTableAndSet(int index, Object value) {
        Object[] oldArray = indexedVariables;
        final int oldCapacity = oldArray.length;
        int newCapacity;
        // 判斷可進行x2方式進行擴容
        if (index < ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD) {
            newCapacity = index;
            // 位元運算,提升擴容效率
            newCapacity |= newCapacity >>>  1;
            newCapacity |= newCapacity >>>  2;
            newCapacity |= newCapacity >>>  4;
            newCapacity |= newCapacity >>>  8;
            newCapacity |= newCapacity >>> 16;
            newCapacity ++;
        } else { // 不支援x2方式擴容,則設定繫結到當前執行緒的資料的陣列容量為最大值
            newCapacity = ARRAY_LIST_CAPACITY_MAX_SIZE;
        }
        // 按擴容後的大小建立新陣列,並將老陣列資料copy到新陣列
        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
        // 新陣列擴容後的部分賦UNSET預設值
        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
        // 新陣列的index位置替換成新的value
        newArray[index] = value;
        // 繫結到當前執行緒的資料的陣列用新陣列替換
        indexedVariables = newArray;
    }
    // 省略其他程式碼
}

原始碼中 set() 方法主要分為下面3個步驟處理:

  1. 判斷value是否是預設值UNSET,如果value不等於預設值,則會通過InternalThreadLocalMap.get()方法獲取當前執行緒的InternalThreadLocalMap,具體實現3.2小節中get()方法已做講解。

  2. 通過FastThreadLocal中的setKnownNotUnset()方法將InternalThreadLocalMap中資料替換為新的value,並將當前的FastThreadLocal物件儲存到待清理的Set中。

  3. 如果等於預設值UNSET或null(else的邏輯),會呼叫remove()方法,remove()具體見後面的程式碼分析。

接下來我們看下InternalThreadLocalMap.setIndexedVariable方法的實現邏輯。

  1. 判斷index是否超出儲存繫結到當前執行緒的資料的陣列indexedVariables的長度,如果沒有超出,則獲取index位置的資料,並將該陣列index位置資料設定新value。

  2. 如果超出了,繫結到當前執行緒的資料的陣列需要擴容,則擴容該陣列並將它index位置的資料設定新value。

  3. 擴容陣列以index 為基準進行擴容,將陣列擴容後的容量向上取整為 2 的次冪。然後將原陣列內容拷貝到新的陣列中,空餘部分填充預設值UNSET,最終把新陣列賦值給 indexedVariables。

下面我們再繼續看下FastThreadLocal.addToVariablesToRemove方法的實現邏輯。

  1. 取下標index為0的資料(用於儲存待清理的FastThreadLocal物件Set集合中),如果該資料是預設值UNSET或null,則會建立FastThreadLocal物件Set集合,並將該Set集合填充到下標index為0的陣列位置。

  2. 如果該資料不是預設值UNSET,說明Set集合已金被填充,直接強轉獲取該Set集合。

  3. 最後將FastThreadLocal物件儲存到待清理的Set集合中。

4.4 remove、removeAll方法

public class FastThreadLocal<V> {
    // FastThreadLocal初始化時variablesToRemoveIndex被賦值為0
    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
 
    public final void remove() {
        // 獲取當前執行緒的InternalThreadLocalMap
        // 刪除當前的FastThreadLocal物件及其維護的資料
        remove(InternalThreadLocalMap.getIfSet());
    }
 
    public final void remove(InternalThreadLocalMap threadLocalMap) {
        if (threadLocalMap == null) {
            return;
        }
 
        // 根據當前執行緒的index,並將該陣列下標index位置對應的值設定為預設值UNSET
        Object v = threadLocalMap.removeIndexedVariable(index);
        // 儲存待清理的FastThreadLocal物件Set集合中刪除當前FastThreadLocal物件
        removeFromVariablesToRemove(threadLocalMap, this);
 
        if (v != InternalThreadLocalMap.UNSET) {
            try {
                // 空方法,使用者可以繼承實現
                onRemoval((V) v);
            } catch (Exception e) {
                PlatformDependent.throwException(e);
            }
        }
    }
 
    public static void removeAll() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
        if (threadLocalMap == null) {
            return;
        }
 
        try {
            // 取下標index為0的資料,用於儲存待清理的FastThreadLocal物件Set集合中
            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
            if (v != null && v != InternalThreadLocalMap.UNSET) {
                @SuppressWarnings("unchecked")
                Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
                // 遍歷所有的FastThreadLocal物件並刪除它們以及它們維護的資料
                FastThreadLocal<?>[] variablesToRemoveArray =
                        variablesToRemove.toArray(new FastThreadLocal[0]);
                for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                    tlv.remove(threadLocalMap);
                }
            }
        } finally {
            // 刪除InternalThreadLocalMap中threadLocalMap和slowThreadLocalMap資料
            InternalThreadLocalMap.remove();
        }
    }
 
    private static void removeFromVariablesToRemove(
            InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        // 取下標index為0的資料,用於儲存待清理的FastThreadLocal物件Set集合中
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
 
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            return;
        }
 
        @SuppressWarnings("unchecked")
        // 儲存待清理的FastThreadLocal物件Set集合中刪除該FastThreadLocal物件
        Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
        variablesToRemove.remove(variable);
    }
 
    // 省略其他程式碼
}

 

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
 
    // 根據當前執行緒獲取InternalThreadLocalMap
       public static InternalThreadLocalMap getIfSet() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return ((FastThreadLocalThread) thread).threadLocalMap();
        }
        return slowThreadLocalMap.get();
    }
 
    // 陣列下標index位置對應的值設定為預設值UNSET
    public Object removeIndexedVariable(int index) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object v = lookup[index];
            lookup[index] = UNSET;
            return v;
        } else {
            return UNSET;
        }
    }
 
    // 刪除threadLocalMap和slowThreadLocalMap資料
    public static void remove() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            ((FastThreadLocalThread) thread).setThreadLocalMap(null);
        } else {
            slowThreadLocalMap.remove();
        }
    }
    // 省略其他程式碼
}

原始碼中 remove() 方法主要分為下面2個步驟處理:

  1. 通過InternalThreadLocalMap.getIfSet()獲取當前執行緒的InternalThreadLocalMap。具體和3.2小節get()方法裡面獲取當前執行緒的InternalThreadLocalMap相似,這裡就不再重複介紹了。

  2. 刪除當前的FastThreadLocal物件及其維護的資料。

原始碼中 removeAll() 方法主要分為下面3個步驟處理:

  1. 通過InternalThreadLocalMap.getIfSet()獲取當前執行緒的InternalThreadLocalMap。

  2. 取下標index為0的資料(用於儲存待清理的FastThreadLocal物件Set集合),然後遍歷所有的FastThreadLocal物件並刪除它們以及它們維護的資料。

  3. 最後會將InternalThreadLocalMap本身從執行緒中移除。

五、總結

那麼使用ThreadLocal時最佳實踐又如何呢?

每次使用完ThreadLocal範例,線上程執行結束之前的finally程式碼塊中主動呼叫它的remove()方法,清除Entry中的資料,避免操作不當導致的記憶體漏失。

使⽤Netty的FastThreadLocal一定比JDK原生的ThreadLocal更快嗎?

不⼀定。當執行緒是FastThreadLocalThread,則新增、獲取FastThreadLocal所維護資料的時間複雜度是 O(1),⽽使⽤ThreadLocal可能存在雜湊衝突,相對來說使⽤FastThreadLocal更⾼效。但如果是普通執行緒則可能更慢。

使⽤FastThreadLocal有哪些優點?

正如文章開頭介紹JDK原生ThreadLocal存在的缺點,FastThreadLocal全部優化了,它更⾼效、而且如果使⽤的是FastThreadLocal,它會在任務執⾏完成後主動調⽤removeAll⽅法清除資料,避免潛在的記憶體洩露。