關於並行程式設計與執行緒安全的思考與實踐

2023-05-09 18:02:21

作者:京東健康 張娜

一、並行程式設計的意義與挑戰

並行程式設計的意義是充分的利用處理器的每一個核,以達到最高的處理效能,可以讓程式執行的更快。而處理器也為了提高計算速率,作出了一系列優化,比如:

1、硬體升級:為平衡CPU 內高速記憶體和記憶體之間數量級的速率差,提升整體效能,引入了多級快取記憶體的傳統硬體記憶體架構來解決,帶來的問題是,資料同時存在於快取記憶體和主記憶體中,需要解決快取一致性問題。

2、處理器優化:主要包含,編譯器重排序、指令級重排序、記憶體系統重排序。通過單執行緒語意、指令級並行重疊執行、快取區載入儲存3種級別的重排序,減少執行指令,從而提高整體執行速度。帶來的問題是,多執行緒環境裡,編譯器和CPU指令無法識別多個執行緒之間存在的資料依賴性,影響程式執行結果。

並行程式設計的好處是巨大的,然而要編寫一個執行緒安全並且執行高效的程式碼,需要管理可變共用狀態的操作存取,考慮記憶體一致性、處理器優化、指令重排序問題。比如我們使用多執行緒對同一個物件的值進行操作時會出現值被更改、值不同步的情況,得到的結果和理論值可能會天差地別,此時該物件就不是執行緒安全的。而當多個執行緒存取某個資料時,不管執行時環境採用何種排程方式或者這些執行緒如何交替執行,這個計算邏輯始終都表現出正確的行為,那麼稱這個物件是執行緒安全的。因此如何在並行程式設計中保證執行緒安全是一個容易忽略的問題,也是一個不小的挑戰。

所以,為什麼會有執行緒安全的問題,首先要明白兩個關鍵問題:

1、執行緒之間是如何通訊的,即執行緒之間以何種機制來交換資訊。

2、執行緒之間是如何同步的,即程式如何控制不同執行緒間的發生順序。

二、Java並行程式設計

Java並行採用了共用記憶體模型,Java執行緒之間的通訊總是隱式進行的,整個通訊過程對程式設計師完全透明。

2.1 Java記憶體模型

為了平衡程式設計師對記憶體可見性儘可能高(對編譯器和處理的約束就多)和提高計算效能(儘可能少約束編譯器處理器)之間的關係,JAVA定義了Java記憶體模型(Java Memory Model,JMM),約定只要不改變程式執行結果,編譯器和處理器怎麼優化都行。所以,JMM主要解決的問題是,通過制定執行緒間通訊規範,提供記憶體可見性保證。

JMM結構如下圖所示:

以此看來,執行緒內建立的區域性變數、方法定義引數等只線上程內使用不會有並行問題,對於共用變數,JMM規定了一個執行緒如何和何時可以看到由其他執行緒修改過後的共用變數的值,以及在必須時如何同步的存取共用變數。

為控制工作記憶體和主記憶體的互動,定義了以下規範:

•所有的變數都儲存在主記憶體(Main Memory)中。

•每個執行緒都有一個私有的本地記憶體(Local Memory),本地記憶體中儲存了該執行緒以讀/寫共用變數的拷貝副本。

•執行緒對變數的所有操作都必須在本地記憶體中進行,而不能直接讀寫主記憶體。

•不同的執行緒之間無法直接存取對方本地記憶體中的變數。

具體實現上定義了八種操作:

1.lock:作用於主記憶體,把變數標識為執行緒獨佔狀態。

2.unlock:作用於主記憶體,解除獨佔狀態。

3.read:作用主記憶體,把一個變數的值從主記憶體傳輸到執行緒的工作記憶體。

4.load:作用於工作記憶體,把read操作傳過來的變數值放入工作記憶體的變數副本中。

5.use:作用工作記憶體,把工作記憶體當中的一個變數值傳給執行引擎。

6.assign:作用工作記憶體,把一個從執行引擎接收到的值賦值給工作記憶體的變數。

7.store:作用於工作記憶體的變數,把工作記憶體的一個變數的值傳送到主記憶體中。

8.write:作用於主記憶體的變數,把store操作傳來的變數的值放入主記憶體的變數中。

這些操作都滿足以下原則:

•不允許read和load、store和write操作之一單獨出現。

•對一個變數執行unlock操作之前,必須先把此變數同步到主記憶體中(執行store和write操作)。

2.2 Java中的並行關鍵字

Java基於以上規則提供了volatile、synchronized等關鍵字來保證執行緒安全,基本原理是從限制處理器優化和使用記憶體屏障兩方面解決並行問題。如果是變數級別,使用volatile宣告任何型別變數,同基本資料型別變數、參照型別變數一樣具備原子性;如果應用場景需要一個更大範圍的原子性保證,需要使用同步塊技術。Java記憶體模型提供了lock和unlock操作來滿足這種需求。虛擬機器器提供了位元組碼指令monitorenter和monitorexist來隱式地使用這兩個操作,這兩個位元組碼指令反映到Java程式碼中就是同步塊-synchronized關鍵字。

這兩個字的作用:volatile僅保證對單個volatile變數的讀/寫具有原子性,而鎖的互斥執行的特性可以確保整個臨界區程式碼的執行具有原子性。在功能上,鎖比volatile更強大,在可伸縮性和執行效能上,volatile更有優勢。

2.3 Java中的並行容器與工具類

2.3.1 CopyOnWriteArrayList

CopyOnWriteArrayList在操作元素時會加可重入鎖,一次來保證寫操作是執行緒安全的,但是每次新增刪除元素就需要複製一份新陣列,對空間有較大的浪費。

    public E get(int index) {
        return get(getArray(), index);
    }

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

2.3.2 Collections.synchronizedList(new ArrayList<>());

這種方式是在 List的操作外包加了一層synchronize同步控制。需要注意的是在遍歷List是還得再手動做整體的同步控制。

    public void add(int index, E element) {
        // SynchronizedList 就是在 List的操作外包加了一層synchronize同步控制
        synchronized (mutex) {list.add(index, element);}
    }
    public E remove(int index) {
        synchronized (mutex) {return list.remove(index);}
    }

2.3.3 ConcurrentLinkedQueue

通過迴圈CAS操作非阻塞的給佇列新增節點,

    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p是尾節點,CAS 將p的next指向newNode.
                if (p.casNext(null, newNode)) {
                    if (p != t) 
                        //tail指向真正尾節點
                        casTail(t, newNode);
                    return true;
                }
            }
            else if (p == q)
                // 說明p節點和p的next節點都等於空,表示這個佇列剛初始化,正準備新增節點,所以返回head節點
                p = (t != (t = tail)) ? t : head;
            else
                // 向後查詢尾節點
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

三、線上案例

3.1 問題發現

在網際網路醫院醫生端,醫生開啟問診IM聊天頁,需要載入幾十個功能按鈕。在2022年12月抗疫期間,QPS全天都很高,高峰時是平日的12倍,偶現報警提示按鈕顯示不全,問題出現概率大概在百萬分之一。

3.2 排查問題的詳細過程

醫生問診IM頁面的載入屬於業務黃金流程,上面的每一個按鈕就是一個業務線的入口,所以處在核心邏輯的上的報警均使用自定義報警,該類報警不設定收斂,無論何種異常包括按鈕個數異常就會立即報警。

1. 根據報警資訊,開始排查,卻發現以下問題:

(1)沒有異常紀錄檔:順著異常紀錄檔的logId排查,過程中竟然沒有異常紀錄檔,按鈕莫名其妙的變少了。

(2)不能復現:在預發環境,使用相同入參,介面正常返回,無法復現。

2. 程式碼分析,縮小異常範圍:

醫生問診IM按鈕處理分組進行:

    // 多個執行緒結果集合
    List<DoctorDiagImButtonInfoDTO> multiButtonList = new ArrayList<>();
    // 多執行緒並行處理
    Future<List<DoctorDiagImButtonInfoDTO>> multiButtonFuture = joyThreadPoolTaskExecutor.submit(() -> {
        List<DoctorDiagImButtonInfoDTO> multiButtonListTemp = new ArrayList<>();
        buttonTypes.forEach(buttonType -> {
            multiButtonListTemp.add(appButtonInfoMap.get(buttonType));
        });
        multiButtonList.addAll(multiButtonListTemp);
        return multiButtonListTemp;
    });

3. 增加紀錄檔線上觀察

由於並行場景容易引發子執行緒失敗的情況,對各子執行緒分支增加必要節點紀錄檔上線後觀察:

(1)發生異常的請求處理過程中,所有子執行緒正常處理完成

(2)按鈕缺少個數隨機等於子執行緒中處理的按鈕個數

(3)初步判斷是ArrayList並行addAll操作異常

4. 模擬復現

使用ArrayList原始碼模擬復現問題:

(1)ArrayList原始碼分析:


     public boolean addAll(Collection<? extends E> c) {
         Object[] a = c.toArray();
         int numNew = a.length;
         ensureCapacityInternal(size + numNew); // Increments modCount
 
         //以當前size為起點,向陣列中追加本次新增物件
         System.arraycopy(a, 0, elementData, size, numNew);
 
         //更新全域性變數size的值,和上一步是非原子操作,引發並行問題的根源
         size += numNew;
         return numNew != 0;
     }
 
     private void ensureCapacityInternal(int minCapacity) {
         if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
             minCapacity = Math.max(DEFAULT_CAPACITY, minCapacity);
         }
 
         ensureExplicitCapacity(minCapacity);
     }
 
     private void ensureExplicitCapacity(int minCapacity) {
         modCount++;
 
         // overflow-conscious code
         if (minCapacity - elementData.length > 0)
             grow(minCapacity);
     }
 
     private void grow(int minCapacity) {
         // overflow-conscious code
         int oldCapacity = elementData.length;
         int newCapacity = oldCapacity + (oldCapacity >> 1);
         if (newCapacity - minCapacity < 0)
             newCapacity = minCapacity;
         if (newCapacity - MAX_ARRAY_SIZE > 0)
             newCapacity = hugeCapacity(minCapacity);
         // minCapacity is usually close to size, so this is a win:
         elementData = Arrays.copyOf(elementData, newCapacity);
     }
 

(2) 理論分析

在ArrayList的add操作中,變更size和增加資料操作,不是原子操作。

(3)問題復現

複製原始碼建立自定義類,為方便復現並行問題,增加停頓

     public boolean addAll(Collection<? extends E> c) {
         Object[] a = c.toArray();
         int numNew = a.length;
         //第1次停頓,獲取當前size
         try {
             Thread.sleep(1000*timeout1);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         ensureCapacityInternal(size + numNew); // Increments modCount
 
         //第2次停頓,等待copy
         try {
             Thread.sleep(1000*timeout2);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.arraycopy(a, 0, elementData, size, numNew);
 
         //第3次停頓,等待size+=
         try {
             Thread.sleep(1000*timeout3);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         size += numNew;
         return numNew != 0;
     }

3.3 解決問題

使用執行緒安全工具 Collections.synchronizedList 建立 ArrayList :

    List<DoctorDiagImButtonInfoDTO> multiButtonList = Collections.synchronizedList(new ArrayList<>()); 

上線觀察後正常。

3.4 總結反思

使用多執行緒處理問題已經變得很普遍,但是對於多執行緒共同操作的物件必須使用執行緒安全的類。

另外,還要搞清楚幾個靈魂問題:

(1)JMM的靈魂:Happens-before 原則

(2)並行工具類的靈魂:volatile變數的讀/寫 和 CAS