在v5版本的MyDisruptor實現DSL風格的API後。按照計劃,v6版本的MyDisruptor作為最後一個版本,需要對MyDisruptor進行最終的一些細節優化。
v6版本一共做了三處優化:
由於該文屬於系列部落格的一部分,需要先對之前的部落格內容有所瞭解才能更好地理解本篇部落格
在第一篇部落格中我們就已經介紹過偽共用問題了,這裡複製原部落格內容如下:
現代的CPU都是多核的,每個核心都擁有獨立的快取記憶體。快取記憶體由固定大小的快取行組成(通常為32個位元組或64個位元組)。CPU以快取行作為最小單位讀寫,且一個快取行通常會被多個變數佔據(例如32位元的參照指標占4位元組,64位元的參照指標占8個位元組)。
這樣的設計導致了一個問題:即使快取行上的變數是無關聯的(比如不屬於同一個物件),但只要快取行上的某一個共用變數發生了變化,則整個快取行都會進行快取一致性的同步。
而CPU間快取一致性的同步是有一定效能損耗的,能避免則儘量避免。這就是所謂的「偽共用」問題。
disruptor通過對佇列中一些關鍵變數進行了快取行的填充,避免其因為不相干的變數讀寫而無謂的重新整理快取,解決了偽共用的問題。
public class Point {
public volatile int x;
public volatile int y;
public Point(int x, int y) {
this.x = x;
this.y = y;
}
}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class FalseSharingDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
CountDownLatch countDownLatch = new CountDownLatch(2);
Point point = new Point(1,2);
long start = System.currentTimeMillis();
executor.execute(()->{
// 執行緒1 x自增1億次
for(int i=0; i<100000000; i++){
point.x++;
}
countDownLatch.countDown();
});
executor.execute(()->{
// 執行緒2 y自增1億次
for(int i=0; i<100000000; i++){
point.y++;
}
countDownLatch.countDown();
});
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("testNormal 耗時=" + (end-start));
executor.shutdown();
}
}
public class PointNoFalseSharing {
private long lp1, lp2, lp3, lp4, lp5, lp6, lp7;
public volatile long x;
private long rp1, rp2, rp3, rp4, rp5, rp6, rp7;
public volatile long y;
public PointNoFalseSharing(int x, int y) {
this.x = x;
this.y = y;
}
}
public class NoFalseSharingDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
CountDownLatch countDownLatch = new CountDownLatch(2);
PointNoFalseSharing point = new PointNoFalseSharing(1,2);
long start = System.currentTimeMillis();
executor.execute(()->{
// 執行緒1 x自增1億次
for(int i=0; i<100000000; i++){
point.x++;
}
countDownLatch.countDown();
});
executor.execute(()->{
// 執行緒2 y自增1億次
for(int i=0; i<100000000; i++){
point.y++;
}
countDownLatch.countDown();
});
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("testNoFalseSharing 耗時=" + (end-start));
executor.shutdown();
}
}
/**
* 序列號物件(仿Disruptor.Sequence)
*
* 由於需要被生產者、消費者執行緒同時存取,因此內部是一個volatile修飾的long值
* */
public class MySequence {
/**
* 解決偽共用 左半部分填充
* */
private long lp1, lp2, lp3, lp4, lp5, lp6, lp7;
/**
* 序列起始值預設為-1,保證下一個序列恰好是0(即第一個合法的序列號)
* */
private volatile long value = -1;
/**
* 解決偽共用 右半部分填充
* */
private long rp1, rp2, rp3, rp4, rp5, rp6, rp7;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static {
try {
UNSAFE = UnsafeUtil.getUnsafe();
VALUE_OFFSET = UNSAFE.objectFieldOffset(MySequence.class.getDeclaredField("value"));
}
catch (final Exception e) {
throw new RuntimeException(e);
}
}
// 注意:省略了方法程式碼
}
/**
* 環形佇列(仿Disruptor.RingBuffer)
* */
public class MyRingBuffer<T> {
/**
* 解決偽共用 左半部分填充
* */
protected long lp1, lp2, lp3, lp4, lp5, lp6, lp7;
private final T[] elementList;
private final MyProducerSequencer myProducerSequencer;
private final int ringBufferSize;
private final int mask;
/**
* 解決偽共用 右半部分填充
* */
protected long rp1, rp2, rp3, rp4, rp5, rp6, rp7;
// 注意:省略了方法程式碼
}
/**
* 單執行緒生產者序列器(仿Disruptor.SingleProducerSequencer)
* 只支援單消費者的簡易版本(只有一個consumerSequence)
*
* 因為是單執行緒序列器,因此在設計上就是執行緒不安全的
* */
public class MySingleProducerSequencer implements MyProducerSequencer {
/**
* 生產者序列器所屬ringBuffer的大小
* */
private final int ringBufferSize;
/**
* 當前已釋出的生產者序列號
* (區別於nextValue)
* */
private final MySequence currentProducerSequence = new MySequence();
/**
* 生產者序列器所屬ringBuffer的消費者序列集合
* */
private volatile MySequence[] gatingConsumerSequences = new MySequence[0];
private final MyWaitStrategy myWaitStrategy;
/**
* 解決偽共用 左半部分填充
* */
private long lp1, lp2, lp3, lp4, lp5, lp6, lp7;
/**
* 當前已申請的序列(但是是否釋出了,要看currentProducerSequence)
*
* 單執行緒生產者內部使用,所以就是普通的long,不考慮並行
* */
private long nextValue = -1;
/**
* 當前已快取的消費者序列
*
* 單執行緒生產者內部使用,所以就是普通的long,不考慮並行
* */
private long cachedConsumerSequenceValue = -1;
/**
* 解決偽共用 右半部分填充
* */
private long rp1, rp2, rp3, rp4, rp5, rp6, rp7;
// 注意:省略了方法程式碼
}
截止MyDisruptor的v5版本,消費者執行緒都是通過一個永不停止的while迴圈進行工作的,除非強制殺死執行緒,否則無法令消費者執行緒關閉,而這無疑是不優雅的。
為此,disruptor實現了令消費者執行緒主動停止的機制。
/**
* 單執行緒消費者(仿Disruptor.BatchEventProcessor)
* */
public class MyBatchEventProcessor<T> implements MyEventProcessor{
private final MySequence currentConsumeSequence = new MySequence(-1);
private final MyRingBuffer<T> myRingBuffer;
private final MyEventHandler<T> myEventConsumer;
private final MySequenceBarrier mySequenceBarrier;
private final AtomicBoolean running = new AtomicBoolean();
public MyBatchEventProcessor(MyRingBuffer<T> myRingBuffer,
MyEventHandler<T> myEventConsumer,
MySequenceBarrier mySequenceBarrier) {
this.myRingBuffer = myRingBuffer;
this.myEventConsumer = myEventConsumer;
this.mySequenceBarrier = mySequenceBarrier;
}
@Override
public void run() {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("Thread is already running");
}
this.mySequenceBarrier.clearAlert();
// 下一個需要消費的下標
long nextConsumerIndex = currentConsumeSequence.get() + 1;
// 消費者執行緒主迴圈邏輯,不斷的嘗試獲取事件並進行消費(為了讓程式碼更簡單,暫不考慮優雅停止消費者執行緒的功能)
while(true) {
try {
long availableConsumeIndex = this.mySequenceBarrier.getAvailableConsumeSequence(nextConsumerIndex);
while (nextConsumerIndex <= availableConsumeIndex) {
// 取出可以消費的下標對應的事件,交給eventConsumer消費
T event = myRingBuffer.get(nextConsumerIndex);
this.myEventConsumer.consume(event, nextConsumerIndex, nextConsumerIndex == availableConsumeIndex);
// 批次處理,一次主迴圈消費N個事件(下標加1,獲取下一個)
nextConsumerIndex++;
}
// 更新當前消費者的消費的序列(lazySet,不需要生產者實時的強感知刷快取效能更好,因為生產者自己也不是實時的讀消費者序列的)
this.currentConsumeSequence.lazySet(availableConsumeIndex);
LogUtil.logWithThreadName("更新當前消費者的消費的序列:" + availableConsumeIndex);
} catch (final MyAlertException ex) {
LogUtil.logWithThreadName("消費者MyAlertException" + ex);
// 被外部alert打斷,檢查running標記
if (!running.get()) {
// running == false, break跳出主迴圈,執行結束
break;
}
} catch (final Throwable ex) {
// 發生異常,消費進度依然推進(跳過這一批拉取的資料)(lazySet 原理同上)
this.currentConsumeSequence.lazySet(nextConsumerIndex);
nextConsumerIndex++;
}
}
}
@Override
public MySequence getCurrentConsumeSequence() {
return this.currentConsumeSequence;
}
@Override
public void halt() {
// 當前消費者狀態設定為停止
running.set(false);
// 喚醒消費者執行緒(令其能立即檢查到狀態為停止)
this.mySequenceBarrier.alert();
}
@Override
public boolean isRunning() {
return this.running.get();
}
}
/**
* 多執行緒消費者工作執行緒 (仿Disruptor.WorkProcessor)
* */
public class MyWorkProcessor<T> implements MyEventProcessor{
private final MySequence currentConsumeSequence = new MySequence(-1);
private final MyRingBuffer<T> myRingBuffer;
private final MyWorkHandler<T> myWorkHandler;
private final MySequenceBarrier sequenceBarrier;
private final MySequence workGroupSequence;
private final AtomicBoolean running = new AtomicBoolean(false);
public MyWorkProcessor(MyRingBuffer<T> myRingBuffer,
MyWorkHandler<T> myWorkHandler,
MySequenceBarrier sequenceBarrier,
MySequence workGroupSequence) {
this.myRingBuffer = myRingBuffer;
this.myWorkHandler = myWorkHandler;
this.sequenceBarrier = sequenceBarrier;
this.workGroupSequence = workGroupSequence;
}
@Override
public MySequence getCurrentConsumeSequence() {
return currentConsumeSequence;
}
@Override
public void halt() {
// 當前消費者狀態設定為停止
running.set(false);
// 喚醒消費者執行緒(令其能立即檢查到狀態為停止)
this.sequenceBarrier.alert();
}
@Override
public boolean isRunning() {
return this.running.get();
}
@Override
public void run() {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("Thread is already running");
}
this.sequenceBarrier.clearAlert();
long nextConsumerIndex = this.currentConsumeSequence.get();
// 設定哨兵值,保證第一次迴圈時nextConsumerIndex <= cachedAvailableSequence一定為false,走else分支通過序列屏障獲得最大的可用序列號
long cachedAvailableSequence = Long.MIN_VALUE;
// 最近是否處理過了序列
boolean processedSequence = true;
while (true) {
try {
if(processedSequence) {
// 爭搶到了一個新的待消費序列,但還未實際進行消費(標記為false)
processedSequence = false;
// 如果已經處理過序列,則重新cas的爭搶一個新的待消費序列
do {
nextConsumerIndex = this.workGroupSequence.get() + 1L;
// 由於currentConsumeSequence會被註冊到生產者側,因此需要始終和workGroupSequence worker組的實際sequence保持協調
// 即當前worker的消費序列currentConsumeSequence = 當前消費者組的序列workGroupSequence
this.currentConsumeSequence.lazySet(nextConsumerIndex - 1L);
// 問題:只使用workGroupSequence,每個worker不維護currentConsumeSequence行不行?
// 回答:這是不行的。因為和單執行緒消費者的行為一樣,都是具體的消費者eventHandler/workHandler執行過之後才更新消費者的序列號,令其對外部可見(生產者、下游消費者)
// 因為消費依賴關係中約定,對於序列i事件只有在上游的消費者消費過後(eventHandler/workHandler執行過),下游才能消費序列i的事件
// workGroupSequence主要是用於通過cas協調同一workerPool內消費者執行緒序列爭搶的,對外的約束依然需要workProcessor原生的消費者序列currentConsumeSequence來控制
// cas更新,保證每個worker執行緒都會獲取到唯一的一個sequence
} while (!workGroupSequence.compareAndSet(nextConsumerIndex - 1L, nextConsumerIndex));
}else{
// processedSequence == false(手頭上存在一個還未消費的序列)
// 走到這裡說明之前拿到了一個新的消費序列,但是由於nextConsumerIndex > cachedAvailableSequence,沒有實際執行消費邏輯
// 而是被阻塞後返回獲得了最新的cachedAvailableSequence,重新執行一次迴圈走到了這裡
// 需要先把手頭上的這個序列給消費掉,才能繼續拿下一個消費序列
}
// cachedAvailableSequence只會存在兩種情況
// 1 第一次迴圈,初始化為Long.MIN_VALUE,則必定會走到下面的else分支中
// 2 非第一次迴圈,則cachedAvailableSequence為序列屏障所允許的最大可消費序列
if (cachedAvailableSequence >= nextConsumerIndex) {
// 爭搶到的消費序列是滿足要求的(小於序列屏障值,被序列屏障允許的),則呼叫消費者進行實際的消費
// 取出可以消費的下標對應的事件,交給eventConsumer消費
T event = myRingBuffer.get(nextConsumerIndex);
this.myWorkHandler.consume(event);
// 實際呼叫消費者進行消費了,標記為true.這樣一來就可以在下次迴圈中cas爭搶下一個新的消費序列了
processedSequence = true;
} else {
// 1 第一次迴圈會獲取當前序列屏障的最大可消費序列
// 2 非第一次迴圈,說明爭搶到的序列超過了屏障序列的最大值,等待生產者推進到爭搶到的sequence
cachedAvailableSequence = sequenceBarrier.getAvailableConsumeSequence(nextConsumerIndex);
}
} catch (final MyAlertException ex) {
// 被外部alert打斷,檢查running標記
if (!running.get()) {
// running == false, break跳出主迴圈,執行結束
break;
}
} catch (final Throwable ex) {
// 消費者消費時發生了異常,也認為是成功消費了,避免阻塞消費序列
// 下次迴圈會cas爭搶一個新的消費序列
processedSequence = true;
}
}
}
}
/**
* 多執行緒消費者(仿Disruptor.WorkerPool)
* */
public class MyWorkerPool<T> {
private final AtomicBoolean started = new AtomicBoolean(false);
private final MySequence workSequence = new MySequence(-1);
private final MyRingBuffer<T> myRingBuffer;
private final List<MyWorkProcessor<T>> workEventProcessorList;
public void halt() {
for (MyWorkProcessor<?> processor : this.workEventProcessorList) {
// 挨個停止所有工作執行緒
processor.halt();
}
started.set(false);
}
public boolean isRunning(){
return this.started.get();
}
// 注意:省略了無關程式碼
}
/**
* 序列柵欄(仿Disruptor.SequenceBarrier)
* */
public class MySequenceBarrier {
private final MyProducerSequencer myProducerSequencer;
private final MySequence currentProducerSequence;
private volatile boolean alerted = false;
private final MyWaitStrategy myWaitStrategy;
private final MySequence[] dependentSequencesList;
public MySequenceBarrier(MyProducerSequencer myProducerSequencer, MySequence currentProducerSequence,
MyWaitStrategy myWaitStrategy, MySequence[] dependentSequencesList) {
this.myProducerSequencer = myProducerSequencer;
this.currentProducerSequence = currentProducerSequence;
this.myWaitStrategy = myWaitStrategy;
if(dependentSequencesList.length != 0) {
this.dependentSequencesList = dependentSequencesList;
}else{
// 如果傳入的上游依賴序列為空,則生產者序列號作為兜底的依賴
this.dependentSequencesList = new MySequence[]{currentProducerSequence};
}
}
/**
* 獲得可用的消費者下標(disruptor中的waitFor)
* */
public long getAvailableConsumeSequence(long currentConsumeSequence) throws InterruptedException, MyAlertException {
// 每次都檢查下是否有被喚醒,被喚醒則會丟擲MyAlertException代表當前消費者要終止執行了
checkAlert();
long availableSequence = this.myWaitStrategy.waitFor(currentConsumeSequence,currentProducerSequence,dependentSequencesList,this);
if (availableSequence < currentConsumeSequence) {
return availableSequence;
}
// 多執行緒生產者中,需要進一步約束(於v4版本新增)
return myProducerSequencer.getHighestPublishedSequence(currentConsumeSequence,availableSequence);
}
/**
* 喚醒可能處於阻塞態的消費者
* */
public void alert() {
this.alerted = true;
this.myWaitStrategy.signalWhenBlocking();
}
/**
* 重新啟動時,清除標記
*/
public void clearAlert() {
this.alerted = false;
}
/**
* 檢查當前消費者的被喚醒狀態
* */
public void checkAlert() throws MyAlertException {
if (alerted) {
throw MyAlertException.INSTANCE;
}
}
}
/**
* disruptor dsl(仿Disruptor.Disruptor)
* */
public class MyDisruptor<T> {
private final MyRingBuffer<T> ringBuffer;
private final Executor executor;
private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
/**
* 啟動所有已註冊的消費者
* */
public void start(){
// cas設定啟動標識,避免重複啟動
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("Disruptor只能啟動一次");
}
// 遍歷所有的消費者,挨個start啟動
this.consumerRepository.getConsumerInfos().forEach(
item->item.start(this.executor)
);
}
/**
* 停止註冊的所有消費者
* */
public void halt() {
// 遍歷消費者資訊列表,挨個呼叫halt方法終止
for (final MyConsumerInfo consumerInfo : this.consumerRepository.getConsumerInfos()) {
consumerInfo.halt();
}
}
// 注意:省略了無關程式碼
}
/**
* disruptor dsl(仿Disruptor.Disruptor)
* */
public class MyDisruptor<T> {
private final MyRingBuffer<T> ringBuffer;
private final Executor executor;
private final MyConsumerRepository<T> consumerRepository = new MyConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
public MyDisruptor(
final MyEventFactory<T> eventProducer,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final MyWaitStrategy myWaitStrategy) {
this.ringBuffer = MyRingBuffer.create(producerType,eventProducer,ringBufferSize,myWaitStrategy);
this.executor = executor;
}
/**
* 註冊單執行緒消費者 (無上游依賴消費者,僅依賴生產者序列)
* */
@SafeVarargs
public final MyEventHandlerGroup<T> handleEventsWith(final MyEventHandler<T>... myEventHandlers){
return createEventProcessors(new MySequence[0], myEventHandlers);
}
/**
* 註冊單執行緒消費者 (有上游依賴消費者,僅依賴生產者序列)
* @param barrierSequences 依賴的序列屏障
* @param myEventHandlers 使用者自定義的事件消費者集合
* */
public MyEventHandlerGroup<T> createEventProcessors(
final MySequence[] barrierSequences,
final MyEventHandler<T>[] myEventHandlers) {
final MySequence[] processorSequences = new MySequence[myEventHandlers.length];
final MySequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
int i=0;
for(MyEventHandler<T> myEventConsumer : myEventHandlers){
final MyBatchEventProcessor<T> batchEventProcessor =
new MyBatchEventProcessor<>(ringBuffer, myEventConsumer, barrier);
processorSequences[i] = batchEventProcessor.getCurrentConsumeSequence();
i++;
// consumer物件都維護起來,便於後續start時啟動
consumerRepository.add(batchEventProcessor);
}
// 更新當前生產者註冊的消費者序列
updateGatingSequencesForNextInChain(barrierSequences,processorSequences);
return new MyEventHandlerGroup<>(this,this.consumerRepository,processorSequences);
}
/**
* 註冊多執行緒消費者 (無上游依賴消費者,僅依賴生產者序列)
* */
@SafeVarargs
public final MyEventHandlerGroup<T> handleEventsWithWorkerPool(final MyWorkHandler<T>... myWorkHandlers) {
return createWorkerPool(new MySequence[0], myWorkHandlers);
}
/**
* 註冊多執行緒消費者 (有上游依賴消費者,僅依賴生產者序列)
* @param barrierSequences 依賴的序列屏障
* @param myWorkHandlers 使用者自定義的事件消費者集合
* */
public MyEventHandlerGroup<T> createWorkerPool(
final MySequence[] barrierSequences, final MyWorkHandler<T>[] myWorkHandlers) {
final MySequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final MyWorkerPool<T> workerPool = new MyWorkerPool<>(ringBuffer, sequenceBarrier, myWorkHandlers);
// consumer都儲存起來,便於start統一的啟動或者halt、shutdown統一的停止
consumerRepository.add(workerPool);
final MySequence[] workerSequences = workerPool.getCurrentWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new MyEventHandlerGroup<>(this, consumerRepository,workerSequences);
}
private void updateGatingSequencesForNextInChain(final MySequence[] barrierSequences, final MySequence[] processorSequences) {
if (processorSequences.length != 0) {
// 這是一個優化操作:
// 由於新的消費者通過ringBuffer.newBarrier(barrierSequences),已經是依賴於之前ringBuffer中已有的消費者序列
// 消費者即EventProcessor內部已經設定好了老的barrierSequences為依賴,因此可以將ringBuffer中已有的消費者序列去掉
// 只需要儲存,依賴當前消費者鏈條最末端的序列即可(也就是最慢的序列),這樣生產者可以更快的遍歷註冊的消費者序列
for(MySequence sequence : barrierSequences){
ringBuffer.removeConsumerSequence(sequence);
}
for(MySequence sequence : processorSequences){
// 新設定的就是當前消費者鏈條最末端的序列
ringBuffer.addConsumerSequence(sequence);
}
// 將被剔除的序列的狀態標記為其不屬於消費者依賴鏈尾部(用於shutdown優雅停止)
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
/**
* 啟動所有已註冊的消費者
* */
public void start(){
// cas設定啟動標識,避免重複啟動
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("Disruptor只能啟動一次");
}
// 遍歷所有的消費者,挨個start啟動
this.consumerRepository.getConsumerInfos().forEach(
item->item.start(this.executor)
);
}
/**
* 停止註冊的所有消費者
* */
public void halt() {
// 遍歷消費者資訊列表,挨個呼叫halt方法終止
for (final MyConsumerInfo consumerInfo : this.consumerRepository.getConsumerInfos()) {
consumerInfo.halt();
}
}
/**
* 等到所有的消費者把已生產的事件全部消費完成後,再halt停止所有消費者執行緒
* */
public void shutdown(long timeout, TimeUnit timeUnit){
final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);
// 無限迴圈,直到所有已生產的事件全部消費完成
while (hasBacklog()) {
if (timeout >= 0 && System.currentTimeMillis() > timeOutAt) {
throw new RuntimeException("disruptor shutdown操作,等待超時");
}
// 忙等待
}
// hasBacklog為false,跳出了迴圈
// 說明已生產的事件全部消費完成了,此時可以安全的優雅停止所有消費者執行緒了,
halt();
}
/**
* 判斷當前消費者是否還有未消費完的事件
*/
private boolean hasBacklog() {
final long cursor = ringBuffer.getCurrentProducerSequence().get();
// 獲得所有的處於最尾端的消費者序列(最尾端的是最慢的,所以是準確的)
for (final MySequence consumer : consumerRepository.getLastSequenceInChain()) {
if (cursor > consumer.get()) {
// 如果任意一個消費者序列號小於當前生產者序列,說明存在未消費完的事件,返回true
return true;
}
}
// 所有最尾端的消費者的序列號都和生產者的序列號相等
// 說明所有的消費者截止當前都已經消費完了全部的已生產的事件,返回false
return false;
}
/**
* 獲得當親Disruptor的ringBuffer
* */
public MyRingBuffer<T> getRingBuffer() {
return ringBuffer;
}
}
/**
* 維護當前disruptor的所有消費者物件資訊的倉庫(仿Disruptor.ConsumerRepository)
*/
public class MyConsumerRepository<T> {
private final ArrayList<MyConsumerInfo> consumerInfos = new ArrayList<>();
/**
* 不重寫Sequence的hashCode,equals,因為比對的就是原始物件是否相等
* */
private final Map<MySequence, MyConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<>();
public ArrayList<MyConsumerInfo> getConsumerInfos() {
return consumerInfos;
}
public void add(final MyEventProcessor processor) {
final MyEventProcessorInfo<T> consumerInfo = new MyEventProcessorInfo<>(processor);
eventProcessorInfoBySequence.put(processor.getCurrentConsumeSequence(),consumerInfo);
consumerInfos.add(consumerInfo);
}
public void add(final MyWorkerPool<T> workerPool) {
final MyWorkerPoolInfo<T> workerPoolInfo = new MyWorkerPoolInfo<>(workerPool);
for (MySequence sequence : workerPool.getCurrentWorkerSequences()) {
eventProcessorInfoBySequence.put(sequence, workerPoolInfo);
}
consumerInfos.add(workerPoolInfo);
}
/**
* 找到所有還在執行的、處於尾端的消費者
* */
public List<MySequence> getLastSequenceInChain() {
List<MySequence> lastSequenceList = new ArrayList<>();
for (MyConsumerInfo consumerInfo : consumerInfos) {
// 找到所有還在執行的、處於尾端的消費者
if (consumerInfo.isRunning() && consumerInfo.isEndOfChain()) {
final MySequence[] sequences = consumerInfo.getSequences();
// 將其消費者序列號全部放進lastSequenceList
Collections.addAll(lastSequenceList, sequences);
}
}
return lastSequenceList;
}
public void unMarkEventProcessorsAsEndOfChain(final MySequence... barrierEventProcessors) {
for (MySequence barrierEventProcessor : barrierEventProcessors) {
eventProcessorInfoBySequence.get(barrierEventProcessor).markAsUsedInBarrier();
}
}
}
/**
* 消費者資訊 (仿Disruptor.ConsumerInfo)
* */
public interface MyConsumerInfo {
/**
* 通過executor啟動當前消費者
* @param executor 啟動器
* */
void start(Executor executor);
/**
* 停止當前消費者
* */
void halt();
/**
* 是否是最尾端的消費者
* */
boolean isEndOfChain();
/**
* 將當前消費者標記為不是最尾端消費者
* */
void markAsUsedInBarrier();
/**
* 當前消費者是否還在執行
* */
boolean isRunning();
/**
* 獲得消費者的序列號(多執行緒消費者由多個序列號物件)
* */
MySequence[] getSequences();
}
/**
* 單執行緒事件處理器資訊(仿Disruptor.EventProcessorInfo)
* */
public class MyEventProcessorInfo<T> implements MyConsumerInfo {
private final MyEventProcessor myEventProcessor;
/**
* 預設是最尾端的消費者
* */
private boolean endOfChain = true;
public MyEventProcessorInfo(MyEventProcessor myEventProcessor) {
this.myEventProcessor = myEventProcessor;
}
@Override
public void start(Executor executor) {
executor.execute(myEventProcessor);
}
@Override
public void halt() {
this.myEventProcessor.halt();
}
@Override
public boolean isEndOfChain() {
return endOfChain;
}
@Override
public void markAsUsedInBarrier() {
this.endOfChain = false;
}
@Override
public boolean isRunning() {
return this.myEventProcessor.isRunning();
}
@Override
public MySequence[] getSequences() {
return new MySequence[]{this.myEventProcessor.getCurrentConsumeSequence()};
}
}
/**
* 多執行緒消費者資訊(仿Disruptor.WorkerPoolInfo)
* */
public class MyWorkerPoolInfo<T> implements MyConsumerInfo {
private final MyWorkerPool<T> workerPool;
/**
* 預設是最尾端的消費者
* */
private boolean endOfChain = true;
public MyWorkerPoolInfo(MyWorkerPool<T> workerPool) {
this.workerPool = workerPool;
}
@Override
public void start(Executor executor) {
workerPool.start(executor);
}
@Override
public void halt() {
this.workerPool.halt();
}
@Override
public boolean isEndOfChain() {
return endOfChain;
}
@Override
public void markAsUsedInBarrier() {
this.endOfChain = true;
}
@Override
public boolean isRunning() {
return this.workerPool.isRunning();
}
@Override
public MySequence[] getSequences() {
return this.workerPool.getCurrentWorkerSequences();
}
}
截止v5版本的MyDisruptor,是通過ArrayList線性表來儲存生產者序列器(ProducerSequencer)中所註冊的消費者序列集合的。而disruptor中卻是直接使用陣列來儲存的,這是為什麼呢?
/**
* 單執行緒生產者序列器(仿Disruptor.SingleProducerSequencer)
*
* 因為是單執行緒序列器,因此在設計上就是執行緒不安全的
* */
public class MySingleProducerSequencer implements MyProducerSequencer{
private static final AtomicReferenceFieldUpdater<MySingleProducerSequencer, MySequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(MySingleProducerSequencer.class, MySequence[].class, "gatingConsumerSequences");
@Override
public void addGatingConsumerSequence(MySequence newGatingConsumerSequence){
MySequenceGroups.addSequences(this,SEQUENCE_UPDATER,this.currentProducerSequence,newGatingConsumerSequence);
}
@Override
public void addGatingConsumerSequenceList(MySequence... newGatingConsumerSequences){
MySequenceGroups.addSequences(this,SEQUENCE_UPDATER,this.currentProducerSequence,newGatingConsumerSequences);
}
@Override
public void removeConsumerSequence(MySequence sequenceNeedRemove) {
MySequenceGroups.removeSequence(this,SEQUENCE_UPDATER,sequenceNeedRemove);
}
// 注意:省略了無關的程式碼
}
/**
* 更改Sequence陣列工具類(仿Disruptor.SequenceGroups)
* 注意:實現中cas的插入/刪除機制在MyDisruptor中是不必要的,因為MyDisruptor不支援在執行時動態的註冊新消費者(disruptor支援,但是有一些額外的複雜度)
* 只是為了和Disruptor的實現保持一致,可以更好的說明實現原理才這樣做的,本質上只需要支援sequence陣列擴容/縮容即可
* */
public class MySequenceGroups {
/**
* 將新的需要註冊的序列集合加入到holder物件的對應sequence陣列中(sequencesToAdd集合)
* */
public static <T> void addSequences(
final T holder,
final AtomicReferenceFieldUpdater<T, MySequence[]> updater,
final MySequence currentProducerSequence,
final MySequence... sequencesToAdd) {
long cursorSequence;
MySequence[] updatedSequences;
MySequence[] currentSequences;
do {
// 獲得資料持有者當前的陣列參照
currentSequences = updater.get(holder);
// 將原陣列中的資料複製到新的陣列中
updatedSequences = Arrays.copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
cursorSequence = currentProducerSequence.get();
int index = currentSequences.length;
// 每個新新增的sequence值都以當前生產者的序列為準
for (MySequence sequence : sequencesToAdd) {
sequence.set(cursorSequence);
// 新註冊sequence放入陣列中
updatedSequences[index++] = sequence;
}
// cas的將新陣列賦值給物件,允許disruptor在執行時並行的註冊新的消費者sequence集合
// 只有cas賦值成功才會返回,失敗的話會重新獲取最新的currentSequences,重新構建、合併新的updatedSequences陣列
} while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
// 新註冊的消費者序列,再以當前生產者序列為準做一次最終修正
cursorSequence = currentProducerSequence.get();
for (MySequence sequence : sequencesToAdd) {
sequence.set(cursorSequence);
}
}
/**
* 從holder的sequence陣列中刪除掉一個sequence
* */
public static <T> void removeSequence(
final T holder,
final AtomicReferenceFieldUpdater<T, MySequence[]> sequenceUpdater,
final MySequence sequenceNeedRemove) {
int numToRemove;
MySequence[] oldSequences;
MySequence[] newSequences;
do {
// 獲得資料持有者當前的陣列參照
oldSequences = sequenceUpdater.get(holder);
// 獲得需要從陣列中刪除的sequence個數
numToRemove = countMatching(oldSequences, sequenceNeedRemove);
if (0 == numToRemove) {
// 沒找到需要刪除的Sequence,直接返回
return;
}
final int oldSize = oldSequences.length;
// 構造新的sequence陣列
newSequences = new MySequence[oldSize - numToRemove];
for (int i = 0, pos = 0; i < oldSize; i++) {
// 將原陣列中的sequence複製到新陣列中
final MySequence testSequence = oldSequences[i];
if (sequenceNeedRemove != testSequence) {
// 只複製不需要刪除的資料
newSequences[pos++] = testSequence;
}
}
} while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));
}
private static int countMatching(MySequence[] values, final MySequence toMatch) {
int numToRemove = 0;
for (MySequence value : values) {
if (value == toMatch) {
// 比對Sequence參照,如果和toMatch相同,則需要刪除
numToRemove++;
}
}
return numToRemove;
}
}
disruptor無論在整體設計還是最終程式碼實現上都有很多值得反覆琢磨和學習的細節,希望這個系列部落格能幫助到對disruptor感興趣的小夥伴。
本篇部落格的完整程式碼在我的github上:https://github.com/1399852153/MyDisruptor 分支:feature/lab6