從零開始實現lmax-Disruptor佇列(四)多執行緒生產者MultiProducerSequencer原理解析

2022-07-06 06:01:33

MyDisruptor V4版本介紹

在v3版本的MyDisruptor實現多執行緒消費者後。按照計劃,v4版本的MyDisruptor需要支援執行緒安全的多執行緒生產者功能。

由於該文屬於系列部落格的一部分,需要先對之前的部落格內容有所瞭解才能更好地理解本篇部落格

MyDisruptor支援多執行緒生產者

  • 之前的v3版本實現了多執行緒消費者,提供並行消費的能力以加速消費速度。同理,disruptor也提供了多執行緒生產者機制以支援更快的生產速度。
  • disruptor的多執行緒生產者機制,其本質是提供了一個執行緒安全的生產者序列器。
    執行緒安全的生產者序列器允許多個執行緒並行的通過next方法申請可用的生產序列號和publish釋出序列號,內部通過cas機制保證每個生產者執行緒拿到的序列號是獨一無二的。
  • disruptor的多執行緒生產者中通過AvailableBuffer陣列機制,巧妙地避免了多個執行緒釋出時並行的修改可用的最大生產者序列。
    不根據一個單獨的序列物件來標識,而是通過一整個陣列來標識可用的最大生產者序列,釋出時每個生產者執行緒都只會更新屬於該執行緒自己的下標值,不會出現多寫爭搶。

如何設計一個執行緒安全的多生產者?

在開始介紹disruptor的實現方式之前,可以站在設計者的角度先大致思考一下如何設計一個執行緒安全的生產者序列器(其功能、使用方法最好和單執行緒生產者序列器保持一致)。

第一個要解決的問題是:如何保證多個執行緒能夠執行緒安全的獲取序列號,不會獲取到重複的序列號而互相覆蓋?
  • 可以參考多執行緒消費者,在next方法中通過cas的爭搶來實現。
第二個問題是二階段生產+多執行緒並行的場景下,如何避免消費者消費到還未釋出完成的事件?
  • disruptor的生產者生產時是分為兩個階段的,首先通過next方法獲取可用的序列號,然後通過publish釋出序列號,令生產完成的序列號對消費者可見,消費者監聽到生產者序列號的變化便會進行對應的消費。
  • 舉個例子,當前生產者已成功釋出了序列號11,執行緒a通過next方法獲取到了序列號12,執行緒b獲取到了13,執行緒c獲取到了14。此時執行緒c生產完畢後,如果按照常規的思路直接更新當前生產者序列為14的話是不行的。
    因為這樣消費者會認為14以及之前的12、13都已經發布完成,會錯誤的消費實際還未完成生產的序列號為12、13的事件。
那麼是否需要引入一個已釋出的最小生產者序列號屬性呢?
  • 上述情況下,如果引入最小生產者序列號機制,那麼雖然執行緒c生產完了序列14的事件,但對外可見的最小生產者序列號依然是11,不會有問題。
那麼執行緒c完成了序列14的生產後,是否可以繼續next獲取新的序列15進行生產呢?
  • 如果不可以,那麼多執行緒生產者的吞吐量就會受到影響,效能大大降低。
  • 如果可以,當執行緒a、b完成了序列12、13的序列號生產後,又該如何知道序列14對應地事件已經生產完成可以釋出,使得最小生產者序列號能正確的變為14呢?

可以看到從設計者的角度出發,可以想到非常多的方案。其中有的可行,有的不可行;可行的方案中有的效能更好,有的更簡潔優雅,讀者可以嘗試著發散一下思維,這裡限於篇幅就不再展開了。

多執行緒生產者MyMultiProducerSequencer介紹

disruptor的設計者當初肯定也對各種方案進行了評估,下面我們就來看看disruptor開發團隊認為的最好的多執行緒生產者設計方案吧。
disruptor多執行緒生產者的next方法實現和單執行緒生產者原理差不多,但為了實現執行緒安全在幾處關鍵地方有所不同。

如何保證多個執行緒能夠執行緒安全的獲取序列號,不會獲取到重複的序列號而互相覆蓋?
  • 單執行緒消費者中最新的消費者序列值是一個普通的long型別變數,而多執行緒消費者中則通過一個Sequence物件來儲存快取的最新消費者序列值實現執行緒間的可見。
  • 同時Sequence類中還提供了一個CAS方法compareAndSet(MySequence的v4版本新增該方法)。
    通過Sequence提供的cas方法,多個生產者執行緒並行呼叫next方法時,每個執行緒通過對currentProducerSequence進行cas爭搶可以保證返回獨一無二的序列號。
二階段生產+多執行緒並行的場景下,如何避免消費者消費到還未釋出完成的事件?
  • 多執行緒生產者中新增了一個和佇列長度保持一致的陣列availableBuffer,用於維護對應序列號的釋出狀態。
    每個生產者釋出對應序列號時,也會通過按照availableBuffer長度求餘數的方式,更新對應位置的資料。這樣一來就能記錄一個序列號段區間內,到底哪些序列號已釋出哪些還未釋出(例如序列號11已釋出、12、13未釋出,14已釋出)。
  • 前面提到多執行緒生產者中在next方法中,還未釋出就更新了currentProducerSequence的值,使得對外暴露的最大可用生產者佇列變得不準確了。
    因此多執行緒生產者中currentProducerSequence(cursor)不再用於標識可用的最大生產者序列,而僅標識已釋出的最大生產者序列
如何相容之前單執行緒生產者場景下,SequenceBarrier/WaitStrategy中利用currentProducerSequence(cursor)進行消費進度約束的設計呢?
  • 之前SequenceBarrier中維護了currentProducerSequence最大可用生產者序列,通過這個來避免消費者消費越界,存取到還未完成生產的事件。
    但多執行緒生產者中小於currentProducerSequence的序列號可能還未釋出,其實際含義已經變了,disruptor在SequenceBarrier的waitFor方法中被迫打了個修補程式來做相容。(下文展開說明)
/**
 * 多執行緒生產者(仿disruptor.MultiProducerSequencer)
 */
public class MyMultiProducerSequencer implements MyProducerSequencer{

    private final int ringBufferSize;
    private final MySequence currentProducerSequence = new MySequence();
    private final List<MySequence> gatingConsumerSequenceList = new ArrayList<>();
    private final MyWaitStrategy myWaitStrategy;

    private final MySequence gatingSequenceCache = new MySequence();
    private final int[] availableBuffer;
    private final int indexMask;
    private final int indexShift;

    /**
     * 通過unsafe存取availableBuffer陣列,可以在讀寫時按需插入讀/寫記憶體屏障
     */
    private static final Unsafe UNSAFE = UnsafeUtil.getUnsafe();
    private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
    private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);

    public MyMultiProducerSequencer(int ringBufferSize, final MyWaitStrategy myWaitStrategy) {
        this.ringBufferSize = ringBufferSize;
        this.myWaitStrategy = myWaitStrategy;
        this.availableBuffer = new int[ringBufferSize];
        this.indexMask = this.ringBufferSize - 1;
        this.indexShift = log2(ringBufferSize);
        initialiseAvailableBuffer();
    }

    private void initialiseAvailableBuffer() {
        for (int i = availableBuffer.length - 1; i >= 0; i--) {
            this.availableBuffer[i] = -1;
        }
    }

    private static int log2(int i) {
        int r = 0;
        while ((i >>= 1) != 0) {
            ++r;
        }
        return r;
    }

    @Override
    public long next() {
        return next(1);
    }

    @Override
    public long next(int n) {
        do {
            // 儲存申請前的最大生產者序列
            long currentMaxProducerSequenceNum = currentProducerSequence.get();
            // 申請之後的生產者位點
            long nextProducerSequence = currentMaxProducerSequenceNum + n;

            // 新申請的位點下,生產者恰好超過消費者一圈的環繞臨界點序列
            long wrapPoint = nextProducerSequence - this.ringBufferSize;
            // 獲得當前已快取的消費者位點(使用Sequence物件維護位點,volatile的讀。因為多生產者環境下,多個執行緒會並行讀寫gatingSequenceCache)
            long cachedGatingSequence = this.gatingSequenceCache.get();

            // 消費者位點cachedValue並不是實時獲取的(因為在沒有超過環繞點一圈時,生產者是可以放心生產的)
            // 每次釋出都實時獲取反而會觸發對消費者sequence強一致的讀,迫使消費者執行緒所在的CPU重新整理快取(而這是不需要的)
            if(wrapPoint > cachedGatingSequence){
                long gatingSequence = SequenceUtil.getMinimumSequence(currentMaxProducerSequenceNum, this.gatingConsumerSequenceList);
                if(wrapPoint > gatingSequence){
                    // 如果確實超過了一圈,則生產者無法獲取佇列空間
                    LockSupport.parkNanos(1);
                    // park短暫阻塞後continue跳出重新進入迴圈
                    continue;

                    // 為什麼不能像單執行緒生產者一樣在這裡while迴圈park?
                    // 因為別的生產者執行緒也在爭搶currentMaxProducerSequence,如果在這裡直接阻塞,會導致當前拿到的序列號可能也被別的執行緒獲取到
                    // 但最終是否可用需要通過cas的結果來決定,所以每次迴圈必須重新獲取gatingSequenceCache最新的值
                }

                // 滿足條件了,則快取獲得最新的消費者序列
                // 因為不是實時獲取消費者序列,可能gatingSequence比上一次的值要大很多
                // 這種情況下,待到下一次next申請時就可以不用去強一致的通過getMinimumSequence讀consumerSequence了(走else分支)
                this.gatingSequenceCache.set(gatingSequence);
            }else {
                if (this.currentProducerSequence.compareAndSet(currentMaxProducerSequenceNum, nextProducerSequence)) {
                    // 由於是多生產者序列,可能存在多個生產者同時執行next方法申請序列,因此只有cas成功的執行緒才視為申請成功,可以跳出迴圈
                    return nextProducerSequence;
                }

                // cas更新失敗,重新迴圈獲取最新的消費位點
                // continue;
            }
        }while (true);
    }

    @Override
    public void publish(long publishIndex) {
        setAvailable(publishIndex);
        this.myWaitStrategy.signalWhenBlocking();
    }

    @Override
    public MySequenceBarrier newBarrier() {
        return new MySequenceBarrier(this,this.currentProducerSequence,this.myWaitStrategy,new ArrayList<>());
    }

    @Override
    public MySequenceBarrier newBarrier(MySequence... dependenceSequences) {
        return new MySequenceBarrier(this,this.currentProducerSequence,this.myWaitStrategy,new ArrayList<>(Arrays.asList(dependenceSequences)));

    }

    @Override
    public void addGatingConsumerSequenceList(MySequence newGatingConsumerSequence) {
        this.gatingConsumerSequenceList.add(newGatingConsumerSequence);
    }

    @Override
    public void addGatingConsumerSequenceList(MySequence... newGatingConsumerSequences) {
        this.gatingConsumerSequenceList.addAll(Arrays.asList(newGatingConsumerSequences));
    }

    @Override
    public MySequence getCurrentProducerSequence() {
        return this.currentProducerSequence;
    }

    @Override
    public int getRingBufferSize() {
        return this.ringBufferSize;
    }

    @Override
    public long getHighestPublishedSequence(long lowBound, long availableSequence) {
        // lowBound是消費者傳入的,保證是已經明確釋出了的最小生產者序列號
        // 因此,從lowBound開始,向後尋找,有兩種情況
        // 1 在lowBound到availableSequence中間存在未釋出的下標(isAvailable(sequence) == false),
        // 那麼,找到的這個未釋出下標的前一個序列號,就是當前最大的已經發布了的序列號(可以被消費者正常消費)
        // 2 在lowBound到availableSequence中間不存在未釋出的下標,那麼就和單生產者的情況一樣
        // 包括availableSequence以及之前的序列號都已經發布過了,availableSequence就是當前可用的最大的的序列號(已釋出的)
        for(long sequence = lowBound; sequence <= availableSequence; sequence++){
            if (!isAvailable(sequence)) {
                // 屬於上述的情況1,lowBound和availableSequence中間存在未釋出的序列號
                return sequence - 1;
            }
        }

        // 屬於上述的情況2,lowBound和availableSequence中間不存在未釋出的序列號
        return availableSequence;
    }

    private void setAvailable(long sequence){
        int index = calculateIndex(sequence);
        int flag = calculateAvailabilityFlag(sequence);

        // 計算index對應下標相對於availableBuffer參照起始位置的指標偏移量
        long bufferAddress = (index * SCALE) + BASE;

        // 功能上等價於this.availableBuffer[index] = flag,但新增了寫屏障
        // 和單執行緒生產者中的lazySet作用一樣,保證了對publish釋出的event事件物件的更新一定先於對availableBuffer對應下標值的更新
        // 避免消費者拿到新的釋出序列號時由於新event事件未對其可見,而錯誤的消費了之前老的event事件
        UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
    }

    private int calculateAvailabilityFlag(long sequence) {
        return (int) (sequence >>> indexShift);
    }

    private int calculateIndex(long sequence) {
        return ((int) sequence) & indexMask;
    }

    public boolean isAvailable(long sequence) {
        int index = calculateIndex(sequence);
        int flag = calculateAvailabilityFlag(sequence);

        // 計算index對應下標相對於availableBuffer參照起始位置的指標偏移量
        long bufferAddress = (index * SCALE) + BASE;

        // 功能上等價於this.availableBuffer[index] == flag
        // 但是新增了讀屏障保證了強一致的讀,可以讓消費者實時的獲取到生產者新的釋出
        return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
    }
}
/**
 * 序列號物件(仿Disruptor.Sequence)
 *
 * 由於需要被生產者、消費者執行緒同時存取,因此內部是一個volatile修飾的long值
 * */
public class MySequence {

    /**
     * 序列起始值預設為-1,保證下一序列恰好是0(即第一個合法的序列號)
     * */
    private volatile long value = -1;

    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static {
        try {
            // 由於提供給cas記憶體中欄位偏移量的unsafe類只能在被jdk信任的類中直接使用,這裡使用反射來繞過這一限制
            Field getUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            getUnsafe.setAccessible(true);
            UNSAFE = (Unsafe) getUnsafe.get(null);
            VALUE_OFFSET = UNSAFE.objectFieldOffset(MySequence.class.getDeclaredField("value"));
        }
        catch (final Exception e) {
            throw new RuntimeException(e);
        }
    }

    public MySequence() {
    }

    public MySequence(long value) {
        this.value = value;
    }

    public long get() {
        return value;
    }

    public void set(long value) {
        this.value = value;
    }

    public void lazySet(long value) {
        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
    }

    public boolean compareAndSet(long expect, long update){
        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expect, update);
    }
}
public class UnsafeUtil {

    private static final Unsafe UNSAFE;

    static {
        try {
            // 由於提供給cas記憶體中欄位偏移量的unsafe類只能在被jdk信任的類中直接使用,這裡使用反射來繞過這一限制
            Field getUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            getUnsafe.setAccessible(true);
            UNSAFE = (Unsafe) getUnsafe.get(null);
        }
        catch (final Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static Unsafe getUnsafe(){
        return UNSAFE;
    }
}
SequenceBarrier中的修補程式
  • SequenceBarrier中也維護了生產者序列器物件,並且生產者序列器物件實現了getHighestPublishedSequence介面,供SequenceBarrier使用(MyDisruptor v4版本新增)。
    單執行緒生產者的getHighestPublishedSequence實現中,和之前邏輯一樣,availableSequence就是可用的最大生產者序列。
    多執行緒生產者的getHighestPublishedSequence實現中,則返回availableBuffer中的連續的最大序列號(具體的原理在下文詳細講解)。
/**
 * 序列柵欄(仿Disruptor.SequenceBarrier)
 * */
public class MySequenceBarrier {

    private final MyProducerSequencer myProducerSequencer;
    private final MySequence currentProducerSequence;
    private final MyWaitStrategy myWaitStrategy;
    private final List<MySequence> dependentSequencesList;

    public MySequenceBarrier(MyProducerSequencer myProducerSequencer, MySequence currentProducerSequence,
                             MyWaitStrategy myWaitStrategy, List<MySequence> dependentSequencesList) {
        this.myProducerSequencer = myProducerSequencer;
        this.currentProducerSequence = currentProducerSequence;
        this.myWaitStrategy = myWaitStrategy;

        if(!dependentSequencesList.isEmpty()) {
            this.dependentSequencesList = dependentSequencesList;
        }else{
            // 如果傳入的上游依賴序列為空,則生產者序列號作為兜底的依賴
            this.dependentSequencesList = Collections.singletonList(currentProducerSequence);
        }
    }

    /**
     * 獲得可用的消費者下標(disruptor中的waitFor)
     * */
    public long getAvailableConsumeSequence(long currentConsumeSequence) throws InterruptedException {
        long availableSequence = this.myWaitStrategy.waitFor(currentConsumeSequence,currentProducerSequence,dependentSequencesList);

        if (availableSequence < currentConsumeSequence) {
            return availableSequence;
        }

      // 多執行緒生產者中,需要進一步約束(於v4版本新增)
      return myProducerSequencer.getHighestPublishedSequence(currentConsumeSequence,availableSequence);
    }
}
availableBuffer標識釋出狀態工作原理解析
  • 建構函式初始化時通過initialiseAvailableBuffer方法將availableBuffer內部的值都設定為-1的初始值。
  • availableBuffer中的值標識的是ringBuffer中對應下標位置的事件第幾次被覆蓋。
    舉個例子:一個長度為8的ringBuffer,其內部陣列下標為2的位置,當序列號為2時其值會被設定為0(第一次被設定值,未被覆蓋),序列號為10時其值會被設定為1(被覆蓋一次),序列號為18時其值會被設定為2(被覆蓋兩次),以此類推。
    序列號對應的下標值通過calculateIndex求模運算獲得,而被覆蓋的次數通過calculateAvailabilityFlag方法對當前釋出的序列號做對數計算出來。
  • 在MultiProducerSequencer的publish方法中,通過setAvailable來標示當前序號為已釋出狀態的,原理如上所述。
  • 而在消費者序列屏障中被呼叫的getHighestPublishedSequence方法中,則通過isAvailable來判斷傳入的序列號是否已釋出。
    isAvailable方法相當於對setAvailable做了個逆運算,如果對應的序列號確實已經發布過了,那麼availableBuffer對應下標的值一定做了對數運算的值,否則就是還未釋出。
  • 由於在next方法中控制、約束了可申請到的生產者序列號不會超過最慢的消費者一輪(ringBuffer的長度),因此不用擔心位於不同輪次的序列號釋出會互相覆蓋。
    如果沒有next方法中的最大差異約束,之前舉例的場景中,ringBuffer長度為8,此時序列號10還未釋出,序列號18卻釋出了,則availableBuffer中下標為2的位置就被覆蓋了(無法真實記錄序列號10是否釋出)。
    也正是因為有了這個約束,在setAvailable中可以不需要做額外的校驗直接更新就行。
Unsafe+偏移量存取陣列原理解析

在lab1中提到單執行緒的生產者SingleProducerSequencer在publish方法中通過一個lazySet方法設定了一個寫記憶體屏障,使得對entry事件物件的更新操作一定先於對序列號的更新,且消費者也是使用讀屏障進行強一致的讀,避免指令重排序和快取記憶體同步延遲導致消費者執行緒消費到錯誤的事件。
那麼在多執行緒生產者中,由於引入了一個availableBuffer陣列,並且在消費者呼叫了isAvailable對其進行了存取。
那麼對於陣列的更新和讀取應該如何插入讀、寫屏障呢?

  • java的unsafe類提供了一些基礎的方法,可以在讀、寫陣列時按需設定讀寫記憶體屏障;其底層是通過計算對應下標資料在陣列中的指標偏移量實現的。
  • 獲取陣列中對應下標資料的偏移值主要取決於兩個屬性:陣列物件實際存放資料至陣列參照的偏移base和每一個資料所佔用的空間大小scale
    base:java中陣列也是一個物件,在記憶體分配時需要設定物件頭、類指標、陣列長度資訊,因此實際存放資料的位置相對於陣列參照是有一定偏移的,需要通過UNSAFE.arrayBaseOffset獲得具體的偏移
    scale:不同型別的陣列中所儲存的資料大小是不一樣的,比如int型別陣列中每一個資料佔4個位元組,而long型別陣列中每一個資料則佔8個位元組,需要動態獲取
  • 綜上可得,下標存放資料相對陣列參照的偏移量 = base + (下標值 * scale)
  • 和單執行緒生產者中一樣,釋出時setAvailable方法通過UNSAFE.putOrderedInt在更新前插入一個寫屏障。
  • 在getHighestPublishedSequence的isAvailable方法中,消費者執行緒通過UNSAFE.getIntVolatile強一致的讀取陣列中對應下標的值。

MyProducerSequencer介面統一兩種型別的生產者

disruptor需要支援單執行緒、多執行緒兩種型別的生產者。所以抽象了一個生產者序列器介面ProducerSequencer用於相容兩者的差異。

/**
 * 生產者序列器介面(仿disruptor.ProducerSequencer)
 * */
public interface MyProducerSequencer {

    /**
     * 獲得一個可用的生產者序列值
     * @return 可用的生產者序列值
     * */
    long next();

    /**
     * 獲得一個可用的生產者序列值區間
     * @param n 區間長度
     * @return 可用的生產者序列區間的最大值
     * */
    long next(int n);

    /**
     * 釋出一個生產者序列
     * @param publishIndex 需要釋出的生產者序列號
     * */
    void publish(long publishIndex);

    /**
     * 建立一個無上游消費者依賴的序列屏障
     * @return 新的序列屏障
     * */
    MySequenceBarrier newBarrier();

    /**
     * 建立一個有上游依賴的序列屏障
     * @param dependenceSequences 上游依賴的序列集合
     * @return 新的序列屏障
     * */
    MySequenceBarrier newBarrier(MySequence... dependenceSequences);

    /**
     * 向生產者註冊一個消費者序列
     * @param newGatingConsumerSequence 新的消費者序列
     * */
    void addGatingConsumerSequenceList(MySequence newGatingConsumerSequence);

    /**
     * 向生產者註冊一個消費者序列集合
     * @param newGatingConsumerSequences 新的消費者序列集合
     * */
    void addGatingConsumerSequenceList(MySequence... newGatingConsumerSequences);

    /**
     * 獲得當前的生產者序列(cursor)
     * @return 當前的生產者序列
     * */
    MySequence getCurrentProducerSequence();

    /**
     * 獲得ringBuffer的大小
     * @return ringBuffer大小
     * */
    int getRingBufferSize();

    /**
     * 獲得最大的已釋出的,可用的消費者序列值
     * @param nextSequence 已經明確釋出了的最小生產者序列號
     * @param availableSequence 需要申請的,可能的最大的序列號
     * @return 最大的已釋出的,可用的消費者序列值
     * */
    long getHighestPublishedSequence(long nextSequence, long availableSequence);
}

MyDisruptor v4版本demo解析

public class MyRingBufferV4Demo {

    public static void main(String[] args) {
        // 環形佇列容量
        int ringBufferSize = 16;

        // 建立環形佇列(多執行緒生產者,即多執行緒安全的生產者(可以並行的next、publish))
        MyRingBuffer<OrderEventModel> myRingBuffer = MyRingBuffer.createMultiProducer(
                new OrderEventProducer(), ringBufferSize, new MyBusySpinWaitStrategy());

        // 獲得ringBuffer的序列屏障(最上游的序列屏障內只維護生產者的序列)
        MySequenceBarrier mySequenceBarrier = myRingBuffer.newBarrier();

        // ================================== 基於生產者序列屏障,建立消費者A
        MyBatchEventProcessor<OrderEventModel> eventProcessorA =
                new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerA"), mySequenceBarrier);
        MySequence consumeSequenceA = eventProcessorA.getCurrentConsumeSequence();
        // RingBuffer監聽消費者A的序列
        myRingBuffer.addGatingConsumerSequenceList(consumeSequenceA);

        // ================================== 消費者組依賴上游的消費者A,通過消費者A的序列號建立序列屏障(構成消費的順序依賴)
        MySequenceBarrier workerSequenceBarrier = myRingBuffer.newBarrier(consumeSequenceA);
        // 基於序列屏障,建立多執行緒消費者B
        MyWorkerPool<OrderEventModel> workerPoolProcessorB =
                new MyWorkerPool<>(myRingBuffer, workerSequenceBarrier,
                        new OrderWorkHandlerDemo("workerHandler1"),
                        new OrderWorkHandlerDemo("workerHandler2"),
                        new OrderWorkHandlerDemo("workerHandler3"));
        MySequence[] workerSequences = workerPoolProcessorB.getCurrentWorkerSequences();
        // RingBuffer監聽消費者C的序列
        myRingBuffer.addGatingConsumerSequenceList(workerSequences);

        // ================================== 通過消費者A的序列號建立序列屏障(構成消費的順序依賴),建立消費者C
        MySequenceBarrier mySequenceBarrierC = myRingBuffer.newBarrier(consumeSequenceA);

        MyBatchEventProcessor<OrderEventModel> eventProcessorC =
                new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerC"), mySequenceBarrierC);
        MySequence consumeSequenceC = eventProcessorC.getCurrentConsumeSequence();
        // RingBuffer監聽消費者C的序列
        myRingBuffer.addGatingConsumerSequenceList(consumeSequenceC);

        // ================================== 基於多執行緒消費者B,單執行緒消費者C的序列屏障,建立消費者D
        MySequence[] bAndCSequenceArr = new MySequence[workerSequences.length+1];
        // 把多執行緒消費者B的序列複製到合併的序列陣列中
        System.arraycopy(workerSequences, 0, bAndCSequenceArr, 0, workerSequences.length);
        // 陣列的最後一位是消費者C的序列
        bAndCSequenceArr[bAndCSequenceArr.length-1] = consumeSequenceC;
        MySequenceBarrier mySequenceBarrierD = myRingBuffer.newBarrier(bAndCSequenceArr);

        MyBatchEventProcessor<OrderEventModel> eventProcessorD =
                new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerD"), mySequenceBarrierD);
        MySequence consumeSequenceD = eventProcessorD.getCurrentConsumeSequence();
        // RingBuffer監聽消費者D的序列
        myRingBuffer.addGatingConsumerSequenceList(consumeSequenceD);


        // 啟動消費者執行緒A
        new Thread(eventProcessorA).start();

        // 啟動workerPool多執行緒消費者B
        workerPoolProcessorB.start(Executors.newFixedThreadPool(10, new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"worker" + mCount.getAndIncrement());
            }
        }));

        // 啟動消費者執行緒C
        new Thread(eventProcessorC).start();
        // 啟動消費者執行緒D
        new Thread(eventProcessorD).start();

        // 啟動多執行緒生產者
        ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"workerProducer" + mCount.getAndIncrement());
            }
        });
        for(int i=1; i<4; i++) {
            int num = i;
            executorService.submit(() -> {
                // 每個生產者並行釋出100個事件
                for (int j = 0; j < 100; j++) {
                    long nextIndex = myRingBuffer.next();
                    OrderEventModel orderEvent = myRingBuffer.get(nextIndex);
                    orderEvent.setMessage("message-" + num + "-" + j);
                    orderEvent.setPrice(num * j * 10);
                    myRingBuffer.publish(nextIndex);
                }
            });
        }
    }
}
  • v4版本的demo和v3版本的邏輯幾乎一致,唯一的區別在於通過RingBuffer提供的createMultiProducer方法建立了一個支援多執行緒生產的RingBuffer。
  • 多執行緒生產者允許多個執行緒並行的呼叫next方法、publish方法進行事件的生產釋出。

總結

  • disruptor的多執行緒生產者實現中維護了一個與當前RingBuffer一樣大小的陣列availableBuffer,利用覆蓋機制巧妙的儲存了每個當前有效的序列號的釋出狀態。
    比起一般思路中引入一個最小的生產者序列號,令多個生產者執行緒並行更新的方案;disruptor的實現方案拆分了競爭的變數,避免了多寫多讀的場景。
    每個生產者執行緒獲得自己獨佔的序列號並且獨佔的更新,做到了一寫多讀,在多佔用一定空間的情況下,提高了佇列的整體吞吐量。
  • 使用基於Unsafe + 偏移量的機制讀寫陣列,除了可以引入記憶體屏障,還繞過了java正常存取陣列時的下標越界檢查。
    這樣的實現就和C語言中一樣,執行時不校驗下標是否越界,略微提高效能的同時也引入了野指標問題,使得存取時下標真的越界時會出現各種奇怪的問題,需要程式設計師更加仔細的編碼。

disruptor無論在整體設計還是最終程式碼實現上都有很多值得反覆琢磨和學習的細節,希望能幫助到對disruptor感興趣的小夥伴。

本篇部落格的完整程式碼在我的github上:https://github.com/1399852153/MyDisruptor 分支:feature/lab4