Disruptor是一個高效能的無鎖並行框架,其主要應用場景是在高並行、低延遲的系統中,如金融領域的交易系統,遊戲伺服器等。其優點就是非常快,號稱能支撐每秒600萬訂單。需要注意的是,Disruptor是單機框架,對標JDK中的Queue,而非可用於分散式系統的MQ
本文基於Disruptor v3.4.*版本
既然是簡單使用,這階段只需要關注:
首先,我們定義訊息的載體Event,生產者向消費者傳遞的訊息通過Event承載
class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" + "value=" + value + '}';
}
}
然後定義Event生產工廠,這用於初始化Event
EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
};
接下來就可以構建Disruptor了,以下是完整程式碼
// 訊息載體(event)
static class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" + "value=" + value + '}';
}
}
// 釋出訊息的轉換器
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.set(buffer.getLong(0));
}
public static void main(String[] args) throws Exception {
// event生產工廠,初始化RingBuffer的時候使用
EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
};
// 指定RingBuffer的大小(必須是2的n次方)
int bufferSize = 1024;
// 構造Disruptor(預設使用多生產者模式、BlockingWaitStrategy阻塞策略)
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new BlockingWaitStrategy());
// 設定消費者
EventHandler<LongEvent> handler = (event, sequence, endOfBatch) -> {
System.out.println("Event: " + event);
};
disruptor.handleEventsWith(handler);
// 啟動disruptor,啟動所有需要執行的執行緒
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long i = 0; i < 100; i++) {
bb.putLong(i);
// 釋出事件
ringBuffer.publishEvent(LongEventMain::translate, bb);
}
}
Disruptor不僅可以當高效能的佇列使用,還支援消費者的序列、並行消費等
以下只展示關鍵程式碼(設定消費者),其餘部分參考上一節的簡單demo
單鏈序列
disruptor.handleEventsWith(handlerA).then(handlerB);
並行
disruptor.handleEventsWith(handlerA, handlerB);
鏈內序列,多鏈並行
disruptor.handleEventsWith(handlerA).then(handlerC);
disruptor.handleEventsWith(handlerB).then(handlerD);
菱形(C、D都執行完才到E)
disruptor.handleEventsWith(handlerA).then(handlerC);
disruptor.handleEventsWith(handlerB).then(handlerD);
disruptor.after(handlerC, handlerD).then(handlerE);
分組(AB都執行完才到CD)
disruptor.handleEventsWith(handlerA, handlerB).then(handlerC, handlerD);
分組不重複消費
組內競爭,組外序列:每個訊息在每個分組中只有一個消費者能消費成功,如果就是分組A中只有HandlerA2能得到資料,分組B中只有HandlerB1獲得
// 注意:此處的handler實現的是WorkHandler介面
disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3)
.then(handlerB1, handlerB2, handlerB3);
分組不重複消費(菱形)
// handlerA、handlerB實現WorkHandler介面
// handlerC 實現EventHandler或WorkHandler介面均可
disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3)
.then(handlerB1, handlerB2, handlerB3)
.then(handlerC);
消費者速度比生產者快時,需要等待。因此就有了不同的等待策略以適應不同場景
BlockingWaitStrategy
預設策略。使用鎖和 Condition 的等待、喚醒機制。速度慢,但節省CPU資源並且在不同部署環境中能提供更加一致的效能表現。
YieldingWaitStrategy
二段式,一階段自旋100次,二階段執行Thread.yield,需要低延遲的場景可使用此策略
SleepingWaitStrategy
三段式,一階段自旋,二階段執行Thread.yield,三階段睡眠
BusySpinWaitStrategy
效能最高的策略,與 YieldingWaitStrategy 一樣在低延遲場景使用,但是此策略要求消費者數量低於 CPU 邏輯核心總數
清除訊息載體 Event 中的資料
如果 Event 中存在大物件,應該在消費者鏈的末尾,新增一個清除資料的消費者,以幫助jvm垃圾回收。demo中的 LongEvent 是 private long value;
所以沒必要新增。
本文介紹了 Disruptor 的簡單使用,以及複雜場景下消費者的設定。下篇開坑 Disruptor 原始碼解析。
參考資料