【RocketMQ】負載均衡原始碼分析

2023-01-11 06:00:21

RocketMQ在叢集模式下,同一個消費組內,一個訊息佇列同一時間只能分配給組內的某一個消費者,也就是一條訊息只能被組內的一個消費者進行消費,為了合理的對訊息佇列進行分配,於是就有了負載均衡。

接下來以叢集模式下的訊息推模式DefaultMQPushConsumerImpl為例,看一下負載均衡的過程。

消費者負載均衡

首先,消費者在啟動時會做如下操作:

  1. 從NameServer更新當前消費者訂閱主題的路由資訊;
  2. 向Broker傳送心跳,註冊消費者;
  3. 喚醒負載均衡服務,觸發一次負載均衡;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    public synchronized void start() throws MQClientException {
        // ...
        // 更新當前消費者訂閱主題的路由資訊
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        // 向Broker傳送心跳
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 喚醒負載均衡服務
        this.mQClientFactory.rebalanceImmediately();
    }
}

更新主題路由資訊

為了保證消費者拿到的主題路由資訊是最新的(topic下有幾個訊息佇列、訊息佇列的分佈資訊等),在進行負載均衡之前首先要更新主題的路由資訊,在updateTopicSubscribeInfoWhenSubscriptionChanged方法中可以看到,首先獲取了當前消費者訂閱的所有主題資訊(一個消費者可以訂閱多個主題),然後進行遍歷,向NameServer傳送請求,更新每一個主題的路由資訊,保證路由資訊是最新的:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
        // 獲取當前消費者訂閱的主題資訊
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            // 遍歷訂閱的主題資訊
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                // 從NameServer更新主題的路由資訊
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            }
        }
    }
}

註冊消費者

傳送心跳

由於Broker需要感知消費者數量的增減,所以每個消費者在啟動的時候,會呼叫sendHeartbeatToAllBrokerWithLock向Broker傳送心跳包,進行消費者註冊:

public class MQClientInstance {
    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                // 呼叫sendHeartbeatToAllBroker向Broker傳送心跳
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            } catch (final Exception e) {
                log.error("sendHeartbeatToAllBroker exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed. [{}]", this.clientId);
        }
    }
}

sendHeartbeatToAllBroker方法中,可以看到從brokerAddrTable中獲取了所有的Broker進行遍歷(主從模式下也會向從節點傳送請求註冊),呼叫MQClientAPIImplsendHearbeat方法向每一個Broker傳送心跳請求進行註冊:

public class MQClientInstance {
    // Broker路由表
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();
    // 傳送心跳
    private void sendHeartbeatToAllBroker() {
        final HeartbeatData heartbeatData = this.prepareHeartbeatData();
        // ...
        if (!this.brokerAddrTable.isEmpty()) {
            long times = this.sendHeartbeatTimesTotal.getAndIncrement();
            // 獲取所有的Broker進行遍歷, key為 Broker Name, value為同一個name下的所有Broker範例(主從模式下Broker的name一致)
            Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<String, HashMap<Long, String>> entry = it.next();
                String brokerName = entry.getKey(); // broker name
                // 獲取同一個Broker Name下的所有Broker範例
                HashMap<Long, String> oneTable = entry.getValue();
                if (oneTable != null) {
                    // 遍歷所有的範例
                    for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                        Long id = entry1.getKey();
                        String addr = entry1.getValue();
                        if (addr != null) { // 如果地址不為空
                            // ...
                            try {
                                // 傳送心跳
                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
                                // ...
                            } catch (Exception e) {
                                // ...
                            }
                        }
                    }
                }
            }
        }
    }
}

MQClientAPIImplsendHearbeat方法中,可以看到構建了HEART_BEAT請求,然後向Broker傳送:

public class MQClientAPIImpl {
   public int sendHearbeat(final String addr, final HeartbeatData heartbeatData, final long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        // 建立HEART_BEAT請求
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
        request.setLanguage(clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        // 傳送請求
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        // ...
    }
}

心跳請求處理

Broker在啟動時註冊了HEART_BEAT請求的處理器,可以看到請求處理器是ClientManageProcessor

public class BrokerController {
    public void registerProcessor() {
        ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
        // 註冊HEART_BEAT請求的處理器ClientManageProcessor
        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
    }
}

進入到ClientManageProcessorprocessRequest方法,如果請求是HEART_BEAT型別會呼叫heartBeat方法進行處理,這裡也能看還有UNREGISTER_CLIENT型別的請求,從名字上可以看出是與取消註冊有關的(這個稍後再說):

public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT: // 處理心跳請求
                return this.heartBeat(ctx, request);
            case RequestCode.UNREGISTER_CLIENT: // 取消註冊請求
                return this.unregisterClient(ctx, request);
            case RequestCode.CHECK_CLIENT_CONFIG:
                return this.checkClientConfig(ctx, request);
            default:
                break;
        }
        return null;
    }
}

進入到heartBeat方法,可以看到,呼叫了ConsumerManagerregisterConsumer註冊消費者:

public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
        // ...
        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
            // ...
            // 註冊Consumer
            boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable);
            // ...
        }
        // ...
        return response;
    }
}

進行註冊

ConsumerManagerregisterConsumer方法的理邏輯如下:

  1. 根據組名稱獲取該消費者組的資訊ConsumerGroupInfo物件。如果獲取為空,會建立一個ConsumerGroupInfo,記錄了消費者組的相關資訊;
  2. 判斷消費者是否發生了變更,如果如果發生了變化,會觸發CHANGE變更事件(這個稍後再看);
  3. 觸發REGISTER註冊事件;
public class ConsumerManager {
    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
        // 根據組名稱獲取消費者組資訊
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) { // 如果為空新增ConsumerGroupInfo物件
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }
        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        boolean r2 = consumerGroupInfo.updateSubscription(subList);
        // 如果有變更
        if (r1 || r2) {
            if (isNotifyConsumerIdsChangedEnable) {
                // 通知變更
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }
        // 註冊Consumer
        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
        return r1 || r2;
    }
}

進入到DefaultConsumerIdsChangeListenerhandle方法中,可以看到如果是REGISTER事件,會通過ConsumerFilterManagerregister方法進行註冊,註冊的詳細過程這裡先不展開講解:

public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
    @Override
    public void handle(ConsumerGroupEvent event, String group, Object... args) {
        if (event == null) {
            return;
        }
        switch (event) {
            case CHANGE:// 如果是消費者變更事件
                // ...
                break;
            case UNREGISTER: // 如果是取消註冊事件
                this.brokerController.getConsumerFilterManager().unRegister(group);
                break;
            case REGISTER: // 如果是註冊事件
                if (args == null || args.length < 1) {
                    return;
                }
                Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
                // 進行註冊
                this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
                break;
            default:
                throw new RuntimeException("Unknown event " + event);
        }
    }
}

負載均衡

經過以上步驟之後,會呼叫MQClientInstance的rebalanceImmediately喚醒負載均衡服務進行一次負載均衡,為消費者分配訊息佇列,需要注意的是負載均衡是由消費者端執行

// MQClientInstance
public class MQClientInstance {
    private final RebalanceService rebalanceService;

    public void rebalanceImmediately() {
        // 喚醒負載均衡服務
        this.rebalanceService.wakeup();
    }
}

// RebalanceService
public class RebalanceService extends ServiceThread {
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            // 負載均衡
            this.mqClientFactory.doRebalance();
        }
        log.info(this.getServiceName() + " service end");
    }
}

負載均衡的過程在【RocketMQ】訊息的拉取一文中已經講解,這裡挑一些重點內容看一下。

在負載均衡的時候,首先會獲取當前消費者訂閱的主題資訊,對訂閱的主題進行遍歷,對每一個主題進行負載均衡,重新分配:

public abstract class RebalanceImpl {
    public void doRebalance(final boolean isOrder) {
        // 獲取訂閱的主題資訊
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            // 遍歷所有訂閱的主題
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    // 根據主題進行負載均衡
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }
        this.truncateMessageQueueNotMyTopic();
    }
}

根據主題進行負載均衡

rebalanceByTopic方法中根據消費模式進行了判斷然後對主題進行負載均衡,這裡我們關注叢集模式下的負載均衡:

  1. topicSubscribeInfoTable中根據主題獲取對應的訊息佇列集合,這一步可以得到主題下的所有訊息佇列資訊

  2. 根據主題資訊和消費者組名稱,獲取所有訂閱了該主題的消費者ID集合,這一步得到了訂閱該主題的所有消費者

  3. 如果主題對應的訊息佇列集合和消費者ID都不為空,對訊息佇列集合和消費ID集合進行排序,排序是為了接下來進行分配;

  4. 獲取設定的分配策略,根據分配策略,為消費者分配對應的消費佇列,以平均分配策略為例,它會根據訊息佇列的數量和消費者的個數計算每個消費者分配的佇列個數:

  5. 根據最新分配的訊息佇列資訊,呼叫updateProcessQueueTableInRebalance更新當前消費者消費的處理佇列ProcessQueue資訊

public abstract class RebalanceImpl {

    // 根據主題進行負載均衡
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: { // 廣播模式
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // ... 
                break;
            }
            case CLUSTERING: { // 叢集模式
                // 根據主題獲取訂閱的訊息佇列
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 獲取所有訂閱了該主題的消費者id
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                // ...
                if (mqSet != null && cidAll != null) { // 如果都不為空
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    // 對訊息佇列排序
                    Collections.sort(mqAll);
                    // 對消費者排序
                    Collections.sort(cidAll);
                    // 獲取分配策略
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    List<MessageQueue> allocateResult = null;
                    try {
                        // 根據分配策略,為消費者分配消費佇列
                        allocateResult = strategy.allocate(
                            this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                    } catch (Throwable e) {
                       // ...
                    }
                    // 分配給當前消費的消費佇列
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        // 將分配結果加入到結果集合中
                        allocateResultSet.addAll(allocateResult);
                    }
                    // 根據分配資訊更新處理佇列
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    // ...
                }
                break;
            }
            default:
                break;
        }
    }
}

更新處理佇列

負載均衡之後,消費者負責的訊息佇列有可能發生變化,一個訊息佇列MessageQueue對應一個處理佇列ProcessQueueprocessQueueTable記錄了消費者負責的佇列資訊,此時需要對其進行更新,處理邏輯如下:

  1. processQueueTable進行遍歷,處理每一個訊息佇列,這一步主要是判斷重新分配之後,processQueueTable中記錄的某些訊息佇列是否已經不再由當前消費者負責,如果是需要將訊息佇列置為dropped,表示刪除,之後消費者不再從此消費佇列中拉取訊息;

  2. 判斷是否有新分配給當前消費者的訊息佇列,如果某個訊息佇列在最新分配給當前消費者的訊息佇列集合mqSet中,但是不在processQueueTable中,

    中,進行以下處理:

    • 計算訊息拉取偏移量,也就是從哪個位置開始消費,如果訊息拉取偏移量大於0,建立ProcessQueue,並放入處理佇列表中processQueueTable
    • 構建PullRequest,設定訊息的拉取資訊,並加入到拉取訊息請求集合pullRequestList

    經過這一步,如果分配給當前消費者的消費佇列不在processQueueTable中,就會構建拉取請求PullRequest,然後呼叫dispatchPullRequest處理訊息拉取請求,之後會從該訊息佇列拉取訊息,詳細過程可參考【RocketMQ】訊息的拉取

public abstract class RebalanceImpl {
    // 處理佇列表,KEY為訊息佇列,VALUE為對應的處理資訊
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
    // 負載均衡,topic表示當前要進行負載均衡的主題,mqSet中記錄了重新分配給當前消費者的訊息佇列
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        boolean changed = false;
        // 處理佇列表
        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            // 獲取訊息佇列
            MessageQueue mq = next.getKey();
            // 獲取處理佇列
            ProcessQueue pq = next.getValue();
            // 主題是否一致
            if (mq.getTopic().equals(topic)) {
                // 如果佇列集合中不包含當前的佇列
                if (!mqSet.contains(mq)) {
                    // 設定為dropped
                    pq.setDropped(true);
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                    }
                } else if (pq.isPullExpired()) { // 是否過期
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY:
                            break;
                        case CONSUME_PASSIVELY:
                            pq.setDropped(true); // 設定為刪除
                            // ...
                            break;
                        default:
                            break;
                    }
                }
            }
        }
        // 建立拉取請求集合
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        // 遍歷本次分配的訊息佇列集合
        for (MessageQueue mq : mqSet) {
            // 如果之前不在processQueueTable中
            if (!this.processQueueTable.containsKey(mq)) {
                // ...
                // 建立ProcessQueue
                ProcessQueue pq = new ProcessQueue();
                long nextOffset = -1L;
                try {
                    // 計算訊息拉取偏移量
                    nextOffset = this.computePullFromWhereWithException(mq);
                } catch (Exception e) {
                    log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
                    continue;
                }
                // 如果偏移量大於等於0
                if (nextOffset >= 0) {
                    // 放入處理佇列表中
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    // 如果之前已經存在,不需要進行處理
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        // 如果之前不存在,構建PullRequest,之後會加入到阻塞佇列中,進行訊息拉取
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);// 設定消費組
                        pullRequest.setNextOffset(nextOffset);// 設定拉取偏移量
                        pullRequest.setMessageQueue(mq);// 設定訊息佇列
                        pullRequest.setProcessQueue(pq);// 設定處理佇列
                        pullRequestList.add(pullRequest);// 加入到拉取訊息請求集合
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
        // 新增訊息拉取請求
        this.dispatchPullRequest(pullRequestList);
        return changed;
    }
}

Rebalance的觸發

消費者啟動時觸發

在文章開頭已經講過,消費者在啟動時會進行一次負載均衡,這裡便不再贅述。

消費者變更時觸發

在消費者註冊時講到,如果發現消費者有變更會觸發變更事件,當處於以下兩種情況之一時會被判斷為消費者發生了變化,需要進行負載均衡:

  • 當前註冊的消費者對應的Channel物件之前不存在;

  • 當前註冊的消費者訂閱的主題資訊發生了變化,也就是消費者訂閱的主題有新增或者刪除;

public class ConsumerManager {
    
    /**
     *  註冊消費者
     * @param group 消費者組名稱
     * @param clientChannelInfo 註冊的消費者對應的Channel資訊
     * @param consumeType 消費型別
     * @param messageModel 
     * @param consumeFromWhere 消費訊息的位置
     * @param subList 消費者訂閱的主題資訊
     * @param isNotifyConsumerIdsChangedEnable 是否通知變更
     * @return
     */
    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
        // 根據組名稱獲取消費者組資訊
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) { // 如果為空新增
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }
        // 更新Channel
        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        // 更新訂閱資訊
        boolean r2 = consumerGroupInfo.updateSubscription(subList);
        // 如果有變更
        if (r1 || r2) {
            if (isNotifyConsumerIdsChangedEnable) {
                // 通知變更,consumerGroupInfo中儲存了該消費者組下的所有消費者的channel
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }
        // 註冊Consumer
        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
        return r1 || r2;
    }
}

Channel變更

updateChannel方法中,首先將變更狀態updated初始化為false,然後根據消費者的channel從channelInfoTable路由表中獲取對應的ClientChannelInfo物件:

  • 如果ClientChannelInfo物件獲取為空,表示之前不存在該消費者的channel資訊,將其加入到路由表中,變更狀態置為true,表示消費者有變化;

  • 如果獲取不為空,判斷clientid是否一致,如果不一致更新為最新的channel資訊,但是變更狀態updated不發生變化;

也就是說,如果註冊的消費者之前不存在,那麼將變更狀態置為true,表示消費者數量發生了變化。

 
// key為消費者對應的channle,value為chanel資訊
   private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
        new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
   public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
        MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
        boolean updated = false; // 變更狀態初始化為false
        this.consumeType = consumeType;
        this.messageModel = messageModel;
        this.consumeFromWhere = consumeFromWhere;
        // 從channelInfoTable中獲取對應的Channel資訊, 
        ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
        if (null == infoOld) { // 如果為空
            // 新增
            ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
            if (null == prev) { // 如果之前不存在
                log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
                    messageModel, infoNew.toString());
                // 變更狀態置為true
                updated = true;
            }
            infoOld = infoNew;
        } else {
            // 如果之前存在,判斷clientid是否一致,如果不一致更新為最新的channel
            if (!infoOld.getClientId().equals(infoNew.getClientId())) { 
                log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", this.groupName, infoOld.toString(), infoNew.toString());
                this.channelInfoTable.put(infoNew.getChannel(), infoNew);
            }
        }
        this.lastUpdateTimestamp = System.currentTimeMillis();
        infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
        return updated;
    }

主題資訊訂閱變更

updateSubscription方法中,主要判斷了消費的主題訂閱資訊是否發生了變化,subscriptionTable中記錄了之前記錄的訂閱資訊:

  1. 判斷是否有新增的主題訂閱資訊,主要是通過subscriptionTable是否存在某個主題進行判斷的:
    • 如果不存在,表示之前沒有訂閱過某個主題的資訊,將其加入到subscriptionTable中,並將變更狀態置為true,表示主題訂閱資訊有變化;
    • 如果subscriptionTable中存在某個主題的訂閱資訊,表示之前就已訂閱,將其更新為最新的,但是變更狀態不發生變化;
  2. 判斷是否有刪除的主題,主要是通過subscriptionTable和subList的對比進行判斷的,如果有刪除的主題,將變更狀態置為true;

如果消費者訂閱的主題發生了變化,比如有新增加的主題或者刪除了某個主題的訂閱,會被判斷為主題訂閱資訊發生了變化。

public class ConsumerGroupInfo {
    // 記錄了訂閱的主題資訊,key為topic,value為訂閱資訊
    private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
        new ConcurrentHashMap<String, SubscriptionData>();   
    
    public boolean updateSubscription(final Set<SubscriptionData> subList) {
        boolean updated = false;
        // 遍歷訂閱的主題資訊
        for (SubscriptionData sub : subList) {
            //根據主題獲取訂閱資訊
            SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
            // 如果獲取為空
            if (old == null) {
                // 加入到subscriptionTable
                SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
                if (null == prev) {
                    updated = true; // 變更狀態置為true
                    log.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString());
                }
            } else if (sub.getSubVersion() > old.getSubVersion()) { // 如果版本發生了變化
                if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                    log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString());
                }
                // 更新為最新的訂閱資訊
                this.subscriptionTable.put(sub.getTopic(), sub);
            }
        }
        Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
        // 進行遍歷,這一步主要是判斷有沒有取消訂閱的主題
        while (it.hasNext()) {
            Entry<String, SubscriptionData> next = it.next();
            String oldTopic = next.getKey();
            boolean exist = false;
            // 遍歷最新的訂閱資訊
            for (SubscriptionData sub : subList) {
                // 如果在舊的訂閱資訊中存在就終止,繼續判斷下一個主題
                if (sub.getTopic().equals(oldTopic)) {
                    exist = true;
                    break;
                }
            }
            // 走到這裡,表示有取消訂閱的主題
            if (!exist) {
                log.warn("subscription changed, group: {} remove topic {} {}",this.groupName, oldTopic, next.getValue().toString());
                // 進行刪除
                it.remove();
                // 變更狀態置為true
                updated = true;
            }
        }
        this.lastUpdateTimestamp = System.currentTimeMillis();
        return updated;
    }
}

變更請求傳送

上面講解了兩種被判定為消費者發生變化的情況,被判定為變化之後,會觸呼叫DefaultConsumerIdsChangeListener中的handle方法觸發變更事件,在方法中傳入了消費者組下的所有消費者的channel物件,會傳送變更請求通知該消費者組下的所有消費者,進行負載均衡。

DefaultConsumerIdsChangeListener中處理變更事件時,會對消費組下的所有消費者遍歷,呼叫notifyConsumerIdsChanged方法向每一個消費者傳送變更請求:

public class ConsumerManager {
   
    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
        // ...
        // 更新Channel
        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        // 更新訂閱資訊
        boolean r2 = consumerGroupInfo.updateSubscription(subList);
        // 如果有變更
        if (r1 || r2) {
            if (isNotifyConsumerIdsChangedEnable) {
                // 觸發變更事件,consumerGroupInfo中儲存了該消費者組下的所有消費者的channel
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }
        // ...
    }
}

// DefaultConsumerIdsChangeListener
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
    @Override
    public void handle(ConsumerGroupEvent event, String group, Object... args) {
        if (event == null) {
            return;
        }
        switch (event) {
            case CHANGE:// 如果是消費者變更事件
                case CHANGE:
                if (args == null || args.length < 1) {
                    return;
                }
                // 獲取所有的消費者對應的channel
                List<Channel> channels = (List<Channel>) args[0];
                if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
                    for (Channel chl : channels) {
                        // 向每一個消費者傳送變更請求
                        this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
                    }
                }
                break;
             // ...
        }
    }
}

請求的傳送是在Broker2ClientnotifyConsumerIdsChanged方法中實現的,可以看到會建立NOTIFY_CONSUMER_IDS_CHANGED請求並行送:

    
public class Broker2Client {
    public void notifyConsumerIdsChanged(
        final Channel channel,
        final String consumerGroup) {
        if (null == consumerGroup) {
            log.error("notifyConsumerIdsChanged consumerGroup is null");
            return;
        }
        NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
        requestHeader.setConsumerGroup(consumerGroup);
        // 建立變更通知請求
        RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
        try {
            // 傳送請求
            this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
        } catch (Exception e) {
            log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());
        }
    }
}

變更通知請求處理

消費者對Broker傳送的NOTIFY_CONSUMER_IDS_CHANGED的請求處理在ClientRemotingProcessorprocessRequest方法中,它會呼叫notifyConsumerIdsChanged方法進行處理,在notifyConsumerIdsChanged方法中可以看到觸發了一次負載均衡:

public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.CHECK_TRANSACTION_STATE:
                return this.checkTransactionState(ctx, request);
            case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: // NOTIFY_CONSUMER_IDS_CHANGED請求處理
                // 處理變更請求
                return this.notifyConsumerIdsChanged(ctx, request); 
            // ...
            default:
                break;
        }
        return null;
    }
    
    public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        try {
            // ...
            // 觸發負載均衡
            this.mqClientFactory.rebalanceImmediately();
        } catch (Exception e) {
            log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
        }
        return null;
    }
}

消費者停止時觸發

消費者在停止時,需要將當前消費者負責的訊息佇列分配給其他消費者進行消費,所以在shutdown方法中會呼叫MQClientInstanceunregisterConsumer方法取消註冊:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
   public synchronized void shutdown(long awaitTerminateMillis) {
        switch (this.serviceState) {
            case CREATE_JUST:
                break;
            case RUNNING:
                this.consumeMessageService.shutdown(awaitTerminateMillis);
                this.persistConsumerOffset();
                // 取消註冊
                this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
                this.mQClientFactory.shutdown();
                // ...
                break;
            case SHUTDOWN_ALREADY:
                break;
            default:
                break;
        }
    }
}

unregisterConsumer方法中,又呼叫了unregisterClient方法取消註冊,與註冊消費者的邏輯相似,它會向所有的Broker傳送取消註冊的請求:

public class MQClientInstance {
    public synchronized void unregisterConsumer(final String group) {
        this.consumerTable.remove(group);
        // 取消註冊
        this.unregisterClient(null, group);
    }
    
    private void unregisterClient(final String producerGroup, final String consumerGroup) {
        // 獲取所有的Broker
        Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
        // 進行遍歷
        while (it.hasNext()) {
            // ...
            if (oneTable != null) {
                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                    String addr = entry1.getValue();
                    if (addr != null) {
                        try {
                            // 傳送取消註冊請求
                            this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
                            // ...
                        } // ...
                    }
                }
            }
        }
    }
}

取消註冊請求的傳送是在MQClientAPIImplunregisterClient方法實現的,可以看到構建了UNREGISTER_CLIENT請求並行送:

public class MQClientAPIImpl { 
    public void unregisterClient(final String addr, final String clientID, final String producerGroup,  final String consumerGroup, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        // ...
        requestHeader.setConsumerGroup(consumerGroup);
        // 構建UNREGISTER_CLIENT請求
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
        // 傳送請求
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        // ...
    }
}

與註冊消費者的請求處理一樣,Broker對UNREGISTER_CLIENT的請求同樣是在ClientManageProcessorprocessRequest中處理的,對於UNREGISTER_CLIENT請求是呼叫unregisterClient方法處理的,裡面又呼叫了ConsumerManagerunregisterConsumer方法進行取消註冊:

public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT: // 處理心跳請求
                return this.heartBeat(ctx, request);
            case RequestCode.UNREGISTER_CLIENT: // 取消註冊請求
                return this.unregisterClient(ctx, request);
            case RequestCode.CHECK_CLIENT_CONFIG:
                return this.checkClientConfig(ctx, request);
            default:
                break;
        }
        return null;
    }
    
    public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        // ...
        {
            final String group = requestHeader.getConsumerGroup();
            if (group != null) {
                // ...
                // 取消消費者的註冊
                this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
            }
        }
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
}

ConsumerManagerunregisterConsumer方法中,可以看到觸發了取消註冊事件,之後如果開啟了允許通知變更,會觸發變更事件,變更事件在上面已經講解過,它會通知消費者組下的所有消費者進行一次負載均衡:


public class ConsumerManager {    
    public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        boolean isNotifyConsumerIdsChangedEnable) {
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null != consumerGroupInfo) {
            consumerGroupInfo.unregisterChannel(clientChannelInfo);
            if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
                ConsumerGroupInfo remove = this.consumerTable.remove(group);
                if (remove != null) {
                    // 觸發取消註冊事件
                    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
                }
            }
            // 觸發消費者變更事件
            if (isNotifyConsumerIdsChangedEnable) {
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }
    }
}

消費者定時觸發

RebalanceService的run方法中,可以看到設定了等待時間,預設是20s,所以消費者本身也會定時執行負載均衡:

public class RebalanceService extends ServiceThread {
    private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
  
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval); // 等待
            // 負載均衡
            this.mqClientFactory.doRebalance();
        }
        log.info(this.getServiceName() + " service end");
    }
}

總結

參考

田守枝-深入理解RocketMQ Rebalance機制

RocketMQ版本:4.9.3