在v2版本的MyDisruptor實現多消費者、消費者組間依賴功能後。按照計劃,v3版本的MyDisruptor需要支援多執行緒消費者的功能。
由於該文屬於系列部落格的一部分,需要先對之前的部落格內容有所瞭解才能更好地理解本篇部落格
/**
* 多執行緒消費者(仿Disruptor.WorkerPool)
* */
public class MyWorkerPool<T> {
private final MySequence workSequence = new MySequence(-1);
private final MyRingBuffer<T> myRingBuffer;
private final List<MyWorkProcessor<T>> workEventProcessorList;
public MyWorkerPool(
MyRingBuffer<T> myRingBuffer,
MySequenceBarrier mySequenceBarrier,
MyWorkHandler<T>... myWorkHandlerList) {
this.myRingBuffer = myRingBuffer;
final int numWorkers = myWorkHandlerList.length;
this.workEventProcessorList = new ArrayList<>(numWorkers);
// 為每個自定義事件消費邏輯MyEventHandler,建立一個對應的MyWorkProcessor去處理
for (MyWorkHandler<T> myEventConsumer : myWorkHandlerList) {
workEventProcessorList.add(new MyWorkProcessor<>(
myRingBuffer,
myEventConsumer,
mySequenceBarrier,
this.workSequence));
}
}
/**
* 返回包括每個workerEventProcessor + workerPool自身的序列號集合
* */
public MySequence[] getCurrentWorkerSequences() {
final MySequence[] sequences = new MySequence[this.workEventProcessorList.size() + 1];
for (int i = 0, size = workEventProcessorList.size(); i < size; i++) {
sequences[i] = workEventProcessorList.get(i).getCurrentConsumeSequence();
}
sequences[sequences.length - 1] = workSequence;
return sequences;
}
public MyRingBuffer<T> start(final Executor executor) {
final long cursor = myRingBuffer.getCurrentProducerSequence().get();
workSequence.set(cursor);
for (MyWorkProcessor<?> processor : workEventProcessorList) {
processor.getCurrentConsumeSequence().set(cursor);
executor.execute(processor);
}
return this.myRingBuffer;
}
}
/**
* 多執行緒消費者-事件處理器介面
* */
public interface MyWorkHandler<T> {
/**
* 消費者消費事件
* @param event 事件物件本身
* */
void consume(T event);
}
接下來是本篇部落格的重點部分,MyWorkProcessor的實現。
/**
* 多執行緒消費者工作執行緒 (仿Disruptor.WorkProcessor)
* */
public class MyWorkProcessor<T> implements Runnable{
private final MySequence currentConsumeSequence = new MySequence(-1);
private final MyRingBuffer<T> myRingBuffer;
private final MyWorkHandler<T> myWorkHandler;
private final MySequenceBarrier sequenceBarrier;
private final MySequence workGroupSequence;
public MyWorkProcessor(MyRingBuffer<T> myRingBuffer,
MyWorkHandler<T> myWorkHandler,
MySequenceBarrier sequenceBarrier,
MySequence workGroupSequence) {
this.myRingBuffer = myRingBuffer;
this.myWorkHandler = myWorkHandler;
this.sequenceBarrier = sequenceBarrier;
this.workGroupSequence = workGroupSequence;
}
public MySequence getCurrentConsumeSequence() {
return currentConsumeSequence;
}
@Override
public void run() {
long nextConsumerIndex = this.currentConsumeSequence.get() + 1;
// 設定哨兵值,保證第一次迴圈時nextConsumerIndex <= cachedAvailableSequence一定為false,走else分支通過序列屏障獲得最大的可用序列號
long cachedAvailableSequence = Long.MIN_VALUE;
// 最近是否處理過了序列
boolean processedSequence = true;
while (true) {
try {
if(processedSequence) {
// 爭搶到了一個新的待消費序列,但還未實際進行消費(標記為false)
processedSequence = false;
// 如果已經處理過序列,則重新cas的爭搶一個新的待消費序列
do {
nextConsumerIndex = this.workGroupSequence.get() + 1L;
// 由於currentConsumeSequence會被註冊到生產者側,因此需要始終和workGroupSequence worker組的實際sequence保持協調
// 即當前worker的消費序列currentConsumeSequence = 當前消費者組的序列workGroupSequence
this.currentConsumeSequence.lazySet(nextConsumerIndex - 1L);
// 問題:只使用workGroupSequence,每個worker不維護currentConsumeSequence行不行?
// 回答:這是不行的。因為和單執行緒消費者的行為一樣,都是具體的消費者eventHandler/workHandler執行過之後才更新消費者的序列號,令其對外部可見(生產者、下游消費者)
// 因為消費依賴關係中約定,對於序列i事件只有在上游的消費者消費過後(eventHandler/workHandler執行過),下游才能消費序列i的事件
// workGroupSequence主要是用於通過cas協調同一workerPool內消費者執行緒序列爭搶的,對外的約束依然需要workProcessor原生的消費者序列currentConsumeSequence來控制
// cas更新,保證每個worker執行緒都會獲取到唯一的一個sequence
} while (!workGroupSequence.compareAndSet(nextConsumerIndex - 1L, nextConsumerIndex));
}else{
// processedSequence == false(手頭上存在一個還未消費的序列)
// 走到這裡說明之前拿到了一個新的消費序列,但是由於nextConsumerIndex > cachedAvailableSequence,沒有實際執行消費邏輯
// 而是被阻塞後返回獲得了最新的cachedAvailableSequence,重新執行一次迴圈走到了這裡
// 需要先把手頭上的這個序列給消費掉,才能繼續拿下一個消費序列
}
// cachedAvailableSequence只會存在兩種情況
// 1 第一次迴圈,初始化為Long.MIN_VALUE,則必定會走到下面的else分支中
// 2 非第一次迴圈,則cachedAvailableSequence為序列屏障所允許的最大可消費序列
if (nextConsumerIndex <= cachedAvailableSequence) {
// 爭搶到的消費序列是滿足要求的(小於序列屏障值,被序列屏障允許的),則呼叫消費者進行實際的消費
// 取出可以消費的下標對應的事件,交給eventConsumer消費
T event = myRingBuffer.get(nextConsumerIndex);
this.myWorkHandler.consume(event);
// 實際呼叫消費者進行消費了,標記為true.這樣一來就可以在下次迴圈中cas爭搶下一個新的消費序列了
processedSequence = true;
} else {
// 1 第一次迴圈會獲取當前序列屏障的最大可消費序列
// 2 非第一次迴圈,說明爭搶到的序列超過了屏障序列的最大值,等待生產者推進到爭搶到的sequence
cachedAvailableSequence = sequenceBarrier.getAvailableConsumeSequence(nextConsumerIndex);
}
} catch (final Throwable ex) {
// 消費者消費時發生了異常,也認為是成功消費了,避免阻塞消費序列
// 下次迴圈會cas爭搶一個新的消費序列
processedSequence = true;
}
}
}
}
v3版本支援了多執行緒消費者功能,下面通過一個demo來展示如何使用該功能。
public class MyRingBufferV3Demo {
/**
* -> 多執行緒消費者B(依賴A)
* 單執行緒消費者A -> 單執行緒消費者D(依賴B、C)
* -> 單執行緒消費者C(依賴A)
* */
public static void main(String[] args) throws InterruptedException {
// 環形佇列容量為16(2的4次方)
int ringBufferSize = 16;
// 建立環形佇列
MyRingBuffer<OrderEventModel> myRingBuffer = MyRingBuffer.createSingleProducer(
new OrderEventProducer(), ringBufferSize, new MyBlockingWaitStrategy());
// 獲得ringBuffer的序列屏障(最上游的序列屏障內只維護生產者的序列)
MySequenceBarrier mySequenceBarrier = myRingBuffer.newBarrier();
// ================================== 基於生產者序列屏障,建立消費者A
MyBatchEventProcessor<OrderEventModel> eventProcessorA =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerA"), mySequenceBarrier);
MySequence consumeSequenceA = eventProcessorA.getCurrentConsumeSequence();
// RingBuffer監聽消費者A的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceA);
// ================================== 消費者組依賴上游的消費者A,通過消費者A的序列號建立序列屏障(構成消費的順序依賴)
MySequenceBarrier workerSequenceBarrier = myRingBuffer.newBarrier(consumeSequenceA);
// 基於序列屏障,建立多執行緒消費者B
MyWorkerPool<OrderEventModel> workerPoolProcessorB =
new MyWorkerPool<>(myRingBuffer, workerSequenceBarrier,
new OrderWorkHandlerDemo("workerHandler1"),
new OrderWorkHandlerDemo("workerHandler2"),
new OrderWorkHandlerDemo("workerHandler3"));
MySequence[] workerSequences = workerPoolProcessorB.getCurrentWorkerSequences();
// RingBuffer監聽消費者C的序列
myRingBuffer.addGatingConsumerSequenceList(workerSequences);
// ================================== 通過消費者A的序列號建立序列屏障(構成消費的順序依賴),建立消費者C
MySequenceBarrier mySequenceBarrierC = myRingBuffer.newBarrier(consumeSequenceA);
MyBatchEventProcessor<OrderEventModel> eventProcessorC =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerC"), mySequenceBarrierC);
MySequence consumeSequenceC = eventProcessorC.getCurrentConsumeSequence();
// RingBuffer監聽消費者C的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceC);
// ================================== 基於多執行緒消費者B,單執行緒消費者C的序列屏障,建立消費者D
MySequence[] bAndCSequenceArr = new MySequence[workerSequences.length+1];
// 把多執行緒消費者B的序列複製到合併的序列陣列中
System.arraycopy(workerSequences, 0, bAndCSequenceArr, 0, workerSequences.length);
// 陣列的最後一位是消費者C的序列
bAndCSequenceArr[bAndCSequenceArr.length-1] = consumeSequenceC;
MySequenceBarrier mySequenceBarrierD = myRingBuffer.newBarrier(bAndCSequenceArr);
MyBatchEventProcessor<OrderEventModel> eventProcessorD =
new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerD"), mySequenceBarrierD);
MySequence consumeSequenceD = eventProcessorD.getCurrentConsumeSequence();
// RingBuffer監聽消費者D的序列
myRingBuffer.addGatingConsumerSequenceList(consumeSequenceD);
// 啟動消費者執行緒A
new Thread(eventProcessorA).start();
// 啟動workerPool多執行緒消費者B
workerPoolProcessorB.start(Executors.newFixedThreadPool(100, new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"worker" + mCount.getAndIncrement());
}
}));
// 啟動消費者執行緒C
new Thread(eventProcessorC).start();
// 啟動消費者執行緒D
new Thread(eventProcessorD).start();
// 生產者釋出100個事件
for(int i=0; i<100; i++) {
long nextIndex = myRingBuffer.next();
OrderEventModel orderEvent = myRingBuffer.get(nextIndex);
orderEvent.setMessage("message-"+i);
orderEvent.setPrice(i * 10);
System.out.println("生產者釋出事件:" + orderEvent);
myRingBuffer.publish(nextIndex);
}
}
}
disruptor無論在整體設計還是最終程式碼實現上都有很多值得反覆琢磨和學習的細節,希望能幫助到對disruptor感興趣的小夥伴。
本篇部落格的完整程式碼在我的github上:https://github.com/1399852153/MyDisruptor 分支:feature/lab3