使用者端是通過Rebalance服務做到高可靠的。當發生Broker掉線、消費者範例掉線、Topic 擴容等各種突發情況時,消費者組中的消費者範例是怎麼重平衡,以支援全部佇列的正常消費的呢?
RebalancePullImpl 和 RebalancePushImpl 兩個重平衡實現類,分別被 DefaultMQPullConsumer 和DefaultMQPushConsumer 使用。下面講一下 Rebalancelmpl 的核心屬性和方法
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
//記錄MessageQueue和ProcessQueue的關係。MessageQueue可以簡單地理解為ConsumeQueue的使用者端實現;ProcessQueue是儲存Pull訊息的本地容器
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
//Topic 路由資訊 。儲存 Topic 和 MessageQueue的關係。
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
//真正的訂閱關係,儲存當前消費者組訂閱了哪些Topic的哪些Tag
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
protected String consumerGroup;
protected MessageModel messageModel;
//消費分配策略的實現
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
//client範例物件
protected MQClientInstance mQClientFactory;
}
public abstract class RebalanceImpl {
//為MessageQueue加鎖
public boolean lock(final MessageQueue mq) {}
//執行Rebalance操作
public void doRebalance(final boolean isOrder) {}
//通知Message發生變化,這個方法在Push和Pull兩個類中被重寫
public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
final Set<MessageQueue> mqDivided);
//去掉不再需要的 MessageQueue
public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
//執行訊息拉取請求
public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
//在Rebalance中更新processQueue
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder)
}
Rebalancelmpl 、 RebalancePushImpl 、 RebalancePullImpl 是Rebalance的核心實現,主要邏輯都在Rebalancelmpl中,因為Pull消費者和Push消費者對Rebalance的需求不同,在各自的實現中重寫了部分方法,以滿足自身需求
如果有一個消費者範例下線了,Broker和其他消費者是怎麼做Rebalance的呢
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
目前佇列分配策略有以下5種實現方法