本文分享自華為雲社群《釋放無鎖佇列的力量:探索用迴圈陣列實現無鎖佇列》,作者: Lion Long 。
在電腦科學領域,佇列是一種常見的資料結構,用於在多執行緒或多程序環境中進行有效的訊息傳遞和任務排程。然而,傳統的佇列實現通常使用鎖來保護共用資源,這可能導致效能瓶頸和可伸縮性問題。
為了克服這些限制,無鎖佇列應運而生。無鎖佇列通過採用特殊的演演算法和資料結構,使多個執行緒可以並行地存取佇列,而無需使用鎖來保護共用資源。其中,基於迴圈陣列的無鎖佇列是一種經典的實現方式。
本文將深入探討基於迴圈陣列的無鎖佇列的原理和優勢。我們將介紹迴圈陣列的基本概念,並解釋如何通過適當的演演算法和技術實現無鎖性。通過對比傳統的鎖保護佇列和無鎖佇列,我們將揭示無鎖佇列的效能提升和可伸縮性優勢。
此外,我們還將探討基於迴圈陣列的無鎖佇列在實際應用中的挑戰和注意事項。我們將分享一些實際案例和經驗教訓,幫助讀者更好地理解和應用無鎖佇列。
通過閱讀本文,您將深入瞭解基於迴圈陣列的無鎖佇列的強大功能和潛力,以及如何利用它們來提升系統效能和可伸縮性。無論您是系統設計師、開發人員還是對並行程式設計感興趣的研究人員,本文都將為您帶來有價值的見解和啟發。
#ifndef _ARRAYLOCKFREEQUEUE_H___ #define _ARRAYLOCKFREEQUEUE_H___ #include <stdint.h> #ifdef _WIN64 #define QUEUE_INT int64_t #else #define QUEUE_INT unsigned long #endif #define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16 template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE> class ArrayLockFreeQueue { public: ArrayLockFreeQueue(); virtual ~ArrayLockFreeQueue(); QUEUE_INT size(); bool enqueue(const ELEM_T &a_data);//入隊 bool dequeue(ELEM_T &a_data);// 出隊 bool try_dequeue(ELEM_T &a_data);//嘗試入隊 private: ELEM_T m_thequeue[Q_SIZE]; volatile QUEUE_INT m_count;//佇列的元素個數 volatile QUEUE_INT m_writeIndex;//新元素入隊時存放位置在陣列中的下標 volatile QUEUE_INT m_readIndex;//下一個出隊元素在陣列中的下標 volatile QUEUE_INT m_maximumReadIndex;// 最後一個已經完成入隊操作的元素在陣列中的下標 inline QUEUE_INT countToIndex(QUEUE_INT a_count); }; #include "ArrayLockFreeQueueImp.h" #endif
m_maximumReadIndex: 最後一個已經完成入列操作的元素在陣列中的下標。如果它的值跟m_writeIndex不一致,表明有寫請求尚未完成。這意味著,有寫請求成功申請了空間但資料還沒完全寫進佇列。所以如果有執行緒要讀取,必須要等到寫執行緒將資料完全寫入到佇列之後。
必須指明的是使用3種不同的下標都是必須的,因為佇列允許任意數量的生產者和消費者圍繞著它工作。
陣列環形圖:
使用gcc內建的syn_bool_compare_and_swap,但重新做了宏定義封裝。
#ifndef _ATOM_OPT_H___ #define _ATOM_OPT_H___ #ifdef __GNUC__ #define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal) #define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count) #define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count) #include <sched.h> // sched_yield() #else #include <Windows.h> #ifdef _WIN64 #define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange64(a_ptr, a_newVal, a_oldVal)) #define sched_yield() SwitchToThread() #define AtomicAdd(a_ptr, num) InterlockedIncrement64(a_ptr) #define AtomicSub(a_ptr, num) InterlockedDecrement64(a_ptr) #else #define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange(a_ptr, a_newVal, a_oldVal)) #define sched_yield() SwitchToThread() #define AtomicAdd(a_ptr, num) InterlockedIncrement(a_ptr) #define AtomicSub(a_ptr, num) InterlockedDecrement(a_ptr) #endif #endif #endif
template <typename ELEM_T, QUEUE_INT Q_SIZE> inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count) { return (a_count % Q_SIZE); // 取餘的時候 } template <typename ELEM_T, QUEUE_INT Q_SIZE> bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) { QUEUE_INT currentWriteIndex; // 獲取寫指標的位置 QUEUE_INT currentReadIndex; // 1. 獲取可寫入的位置 do { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; if(countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { return false; // 佇列已經滿了 } // 目的是為了獲取一個能寫入的位置 } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1))); // 獲取寫入位置後 currentWriteIndex 是一個臨時變數,儲存我們寫入的位置 // We know now that this index is reserved for us. Use it to save the data m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把資料更新到對應的位置 // 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作 // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more // than 1 producer thread // have a look at sched_yield (POSIX.1b) sched_yield(); // 當執行緒超過cpu核數的時候如果不讓出cpu導致一直迴圈在此。 } AtomicAdd(&m_count, 1); return true; }
分析:
(1)對於下圖,佇列中存放了兩個元素。WriteIndex指示的位置是新元素將會被插入的位置。ReadIndex指向的位置中的元素將會在下一次pop操作中被彈出。
(2)當生產者準備將資料插入到佇列中,它首先通過增加WriteIndex的值來申請空間。MaximumReadIndex指向最後一個存放有效資料的位置(也就是實際的佇列尾)。
(3)一旦空間的申請完成,生產者就可以將資料拷貝到剛剛申請到的位置中。完成之後增加MaximumReadIndex使得它與WriteIndex的一致。
(4)現在佇列中有3個元素,接著又有一個生產者嘗試向佇列中插入元素。
(5)在第一個生產者完成資料拷貝之前,又有另外一個生產者申請了一個新的空間準備拷貝資料。現在有兩個生產者同時向佇列插入資料。
(6)現在生產者開始拷貝資料,在完成拷貝之後,對MaximumReadIndex的遞增操作必須嚴格遵循一個順序:**第一個生產者執行緒首先遞增MaximumReadIndex,接著才輪到第二個生產者。** 這個順序必須被嚴格遵守的原因是,我們必須**保證資料被完全拷貝到佇列之後才允許消費者執行緒將其出列**。
第一個生產者完成了資料拷貝,並對MaximumReadIndex完成了遞增。
(7)現在第二個生產者可以遞增MaximumReadIndex了;第二個生產者完成了對MaximumReadIndex的遞增,現在佇列中有5個元素。
template <typename ELEM_T, QUEUE_INT Q_SIZE> bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data) { QUEUE_INT currentMaximumReadIndex; QUEUE_INT currentReadIndex; do { // to ensure thread-safety when there is more than 1 producer thread // a second index is defined (m_maximumReadIndex) currentReadIndex = m_readIndex; currentMaximumReadIndex = m_maximumReadIndex; if(countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置 { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it return false; } // retrieve the data from the queue a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的 // try to perfrom now the CAS operation on the read index. If we succeed // a_data already contains what m_readIndex pointed to before we // increased it if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { AtomicSub(&m_count, 1); // 真正讀取到了資料,元素-1 return true; } } while(true); assert(0); // Add this return statement to avoid compiler warnings return false; }
分析:
(1)以下插圖展示了元素出列的時候各種下標是如何變化的,佇列中初始有2個元素。WriteIndex指示的位置是新元素將會被插入的位置。ReadIndex指向的位置中的元素將會在下一次pop操作中被彈出。
(2)消費者執行緒拷貝陣列ReadIndex位置的元素,然後嘗試用CAS操作將ReadIndex加1。如果操作成功消費者成功的將資料出列。因為CAS操作是原子的,所以只有唯一的執行緒可以在同一時刻更新ReadIndex的值。如果操作失敗,讀取新的ReadIndex值,以重複以上操作(copy資料,CAS)。
(3)現在又有一個消費者將元素出列,佇列變成空。
(4)現在有一個生產者正在向佇列中新增元素。它已經成功的申請了空間,但尚未完成資料拷貝。任何其它企圖從佇列中移除元素的消費者都會發現佇列非空(因為writeIndex不等於readIndex)。但它不能讀取readIndex所指向位置中的資料,因為readIndex與MaximumReadIndex相等。消費者將會在do迴圈中不斷的反覆嘗試,直到生產者完成資料拷貝增加MaximumReadIndex的值,或者佇列變成空(這在多個消費者的場景下會發生)。
(5)當生產者完成資料拷貝,佇列的大小是1,消費者執行緒可以讀取這個資料了。
enqueue函數中使用了sched_yiedld()來主動的讓出CPU,對於一個無鎖的演演算法而言,這個呼叫看起來有點奇怪。
多執行緒環境下影響效能的其中一個因素就是Cache損壞。而產生Cache損壞的一種情況就是一個執行緒被搶佔,作業系統需要儲存被搶佔執行緒的上下文,然後將被選中作為下一個排程執行緒的上下文載入。此時Cache中快取的資料都會失效,因為它是被搶佔執行緒的資料而不是新執行緒的資料。
所以,當此演演算法呼叫sched_yield()意味著告訴作業系統:「我要把處理器時間讓給其它執行緒,因為我要等待某件事情的發生」。無鎖演演算法和通過阻塞機制同步的演演算法的一個主要區別在於無鎖演演算法不會阻塞線上程同步上。
那麼為什麼在這裡我們要主動請求作業系統搶佔自己呢? 它與有多少個生產者執行緒在並行的往佇列中存放資料有關:每個生產者執行緒所執行的CAS操作都必須嚴格遵循FIFO次序,一個用於申請空間,另一個用於通知消費者資料已經寫入完成可以被讀取了。
如果應用程式只有唯一的生產者操作這個佇列,sche_yield()將永遠沒有機會被呼叫,第2個CAS操作永遠不會失敗。因為在一個生產者的情況下沒有人能破壞生產者執行這兩個CAS操作的FIFO順序。
而當多於一個生產者執行緒往佇列中存放資料的時候,問題就出現了。概括來說,一個生產者通過第1個
CAS操作申請空間,然後將資料寫入到申請到的空間中,然後執行第2個CAS操作通知消費者資料準備完畢可供讀取了。這第2個CAS操作必須遵循FIFO順序,也就是說,如果A執行緒第首先執行完第一個CAS操作,那麼它也要第1個執行完第2個CAS操作,如果A執行緒在執行完第一個CAS操作之後停止,然後B執行緒執行完第1個CAS操作,那麼B執行緒將無法完成第2個CAS操作,因為它要等待A先完成第2個CAS操作。而這就是問題產生的根源。
#ifndef _ARRAYLOCKFREEQUEUEIMP_H___ #define _ARRAYLOCKFREEQUEUEIMP_H___ #include "ArrayLockFreeQueue.h" #include <assert.h> #include "atom_opt.h" template <typename ELEM_T, QUEUE_INT Q_SIZE> ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() : m_writeIndex(0), m_readIndex(0), m_maximumReadIndex(0) { m_count = 0; } template <typename ELEM_T, QUEUE_INT Q_SIZE> ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue() { } template <typename ELEM_T, QUEUE_INT Q_SIZE> inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count) { return (a_count % Q_SIZE); // 取餘的時候 } template <typename ELEM_T, QUEUE_INT Q_SIZE> QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size() { QUEUE_INT currentWriteIndex = m_writeIndex; QUEUE_INT currentReadIndex = m_readIndex; if(currentWriteIndex>=currentReadIndex) return currentWriteIndex - currentReadIndex; else return Q_SIZE + currentWriteIndex - currentReadIndex; } template <typename ELEM_T, QUEUE_INT Q_SIZE> bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) { QUEUE_INT currentWriteIndex; // 獲取寫指標的位置 QUEUE_INT currentReadIndex; // 1. 獲取可寫入的位置 do { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; if(countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { return false; // 佇列已經滿了 } // 目的是為了獲取一個能寫入的位置 } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1))); // 獲取寫入位置後 currentWriteIndex 是一個臨時變數,儲存我們寫入的位置 // We know now that this index is reserved for us. Use it to save the data m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把資料更新到對應的位置 // 2. 更新可讀的位置,按著m_maximumReadIndex+1的操作 // update the maximum read index after saving the data. It wouldn't fail if there is only one thread // inserting in the queue. It might fail if there are more than 1 producer threads because this // operation has to be done in the same order as the previous CAS while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) { // this is a good place to yield the thread in case there are more // software threads than hardware processors and you have more // than 1 producer thread // have a look at sched_yield (POSIX.1b) sched_yield(); // 當執行緒超過cpu核數的時候如果不讓出cpu導致一直迴圈在此。 } AtomicAdd(&m_count, 1); return true; } template <typename ELEM_T, QUEUE_INT Q_SIZE> bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::try_dequeue(ELEM_T &a_data) { return dequeue(a_data); } template <typename ELEM_T, QUEUE_INT Q_SIZE> bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data) { QUEUE_INT currentMaximumReadIndex; QUEUE_INT currentReadIndex; do { // to ensure thread-safety when there is more than 1 producer thread // a second index is defined (m_maximumReadIndex) currentReadIndex = m_readIndex; currentMaximumReadIndex = m_maximumReadIndex; if(countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) // 如果不為空,獲取到讀索引的位置 { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it return false; } // retrieve the data from the queue a_data = m_thequeue[countToIndex(currentReadIndex)]; // 從臨時位置讀取的 // try to perfrom now the CAS operation on the read index. If we succeed // a_data already contains what m_readIndex pointed to before we // increased it if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { AtomicSub(&m_count, 1); // 真正讀取到了資料,元素-1 return true; } } while(true); assert(0); // Add this return statement to avoid compiler warnings return false; } #endif
#include "ArrayLockFreeQueue.h" ArrayLockFreeQueue<int> arraylockfreequeue; void *arraylockfreequeue_producer_thread(void *argv) { PRINT_THREAD_INTO(); int count = 0; int write_failed_count = 0; for (int i = 0; i < s_queue_item_num;) { if (arraylockfreequeue.enqueue(count)) // enqueue的順序是無法保證的,我們只能計算enqueue的個數 { count = lxx_atomic_add(&s_count_push, 1); i++; } else { write_failed_count++; // printf("%s %lu enqueue failed, q:%d\n", __FUNCTION__, pthread_self(), arraylockfreequeue.size()); sched_yield(); // usleep(10000); } } // printf("%s %lu write_failed_count:%d\n", __FUNCTION__, pthread_self(), write_failed_count) PRINT_THREAD_LEAVE(); return NULL; } void *arraylockfreequeue_consumer_thread(void *argv) { int last_value = 0; PRINT_THREAD_INTO(); int value = 0; int read_failed_count = 0; while (true) { if (arraylockfreequeue.dequeue(value)) { if (s_consumer_thread_num == 1 && s_producer_thread_num == 1 && (last_value + 1) != value) // 只有一入一出的情況下才有對比意義 { // printf("pid:%lu, -> value:%d, expected:%d\n", pthread_self(), value, last_value); } lxx_atomic_add(&s_count_pop, 1); last_value = value; } else { read_failed_count++; // printf("%s %lu no data, s_count_pop:%d, value:%d\n", __FUNCTION__, pthread_self(), s_count_pop, value); // usleep(100); sched_yield(); } if (s_count_pop >= s_queue_item_num * s_producer_thread_num) { // printf("%s dequeue:%d, s_count_pop:%d, %d, %d\n", __FUNCTION__, last_value, s_count_pop, s_queue_item_num, s_consumer_thread_num); break; } } // printf("%s %lu read_failed_count:%d\n", __FUNCTION__, pthread_self(), read_failed_count) PRINT_THREAD_LEAVE(); return NULL; }