RocketMQ在叢集模式下,同一個消費組內,一個訊息佇列同一時間只能分配給組內的某一個消費者,也就是一條訊息只能被組內的一個消費者進行消費,為了合理的對訊息佇列進行分配,於是就有了負載均衡。
接下來以叢集模式下的訊息推模式DefaultMQPushConsumerImpl
為例,看一下負載均衡的過程。
首先,消費者在啟動時會做如下操作:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
// ...
// 更新當前消費者訂閱主題的路由資訊
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
// 向Broker傳送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 喚醒負載均衡服務
this.mQClientFactory.rebalanceImmediately();
}
}
為了保證消費者拿到的主題路由資訊是最新的(topic下有幾個訊息佇列、訊息佇列的分佈資訊等),在進行負載均衡之前首先要更新主題的路由資訊,在updateTopicSubscribeInfoWhenSubscriptionChanged
方法中可以看到,首先獲取了當前消費者訂閱的所有主題資訊(一個消費者可以訂閱多個主題),然後進行遍歷,向NameServer傳送請求,更新每一個主題的路由資訊,保證路由資訊是最新的:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
// 獲取當前消費者訂閱的主題資訊
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍歷訂閱的主題資訊
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
// 從NameServer更新主題的路由資訊
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
}
}
}
}
由於Broker需要感知消費者數量的增減,所以每個消費者在啟動的時候,會呼叫sendHeartbeatToAllBrokerWithLock
向Broker傳送心跳包,進行消費者註冊:
public class MQClientInstance {
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
// 呼叫sendHeartbeatToAllBroker向Broker傳送心跳
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
}
}
在sendHeartbeatToAllBroker
方法中,可以看到從brokerAddrTable
中獲取了所有的Broker進行遍歷(主從模式下也會向從節點傳送請求註冊),呼叫MQClientAPIImpl
的sendHearbeat
方法向每一個Broker傳送心跳請求進行註冊:
public class MQClientInstance {
// Broker路由表
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
// 傳送心跳
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
// ...
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
// 獲取所有的Broker進行遍歷, key為 Broker Name, value為同一個name下的所有Broker範例(主從模式下Broker的name一致)
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey(); // broker name
// 獲取同一個Broker Name下的所有Broker範例
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
// 遍歷所有的範例
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) { // 如果地址不為空
// ...
try {
// 傳送心跳
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
// ...
} catch (Exception e) {
// ...
}
}
}
}
}
}
}
}
在MQClientAPIImpl
的sendHearbeat
方法中,可以看到構建了HEART_BEAT
請求,然後向Broker傳送:
public class MQClientAPIImpl {
public int sendHearbeat(final String addr, final HeartbeatData heartbeatData, final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// 建立HEART_BEAT請求
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
// 傳送請求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
// ...
}
}
Broker在啟動時註冊了HEART_BEAT
請求的處理器,可以看到請求處理器是ClientManageProcessor
:
public class BrokerController {
public void registerProcessor() {
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
// 註冊HEART_BEAT請求的處理器ClientManageProcessor
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
}
}
進入到ClientManageProcessor
的processRequest
方法,如果請求是HEART_BEAT
型別會呼叫heartBeat
方法進行處理,這裡也能看還有UNREGISTER_CLIENT
型別的請求,從名字上可以看出是與取消註冊有關的(這個稍後再說):
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT: // 處理心跳請求
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT: // 取消註冊請求
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
}
進入到heartBeat
方法,可以看到,呼叫了ConsumerManager
的registerConsumer
註冊消費者:
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
// ...
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
// ...
// 註冊Consumer
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(),
data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable);
// ...
}
// ...
return response;
}
}
ConsumerManager
的registerConsumer
方法的理邏輯如下:
ConsumerGroupInfo
物件。如果獲取為空,會建立一個ConsumerGroupInfo,記錄了消費者組的相關資訊;CHANGE
變更事件(這個稍後再看);REGISTER
註冊事件;public class ConsumerManager {
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 根據組名稱獲取消費者組資訊
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) { // 如果為空新增ConsumerGroupInfo物件
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有變更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 通知變更
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 註冊Consumer
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
}
進入到DefaultConsumerIdsChangeListener
的handle
方法中,可以看到如果是REGISTER
事件,會通過ConsumerFilterManager
的register
方法進行註冊,註冊的詳細過程這裡先不展開講解:
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:// 如果是消費者變更事件
// ...
break;
case UNREGISTER: // 如果是取消註冊事件
this.brokerController.getConsumerFilterManager().unRegister(group);
break;
case REGISTER: // 如果是註冊事件
if (args == null || args.length < 1) {
return;
}
Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
// 進行註冊
this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
break;
default:
throw new RuntimeException("Unknown event " + event);
}
}
}
經過以上步驟之後,會呼叫MQClientInstance的rebalanceImmediately
喚醒負載均衡服務進行一次負載均衡,為消費者分配訊息佇列,需要注意的是負載均衡是由消費者端執行:
// MQClientInstance
public class MQClientInstance {
private final RebalanceService rebalanceService;
public void rebalanceImmediately() {
// 喚醒負載均衡服務
this.rebalanceService.wakeup();
}
}
// RebalanceService
public class RebalanceService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
// 負載均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
負載均衡的過程在【RocketMQ】訊息的拉取一文中已經講解,這裡挑一些重點內容看一下。
在負載均衡的時候,首先會獲取當前消費者訂閱的主題資訊,對訂閱的主題進行遍歷,對每一個主題進行負載均衡,重新分配:
public abstract class RebalanceImpl {
public void doRebalance(final boolean isOrder) {
// 獲取訂閱的主題資訊
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍歷所有訂閱的主題
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 根據主題進行負載均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
}
rebalanceByTopic
方法中根據消費模式進行了判斷然後對主題進行負載均衡,這裡我們關注叢集模式下的負載均衡:
從topicSubscribeInfoTable
中根據主題獲取對應的訊息佇列集合,這一步可以得到主題下的所有訊息佇列資訊;
根據主題資訊和消費者組名稱,獲取所有訂閱了該主題的消費者ID集合,這一步得到了訂閱該主題的所有消費者;
如果主題對應的訊息佇列集合和消費者ID都不為空,對訊息佇列集合和消費ID集合進行排序,排序是為了接下來進行分配;
獲取設定的分配策略,根據分配策略,為消費者分配對應的消費佇列,以平均分配策略為例,它會根據訊息佇列的數量和消費者的個數計算每個消費者分配的佇列個數:
根據最新分配的訊息佇列資訊,呼叫updateProcessQueueTableInRebalance
更新當前消費者消費的處理佇列ProcessQueue
資訊
public abstract class RebalanceImpl {
// 根據主題進行負載均衡
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: { // 廣播模式
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// ...
break;
}
case CLUSTERING: { // 叢集模式
// 根據主題獲取訂閱的訊息佇列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 獲取所有訂閱了該主題的消費者id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// ...
if (mqSet != null && cidAll != null) { // 如果都不為空
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 對訊息佇列排序
Collections.sort(mqAll);
// 對消費者排序
Collections.sort(cidAll);
// 獲取分配策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 根據分配策略,為消費者分配消費佇列
allocateResult = strategy.allocate(
this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable e) {
// ...
}
// 分配給當前消費的消費佇列
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
// 將分配結果加入到結果集合中
allocateResultSet.addAll(allocateResult);
}
// 根據分配資訊更新處理佇列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
// ...
}
break;
}
default:
break;
}
}
}
負載均衡之後,消費者負責的訊息佇列有可能發生變化,一個訊息佇列MessageQueue
對應一個處理佇列ProcessQueue
,processQueueTable
記錄了消費者負責的佇列資訊,此時需要對其進行更新,處理邏輯如下:
對processQueueTable
進行遍歷,處理每一個訊息佇列,這一步主要是判斷重新分配之後,processQueueTable
中記錄的某些訊息佇列是否已經不再由當前消費者負責,如果是需要將訊息佇列置為dropped,表示刪除,之後消費者不再從此消費佇列中拉取訊息;
判斷是否有新分配給當前消費者的訊息佇列,如果某個訊息佇列在最新分配給當前消費者的訊息佇列集合mqSet
中,但是不在processQueueTable
中,
中,進行以下處理:
processQueueTable
PullRequest
,設定訊息的拉取資訊,並加入到拉取訊息請求集合pullRequestList
中經過這一步,如果分配給當前消費者的消費佇列不在processQueueTable
中,就會構建拉取請求PullRequest
,然後呼叫dispatchPullRequest處理訊息拉取請求,之後會從該訊息佇列拉取訊息,詳細過程可參考【RocketMQ】訊息的拉取。
public abstract class RebalanceImpl {
// 處理佇列表,KEY為訊息佇列,VALUE為對應的處理資訊
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
// 負載均衡,topic表示當前要進行負載均衡的主題,mqSet中記錄了重新分配給當前消費者的訊息佇列
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// 處理佇列表
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
// 獲取訊息佇列
MessageQueue mq = next.getKey();
// 獲取處理佇列
ProcessQueue pq = next.getValue();
// 主題是否一致
if (mq.getTopic().equals(topic)) {
// 如果佇列集合中不包含當前的佇列
if (!mqSet.contains(mq)) {
// 設定為dropped
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) { // 是否過期
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true); // 設定為刪除
// ...
break;
default:
break;
}
}
}
}
// 建立拉取請求集合
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 遍歷本次分配的訊息佇列集合
for (MessageQueue mq : mqSet) {
// 如果之前不在processQueueTable中
if (!this.processQueueTable.containsKey(mq)) {
// ...
// 建立ProcessQueue
ProcessQueue pq = new ProcessQueue();
long nextOffset = -1L;
try {
// 計算訊息拉取偏移量
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
// 如果偏移量大於等於0
if (nextOffset >= 0) {
// 放入處理佇列表中
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
// 如果之前已經存在,不需要進行處理
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 如果之前不存在,構建PullRequest,之後會加入到阻塞佇列中,進行訊息拉取
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);// 設定消費組
pullRequest.setNextOffset(nextOffset);// 設定拉取偏移量
pullRequest.setMessageQueue(mq);// 設定訊息佇列
pullRequest.setProcessQueue(pq);// 設定處理佇列
pullRequestList.add(pullRequest);// 加入到拉取訊息請求集合
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 新增訊息拉取請求
this.dispatchPullRequest(pullRequestList);
return changed;
}
}
在文章開頭已經講過,消費者在啟動時會進行一次負載均衡,這裡便不再贅述。
在消費者註冊時講到,如果發現消費者有變更會觸發變更事件,當處於以下兩種情況之一時會被判斷為消費者發生了變化,需要進行負載均衡:
當前註冊的消費者對應的Channel物件之前不存在;
當前註冊的消費者訂閱的主題資訊發生了變化,也就是消費者訂閱的主題有新增或者刪除;
public class ConsumerManager {
/**
* 註冊消費者
* @param group 消費者組名稱
* @param clientChannelInfo 註冊的消費者對應的Channel資訊
* @param consumeType 消費型別
* @param messageModel
* @param consumeFromWhere 消費訊息的位置
* @param subList 消費者訂閱的主題資訊
* @param isNotifyConsumerIdsChangedEnable 是否通知變更
* @return
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 根據組名稱獲取消費者組資訊
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) { // 如果為空新增
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新Channel
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新訂閱資訊
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有變更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 通知變更,consumerGroupInfo中儲存了該消費者組下的所有消費者的channel
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 註冊Consumer
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
}
在updateChannel
方法中,首先將變更狀態updated
初始化為false,然後根據消費者的channel從channelInfoTable
路由表中獲取對應的ClientChannelInfo
物件:
如果ClientChannelInfo物件獲取為空,表示之前不存在該消費者的channel資訊,將其加入到路由表中,變更狀態置為true,表示消費者有變化;
如果獲取不為空,判斷clientid是否一致,如果不一致更新為最新的channel資訊,但是變更狀態updated
不發生變化;
也就是說,如果註冊的消費者之前不存在,那麼將變更狀態置為true,表示消費者數量發生了變化。
// key為消費者對應的channle,value為chanel資訊
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false; // 變更狀態初始化為false
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
// 從channelInfoTable中獲取對應的Channel資訊,
ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
if (null == infoOld) { // 如果為空
// 新增
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) { // 如果之前不存在
log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
messageModel, infoNew.toString());
// 變更狀態置為true
updated = true;
}
infoOld = infoNew;
} else {
// 如果之前存在,判斷clientid是否一致,如果不一致更新為最新的channel
if (!infoOld.getClientId().equals(infoNew.getClientId())) {
log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", this.groupName, infoOld.toString(), infoNew.toString());
this.channelInfoTable.put(infoNew.getChannel(), infoNew);
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
return updated;
}
updateSubscription
方法中,主要判斷了消費的主題訂閱資訊是否發生了變化,subscriptionTable中記錄了之前記錄的訂閱資訊:
如果消費者訂閱的主題發生了變化,比如有新增加的主題或者刪除了某個主題的訂閱,會被判斷為主題訂閱資訊發生了變化。
public class ConsumerGroupInfo {
// 記錄了訂閱的主題資訊,key為topic,value為訂閱資訊
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
// 遍歷訂閱的主題資訊
for (SubscriptionData sub : subList) {
//根據主題獲取訂閱資訊
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
// 如果獲取為空
if (old == null) {
// 加入到subscriptionTable
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true; // 變更狀態置為true
log.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) { // 如果版本發生了變化
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString());
}
// 更新為最新的訂閱資訊
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
// 進行遍歷,這一步主要是判斷有沒有取消訂閱的主題
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
// 遍歷最新的訂閱資訊
for (SubscriptionData sub : subList) {
// 如果在舊的訂閱資訊中存在就終止,繼續判斷下一個主題
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
// 走到這裡,表示有取消訂閱的主題
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",this.groupName, oldTopic, next.getValue().toString());
// 進行刪除
it.remove();
// 變更狀態置為true
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
}
上面講解了兩種被判定為消費者發生變化的情況,被判定為變化之後,會觸呼叫DefaultConsumerIdsChangeListener
中的handle
方法觸發變更事件,在方法中傳入了消費者組下的所有消費者的channel物件,會傳送變更請求通知該消費者組下的所有消費者,進行負載均衡。
DefaultConsumerIdsChangeListener
中處理變更事件時,會對消費組下的所有消費者遍歷,呼叫notifyConsumerIdsChanged
方法向每一個消費者傳送變更請求:
public class ConsumerManager {
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// ...
// 更新Channel
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新訂閱資訊
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果有變更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 觸發變更事件,consumerGroupInfo中儲存了該消費者組下的所有消費者的channel
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// ...
}
}
// DefaultConsumerIdsChangeListener
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:// 如果是消費者變更事件
case CHANGE:
if (args == null || args.length < 1) {
return;
}
// 獲取所有的消費者對應的channel
List<Channel> channels = (List<Channel>) args[0];
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) {
// 向每一個消費者傳送變更請求
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
}
break;
// ...
}
}
}
請求的傳送是在Broker2Client
的notifyConsumerIdsChanged
方法中實現的,可以看到會建立NOTIFY_CONSUMER_IDS_CHANGED
請求並行送:
public class Broker2Client {
public void notifyConsumerIdsChanged(
final Channel channel,
final String consumerGroup) {
if (null == consumerGroup) {
log.error("notifyConsumerIdsChanged consumerGroup is null");
return;
}
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
// 建立變更通知請求
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {
// 傳送請求
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());
}
}
}
消費者對Broker傳送的NOTIFY_CONSUMER_IDS_CHANGED
的請求處理在ClientRemotingProcessor
的processRequest
方法中,它會呼叫notifyConsumerIdsChanged
方法進行處理,在notifyConsumerIdsChanged
方法中可以看到觸發了一次負載均衡:
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: // NOTIFY_CONSUMER_IDS_CHANGED請求處理
// 處理變更請求
return this.notifyConsumerIdsChanged(ctx, request);
// ...
default:
break;
}
return null;
}
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
try {
// ...
// 觸發負載均衡
this.mqClientFactory.rebalanceImmediately();
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
}
return null;
}
}
消費者在停止時,需要將當前消費者負責的訊息佇列分配給其他消費者進行消費,所以在shutdown
方法中會呼叫MQClientInstance
的unregisterConsumer
方法取消註冊:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void shutdown(long awaitTerminateMillis) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown(awaitTerminateMillis);
this.persistConsumerOffset();
// 取消註冊
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
// ...
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
}
在unregisterConsumer
方法中,又呼叫了unregisterClient
方法取消註冊,與註冊消費者的邏輯相似,它會向所有的Broker傳送取消註冊的請求:
public class MQClientInstance {
public synchronized void unregisterConsumer(final String group) {
this.consumerTable.remove(group);
// 取消註冊
this.unregisterClient(null, group);
}
private void unregisterClient(final String producerGroup, final String consumerGroup) {
// 獲取所有的Broker
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
// 進行遍歷
while (it.hasNext()) {
// ...
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
String addr = entry1.getValue();
if (addr != null) {
try {
// 傳送取消註冊請求
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
// ...
} // ...
}
}
}
}
}
}
取消註冊請求的傳送是在MQClientAPIImpl
中unregisterClient
方法實現的,可以看到構建了UNREGISTER_CLIENT
請求並行送:
public class MQClientAPIImpl {
public void unregisterClient(final String addr, final String clientID, final String producerGroup, final String consumerGroup, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
// ...
requestHeader.setConsumerGroup(consumerGroup);
// 構建UNREGISTER_CLIENT請求
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
// 傳送請求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
// ...
}
}
與註冊消費者的請求處理一樣,Broker對UNREGISTER_CLIENT
的請求同樣是在ClientManageProcessor
的processRequest
中處理的,對於UNREGISTER_CLIENT
請求是呼叫unregisterClient方法處理的,裡面又呼叫了ConsumerManager
的unregisterConsumer
方法進行取消註冊:
public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT: // 處理心跳請求
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT: // 取消註冊請求
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// ...
{
final String group = requestHeader.getConsumerGroup();
if (group != null) {
// ...
// 取消消費者的註冊
this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
在ConsumerManager
的unregisterConsumer
方法中,可以看到觸發了取消註冊事件,之後如果開啟了允許通知變更,會觸發變更事件,變更事件在上面已經講解過,它會通知消費者組下的所有消費者進行一次負載均衡:
public class ConsumerManager {
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
// 觸發取消註冊事件
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
}
}
// 觸發消費者變更事件
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
}
}
在RebalanceService
的run方法中,可以看到設定了等待時間,預設是20s,所以消費者本身也會定時執行負載均衡:
public class RebalanceService extends ServiceThread {
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval); // 等待
// 負載均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
總結
參考
RocketMQ版本:4.9.3