在v3版本的MyDisruptor實現多執行緒消費者後。按照計劃,v4版本的MyDisruptor需要支援執行緒安全的多執行緒生產者功能。
由於該文屬於系列部落格的一部分,需要先對之前的部落格內容有所瞭解才能更好地理解本篇部落格
在開始介紹disruptor的實現方式之前,可以站在設計者的角度先大致思考一下如何設計一個執行緒安全的生產者序列器(其功能、使用方法最好和單執行緒生產者序列器保持一致)。
可以看到從設計者的角度出發,可以想到非常多的方案。其中有的可行,有的不可行;可行的方案中有的效能更好,有的更簡潔優雅,讀者可以嘗試著發散一下思維,這裡限於篇幅就不再展開了。
disruptor的設計者當初肯定也對各種方案進行了評估,下面我們就來看看disruptor開發團隊認為的最好的多執行緒生產者設計方案吧。
disruptor多執行緒生產者的next方法實現和單執行緒生產者原理差不多,但為了實現執行緒安全在幾處關鍵地方有所不同。
/**
* 多執行緒生產者(仿disruptor.MultiProducerSequencer)
*/
public class MyMultiProducerSequencer implements MyProducerSequencer{
private final int ringBufferSize;
private final MySequence currentProducerSequence = new MySequence();
private final List<MySequence> gatingConsumerSequenceList = new ArrayList<>();
private final MyWaitStrategy myWaitStrategy;
private final MySequence gatingSequenceCache = new MySequence();
private final int[] availableBuffer;
private final int indexMask;
private final int indexShift;
/**
* 通過unsafe存取availableBuffer陣列,可以在讀寫時按需插入讀/寫記憶體屏障
*/
private static final Unsafe UNSAFE = UnsafeUtil.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
public MyMultiProducerSequencer(int ringBufferSize, final MyWaitStrategy myWaitStrategy) {
this.ringBufferSize = ringBufferSize;
this.myWaitStrategy = myWaitStrategy;
this.availableBuffer = new int[ringBufferSize];
this.indexMask = this.ringBufferSize - 1;
this.indexShift = log2(ringBufferSize);
initialiseAvailableBuffer();
}
private void initialiseAvailableBuffer() {
for (int i = availableBuffer.length - 1; i >= 0; i--) {
this.availableBuffer[i] = -1;
}
}
private static int log2(int i) {
int r = 0;
while ((i >>= 1) != 0) {
++r;
}
return r;
}
@Override
public long next() {
return next(1);
}
@Override
public long next(int n) {
do {
// 儲存申請前的最大生產者序列
long currentMaxProducerSequenceNum = currentProducerSequence.get();
// 申請之後的生產者位點
long nextProducerSequence = currentMaxProducerSequenceNum + n;
// 新申請的位點下,生產者恰好超過消費者一圈的環繞臨界點序列
long wrapPoint = nextProducerSequence - this.ringBufferSize;
// 獲得當前已快取的消費者位點(使用Sequence物件維護位點,volatile的讀。因為多生產者環境下,多個執行緒會並行讀寫gatingSequenceCache)
long cachedGatingSequence = this.gatingSequenceCache.get();
// 消費者位點cachedValue並不是實時獲取的(因為在沒有超過環繞點一圈時,生產者是可以放心生產的)
// 每次釋出都實時獲取反而會觸發對消費者sequence強一致的讀,迫使消費者執行緒所在的CPU重新整理快取(而這是不需要的)
if(wrapPoint > cachedGatingSequence){
long gatingSequence = SequenceUtil.getMinimumSequence(currentMaxProducerSequenceNum, this.gatingConsumerSequenceList);
if(wrapPoint > gatingSequence){
// 如果確實超過了一圈,則生產者無法獲取佇列空間
LockSupport.parkNanos(1);
// park短暫阻塞後continue跳出重新進入迴圈
continue;
// 為什麼不能像單執行緒生產者一樣在這裡while迴圈park?
// 因為別的生產者執行緒也在爭搶currentMaxProducerSequence,如果在這裡直接阻塞,會導致當前拿到的序列號可能也被別的執行緒獲取到
// 但最終是否可用需要通過cas的結果來決定,所以每次迴圈必須重新獲取gatingSequenceCache最新的值
}
// 滿足條件了,則快取獲得最新的消費者序列
// 因為不是實時獲取消費者序列,可能gatingSequence比上一次的值要大很多
// 這種情況下,待到下一次next申請時就可以不用去強一致的通過getMinimumSequence讀consumerSequence了(走else分支)
this.gatingSequenceCache.set(gatingSequence);
}else {
if (this.currentProducerSequence.compareAndSet(currentMaxProducerSequenceNum, nextProducerSequence)) {
// 由於是多生產者序列,可能存在多個生產者同時執行next方法申請序列,因此只有cas成功的執行緒才視為申請成功,可以跳出迴圈
return nextProducerSequence;
}
// cas更新失敗,重新迴圈獲取最新的消費位點
// continue;
}
}while (true);
}
@Override
public void publish(long publishIndex) {
setAvailable(publishIndex);
this.myWaitStrategy.signalWhenBlocking();
}
@Override
public MySequenceBarrier newBarrier() {
return new MySequenceBarrier(this,this.currentProducerSequence,this.myWaitStrategy,new ArrayList<>());
}
@Override
public MySequenceBarrier newBarrier(MySequence... dependenceSequences) {
return new MySequenceBarrier(this,this.currentProducerSequence,this.myWaitStrategy,new ArrayList<>(Arrays.asList(dependenceSequences)));
}
@Override
public void addGatingConsumerSequenceList(MySequence newGatingConsumerSequence) {
this.gatingConsumerSequenceList.add(newGatingConsumerSequence);
}
@Override
public void addGatingConsumerSequenceList(MySequence... newGatingConsumerSequences) {
this.gatingConsumerSequenceList.addAll(Arrays.asList(newGatingConsumerSequences));
}
@Override
public MySequence getCurrentProducerSequence() {
return this.currentProducerSequence;
}
@Override
public int getRingBufferSize() {
return this.ringBufferSize;
}
@Override
public long getHighestPublishedSequence(long lowBound, long availableSequence) {
// lowBound是消費者傳入的,保證是已經明確釋出了的最小生產者序列號
// 因此,從lowBound開始,向後尋找,有兩種情況
// 1 在lowBound到availableSequence中間存在未釋出的下標(isAvailable(sequence) == false),
// 那麼,找到的這個未釋出下標的前一個序列號,就是當前最大的已經發布了的序列號(可以被消費者正常消費)
// 2 在lowBound到availableSequence中間不存在未釋出的下標,那麼就和單生產者的情況一樣
// 包括availableSequence以及之前的序列號都已經發布過了,availableSequence就是當前可用的最大的的序列號(已釋出的)
for(long sequence = lowBound; sequence <= availableSequence; sequence++){
if (!isAvailable(sequence)) {
// 屬於上述的情況1,lowBound和availableSequence中間存在未釋出的序列號
return sequence - 1;
}
}
// 屬於上述的情況2,lowBound和availableSequence中間不存在未釋出的序列號
return availableSequence;
}
private void setAvailable(long sequence){
int index = calculateIndex(sequence);
int flag = calculateAvailabilityFlag(sequence);
// 計算index對應下標相對於availableBuffer參照起始位置的指標偏移量
long bufferAddress = (index * SCALE) + BASE;
// 功能上等價於this.availableBuffer[index] = flag,但新增了寫屏障
// 和單執行緒生產者中的lazySet作用一樣,保證了對publish釋出的event事件物件的更新一定先於對availableBuffer對應下標值的更新
// 避免消費者拿到新的釋出序列號時由於新event事件未對其可見,而錯誤的消費了之前老的event事件
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
private int calculateAvailabilityFlag(long sequence) {
return (int) (sequence >>> indexShift);
}
private int calculateIndex(long sequence) {
return ((int) sequence) & indexMask;
}
public boolean isAvailable(long sequence) {
int index = calculateIndex(sequence);
int flag = calculateAvailabilityFlag(sequence);
// 計算index對應下標相對於availableBuffer參照起始位置的指標偏移量
long bufferAddress = (index * SCALE) + BASE;
// 功能上等價於this.availableBuffer[index] == flag
// 但是新增了讀屏障保證了強一致的讀,可以讓消費者實時的獲取到生產者新的釋出
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
}
/**
* 序列號物件(仿Disruptor.Sequence)
*
* 由於需要被生產者、消費者執行緒同時存取,因此內部是一個volatile修飾的long值
* */
public class MySequence {
/**
* 序列起始值預設為-1,保證下一序列恰好是0(即第一個合法的序列號)
* */
private volatile long value = -1;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static {
try {
// 由於提供給cas記憶體中欄位偏移量的unsafe類只能在被jdk信任的類中直接使用,這裡使用反射來繞過這一限制
Field getUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
getUnsafe.setAccessible(true);
UNSAFE = (Unsafe) getUnsafe.get(null);
VALUE_OFFSET = UNSAFE.objectFieldOffset(MySequence.class.getDeclaredField("value"));
}
catch (final Exception e) {
throw new RuntimeException(e);
}
}
public MySequence() {
}
public MySequence(long value) {
this.value = value;
}
public long get() {
return value;
}
public void set(long value) {
this.value = value;
}
public void lazySet(long value) {
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
public boolean compareAndSet(long expect, long update){
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expect, update);
}
}
public class UnsafeUtil {
private static final Unsafe UNSAFE;
static {
try {
// 由於提供給cas記憶體中欄位偏移量的unsafe類只能在被jdk信任的類中直接使用,這裡使用反射來繞過這一限制
Field getUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
getUnsafe.setAccessible(true);
UNSAFE = (Unsafe) getUnsafe.get(null);
}
catch (final Exception e) {
throw new RuntimeException(e);
}
}
public static Unsafe getUnsafe(){
return UNSAFE;
}
}
/**
* 序列柵欄(仿Disruptor.SequenceBarrier)
* */
public class MySequenceBarrier {
private final MyProducerSequencer myProducerSequencer;
private final MySequence currentProducerSequence;
private final MyWaitStrategy myWaitStrategy;
private final List<MySequence> dependentSequencesList;
public MySequenceBarrier(MyProducerSequencer myProducerSequencer, MySequence currentProducerSequence,
MyWaitStrategy myWaitStrategy, List<MySequence> dependentSequencesList) {
this.myProducerSequencer = myProducerSequencer;
this.currentProducerSequence = currentProducerSequence;
this.myWaitStrategy = myWaitStrategy;
if(!dependentSequencesList.isEmpty()) {
this.dependentSequencesList = dependentSequencesList;
}else{
// 如果傳入的上游依賴序列為空,則生產者序列號作為兜底的依賴
this.dependentSequencesList = Collections.singletonList(currentProducerSequence);
}
}
/**
* 獲得可用的消費者下標(disruptor中的waitFor)
* */
public long getAvailableConsumeSequence(long currentConsumeSequence) throws InterruptedException {
long availableSequence = this.myWaitStrategy.waitFor(currentConsumeSequence,currentProducerSequence,dependentSequencesList);
if (availableSequence < currentConsumeSequence) {
return availableSequence;
}
// 多執行緒生產者中,需要進一步約束(於v4版本新增)
return myProducerSequencer.getHighestPublishedSequence(currentConsumeSequence,availableSequence);
}
}
在lab1中提到單執行緒的生產者SingleProducerSequencer在publish方法中通過一個lazySet方法設定了一個寫記憶體屏障,使得對entry事件物件的更新操作一定先於對序列號的更新,且消費者也是使用讀屏障進行強一致的讀,避免指令重排序和快取記憶體同步延遲導致消費者執行緒消費到錯誤的事件。
那麼在多執行緒生產者中,由於引入了一個availableBuffer陣列,並且在消費者呼叫了isAvailable對其進行了存取。
那麼對於陣列的更新和讀取應該如何插入讀、寫屏障呢?
disruptor需要支援單執行緒、多執行緒兩種型別的生產者。所以抽象了一個生產者序列器介面ProducerSequencer用於相容兩者的差異。
/**
* 生產者序列器介面(仿disruptor.ProducerSequencer)
* */
public interface MyProducerSequencer {
/**
* 獲得一個可用的生產者序列值
* @return 可用的生產者序列值
* */
long next();
/**
* 獲得一個可用的生產者序列值區間
* @param n 區間長度
* @return 可用的生產者序列區間的最大值
* */
long next(int n);
/**
* 釋出一個生產者序列
* @param publishIndex 需要釋出的生產者序列號
* */
void publish(long publishIndex);
/**
* 建立一個無上游消費者依賴的序列屏障
* @return 新的序列屏障
* */
MySequenceBarrier newBarrier();
/**
* 建立一個有上游依賴的序列屏障
* @param dependenceSequences 上游依賴的序列集合
* @return 新的序列屏障
* */
MySequenceBarrier newBarrier(MySequence... dependenceSequences);
/**
* 向生產者註冊一個消費者序列
* @param newGatingConsumerSequence 新的消費者序列
* */
void addGatingConsumerSequenceList(MySequence newGatingConsumerSequence);
/**
* 向生產者註冊一個消費者序列集合
* @param newGatingConsumerSequences 新的消費者序列集合
* */
void addGatingConsumerSequenceList(MySequence... newGatingConsumerSequences);
/**
* 獲得當前的生產者序列(cursor)
* @return 當前的生產者序列
* */
MySequence getCurrentProducerSequence();
/**
* 獲得ringBuffer的大小
* @return ringBuffer大小
* */
int getRingBufferSize();
/**
* 獲得最大的已釋出的,可用的消費者序列值
* @param nextSequence 已經明確釋出了的最小生產者序列號
* @param availableSequence 需要申請的,可能的最大的序列號
* @return 最大的已釋出的,可用的消費者序列值
* */
long getHighestPublishedSequence(long nextSequence, long availableSequence);
}
public class MyRingBufferV4Demo {
public static void main(String[] args) {
// 環形佇列容量
int ringBufferSize = 16;
// 建立環形佇列(多執行緒生產者,即多執行緒安全的生產者(可以並行的next、publish))
MyRingBuffer<OrderEventModel> myRingBuffer = MyRingBuffer.createMultiProducer(
new OrderEventProducer(), ringBufferSize, new MyBusySpinWaitStrategy());
// 獲得ringBuffer的序列屏障(最上游的序列屏障內只維護生產者的序列)
MySequenceBarrier mySequenceBarrier = myRingBuffer.newBarrier();
// ================================== 基於生產者序列屏障,建立消費者A
MyBatchEventProcessor<OrderEventModel> eventProcessorA =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerA"), mySequenceBarrier);
MySequence consumeSequenceA = eventProcessorA.getCurrentConsumeSequence();
// RingBuffer監聽消費者A的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceA);
// ================================== 消費者組依賴上游的消費者A,通過消費者A的序列號建立序列屏障(構成消費的順序依賴)
MySequenceBarrier workerSequenceBarrier = myRingBuffer.newBarrier(consumeSequenceA);
// 基於序列屏障,建立多執行緒消費者B
MyWorkerPool<OrderEventModel> workerPoolProcessorB =
new MyWorkerPool<>(myRingBuffer, workerSequenceBarrier,
new OrderWorkHandlerDemo("workerHandler1"),
new OrderWorkHandlerDemo("workerHandler2"),
new OrderWorkHandlerDemo("workerHandler3"));
MySequence[] workerSequences = workerPoolProcessorB.getCurrentWorkerSequences();
// RingBuffer監聽消費者C的序列
myRingBuffer.addGatingConsumerSequenceList(workerSequences);
// ================================== 通過消費者A的序列號建立序列屏障(構成消費的順序依賴),建立消費者C
MySequenceBarrier mySequenceBarrierC = myRingBuffer.newBarrier(consumeSequenceA);
MyBatchEventProcessor<OrderEventModel> eventProcessorC =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerC"), mySequenceBarrierC);
MySequence consumeSequenceC = eventProcessorC.getCurrentConsumeSequence();
// RingBuffer監聽消費者C的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceC);
// ================================== 基於多執行緒消費者B,單執行緒消費者C的序列屏障,建立消費者D
MySequence[] bAndCSequenceArr = new MySequence[workerSequences.length+1];
// 把多執行緒消費者B的序列複製到合併的序列陣列中
System.arraycopy(workerSequences, 0, bAndCSequenceArr, 0, workerSequences.length);
// 陣列的最後一位是消費者C的序列
bAndCSequenceArr[bAndCSequenceArr.length-1] = consumeSequenceC;
MySequenceBarrier mySequenceBarrierD = myRingBuffer.newBarrier(bAndCSequenceArr);
MyBatchEventProcessor<OrderEventModel> eventProcessorD =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerD"), mySequenceBarrierD);
MySequence consumeSequenceD = eventProcessorD.getCurrentConsumeSequence();
// RingBuffer監聽消費者D的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceD);
// 啟動消費者執行緒A
new Thread(eventProcessorA).start();
// 啟動workerPool多執行緒消費者B
workerPoolProcessorB.start(Executors.newFixedThreadPool(10, new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"worker" + mCount.getAndIncrement());
}
}));
// 啟動消費者執行緒C
new Thread(eventProcessorC).start();
// 啟動消費者執行緒D
new Thread(eventProcessorD).start();
// 啟動多執行緒生產者
ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"workerProducer" + mCount.getAndIncrement());
}
});
for(int i=1; i<4; i++) {
int num = i;
executorService.submit(() -> {
// 每個生產者並行釋出100個事件
for (int j = 0; j < 100; j++) {
long nextIndex = myRingBuffer.next();
OrderEventModel orderEvent = myRingBuffer.get(nextIndex);
orderEvent.setMessage("message-" + num + "-" + j);
orderEvent.setPrice(num * j * 10);
myRingBuffer.publish(nextIndex);
}
});
}
}
}
disruptor無論在整體設計還是最終程式碼實現上都有很多值得反覆琢磨和學習的細節,希望能幫助到對disruptor感興趣的小夥伴。
本篇部落格的完整程式碼在我的github上:https://github.com/1399852153/MyDisruptor 分支:feature/lab4