disruptor是英國著名的金融交易所lmax旗下技術團隊開發的一款java實現的高效能記憶體佇列框架
其發明disruptor的主要目的是為了改進傳統的記憶體佇列實現如jdk的ArrayBlockingQueue、LinkedBlockingQueue等在現代CPU硬體上的一些缺陷
現代的CPU都是多核的,每個核心都擁有獨立的快取記憶體。快取記憶體由固定大小的快取行組成(通常為32個位元組或64個位元組)。
CPU以快取行作為最小單位讀寫,且一個快取行通常會被多個變數佔據(例如32位元的參照指標占4位元組,64位元的參照指標占8個位元組)。
這樣的設計導致了一個問題:即使快取行上的變數是無關聯的(比如不屬於同一個物件),但只要快取行上的某一個共用變數發生了變化,則整個快取行都會進行快取一致性的同步。
而CPU間快取一致性的同步是有一定效能損耗的,能避免則儘量避免。這就是所謂的「偽共用」問題。
disruptor通過對佇列中一些關鍵變數進行了快取行的填充,避免其因為不相干的變數讀寫而無謂的重新整理快取,解決了偽共用的問題。
關於CPU間快取一致性相關的內容可以參考下我以前的部落格:
快取記憶體一致性協定MESI與記憶體屏障
傳統的記憶體佇列由於生產者、消費者都會並行的讀寫佇列頭、佇列尾的參照和更新佇列size,
因此被迫使用瞭如ReentrantLock等基於上下文切換的悲觀鎖或是CAS機制的樂觀鎖等互斥機制來保證佇列關鍵資料的並行安全,但即使是CAS這樣非阻塞的機制,由於存在失敗重試機制和快取記憶體間強一致地同步操作,其效能損耗在追求極限效能的高並行佇列中介軟體上也是不容忽視的。
disruptor在實現過程中巧妙的通過全域性有序增長的序列號機制代替了顯式的佇列頭、佇列尾更新,極大的減少了需要並行更新共用變數的場合,從而提高了高並行場景下佇列的吞吐量。
juc包下的阻塞佇列佇列元素會在入隊時被建立、出隊被消費後就不再被參照而產生大量的垃圾。
disruptor通過基於陣列的環形佇列,在開始執行前用空的事件物件填充好整個佇列,後續的生產與消費則不新增或者刪除佇列元素,而是配合序列號機制,修改佇列元素中的屬性進行生產者和消費者的互動。
通過固定佇列中的物件,disruptor避免了入隊、出隊時產生不必要的垃圾。
除此之外,disruptor還允許設定消費者間消費的依賴關係(例如A、B消費者消費完畢後,C才能消費),構造高效的事件傳輸管道,實現1對1,1對多,多對1等模式的組合。
更詳細的內容可以參考disruptor的官方檔案:https://lmax-exchange.github.io/disruptor/disruptor.html
上面雖然介紹了有關disruptor的各種特點,但只有詳細的研究原始碼後才能更好地理解disruptor的原理,體會其整體設計思路以及程式碼層面微觀實現的精妙之處。
程式設計和畫畫很類似,比起對著已經完工的畫作進行分析,我更喜歡參考著原畫從設計者的角度出發自己臨摹出一副屬於自己的畫。在這個過程中,可以看到程式從簡單到複雜的全過程,能更清楚得知道哪些是核心功能而哪些是相對邊緣的邏輯,從而獲得一條平滑的學習曲線。
MyDisruptor就是我按照上述學習方式自己臨摹出來的結果,按照功能模組大致分為六個迭代版本逐步完成,最終實現了一個和disruptor相差無幾的佇列。
在這個過程中,低版本的程式碼是相對精簡的,可以讓讀者更容易理解當前功能的實現原理,不會被其餘旁路程式碼的複雜度給繞暈。
v1版本是整個專案的基石,所以在這裡先介紹disruptor的核心設計思想和各關鍵元件的整體關聯以幫助大家更好地理解。
我們知道基於阻塞/喚醒的悲觀鎖和基於CAS的樂觀鎖都是並行程式設計中常見地同步機制,但是其在高並行場景下都有一定的效能損耗。那麼有沒有開銷更低地執行緒間同步機制呢?
答案是有的,即單純依靠記憶體屏障提供的多執行緒間的記憶體可見效能力。
這裡糾正一個部分人理解上的誤區:java中volatile修飾的變數具備多執行緒間的可見效能力,但不提供原子性更新的功能,所以無法保證執行緒安全。這段概述是不全面的,確實在多執行緒並行讀寫時,由於缺少原子性的更新機制,單靠volatile是無法做到執行緒安全的。
但在單寫者多讀者這一更為特殊的場景下,僅靠volatile提供的記憶體可見效能力就可以做到並行場景下的執行緒安全,且其效能開銷比CAS更低。
為了解決上述傳統佇列中共用變數高並行時過多爭搶的問題,disruptor從設計一開始就引入了單調遞增的序列號機制,每個生產者、消費者執行緒都有自己獨立所屬的序列號變數(volatile修飾),其只能由序列號所屬的執行緒寫入,其它執行緒只能去讀取,做到一寫多讀。
disruptor拆分了傳統佇列中多寫多讀的佇列頭、尾等多讀多寫的變數,僅憑藉記憶體可見性就完成了生產者和消費者間的通訊
下面我們基於原始碼分析MyDisruptor,為了和lmax-Disruptor作區分MyDisruptor內各個元件都在disruptor對應元件名稱的基礎上加了My字首。
/**
* 序列號物件(仿Disruptor.Sequence)
* 由於需要被生產者、消費者執行緒同時存取,因此內部是一個volatile修飾的long值
* */
public class MySequence {
/**
* 序列起始值預設為-1,保證下一序列恰好是0(即第一個合法的序列號)
* */
private volatile long value = -1;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static {
try {
// 由於提供給cas記憶體中欄位偏移量的unsafe類只能在被jdk信任的類中直接使用,這裡使用反射來繞過這一限制
Field getUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
getUnsafe.setAccessible(true);
UNSAFE = (Unsafe) getUnsafe.get(null);
VALUE_OFFSET = UNSAFE.objectFieldOffset(MySequence.class.getDeclaredField("value"));
}
catch (final Exception e) {
throw new RuntimeException(e);
}
}
public MySequence() {
}
public MySequence(long value) {
this.value = value;
}
public long get() {
return value;
}
public void set(long value) {
this.value = value;
}
public void lazySet(long value) {
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
}
看完MySequence的實現後你可能會有一個疑問,這不就是一個簡易版的AtomicLong嗎,為什麼disruptor還要自己造一個出來呢?
確實在v1版本中,MySequence類比起juc的AtomicLong只是名字上更加貼合業務場景而已,能被AtomicLong完全的代替。
但Disruptor通過填充多餘的欄位解決了Sequence中value變數的偽共用問題,MyDisruptor中偽共用的實現放在了後面的版本,所以v1版本在這裡提前進行了抽象,目的是方便大家後續的理解。
/**
* 單執行緒生產者序列器(仿Disruptor.SingleProducerSequencer)
* 只支援單消費者的簡易版本(只有一個consumerSequence)
*
* 因為是單執行緒序列器,因此在設計上就是執行緒不安全的
* */
public class MySingleProducerSequencer {
/**
* 生產者序列器所屬ringBuffer的大小
* */
private final int ringBufferSize;
/**
* 當前已釋出的生產者序列號
* (區別於nextValue)
* */
private final MySequence currentProducerSequence = new MySequence();
/**
* 生產者序列器所屬ringBuffer的消費者的序列(後續多消費者版本會改為用陣列儲存多個消費者序列)
* */
private MySequence consumerSequence;
private final MyWaitStrategy myWaitStrategy;
/**
* 當前已申請的序列(但是是否釋出了,要看currentProducerSequence)
* 單執行緒生產者內部使用,所以就是普通的long,不考慮並行
* */
private long nextValue = -1;
/**
* 當前已快取的消費者序列
* 單執行緒生產者內部使用,所以就是普通的long,不考慮並行
* */
private long cachedConsumerSequenceValue = -1;
public MySingleProducerSequencer(int ringBufferSize, MyWaitStrategy myWaitStrategy) {
this.ringBufferSize = ringBufferSize;
this.myWaitStrategy = myWaitStrategy;
}
/**
* 申請可用的1個生產者序列號
* */
public long next(){
return next(1);
}
/**
* 一次性申請可用的n個生產者序列號
* */
public long next(int n){
// 申請的下一個生產者位點
long nextProducerSequence = this.nextValue + n;
// 新申請的位點下,生產者恰好超過消費者一圈的環繞臨界點序列
long wrapPoint = nextProducerSequence - this.ringBufferSize;
// 獲得當前已快取的消費者位點
long cachedGatingSequence = this.cachedConsumerSequenceValue;
// 消費者位點cachedValue並不是實時獲取的(因為在沒有超過環繞點一圈時,生產者是可以放心生產的)
// 每次釋出都實時獲取反而會觸發對消費者sequence強一致的讀,迫使消費者執行緒所在的CPU重新整理快取(而這是不需要的)
if(wrapPoint > cachedGatingSequence){
// 比起disruptor省略了if中的cachedGatingSequence > nextProducerSequence邏輯
// 原因請見:https://github.com/LMAX-Exchange/disruptor/issues/76
// 比起disruptor省略了currentProducerSequence.set(nextProducerSequence);
// 原因請見:https://github.com/LMAX-Exchange/disruptor/issues/291
long minSequence;
// 當生產者發現確實當前已經超過了一圈,則必須去讀最新的消費者序列了,看看消費者的消費進度是否推進了
// 這裡的consumerSequence.get是對volatile變數的讀,是實時的、強一致的讀
while(wrapPoint > (minSequence = consumerSequence.get())){
// 如果確實超過了一圈,則生產者無法獲取可用的佇列空間,迴圈的間歇性park阻塞
LockSupport.parkNanos(1L);
}
// 滿足條件了,則快取獲得最新的消費者序列
// 因為不是實時獲取消費者序列,可能cachedValue比上一次的值要大很多
// 這種情況下,待到下一次next申請時就可以不用去強一致的讀consumerSequence了
this.cachedConsumerSequenceValue = minSequence;
}
// 記錄本次申請後的,已申請的生產者位點
this.nextValue = nextProducerSequence;
return nextProducerSequence;
}
public void publish(long publishIndex){
// 釋出時,更新生產者佇列
// lazySet,由於消費者可以批次的拉取資料,所以不必每次釋出時都volatile的更新,允許消費者晚一點感知到,這樣效能會更好
// 設定寫屏障
this.currentProducerSequence.lazySet(publishIndex);
// 釋出完成後,喚醒可能阻塞等待的消費者執行緒
this.myWaitStrategy.signalWhenBlocking();
}
public MySequenceBarrier newBarrier(){
return new MySequenceBarrier(this.currentProducerSequence,this.myWaitStrategy);
}
public void setConsumerSequence(MySequence consumerSequence){
this.consumerSequence = consumerSequence;
}
public int getRingBufferSize() {
return ringBufferSize;
}
}
上述MySingleProducerSequencer的實現中,生產者是通過park(1L)自旋來等待消費者的。如果消費者消費速度比較慢,那麼生產者執行緒將長時間的處於自旋狀態,嚴重浪費CPU資源。因此使用next方式獲取生產者序列號時,使用者必須保證消費者有足夠的消費速度。
disruptor和juc下很多並行工具類一樣,除了提供內部自動阻塞的next方法外,還提供了非阻塞的tryNext方法。tryNext在消費者速度偏慢無法獲得可用的生產序列時直接丟擲特定的異常,使用者在捕獲異常後可以靈活的控制重試的間隔。tryNext原理和next是相同的,限於篇幅在v1版本中就先不實現該方法了。
/**
* 單執行緒消費者(仿Disruptor.BatchEventProcessor)
* */
public class MyBatchEventProcessor<T> implements Runnable{
private final MySequence currentConsumeSequence = new MySequence(-1);
private final MyRingBuffer<T> myRingBuffer;
private final MyEventHandler<T> myEventConsumer;
private final MySequenceBarrier mySequenceBarrier;
public MyBatchEventProcessor(MyRingBuffer<T> myRingBuffer,
MyEventHandler<T> myEventConsumer,
MySequenceBarrier mySequenceBarrier) {
this.myRingBuffer = myRingBuffer;
this.myEventConsumer = myEventConsumer;
this.mySequenceBarrier = mySequenceBarrier;
}
@Override
public void run() {
// 下一個需要消費的下標
long nextConsumerIndex = currentConsumeSequence.get() + 1;
// 消費者執行緒主迴圈邏輯,不斷的嘗試獲取事件並進行消費(為了讓程式碼更簡單,暫不考慮優雅停止消費者執行緒的功能)
while(true) {
try {
long availableConsumeIndex = this.mySequenceBarrier.getAvailableConsumeSequence(nextConsumerIndex);
while (nextConsumerIndex <= availableConsumeIndex) {
// 取出可以消費的下標對應的事件,交給eventConsumer消費
T event = myRingBuffer.get(nextConsumerIndex);
this.myEventConsumer.consume(event, nextConsumerIndex, nextConsumerIndex == availableConsumeIndex);
// 批次處理,一次主迴圈消費N個事件(下標加1,獲取下一個)
nextConsumerIndex++;
}
// 更新當前消費者的消費的序列(lazySet,不需要生產者實時的強感知刷快取效能更好,因為生產者自己也不是實時的讀消費者序列的)
this.currentConsumeSequence.lazySet(availableConsumeIndex);
LogUtil.logWithThreadName("更新當前消費者的消費的序列:" + availableConsumeIndex);
} catch (final Throwable ex) {
// 發生異常,消費進度依然推進(跳過這一批拉取的資料)(lazySet 原理同上)
this.currentConsumeSequence.lazySet(nextConsumerIndex);
nextConsumerIndex++;
}
}
}
public MySequence getCurrentConsumeSequence() {
return this.currentConsumeSequence;
}
}
/**
* 事件處理器介面
* */
public interface MyEventHandler<T> {
/**
* 消費者消費事件
* @param event 事件物件本身
* @param sequence 事件物件在佇列裡的序列
* @param endOfBatch 當前事件是否是這一批次處理事件中的最後一個
* */
void consume(T event, long sequence, boolean endOfBatch);
}
disruptor中對入隊元素物件是沒有任何要求的,那麼disruptor是如何保證生產者對新入隊物件的改動對消費者執行緒是可見的,且不會由於快取記憶體的重新整理延遲而讀到舊值呢?
答案是通過生產者的publish方法中對生產者Sequence物件lazySet操作中設定的寫屏障。lazySet設定了一個store-store的屏障禁止了寫操作的重排序,保證了publish方法執行前生產者對事件物件更新的寫操作一定先於對生產者Sequence的更新。因此當消費者執行緒volatile強一致的讀取到新的序列號時,就一定能正確的讀取到序列號對應的事件物件。
/**
* 序列柵欄(仿Disruptor.SequenceBarrier)
* */
public class MySequenceBarrier {
private final MySequence currentProducerSequence;
private final MyWaitStrategy myWaitStrategy;
public MySequenceBarrier(MySequence currentProducerSequence, MyWaitStrategy myWaitStrategy) {
this.currentProducerSequence = currentProducerSequence;
this.myWaitStrategy = myWaitStrategy;
}
/**
* 獲得可用的消費者下標
* */
public long getAvailableConsumeSequence(long currentConsumeSequence) throws InterruptedException {
// v1版本只是簡單的呼叫waitFor,等待其返回即可
return this.myWaitStrategy.waitFor(currentConsumeSequence,currentProducerSequence,this);
}
}
/**
* 消費者等待策略(仿Disruptor.WaitStrategy)
* */
public interface MyWaitStrategy {
/**
* 類似jdk Condition的await,如果不滿足條件就會阻塞在該方法內,不返回
* */
long waitFor(long currentConsumeSequence, MySequence currentProducerSequence) throws InterruptedException;
/**
* 類似jdk Condition的signal,喚醒waitFor阻塞在該等待策略物件上的消費者執行緒
* */
void signalWhenBlocking();
}
限於篇幅,v1版本只實現了具有代表性的,基於條件變數阻塞/喚醒的等待策略來展示等待策略具體是如何工作的。
/**
* 阻塞等待策略
* */
public class MyBlockingWaitStrategy implements MyWaitStrategy{
private final Lock lock = new ReentrantLock();
private final Condition processorNotifyCondition = lock.newCondition();
@Override
public long waitFor(long currentConsumeSequence, MySequence currentProducerSequence) throws InterruptedException {
// 強一致的讀生產者序列號
if (currentProducerSequence.get() < currentConsumeSequence) {
// 如果ringBuffer的生產者下標小於當前消費者所需的下標,說明目前消費者消費速度大於生產者生產速度
lock.lock();
try {
//
while (currentProducerSequence.get() < currentConsumeSequence) {
// 消費者的消費速度比生產者的生產速度快,阻塞等待
processorNotifyCondition.await();
}
}
finally {
lock.unlock();
}
}
// 跳出了上面的迴圈,說明生產者序列已經超過了當前所要消費的位點(currentProducerSequence > currentConsumeSequence)
return currentConsumeSequence;
}
@Override
public void signalWhenBlocking() {
lock.lock();
try {
// signal喚醒所有阻塞在條件變數上的消費者執行緒(後續支援多消費者時,會改為signalAll)
processorNotifyCondition.signal();
}
finally {
lock.unlock();
}
}
}
需要注意的是,並不是所有的等待策略都需要去實現signalWhenBlocking方法。
例如在disruptor內建的基於自旋的等待策略BusySpinWaitStrategy中,消費者執行緒並沒有陷入阻塞態,自己能夠及時的發現生產者新發布時序列的變化,所以其signalWhenBlocking是一個空實現。
/**
* 環形佇列(仿Disruptor.RingBuffer)
* */
public class MyRingBuffer<T> {
private final T[] elementList;
private final MySingleProducerSequencer mySingleProducerSequencer;
private final int ringBufferSize;
private final int mask;
public MyRingBuffer(MySingleProducerSequencer mySingleProducerSequencer, MyEventFactory<T> myEventFactory) {
int bufferSize = mySingleProducerSequencer.getRingBufferSize();
if (Integer.bitCount(bufferSize) != 1) {
// ringBufferSize需要是2的倍數,類似hashMap,求餘數時效率更高
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.mySingleProducerSequencer = mySingleProducerSequencer;
this.ringBufferSize = bufferSize;
this.elementList = (T[]) new Object[bufferSize];
// 迴環掩碼
this.mask = ringBufferSize;
// 預填充事件物件(後續生產者/消費者都只會更新事件物件,不會發生插入、刪除等操作,避免GC)
for(int i=0; i<this.elementList.length; i++){
this.elementList[i] = myEventFactory.newInstance();
}
}
public T get(long sequence){
// 由於ringBuffer的長度是2次冪,mask為2次冪-1,因此可以將求餘運算優化為位運算
int index = (int) (sequence & mask);
return elementList[index];
}
public long next(){
return this.mySingleProducerSequencer.next();
}
public long next(int n){
return this.mySingleProducerSequencer.next(n);
}
public void publish(Long index){
this.mySingleProducerSequencer.publish(index);
}
public void setConsumerSequence(MySequence consumerSequence){
this.mySingleProducerSequencer.setConsumerSequence(consumerSequence);
}
public MySequenceBarrier newBarrier() {
return this.mySingleProducerSequencer.newBarrier();
}
public static <E> MyRingBuffer<E> createSingleProducer(MyEventFactory<E> factory, int bufferSize, MyWaitStrategy waitStrategy) {
MySingleProducerSequencer sequencer = new MySingleProducerSequencer(bufferSize, waitStrategy);
return new MyRingBuffer<>(sequencer,factory);
}
}
Disruptor的各個元件設計的較為獨立,需要以特定的方式將其組合起來實現我們的業務。這裡展示一個簡單的v1版本的MyDisruptor使用demo,希望通過對demo的分析加深讀者對disruptor整體的理解。
public class MyRingBufferV1Demo {
public static void main(String[] args) {
// 環形佇列容量為16(2的4次方)
int ringBufferSize = 16;
// 建立環形佇列
MyRingBuffer<OrderModel> myRingBuffer = MyRingBuffer.createSingleProducer(
new OrderEventProducer(), ringBufferSize, new MyBlockingWaitStrategy());
// 獲得ringBuffer的序列屏障(v1版本的序列屏障內只維護生產者的序列)
MySequenceBarrier mySequenceBarrier = myRingBuffer.newBarrier();
// 基於序列屏障,建立消費者
MyBatchEventProcessor<OrderModel> eventProcessor =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventConsumerDemo(), mySequenceBarrier);
// RingBuffer設定消費者的序列,用於控制生產速度
MySequence consumeSequence = eventProcessor.getCurrentConsumeSequence();
myRingBuffer.setConsumerSequence(consumeSequence);
// 啟動消費者執行緒
new Thread(eventProcessor).start();
// 生產者釋出100個事件
for(int i=0; i<100; i++) {
long nextIndex = myRingBuffer.next();
OrderModel orderEvent = myRingBuffer.get(nextIndex);
orderEvent.setMessage("message-"+i);
orderEvent.setPrice(i * 10);
System.out.println("生產者釋出事件:" + orderEvent);
myRingBuffer.publish(nextIndex);
}
}
}
/**
* 訂單事件物件
* */
public class OrderEventModel {
private String message;
private int price;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
}
/**
* 訂單事件工廠
* */
public class OrderEventFactoryDemo implements MyEventFactory<OrderEventModel> {
@Override
public OrderEventModel newInstance() {
return new OrderEventModel();
}
}
/**
* 訂單事件處理器
* */
public class OrderEventHandlerDemo implements MyEventHandler<OrderEventModel> {
@Override
public void consume(OrderEventModel event, long sequence, boolean endOfBatch) {
System.out.println("消費者消費事件" + event + " sequence=" + sequence + " endOfBatch=" + endOfBatch);
}
}
disruptor無論在整體設計還是最終程式碼實現上都有很多值得反覆琢磨和學習的細節,希望能幫助到對disruptor感興趣的小夥伴。
本篇部落格的完整程式碼在我的github上:https://github.com/1399852153/MyDisruptor 分支:feature/lab1