Disruptor-簡單使用

2023-04-10 18:00:47

前言

Disruptor是一個高效能的無鎖並行框架,其主要應用場景是在高並行、低延遲的系統中,如金融領域的交易系統,遊戲伺服器等。其優點就是非常快,號稱能支撐每秒600萬訂單。需要注意的是,Disruptor是單機框架,對標JDK中的Queue,而非可用於分散式系統的MQ

本文基於Disruptor v3.4.*版本

Demo

既然是簡單使用,這階段只需要關注:

  • 生產者
  • 消費者:EventHandler
  • 訊息的傳遞:訊息的載體Event

簡單例子

首先,我們定義訊息的載體Event,生產者向消費者傳遞的訊息通過Event承載

class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }
    @Override
    public String toString() {
        return "LongEvent{" + "value=" + value + '}';
    }
}

然後定義Event生產工廠,這用於初始化Event

EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
};

接下來就可以構建Disruptor了,以下是完整程式碼

// 訊息載體(event)
static class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }
    @Override
    public String toString() {
        return "LongEvent{" + "value=" + value + '}';
    }
}

// 釋出訊息的轉換器
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
    event.set(buffer.getLong(0));
}

public static void main(String[] args) throws Exception {

    // event生產工廠,初始化RingBuffer的時候使用
    EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
        @Override
        public LongEvent newInstance() {
            return new LongEvent();
        }
    };

    // 指定RingBuffer的大小(必須是2的n次方)
    int bufferSize = 1024;

    // 構造Disruptor(預設使用多生產者模式、BlockingWaitStrategy阻塞策略)
    Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
    //  Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new BlockingWaitStrategy());
    // 設定消費者
    EventHandler<LongEvent> handler = (event, sequence, endOfBatch) -> {
        System.out.println("Event: " + event);
    };
    disruptor.handleEventsWith(handler);

    // 啟動disruptor,啟動所有需要執行的執行緒
    disruptor.start();

    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    ByteBuffer bb = ByteBuffer.allocate(8);
    for (long i = 0; i < 100; i++) {
        bb.putLong(i);
        // 釋出事件
        ringBuffer.publishEvent(LongEventMain::translate, bb);
    }
}

消費者組合(多使用場景)

Disruptor不僅可以當高效能的佇列使用,還支援消費者的序列、並行消費等

以下只展示關鍵程式碼(設定消費者),其餘部分參考上一節的簡單demo

  1. 單鏈序列

    disruptor.handleEventsWith(handlerA).then(handlerB);
    
  2. 並行

    disruptor.handleEventsWith(handlerA, handlerB);
    
  3. 鏈內序列,多鏈並行

    disruptor.handleEventsWith(handlerA).then(handlerC);
    disruptor.handleEventsWith(handlerB).then(handlerD);
    
  4. 菱形(C、D都執行完才到E)

    disruptor.handleEventsWith(handlerA).then(handlerC);
    disruptor.handleEventsWith(handlerB).then(handlerD);
    disruptor.after(handlerC, handlerD).then(handlerE);
    
    
  5. 分組(AB都執行完才到CD)

    disruptor.handleEventsWith(handlerA, handlerB).then(handlerC, handlerD);
    
  6. 分組不重複消費

    組內競爭,組外序列:每個訊息在每個分組中只有一個消費者能消費成功,如果就是分組A中只有HandlerA2能得到資料,分組B中只有HandlerB1獲得

    // 注意:此處的handler實現的是WorkHandler介面
    disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3)
                    .then(handlerB1, handlerB2, handlerB3);
    
  7. 分組不重複消費(菱形)

    // handlerA、handlerB實現WorkHandler介面
    // handlerC 實現EventHandler或WorkHandler介面均可
    disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3)
                    .then(handlerB1, handlerB2, handlerB3)
                    .then(handlerC);
    

    等待策略

    消費者速度比生產者快時,需要等待。因此就有了不同的等待策略以適應不同場景

    • BlockingWaitStrategy

      預設策略。使用鎖和 Condition 的等待、喚醒機制。速度慢,但節省CPU資源並且在不同部署環境中能提供更加一致的效能表現。

    • YieldingWaitStrategy

      二段式,一階段自旋100次,二階段執行Thread.yield,需要低延遲的場景可使用此策略

    • SleepingWaitStrategy

      三段式,一階段自旋,二階段執行Thread.yield,三階段睡眠

    • BusySpinWaitStrategy

      效能最高的策略,與 YieldingWaitStrategy 一樣在低延遲場景使用,但是此策略要求消費者數量低於 CPU 邏輯核心總數

    其他小技巧

    1. 清除訊息載體 Event 中的資料

      如果 Event 中存在大物件,應該在消費者鏈的末尾,新增一個清除資料的消費者,以幫助jvm垃圾回收。demo中的 LongEvent 是 private long value; 所以沒必要新增。

總結

本文介紹了 Disruptor 的簡單使用,以及複雜場景下消費者的設定。下篇開坑 Disruptor 原始碼解析。


參考資料

Disruptor官方檔案