RocketMQ

2023-02-28 09:00:22

使用者端是通過Rebalance服務做到高可靠的。當發生Broker掉線、消費者範例掉線、Topic 擴容等各種突發情況時,消費者組中的消費者範例是怎麼重平衡,以支援全部佇列的正常消費的呢?

RebalancePullImpl 和 RebalancePushImpl 兩個重平衡實現類,分別被 DefaultMQPullConsumer 和DefaultMQPushConsumer 使用。下面講一下 Rebalancelmpl 的核心屬性和方法

核心屬性

public abstract class RebalanceImpl {
    protected static final InternalLogger log = ClientLogger.getLog();
    //記錄MessageQueue和ProcessQueue的關係。MessageQueue可以簡單地理解為ConsumeQueue的使用者端實現;ProcessQueue是儲存Pull訊息的本地容器
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
    //Topic 路由資訊 。儲存 Topic 和 MessageQueue的關係。
    protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
    //真正的訂閱關係,儲存當前消費者組訂閱了哪些Topic的哪些Tag
    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
    protected String consumerGroup;
    protected MessageModel messageModel;
    //消費分配策略的實現
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    //client範例物件
    protected MQClientInstance mQClientFactory;
}

核心方法

public abstract class RebalanceImpl {
    //為MessageQueue加鎖
    public boolean lock(final MessageQueue mq) {}
    //執行Rebalance操作
    public void doRebalance(final boolean isOrder) {}
    //通知Message發生變化,這個方法在Push和Pull兩個類中被重寫
    public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
                                             final Set<MessageQueue> mqDivided);
    //去掉不再需要的 MessageQueue
    public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
    //執行訊息拉取請求
    public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
    //在Rebalance中更新processQueue
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                       final boolean isOrder)

}

Rebalancelmpl 、 RebalancePushImpl 、 RebalancePullImpl 是Rebalance的核心實現,主要邏輯都在Rebalancelmpl中,因為Pull消費者和Push消費者對Rebalance的需求不同,在各自的實現中重寫了部分方法,以滿足自身需求

如果有一個消費者範例下線了,Broker和其他消費者是怎麼做Rebalance的呢

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + " service end");
}

目前佇列分配策略有以下5種實現方法

  • AllocateMessageQueueAveragely:平均分配,也是預設使用的策略(強烈推薦)。
  • AllocateMessageQueueAveragelyByCircle:環形分配策略。
  • AllocateMessageQueueByConfig:手動設定。
  • AllocateMessageQueueConsistentHash:一致性Hash分配。
  • AllocateMessageQueueByMachineRoom:機房分配策略