從零開始實現lmax-Disruptor佇列(六)Disruptor 解決偽共用、消費者優雅停止實現原理解析

2022-07-29 06:01:15

MyDisruptor V6版本介紹

在v5版本的MyDisruptor實現DSL風格的API後。按照計劃,v6版本的MyDisruptor作為最後一個版本,需要對MyDisruptor進行最終的一些細節優化。
v6版本一共做了三處優化:

  • 解決偽共用問題
  • 支援消費者執行緒優雅停止
  • 生產者序列器中維護消費者序列集合的資料結構由ArrayList優化為陣列Array型別(減少ArrayList在get操作時額外的rangeCheck檢查)

由於該文屬於系列部落格的一部分,需要先對之前的部落格內容有所瞭解才能更好地理解本篇部落格

偽共用問題(FalseSharing)原理詳解

在第一篇部落格中我們就已經介紹過偽共用問題了,這裡複製原部落格內容如下:
現代的CPU都是多核的,每個核心都擁有獨立的快取記憶體。快取記憶體由固定大小的快取行組成(通常為32個位元組或64個位元組)。CPU以快取行作為最小單位讀寫,且一個快取行通常會被多個變數佔據(例如32位元的參照指標占4位元組,64位元的參照指標占8個位元組)。
這樣的設計導致了一個問題:即使快取行上的變數是無關聯的(比如不屬於同一個物件),但只要快取行上的某一個共用變數發生了變化,則整個快取行都會進行快取一致性的同步。
而CPU間快取一致性的同步是有一定效能損耗的,能避免則儘量避免。這就是所謂的「偽共用」問題。
disruptor通過對佇列中一些關鍵變數進行了快取行的填充,避免其因為不相干的變數讀寫而無謂的重新整理快取,解決了偽共用的問題。

舉例展示偽共用問題對效能的影響

  • 假設存在一個Point物件,其中有兩個volatile修飾的long型別欄位,x和y。
    有兩個執行緒並行的存取一個Point物件,但其中一個執行緒1唯讀寫x欄位,而另一個執行緒2唯讀寫y欄位。
存在偽共用問題的demo
public class Point {
    public volatile int x;
    public volatile int y;

    public Point(int x, int y) {
        this.x = x;
        this.y = y;
    }
}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class FalseSharingDemo {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Point point = new Point(1,2);
        long start = System.currentTimeMillis();
        executor.execute(()->{
            // 執行緒1 x自增1億次
            for(int i=0; i<100000000; i++){
                point.x++;
            }
            countDownLatch.countDown();
        });

        executor.execute(()->{
            // 執行緒2 y自增1億次
            for(int i=0; i<100000000; i++){
                point.y++;
            }
            countDownLatch.countDown();
        });

        countDownLatch.await();
        long end = System.currentTimeMillis();
        System.out.println("testNormal 耗時=" + (end-start));
        executor.shutdown();
    }
}
  • 兩個執行緒各自獨立存取兩個不同的資料,但x和y是一個物件的兩個相鄰屬性因此在記憶體中是連續分佈的,大概率讀寫時會被放到同一個快取記憶體行中,
    由於volatile變數修飾的原因,執行緒1對x執行緒的修改會對當前快取行進行觸發快取記憶體間同步進行強一致地寫,使得執行緒2中x、y欄位所在CPU的快取記憶體行失效,被迫重新讀取主記憶體中最新的資料。
    但實際上執行緒1讀寫x和執行緒2讀寫y是完全不相關的,執行緒1與執行緒2在實際業務中並不需要共用同一片記憶體空間,因此強一致的快取記憶體行同步完全是畫蛇添足,只會降低效能。
  • 需要注意的是,偽共用問題絕大多數情況下是出現在不同物件之間的,例如執行緒1會存取物件A中的volatile變數aaa,而執行緒2會存取另一個物件B中的volatile變數bbb。
    但恰好物件A的aaa屬性和物件B的bbb屬性被載入到同一個快取行中,這便是實際上最常見的偽共用場景。
    因此上述同一個Point物件中x、y兩個屬性互相干擾的例子其實並不是很恰當,只是為了方便演示效果才拿同一個物件裡的不同欄位的偽共用場景舉例。
  • 解決偽共用問題的方法是做快取行的填充,簡單來說就是通過在需要避免偽共用的volatile欄位集合前後填充無用的padding欄位,讓編譯器在編排變數地址時保證其不會被其它執行緒在存取不相關的變數時所影響。
    無論怎樣分配變數地記憶體地址,被填充欄位包裹的volatile變數都不會被其它無關的變數存取而被迫進行強一致地快取記憶體同步。
通過填充無用欄位解決偽共用問題demo
public class PointNoFalseSharing {

    private long lp1, lp2, lp3, lp4, lp5, lp6, lp7;
    public volatile long x;
    private long rp1, rp2, rp3, rp4, rp5, rp6, rp7;

    public volatile long y;

    public PointNoFalseSharing(int x, int y) {
        this.x = x;
        this.y = y;
    }
}
public class NoFalseSharingDemo {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
        CountDownLatch countDownLatch = new CountDownLatch(2);
        PointNoFalseSharing point = new PointNoFalseSharing(1,2);
        long start = System.currentTimeMillis();
        executor.execute(()->{
            // 執行緒1 x自增1億次
            for(int i=0; i<100000000; i++){
                point.x++;
            }
            countDownLatch.countDown();
        });

        executor.execute(()->{
            // 執行緒2 y自增1億次
            for(int i=0; i<100000000; i++){
                point.y++;
            }
            countDownLatch.countDown();
        });

        countDownLatch.await();
        long end = System.currentTimeMillis();
        System.out.println("testNoFalseSharing 耗時=" + (end-start));
        executor.shutdown();
    }
}
  • 感興趣的讀者可以把上述存在偽共用問題和解決了偽共用問題的demo分別執行下看看。
    在我的機器上,兩個執行緒在對x、y分別自增1億次的場景下,存在偽共用問題的範例程式碼FalseSharingDemo比解決了偽共用問題範例程式碼NoFalseSharingDemo要慢3到5倍。

disruptor中偽共用問題的解決方式

  • disruptor中對三個關鍵元件的全部或部分屬性進行了快取行的填充,分別是Sequence、RingBuffer和SingleProducerSequencer。
    這三個元件有兩大特徵:只會被單個執行緒寫、會被大量其它執行緒頻繁的讀,令它們避免出現偽共用問題在高並行場景下對效能有很大提升。
  • MySingleProducerSequencer中很多屬性,但只有nextValue和cachedConsumerSequenceValue被填充欄位包裹起來,其主要原因是隻有這兩個欄位會被生產者頻繁的讀寫。
MySequence解決偽共用實現
/**
 * 序列號物件(仿Disruptor.Sequence)
 *
 * 由於需要被生產者、消費者執行緒同時存取,因此內部是一個volatile修飾的long值
 * */
public class MySequence {

    /**
     * 解決偽共用 左半部分填充
     * */
    private long lp1, lp2, lp3, lp4, lp5, lp6, lp7;

    /**
     * 序列起始值預設為-1,保證下一個序列恰好是0(即第一個合法的序列號)
     * */
    private volatile long value = -1;

    /**
     * 解決偽共用 右半部分填充
     * */
    private long rp1, rp2, rp3, rp4, rp5, rp6, rp7;

    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static {
        try {
            UNSAFE = UnsafeUtil.getUnsafe();
            VALUE_OFFSET = UNSAFE.objectFieldOffset(MySequence.class.getDeclaredField("value"));
        }
        catch (final Exception e) {
            throw new RuntimeException(e);
        }
    }

    // 注意:省略了方法程式碼
}
MyRingBuffer解決偽共用實現
/**
 * 環形佇列(仿Disruptor.RingBuffer)
 * */
public class MyRingBuffer<T> {

  /**
   * 解決偽共用 左半部分填充
   * */
  protected long lp1, lp2, lp3, lp4, lp5, lp6, lp7;

  private final T[] elementList;
  private final MyProducerSequencer myProducerSequencer;
  private final int ringBufferSize;
  private final int mask;

  /**
   * 解決偽共用 右半部分填充
   * */
  protected long rp1, rp2, rp3, rp4, rp5, rp6, rp7;

  // 注意:省略了方法程式碼
}
MySingleProducerSequencer解決偽共用實現
/**
 * 單執行緒生產者序列器(仿Disruptor.SingleProducerSequencer)
 * 只支援單消費者的簡易版本(只有一個consumerSequence)
 *
 * 因為是單執行緒序列器,因此在設計上就是執行緒不安全的
 * */
public class MySingleProducerSequencer implements MyProducerSequencer {

   /**
   * 生產者序列器所屬ringBuffer的大小
   * */
  private final int ringBufferSize;

  /**
   * 當前已釋出的生產者序列號
   * (區別於nextValue)
   * */
  private final MySequence currentProducerSequence = new MySequence();

  /**
   * 生產者序列器所屬ringBuffer的消費者序列集合
   * */
  private volatile MySequence[] gatingConsumerSequences = new MySequence[0];

  private final MyWaitStrategy myWaitStrategy;

  /**
   * 解決偽共用 左半部分填充
   * */
  private long lp1, lp2, lp3, lp4, lp5, lp6, lp7;

  /**
   * 當前已申請的序列(但是是否釋出了,要看currentProducerSequence)
   *
   * 單執行緒生產者內部使用,所以就是普通的long,不考慮並行
   * */
  private long nextValue = -1;

  /**
   * 當前已快取的消費者序列
   *
   * 單執行緒生產者內部使用,所以就是普通的long,不考慮並行
   * */
  private long cachedConsumerSequenceValue = -1;

  /**
   * 解決偽共用 右半部分填充
   * */
  private long rp1, rp2, rp3, rp4, rp5, rp6, rp7;

  // 注意:省略了方法程式碼
}
  • 物件填充多餘欄位避免偽共用問題,提高了效能的同時,也需要注意其可能大幅增加了物件所佔用的記憶體空間。
    在disruptor中因為Sequence,RingBuffer,SingleProducerSequencer這三個資料結構都是被執行緒頻繁存取的,但實際的數量卻十分有限(正比於生產者、消費者的總數),所以這個問題並不嚴重。
  • 填充快取行的方法既可以像disruptor一樣,手動的設定填充欄位,也可以使用jdk提供的Contended註解來告訴編譯器進行緩衝行的填充,限於篇幅就不再繼續展開了。

為什麼和SingleProducerSequencer類似的MultiProducerSequencer不需要解決偽共用問題?

  • 因為多執行緒生產者序列器中和nextValue、cachedConsumerSequenceValue等價的屬性就是需要在多個生產者執行緒間共用的,因此確實需要頻繁的在多個CPU核心的快取記憶體行間進行同步。
    這種場景是實實在在的共用場景,而不是偽共用場景,因此也就不存在偽共用問題了。

支援消費者執行緒優雅停止詳解

截止MyDisruptor的v5版本,消費者執行緒都是通過一個永不停止的while迴圈進行工作的,除非強制殺死執行緒,否則無法令消費者執行緒關閉,而這無疑是不優雅的。

實現外部通知消費者執行緒自行終止

為此,disruptor實現了令消費者執行緒主動停止的機制。

  • 具體思路是在消費者執行緒內部維護一個用於標識是否需要繼續執行的標識running,預設是執行中,但外部可以去修改標識的狀態(halt方法),將其標識為停止。
  • 消費者主迴圈時每次都檢查一下該狀態,如果標識是停止,則丟擲AlertException異常。主迴圈中捕獲該異常,然後通過一個break跳出主迴圈,主動地關閉。
實現了優雅停止功能的單執行緒消費者
/**
 * 單執行緒消費者(仿Disruptor.BatchEventProcessor)
 * */
public class MyBatchEventProcessor<T> implements MyEventProcessor{

    private final MySequence currentConsumeSequence = new MySequence(-1);
    private final MyRingBuffer<T> myRingBuffer;
    private final MyEventHandler<T> myEventConsumer;
    private final MySequenceBarrier mySequenceBarrier;
    private final AtomicBoolean running = new AtomicBoolean();

    public MyBatchEventProcessor(MyRingBuffer<T> myRingBuffer,
                                 MyEventHandler<T> myEventConsumer,
                                 MySequenceBarrier mySequenceBarrier) {
        this.myRingBuffer = myRingBuffer;
        this.myEventConsumer = myEventConsumer;
        this.mySequenceBarrier = mySequenceBarrier;
    }

    @Override
    public void run() {
        if (!running.compareAndSet(false, true)) {
            throw new IllegalStateException("Thread is already running");
        }
        this.mySequenceBarrier.clearAlert();

        // 下一個需要消費的下標
        long nextConsumerIndex = currentConsumeSequence.get() + 1;

        // 消費者執行緒主迴圈邏輯,不斷的嘗試獲取事件並進行消費(為了讓程式碼更簡單,暫不考慮優雅停止消費者執行緒的功能)
        while(true) {
            try {
                long availableConsumeIndex = this.mySequenceBarrier.getAvailableConsumeSequence(nextConsumerIndex);

                while (nextConsumerIndex <= availableConsumeIndex) {
                    // 取出可以消費的下標對應的事件,交給eventConsumer消費
                    T event = myRingBuffer.get(nextConsumerIndex);
                    this.myEventConsumer.consume(event, nextConsumerIndex, nextConsumerIndex == availableConsumeIndex);
                    // 批次處理,一次主迴圈消費N個事件(下標加1,獲取下一個)
                    nextConsumerIndex++;
                }

                // 更新當前消費者的消費的序列(lazySet,不需要生產者實時的強感知刷快取效能更好,因為生產者自己也不是實時的讀消費者序列的)
                this.currentConsumeSequence.lazySet(availableConsumeIndex);
                LogUtil.logWithThreadName("更新當前消費者的消費的序列:" + availableConsumeIndex);
            } catch (final MyAlertException ex) {
                LogUtil.logWithThreadName("消費者MyAlertException" + ex);

                // 被外部alert打斷,檢查running標記
                if (!running.get()) {
                    // running == false, break跳出主迴圈,執行結束
                    break;
                }
            } catch (final Throwable ex) {
                // 發生異常,消費進度依然推進(跳過這一批拉取的資料)(lazySet 原理同上)
                this.currentConsumeSequence.lazySet(nextConsumerIndex);
                nextConsumerIndex++;
            }
        }
    }

    @Override
    public MySequence getCurrentConsumeSequence() {
        return this.currentConsumeSequence;
    }

    @Override
    public void halt() {
        // 當前消費者狀態設定為停止
        running.set(false);

        // 喚醒消費者執行緒(令其能立即檢查到狀態為停止)
        this.mySequenceBarrier.alert();
    }

    @Override
    public boolean isRunning() {
        return this.running.get();
    }
}
實現了優雅停止功能多執行緒消費者
/**
 * 多執行緒消費者工作執行緒 (仿Disruptor.WorkProcessor)
 * */
public class MyWorkProcessor<T> implements MyEventProcessor{

    private final MySequence currentConsumeSequence = new MySequence(-1);
    private final MyRingBuffer<T> myRingBuffer;
    private final MyWorkHandler<T> myWorkHandler;
    private final MySequenceBarrier sequenceBarrier;
    private final MySequence workGroupSequence;

    private final AtomicBoolean running = new AtomicBoolean(false);


    public MyWorkProcessor(MyRingBuffer<T> myRingBuffer,
                           MyWorkHandler<T> myWorkHandler,
                                MySequenceBarrier sequenceBarrier,
                                MySequence workGroupSequence) {
        this.myRingBuffer = myRingBuffer;
        this.myWorkHandler = myWorkHandler;
        this.sequenceBarrier = sequenceBarrier;
        this.workGroupSequence = workGroupSequence;
    }

    @Override
    public MySequence getCurrentConsumeSequence() {
        return currentConsumeSequence;
    }

    @Override
    public void halt() {
        // 當前消費者狀態設定為停止
        running.set(false);

        // 喚醒消費者執行緒(令其能立即檢查到狀態為停止)
        this.sequenceBarrier.alert();
    }

    @Override
    public boolean isRunning() {
        return this.running.get();
    }

    @Override
    public void run() {
        if (!running.compareAndSet(false, true)) {
            throw new IllegalStateException("Thread is already running");
        }
        this.sequenceBarrier.clearAlert();

        long nextConsumerIndex = this.currentConsumeSequence.get();
        // 設定哨兵值,保證第一次迴圈時nextConsumerIndex <= cachedAvailableSequence一定為false,走else分支通過序列屏障獲得最大的可用序列號
        long cachedAvailableSequence = Long.MIN_VALUE;

        // 最近是否處理過了序列
        boolean processedSequence = true;

        while (true) {
            try {
                if(processedSequence) {
                    // 爭搶到了一個新的待消費序列,但還未實際進行消費(標記為false)
                    processedSequence = false;

                    // 如果已經處理過序列,則重新cas的爭搶一個新的待消費序列
                    do {
                        nextConsumerIndex = this.workGroupSequence.get() + 1L;
                        // 由於currentConsumeSequence會被註冊到生產者側,因此需要始終和workGroupSequence worker組的實際sequence保持協調
                        // 即當前worker的消費序列currentConsumeSequence = 當前消費者組的序列workGroupSequence
                        this.currentConsumeSequence.lazySet(nextConsumerIndex - 1L);
                        // 問題:只使用workGroupSequence,每個worker不維護currentConsumeSequence行不行?
                        // 回答:這是不行的。因為和單執行緒消費者的行為一樣,都是具體的消費者eventHandler/workHandler執行過之後才更新消費者的序列號,令其對外部可見(生產者、下游消費者)
                        // 因為消費依賴關係中約定,對於序列i事件只有在上游的消費者消費過後(eventHandler/workHandler執行過),下游才能消費序列i的事件
                        // workGroupSequence主要是用於通過cas協調同一workerPool內消費者執行緒序列爭搶的,對外的約束依然需要workProcessor原生的消費者序列currentConsumeSequence來控制

                        // cas更新,保證每個worker執行緒都會獲取到唯一的一個sequence
                    } while (!workGroupSequence.compareAndSet(nextConsumerIndex - 1L, nextConsumerIndex));
                }else{
                    // processedSequence == false(手頭上存在一個還未消費的序列)
                    // 走到這裡說明之前拿到了一個新的消費序列,但是由於nextConsumerIndex > cachedAvailableSequence,沒有實際執行消費邏輯
                    // 而是被阻塞後返回獲得了最新的cachedAvailableSequence,重新執行一次迴圈走到了這裡
                    // 需要先把手頭上的這個序列給消費掉,才能繼續拿下一個消費序列
                }

                // cachedAvailableSequence只會存在兩種情況
                // 1 第一次迴圈,初始化為Long.MIN_VALUE,則必定會走到下面的else分支中
                // 2 非第一次迴圈,則cachedAvailableSequence為序列屏障所允許的最大可消費序列

                if (cachedAvailableSequence >= nextConsumerIndex) {
                    // 爭搶到的消費序列是滿足要求的(小於序列屏障值,被序列屏障允許的),則呼叫消費者進行實際的消費

                    // 取出可以消費的下標對應的事件,交給eventConsumer消費
                    T event = myRingBuffer.get(nextConsumerIndex);
                    this.myWorkHandler.consume(event);

                    // 實際呼叫消費者進行消費了,標記為true.這樣一來就可以在下次迴圈中cas爭搶下一個新的消費序列了
                    processedSequence = true;
                } else {
                    // 1 第一次迴圈會獲取當前序列屏障的最大可消費序列
                    // 2 非第一次迴圈,說明爭搶到的序列超過了屏障序列的最大值,等待生產者推進到爭搶到的sequence
                    cachedAvailableSequence = sequenceBarrier.getAvailableConsumeSequence(nextConsumerIndex);
                }
            } catch (final MyAlertException ex) {
                // 被外部alert打斷,檢查running標記
                if (!running.get()) {
                    // running == false, break跳出主迴圈,執行結束
                    break;
                }
            } catch (final Throwable ex) {
                // 消費者消費時發生了異常,也認為是成功消費了,避免阻塞消費序列
                // 下次迴圈會cas爭搶一個新的消費序列
                processedSequence = true;
            }
        }
    }
}
/**
 * 多執行緒消費者(仿Disruptor.WorkerPool)
 * */
public class MyWorkerPool<T> {

  private final AtomicBoolean started = new AtomicBoolean(false);
  private final MySequence workSequence = new MySequence(-1);
  private final MyRingBuffer<T> myRingBuffer;
  private final List<MyWorkProcessor<T>> workEventProcessorList;
  
  public void halt() {
    for (MyWorkProcessor<?> processor : this.workEventProcessorList) {
      // 挨個停止所有工作執行緒  
      processor.halt();
    }

    started.set(false);
  }

  public boolean isRunning(){
    return this.started.get();
  }
  
  // 注意:省略了無關程式碼
}
實現了優雅停止功能的序列屏障
  • 在修改標識狀態為停止的halt方法中,消費者執行緒可能由於等待生產者繼續生產而處於阻塞狀態(例如BlockingWaitStrategy),
    所以還需要通過消費者維護的序列屏障SequenceBarrier的alert方法來嘗試著喚醒消費者。
/**
 * 序列柵欄(仿Disruptor.SequenceBarrier)
 * */
public class MySequenceBarrier {

    private final MyProducerSequencer myProducerSequencer;
    private final MySequence currentProducerSequence;
    private volatile boolean alerted = false;
    private final MyWaitStrategy myWaitStrategy;
    private final MySequence[] dependentSequencesList;

    public MySequenceBarrier(MyProducerSequencer myProducerSequencer, MySequence currentProducerSequence,
                             MyWaitStrategy myWaitStrategy, MySequence[] dependentSequencesList) {
        this.myProducerSequencer = myProducerSequencer;
        this.currentProducerSequence = currentProducerSequence;
        this.myWaitStrategy = myWaitStrategy;

        if(dependentSequencesList.length != 0) {
            this.dependentSequencesList = dependentSequencesList;
        }else{
            // 如果傳入的上游依賴序列為空,則生產者序列號作為兜底的依賴
            this.dependentSequencesList = new MySequence[]{currentProducerSequence};
        }
    }

    /**
     * 獲得可用的消費者下標(disruptor中的waitFor)
     * */
    public long getAvailableConsumeSequence(long currentConsumeSequence) throws InterruptedException, MyAlertException {
        // 每次都檢查下是否有被喚醒,被喚醒則會丟擲MyAlertException代表當前消費者要終止執行了
        checkAlert();

        long availableSequence = this.myWaitStrategy.waitFor(currentConsumeSequence,currentProducerSequence,dependentSequencesList,this);

        if (availableSequence < currentConsumeSequence) {
            return availableSequence;
        }

        // 多執行緒生產者中,需要進一步約束(於v4版本新增)
        return myProducerSequencer.getHighestPublishedSequence(currentConsumeSequence,availableSequence);
    }

    /**
     * 喚醒可能處於阻塞態的消費者
     * */
    public void alert() {
        this.alerted = true;
        this.myWaitStrategy.signalWhenBlocking();
    }

    /**
     * 重新啟動時,清除標記
     */
    public void clearAlert() {
        this.alerted = false;
    }

    /**
     * 檢查當前消費者的被喚醒狀態
     * */
    public void checkAlert() throws MyAlertException {
        if (alerted) {
            throw MyAlertException.INSTANCE;
        }
    }
}
由disruptor對外暴露的halt方法,停止當前所有消費者執行緒
  • disruptor類提供了一個halt方法,其基於元件提供的halt機制將所有註冊的消費者執行緒全部關閉。
  • consumerInfo抽象了單執行緒/多執行緒消費者,其子類的halt方法內部會呼叫對應消費者的halt方法將對應消費者終止。
/**
 * disruptor dsl(仿Disruptor.Disruptor)
 * */
public class MyDisruptor<T> {

  private final MyRingBuffer<T> ringBuffer;
  private final Executor executor;
  private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>();
  private final AtomicBoolean started = new AtomicBoolean(false);
  
  /**
   * 啟動所有已註冊的消費者
   * */
  public void start(){
    // cas設定啟動標識,避免重複啟動
    if (!started.compareAndSet(false, true)) {
      throw new IllegalStateException("Disruptor只能啟動一次");
    }

    // 遍歷所有的消費者,挨個start啟動
    this.consumerRepository.getConsumerInfos().forEach(
            item->item.start(this.executor)
    );
  }

  /**
   * 停止註冊的所有消費者
   * */
  public void halt() {
    // 遍歷消費者資訊列表,挨個呼叫halt方法終止
    for (final MyConsumerInfo consumerInfo : this.consumerRepository.getConsumerInfos()) {
      consumerInfo.halt();
    }
  }
  
  // 注意:省略了無關程式碼
}

優雅停止消費者執行緒

  • 目前為止,已經實現了disruptor的halt方法,可以從外部控制消費者執行緒的啟動和終止了。但還存在一個關鍵問題沒有解決:如何保證消費者執行緒halt停止時,不會存在還未消費完成的事件?
  • disruptor是一個記憶體佇列,關閉時如果消費者沒有把已經在ringBuffer中的事件消費掉,則相當於丟訊息了。這個問題在某些場景下是致命的,無法接受的。
  • disruptor為此提供了一個shutdown方法,用於真正優雅的停止所有消費者,shutdown方法可以檢查所有消費者的消費狀態,直到所有消費者都把生產的事件消費完後才呼叫halt方法終止消費者執行緒。
    可以令使用者在不丟事件的情況下,實現真正的優雅停止。
disruptor的shutdown方法實現
  • 在disruptor提供的dsl風格api中,通過updateGatingSequencesForNextInChain方法將不處於消費鏈尾部的消費者序列從生產者中剔除出去進行了優化。
    同時也對這些消費者(ConsumeInfo)進行了是否處於消費者隊尾的進行了標記(endOfChain)
  • shutdown方法內通過忙迴圈不斷的通過hasBacklog方法檢查是否有消費鏈尾部的(最慢的)消費者其進度慢於生產者。
/**
 * disruptor dsl(仿Disruptor.Disruptor)
 * */
public class MyDisruptor<T> {

    private final MyRingBuffer<T> ringBuffer;
    private final Executor executor;
    private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>();
    private final AtomicBoolean started = new AtomicBoolean(false);

    public MyDisruptor(
            final MyEventFactory<T> eventProducer,
            final int ringBufferSize,
            final Executor executor,
            final ProducerType producerType,
            final MyWaitStrategy myWaitStrategy) {

        this.ringBuffer = MyRingBuffer.create(producerType,eventProducer,ringBufferSize,myWaitStrategy);
        this.executor = executor;
    }

    /**
     * 註冊單執行緒消費者 (無上游依賴消費者,僅依賴生產者序列)
     * */
    @SafeVarargs
    public final MyEventHandlerGroup<T> handleEventsWith(final MyEventHandler<T>... myEventHandlers){
        return createEventProcessors(new MySequence[0], myEventHandlers);
    }

    /**
     * 註冊單執行緒消費者 (有上游依賴消費者,僅依賴生產者序列)
     * @param barrierSequences 依賴的序列屏障
     * @param myEventHandlers 使用者自定義的事件消費者集合
     * */
    public MyEventHandlerGroup<T> createEventProcessors(
            final MySequence[] barrierSequences,
            final MyEventHandler<T>[] myEventHandlers) {

        final MySequence[] processorSequences = new MySequence[myEventHandlers.length];
        final MySequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        int i=0;
        for(MyEventHandler<T> myEventConsumer : myEventHandlers){
            final MyBatchEventProcessor<T> batchEventProcessor =
                    new MyBatchEventProcessor<>(ringBuffer, myEventConsumer, barrier);

            processorSequences[i] = batchEventProcessor.getCurrentConsumeSequence();
            i++;

            // consumer物件都維護起來,便於後續start時啟動
            consumerRepository.add(batchEventProcessor);
        }

        // 更新當前生產者註冊的消費者序列
        updateGatingSequencesForNextInChain(barrierSequences,processorSequences);

        return new MyEventHandlerGroup<>(this,this.consumerRepository,processorSequences);
    }

    /**
     * 註冊多執行緒消費者 (無上游依賴消費者,僅依賴生產者序列)
     * */
    @SafeVarargs
    public final MyEventHandlerGroup<T> handleEventsWithWorkerPool(final MyWorkHandler<T>... myWorkHandlers) {
        return createWorkerPool(new MySequence[0], myWorkHandlers);
    }

    /**
     * 註冊多執行緒消費者 (有上游依賴消費者,僅依賴生產者序列)
     * @param barrierSequences 依賴的序列屏障
     * @param myWorkHandlers 使用者自定義的事件消費者集合
     * */
    public MyEventHandlerGroup<T> createWorkerPool(
            final MySequence[] barrierSequences, final MyWorkHandler<T>[] myWorkHandlers) {
        final MySequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
        final MyWorkerPool<T> workerPool = new MyWorkerPool<>(ringBuffer, sequenceBarrier, myWorkHandlers);

        // consumer都儲存起來,便於start統一的啟動或者halt、shutdown統一的停止
        consumerRepository.add(workerPool);

        final MySequence[] workerSequences = workerPool.getCurrentWorkerSequences();

        updateGatingSequencesForNextInChain(barrierSequences, workerSequences);

        return new MyEventHandlerGroup<>(this, consumerRepository,workerSequences);
    }

    private void updateGatingSequencesForNextInChain(final MySequence[] barrierSequences, final MySequence[] processorSequences) {
        if (processorSequences.length != 0) {
            // 這是一個優化操作:
            // 由於新的消費者通過ringBuffer.newBarrier(barrierSequences),已經是依賴於之前ringBuffer中已有的消費者序列
            // 消費者即EventProcessor內部已經設定好了老的barrierSequences為依賴,因此可以將ringBuffer中已有的消費者序列去掉
            // 只需要儲存,依賴當前消費者鏈條最末端的序列即可(也就是最慢的序列),這樣生產者可以更快的遍歷註冊的消費者序列
            for(MySequence sequence : barrierSequences){
                ringBuffer.removeConsumerSequence(sequence);
            }
            for(MySequence sequence : processorSequences){
                // 新設定的就是當前消費者鏈條最末端的序列
                ringBuffer.addConsumerSequence(sequence);
            }

            // 將被剔除的序列的狀態標記為其不屬於消費者依賴鏈尾部(用於shutdown優雅停止)
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }

    /**
     * 啟動所有已註冊的消費者
     * */
    public void start(){
        // cas設定啟動標識,避免重複啟動
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("Disruptor只能啟動一次");
        }

        // 遍歷所有的消費者,挨個start啟動
        this.consumerRepository.getConsumerInfos().forEach(
                item->item.start(this.executor)
        );
    }

    /**
     * 停止註冊的所有消費者
     * */
    public void halt() {
        // 遍歷消費者資訊列表,挨個呼叫halt方法終止
        for (final MyConsumerInfo consumerInfo : this.consumerRepository.getConsumerInfos()) {
            consumerInfo.halt();
        }
    }

    /**
     * 等到所有的消費者把已生產的事件全部消費完成後,再halt停止所有消費者執行緒
     * */
    public void shutdown(long timeout, TimeUnit timeUnit){
        final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        // 無限迴圈,直到所有已生產的事件全部消費完成
        while (hasBacklog()) {
            if (timeout >= 0 && System.currentTimeMillis() > timeOutAt) {
                throw new RuntimeException("disruptor shutdown操作,等待超時");
            }
            // 忙等待
        }

        // hasBacklog為false,跳出了迴圈
        // 說明已生產的事件全部消費完成了,此時可以安全的優雅停止所有消費者執行緒了,
        halt();
    }

    /**
     * 判斷當前消費者是否還有未消費完的事件
     */
    private boolean hasBacklog() {
        final long cursor = ringBuffer.getCurrentProducerSequence().get();
        // 獲得所有的處於最尾端的消費者序列(最尾端的是最慢的,所以是準確的)
        for (final MySequence consumer : consumerRepository.getLastSequenceInChain()) {
            if (cursor > consumer.get()) {
                // 如果任意一個消費者序列號小於當前生產者序列,說明存在未消費完的事件,返回true
                return true;
            }
        }
        // 所有最尾端的消費者的序列號都和生產者的序列號相等
        // 說明所有的消費者截止當前都已經消費完了全部的已生產的事件,返回false
        return false;
    }


    /**
     * 獲得當親Disruptor的ringBuffer
     * */
    public MyRingBuffer<T> getRingBuffer() {
        return ringBuffer;
    }
}
/**
 * 維護當前disruptor的所有消費者物件資訊的倉庫(仿Disruptor.ConsumerRepository)
 */
public class MyConsumerRepository<T> {

    private final ArrayList<MyConsumerInfo> consumerInfos = new ArrayList<>();

    /**
     * 不重寫Sequence的hashCode,equals,因為比對的就是原始物件是否相等
     * */
    private final Map<MySequence, MyConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<>();

    public ArrayList<MyConsumerInfo> getConsumerInfos() {
        return consumerInfos;
    }

    public void add(final MyEventProcessor processor) {
        final MyEventProcessorInfo<T> consumerInfo = new MyEventProcessorInfo<>(processor);
        eventProcessorInfoBySequence.put(processor.getCurrentConsumeSequence(),consumerInfo);
        consumerInfos.add(consumerInfo);
    }

    public void add(final MyWorkerPool<T> workerPool) {
        final MyWorkerPoolInfo<T> workerPoolInfo = new MyWorkerPoolInfo<>(workerPool);
        for (MySequence sequence : workerPool.getCurrentWorkerSequences()) {
            eventProcessorInfoBySequence.put(sequence, workerPoolInfo);
        }
        consumerInfos.add(workerPoolInfo);
    }

    /**
     * 找到所有還在執行的、處於尾端的消費者
     * */
    public List<MySequence> getLastSequenceInChain() {
        List<MySequence> lastSequenceList = new ArrayList<>();
        for (MyConsumerInfo consumerInfo : consumerInfos) {
            // 找到所有還在執行的、處於尾端的消費者
            if (consumerInfo.isRunning() && consumerInfo.isEndOfChain()) {
                final MySequence[] sequences = consumerInfo.getSequences();
                // 將其消費者序列號全部放進lastSequenceList
                Collections.addAll(lastSequenceList, sequences);
            }
        }

        return lastSequenceList;
    }

    public void unMarkEventProcessorsAsEndOfChain(final MySequence... barrierEventProcessors) {
        for (MySequence barrierEventProcessor : barrierEventProcessors) {
            eventProcessorInfoBySequence.get(barrierEventProcessor).markAsUsedInBarrier();
        }
    }
}
ConsumerInfo及其子類實現
/**
 * 消費者資訊 (仿Disruptor.ConsumerInfo)
 * */
public interface MyConsumerInfo {

    /**
     * 通過executor啟動當前消費者
     * @param executor 啟動器
     * */
    void start(Executor executor);

    /**
     * 停止當前消費者
     * */
    void halt();

    /**
     * 是否是最尾端的消費者
     * */
    boolean isEndOfChain();

    /**
     * 將當前消費者標記為不是最尾端消費者
     * */
    void markAsUsedInBarrier();

    /**
     * 當前消費者是否還在執行
     * */
    boolean isRunning();

    /**
     * 獲得消費者的序列號(多執行緒消費者由多個序列號物件)
     * */
    MySequence[] getSequences();
}
/**
 * 單執行緒事件處理器資訊(仿Disruptor.EventProcessorInfo)
 * */
public class MyEventProcessorInfo<T> implements MyConsumerInfo {

    private final MyEventProcessor myEventProcessor;

    /**
     * 預設是最尾端的消費者
     * */
    private boolean endOfChain = true;

    public MyEventProcessorInfo(MyEventProcessor myEventProcessor) {
        this.myEventProcessor = myEventProcessor;
    }

    @Override
    public void start(Executor executor) {
        executor.execute(myEventProcessor);
    }

    @Override
    public void halt() {
        this.myEventProcessor.halt();
    }

    @Override
    public boolean isEndOfChain() {
        return endOfChain;
    }

    @Override
    public void markAsUsedInBarrier() {
        this.endOfChain = false;
    }

    @Override
    public boolean isRunning() {
        return this.myEventProcessor.isRunning();
    }

    @Override
    public MySequence[] getSequences() {
        return new MySequence[]{this.myEventProcessor.getCurrentConsumeSequence()};
    }
}
/**
 * 多執行緒消費者資訊(仿Disruptor.WorkerPoolInfo)
 * */
public class MyWorkerPoolInfo<T> implements MyConsumerInfo {

    private final MyWorkerPool<T> workerPool;

    /**
     * 預設是最尾端的消費者
     * */
    private boolean endOfChain = true;

    public MyWorkerPoolInfo(MyWorkerPool<T> workerPool) {
        this.workerPool = workerPool;
    }

    @Override
    public void start(Executor executor) {
        workerPool.start(executor);
    }

    @Override
    public void halt() {
        this.workerPool.halt();
    }

    @Override
    public boolean isEndOfChain() {
        return endOfChain;
    }

    @Override
    public void markAsUsedInBarrier() {
        this.endOfChain = true;
    }

    @Override
    public boolean isRunning() {
        return this.workerPool.isRunning();
    }

    @Override
    public MySequence[] getSequences() {
        return this.workerPool.getCurrentWorkerSequences();
    }
}
  • 至此,v6版本的MyDisruptor就完整的實現了消費者的優雅停止功能。生產者執行緒不再生產後便可以通過Disruptor提供的shutdown方法安全的、優雅的關閉所有的消費者。
  • 對比上個版本,可以看到disruptor為了實現優雅停止這一功能新增了很多的方法和邏輯,使得整體程式碼變得複雜起來而不易理解,所以MyDisruptor才將這一功能推遲到最後才實現。

生產者中的消費者序列集合由ArrayList優化為陣列

截止v5版本的MyDisruptor,是通過ArrayList線性表來儲存生產者序列器(ProducerSequencer)中所註冊的消費者序列集合的。而disruptor中卻是直接使用陣列來儲存的,這是為什麼呢?

  • disruptor中生產者序列器維護的消費者序列集合是會動態新增和刪除的,早期版本的MyDisruptor直接使用ArrayList,目的是避免編寫額外的程式碼對陣列進行擴容,令程式碼更加的簡單易懂。
  • 雖然ArrayList是線性表結構,基於陣列做了一個簡單的封裝,但是在存取陣列中元素時依然不如"array[index]"直接存取的方式效率高。
    原因在於ArrayList的get方法中,多了一個rangeCheck判斷;而ArrayList的迭代器中則更是包括了對並行版本號驗證等額外邏輯。
    存在額外邏輯的ArrayList存取內部元素的效能肯定是不如裸陣列的。
  • 在絕大多數的場景下,裸陣列和ArrayList這一點微小的效能差異是完全可以忽略的。但disruptor中的生產者會不斷的通過getMinimumSequence方法遍歷維護的消費者序列。因此略微捨棄一些可讀性,換來效能上的小提升是值得的。
生產者由ArrayList改為陣列實現(多執行緒生產者中實現原理也是一樣的)
/**
 * 單執行緒生產者序列器(仿Disruptor.SingleProducerSequencer)
 *
 * 因為是單執行緒序列器,因此在設計上就是執行緒不安全的
 * */
public class MySingleProducerSequencer implements MyProducerSequencer{

    private static final AtomicReferenceFieldUpdater<MySingleProducerSequencer, MySequence[]> SEQUENCE_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(MySingleProducerSequencer.class, MySequence[].class, "gatingConsumerSequences");
    
    @Override
    public void addGatingConsumerSequence(MySequence newGatingConsumerSequence){
        MySequenceGroups.addSequences(this,SEQUENCE_UPDATER,this.currentProducerSequence,newGatingConsumerSequence);
    }

    @Override
    public void addGatingConsumerSequenceList(MySequence... newGatingConsumerSequences){
        MySequenceGroups.addSequences(this,SEQUENCE_UPDATER,this.currentProducerSequence,newGatingConsumerSequences);
    }

    @Override
    public void removeConsumerSequence(MySequence sequenceNeedRemove) {
        MySequenceGroups.removeSequence(this,SEQUENCE_UPDATER,sequenceNeedRemove);
    }

    // 注意:省略了無關的程式碼
}
/**
 * 更改Sequence陣列工具類(仿Disruptor.SequenceGroups)
 * 注意:實現中cas的插入/刪除機制在MyDisruptor中是不必要的,因為MyDisruptor不支援在執行時動態的註冊新消費者(disruptor支援,但是有一些額外的複雜度)
 *     只是為了和Disruptor的實現保持一致,可以更好的說明實現原理才這樣做的,本質上只需要支援sequence陣列擴容/縮容即可
 * */
public class MySequenceGroups {

    /**
     * 將新的需要註冊的序列集合加入到holder物件的對應sequence陣列中(sequencesToAdd集合)
     * */
    public static <T> void addSequences(
            final T holder,
            final AtomicReferenceFieldUpdater<T, MySequence[]> updater,
            final MySequence currentProducerSequence,
            final MySequence... sequencesToAdd) {
        long cursorSequence;
        MySequence[] updatedSequences;
        MySequence[] currentSequences;

        do {
            // 獲得資料持有者當前的陣列參照
            currentSequences = updater.get(holder);
            // 將原陣列中的資料複製到新的陣列中
            updatedSequences = Arrays.copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
            cursorSequence = currentProducerSequence.get();

            int index = currentSequences.length;
            // 每個新新增的sequence值都以當前生產者的序列為準
            for (MySequence sequence : sequencesToAdd) {
                sequence.set(cursorSequence);
                // 新註冊sequence放入陣列中
                updatedSequences[index++] = sequence;
            }
            // cas的將新陣列賦值給物件,允許disruptor在執行時並行的註冊新的消費者sequence集合
            // 只有cas賦值成功才會返回,失敗的話會重新獲取最新的currentSequences,重新構建、合併新的updatedSequences陣列
        } while (!updater.compareAndSet(holder, currentSequences, updatedSequences));

        // 新註冊的消費者序列,再以當前生產者序列為準做一次最終修正
        cursorSequence = currentProducerSequence.get();
        for (MySequence sequence : sequencesToAdd) {
            sequence.set(cursorSequence);
        }
    }

    /**
     * 從holder的sequence陣列中刪除掉一個sequence
     * */
    public static <T> void removeSequence(
            final T holder,
            final AtomicReferenceFieldUpdater<T, MySequence[]> sequenceUpdater,
            final MySequence sequenceNeedRemove) {
        int numToRemove;
        MySequence[] oldSequences;
        MySequence[] newSequences;

        do {
            // 獲得資料持有者當前的陣列參照
            oldSequences = sequenceUpdater.get(holder);
            // 獲得需要從陣列中刪除的sequence個數
            numToRemove = countMatching(oldSequences, sequenceNeedRemove);

            if (0 == numToRemove) {
                // 沒找到需要刪除的Sequence,直接返回
                return;
            }

            final int oldSize = oldSequences.length;
            // 構造新的sequence陣列
            newSequences = new MySequence[oldSize - numToRemove];

            for (int i = 0, pos = 0; i < oldSize; i++) {
                // 將原陣列中的sequence複製到新陣列中
                final MySequence testSequence = oldSequences[i];
                if (sequenceNeedRemove != testSequence) {
                    // 只複製不需要刪除的資料
                    newSequences[pos++] = testSequence;
                }
            }
        } while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));
    }

    private static int countMatching(MySequence[] values, final MySequence toMatch) {
        int numToRemove = 0;
        for (MySequence value : values) {
            if (value == toMatch) {
                // 比對Sequence參照,如果和toMatch相同,則需要刪除
                numToRemove++;
            }
        }
        return numToRemove;
    }
}

總結

  • 作為disruptor學習系列的最後一篇部落格,v6版本對MyDisruptor存在的一些關鍵的效能問題做了最後的優化。最終的v6版本MyDisruptor除了少部分不常用的功能沒實現外,整體已經和Disruptor相差無幾了。
  • 縱觀v1到v6版本迭代的全過程,MyDisruptor從最初簡單的只支援單執行緒/單消費者開始,不斷的豐富功能、優化效能,程式碼也逐漸膨脹,變得越來越複雜。
    但只要按照每個版本都是為了實現一至多個完整功能模組的角度出發,有機的切分這些程式碼,也不會覺得難以理解。
  • 站在設計者的角度去實現MyDisruptor的過程中,我學到了很多東西,也逐漸地理解了disruptor在一些地方為什麼那樣實現的原因。
    這種臨摹、自己動手實現的方式,可以大幅降低對disruptor這樣一個實現巧妙、細節頗多的專案的學習曲線,幫助我們更好的理解disruptor的工作原理以及背後的設計思想。

disruptor無論在整體設計還是最終程式碼實現上都有很多值得反覆琢磨和學習的細節,希望這個系列部落格能幫助到對disruptor感興趣的小夥伴。

本篇部落格的完整程式碼在我的github上:https://github.com/1399852153/MyDisruptor 分支:feature/lab6