RocketMQ訊息的消費以組為單位,有兩種消費模式:
廣播模式:同一個訊息佇列可以分配給組內的每個消費者,每條訊息可以被組內的消費者進行消費。
叢集模式:同一個消費組下,一個訊息佇列同一時間只能分配給組內的一個消費者,也就是一條訊息只能被組內的一個消費者進行消費。(一般情況下都使用的是叢集模式)
訊息的獲取也有兩種模式:
拉模式:消費者主動發起拉取訊息的請求,獲取訊息進行消費。
推模式:訊息到達Broker後推播給消費者。RocketMQ對拉模式進行了包裝去實現推模式,本質還是需要消費者去拉取,一個拉取任務完成後繼續下一次拉取。
首先來看一個RocketMQ原始碼中基於推模式DefaultMQPushConsumer
進行消費的例子,首先為消費者設定了消費者組名稱,然後註冊了訊息監聽器,並設定訂閱的主題,最後呼叫start方法啟動消費者,接下來就去看看DefaultMQPushConsumer
如何進行訊息消費的:
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
private String brokerName = "BrokerA";
private MQClientInstance mQClientFactory;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private static DefaultMQPushConsumer pushConsumer;
@Before
public void init() throws Exception {
// ...
// 消費者組
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
// 範例化DefaultMQPushConsumer
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
// 設定拉取間隔
pushConsumer.setPullInterval(60 * 1000);
// 註冊訊息監聽器
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return null;
}
});
// ...
// 設定訂閱的主題
pushConsumer.subscribe(topic, "*");
// 啟動消費者
pushConsumer.start();
}
}
DefaultMQPushConsumer
實現了MQPushConsumer
介面,它參照了預設的訊息推播實現類DefaultMQPushConsumerImpl
,在建構函式中可以看到對其進行了範例化,並在start方法中進行了啟動:
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
/**
* 預設的訊息推播實現類
*/
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
/**
* 建構函式
*/
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
// 範例化DefaultMQPushConsumerImpl
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
/**
* 啟動
*/
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// 啟動消費者
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
}
DefaultMQPushConsumerImpl的start方法中處理邏輯如下:
copySubscription
方法處理訊息訂閱,主要是將訂閱資訊包裝成SubscriptionData物件,加入到負載均衡物件rebalanceImpl
中mQClientFactory
,對應實現類為MQClientInstance
,拉取服務執行緒、負載均衡執行緒都是通過MQClientInstance
啟動的RebalanceImpl
設定消費組、消費模式、分配策略,RebalanceImpl
是一個抽象類,在範例化時可以看到使用的是RebalancePushImpl
型別的PullAPIWrapper
,用於向Broker傳送拉取訊息的請求offsetStore
LocalFileOffsetStore
。RemoteBrokerOffsetStore
。MQClientInstance
的registerConsumer
將消費者組的資訊註冊到MQClientInstance
的consumerTable
中mQClientFactory
的start方法啟動MQClientInstance
mQClientFactory
的rebalanceImmediately
方法進行負載均衡public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// MQClientInstance
private MQClientInstance mQClientFactory;
// 負載均衡物件,具體使用的是RebalancePushImpl進行範例化
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
// 訊息拉取API物件PullAPIWrapper
private PullAPIWrapper pullAPIWrapper;
// 消費進度儲存物件
private OffsetStore offsetStore;
public synchronized void start() throws MQClientException {
// 判斷狀態
switch (this.serviceState) {
case CREATE_JUST: // 如果是建立未啟動狀態
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
// 先置為失敗狀態
this.serviceState = ServiceState.START_FAILED;
// 檢查設定
this.checkConfig();
// 處理訊息訂閱
this.copySubscription();
// 如果是叢集模式
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 建立MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 設定消費者組
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 設定消費模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 設定分配策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 設定MQClientInstance
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 建立訊息拉取API物件
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 註冊訊息過濾勾點
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 消費模式判斷
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING: // 廣播模式
// 消費進度儲存在消費者本地,從本地獲取
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING: // 叢集模式
// 消費進度需要從Broker獲取
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
// 設定消費進度
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 載入消費進度
this.offsetStore.load();
// 如果是順序消費
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
// 建立順序消費service:ConsumeMessageOrderlyService
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
// 非順序消費,使用ConsumeMessageConcurrentlyService
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// 啟動消費服務
this.consumeMessageService.start();
// 將消費者資訊註冊到mQClientFactory中,key為消費者組名稱,value為消費者也就是當前物件
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 啟動MQClientInstance
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
// 狀態更改為執行中
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 進行負載均衡
this.mQClientFactory.rebalanceImmediately();
}
}
public class MQClientInstance {
// 註冊消費者
public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) {
if (null == group || null == consumer) {
return false;
}
// 將消費者組資訊新增到consumerTable中
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {
log.warn("the consumer group[" + group + "] exist already.");
return false;
}
return true;
}
}
在copySubscription
方法中,從defaultMQPushConsumer
獲取了設定的主題訂閱資訊,在前面的例子中可以看到向defaultMQPushConsumer中新增了訂閱的主題資訊,所以這裡獲取到了之前新增的主題資訊MAP集合,其中KEY為主題,VALUE為表示式,然後遍歷訂閱資訊集合,將訂閱資訊包裝成SubscriptionData物件,並加入到負載均衡物件rebalanceImpl
中:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// DefaultMQPushConsumer
private final DefaultMQPushConsumer defaultMQPushConsumer;
private void copySubscription() throws MQClientException {
try {
// 獲取訂閱資訊,KEY為主題,VALUE為表示式
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
// 獲取主題
final String topic = entry.getKey();
// 獲取表示式
final String subString = entry.getValue();
// 構建主題資訊物件
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
// 加入到負載均衡實現類中
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
// 獲取重試主題
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
// 訂閱重試主題
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
}
MQClientInstance
中有以下幾個主要的成員變數:
pullMessageService:對應實現類為PullMessageService
,是用來拉取訊息的服務
rebalanceService:對應的實現類為RebalanceService
,是用來進行負載均衡的服務
consumerTable:消費者組資訊,key為消費者組名稱,value為註冊的消費者,上面可知在start方法中呼叫了registerConsumer
方法進行了消費者註冊
RebalanceService
和PullMessageService
都繼承了ServiceThread,在MQClientInstance
的start方法中,分別呼叫了pullMessageService和rebalanceService的start方法啟動拉取服務執行緒和負載均衡執行緒:
public class MQClientInstance {
// 拉取訊息Service
private final PullMessageService pullMessageService;
// 負載均衡service
private final RebalanceService rebalanceService
// 消費者組資訊,key為消費者組名稱,value為註冊的消費者
private final ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
// ...
// 建立MQClientAPIImpl
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
// ...
this.mQAdminImpl = new MQAdminImpl(this);
// 建立拉取訊息service
this.pullMessageService = new PullMessageService(this);
// 建立負載均衡service,並在建構函式中傳入了當前物件
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
// ...
}
// 啟動
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
// ...
this.startScheduledTask();
// 啟動拉取訊息服務
this.pullMessageService.start();
// 啟動負載均衡服務
this.rebalanceService.start();
// ...
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
}
PullMessageService
繼承了ServiceThread
,並且使用了阻塞佇列pullRequestQueue
儲存訊息拉取請求,PullMessageService
被啟動後,在run方法中等待pullRequestQueue
中拉取請求的到來,然後呼叫pullMessage
方法拉取訊息, 在pullMessage
中又是呼叫DefaultMQPushConsumerImpl
的pullMessage
進行訊息拉取的:
public class PullMessageService extends ServiceThread {
// 拉取請求阻塞佇列
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 拉取訊息
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
// 拉取訊息
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
// 轉換為DefaultMQPushConsumerImpl
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
// 呼叫pullMessage拉取訊息
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
}
這裡可能會有一個疑問,既然PullMessageService
在等待拉取請求的到來,那麼什麼時候會往pullRequestQueue
中新增拉取訊息的請求?
可以看到在PullMessageService
的executePullRequestImmediately
方法中,將拉取請求新增到了阻塞佇列pullRequestQueue
中:
public class PullMessageService extends ServiceThread {
// 拉取請求阻塞佇列
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
// 向佇列中新增拉取訊息的請求資訊
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
}
那麼接下來只需看看哪裡呼叫了PullMessageService
的executePullRequestImmediately
方法就可以找到在何時向佇列中新增拉取請求的:
可以看到DefaultMQPushConsumerImpl
的executePullRequestImmediately
方法中呼叫了PullMessageService
的executePullRequestImmediately
方法:
public void executePullRequestImmediately(final PullRequest pullRequest) {
// 呼叫PullMessageService的executePullRequestImmediately方法
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
接下來再看看哪裡呼叫了DefaultMQPushConsumerImpl
的executePullRequestImmediately
:
發現有兩處進行了呼叫:
DefaultMQPushConsumerImpl
的pullMessage
方法RebalancePushImpl
的dispatchPullRequest
方法前面可知PullMessageService
處理拉取請求的時候就是呼叫的DefaultMQPushConsumerImpl
的pullMessage
方法進行處理的,所以如果是首次新增拉取請求,一定不是從這個入口新增的,那麼首次大概就是從RebalancePushImpl這個地方新增的,接下來就去看看RebalancePushImpl
如何新增拉取請求的。
MQClientInstance
的start方法中,啟動了負責均衡服務的執行緒,在RebalanceService
的run方法中,呼叫了waitForRunning
方法進行阻塞等待,如果負責均衡服務被喚醒,將會呼叫MQClientInstance
的doRebalance
進行負載均衡:
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory; // 參照了MQClientInstance
// 建構函式
public RebalanceService(MQClientInstance mqClientFactory) {
// 設定MQClientInstance
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// 等待執行
this.waitForRunning(waitInterval);
// 進行負載均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
前面可知DefaultMQPushConsumerImpl
在啟動的時候呼叫了MQClientInstance
的rebalanceImmediately
方法,在rebalanceImmediately
方法中可以看到,呼叫了rebalanceService
的wakeup
方法喚醒負載均衡執行緒,(關於wakeup方法的實現前面在講解訊息傳送時已經分析過這裡不再贅述):
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
// ...
// 喚醒負載均衡服務,也就是呼叫MQClientInstance的rebalanceImmediately方法
this.mQClientFactory.rebalanceImmediately();
}
}
public class MQClientInstance {
public void rebalanceImmediately() {
// 喚醒負載均衡服務
this.rebalanceService.wakeup();
}
}
負責均衡服務被喚醒後,會呼叫MQClientInstance
的doRebalance
進行負載均衡,處理邏輯如下:
doRebalance
方法對每一個消費者進行負載均衡,前面可知消費者是DefaultMQPushConsumerImpl
型別的public class MQClientInstance {
public void doRebalance() {
// 遍歷註冊的消費者
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
// 負載均衡,前面可知消費者是DefaultMQPushConsumerImpl型別的
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
}
接下來進入到DefaultMQPushConsumerImpl
的doRebalance
,可以看到它又呼叫了rebalanceImpl
的doRebalance
進行負載均衡:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public void doRebalance() {
if (!this.pause) {
// 這裡又呼叫了rebalanceImpl的doRebalance進行負載均衡
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
}
RebalanceImpl
RebalanceImpl
的doRebalance
處理邏輯如下:
rebalanceByTopic
對每一個主題進行負載均衡public abstract class RebalanceImpl {
public void doRebalance(final boolean isOrder) {
// 獲取訂閱的主題資訊
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍歷所有訂閱的主題
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 根據主題進行負載均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
}
rebalanceByTopic
方法中根據消費模式進行了判斷然後對主題進行負載均衡,這裡我們關注叢集模式下的負載均衡:
從topicSubscribeInfoTable
中根據主題獲取對應的訊息佇列集合
根據主題資訊和消費者組名稱,獲取所有訂閱了該主題的消費者ID集合
如果主題對應的訊息佇列集合和消費者ID都不為空,對訊息佇列集合和消費ID集合進行排序
獲取分配策略,根據分配策略,為當前的消費者分配對應的消費佇列,RocketMQ預設提供了以下幾種分配策略:
AllocateMessageQueueAveragely:平均分配策略,根據訊息佇列的數量和消費者的個數計算每個消費者分配的佇列個數。
AllocateMessageQueueAveragelyByCircle:平均輪詢分配策略,將訊息佇列逐個分發給每個消費者。
AllocateMessageQueueConsistentHash:根據一致性 hash進行分配。
llocateMessageQueueByConfig:根據設定,為每一個消費者設定固定的訊息佇列 。
AllocateMessageQueueByMachineRoom:分配指定機房下的訊息佇列給消費者。
AllocateMachineRoomNearby:優先分配給同機房的消費者。
根據最新分配的訊息佇列,呼叫updateProcessQueueTableInRebalance
更新當前消費者消費的佇列資訊
public abstract class RebalanceImpl {
// 根據主題進行負載均衡
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: { // 廣播模式
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// ...
break;
}
case CLUSTERING: { // 叢集模式
// 根據主題獲取訂閱的訊息佇列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 獲取所有訂閱了該主題的消費者id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// ...
if (mqSet != null && cidAll != null) { // 如果都不為空
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 對訊息佇列排序
Collections.sort(mqAll);
// 對消費者排序
Collections.sort(cidAll);
// 獲取分配策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 根據分配策略,為當前的消費者分配消費佇列
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
// 分配給當前消費的消費佇列
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
// 將分配結果加入到結果集合中
allocateResultSet.addAll(allocateResult);
}
// 根據分配資訊更新處理佇列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
// ...
}
break;
}
default:
break;
}
}
}
updateProcessQueueTableInRebalance
方法同樣在RebalanceImpl
中,RebalanceImpl
中使用了一個ConcurrentMap
型別的處理佇列表儲存訊息佇列及對應的佇列處理資訊,updateProcessQueueTableInRebalance
方法的入參中topic表示當前要進行負載均衡的主題,mqSet中記錄了重新分配給當前消費者的訊息佇列,主要處理邏輯如下:
processQueueTable
進行遍歷,處理每一個訊息佇列,如果佇列表為空直接進入第2步:
processQueueTable
中,需要進行如下處理:
processQueueTable
PullRequest
,設定訊息的拉取資訊,並加入到拉取訊息請求集合pullRequestList
中可以看到,經過這一步,如果分配給當前消費者的消費佇列不在processQueueTable
中,就會構建拉取請求PullRequest
,然後呼叫dispatchPullRequest處理訊息拉取請求。
public abstract class RebalanceImpl {
// 處理佇列表,KEY為訊息佇列,VALUE為對應的處理資訊
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
// 負載均衡,topic表示當前要進行負載均衡的主題,mqSet中記錄了重新分配給當前消費者的訊息佇列
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// 處理佇列表
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
// 獲取訊息佇列
MessageQueue mq = next.getKey();
// 獲取處理佇列
ProcessQueue pq = next.getValue();
// 主題是否一致
if (mq.getTopic().equals(topic)) {
// 如果佇列集合中不包含當前的佇列
if (!mqSet.contains(mq)) {
// 設定為dropped
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) { // 是否過期
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true); // 設定為刪除
// ...
break;
default:
break;
}
}
}
}
// 建立拉取請求集合
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 遍歷本次分配的訊息佇列集合
for (MessageQueue mq : mqSet) {
// 如果之前不在processQueueTable中
if (!this.processQueueTable.containsKey(mq)) {
// ...
// 建立ProcessQueue
ProcessQueue pq = new ProcessQueue();
long nextOffset = -1L;
try {
// 計算訊息拉取偏移量
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
// 如果偏移量大於等於0
if (nextOffset >= 0) {
// 放入處理佇列表中
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
// 如果之前已經存在,不需要進行處理
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 如果之前不存在,構建PullRequest,之後會加入到阻塞佇列中,進行訊息拉取
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);// 設定消費組
pullRequest.setNextOffset(nextOffset);// 設定拉取偏移量
pullRequest.setMessageQueue(mq);// 設定訊息佇列
pullRequest.setProcessQueue(pq);// 設定處理佇列
pullRequestList.add(pullRequest);// 加入到拉取訊息請求集合
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 新增訊息拉取請求
this.dispatchPullRequest(pullRequestList);
return changed;
}
}
在dispatchPullRequest方法中可以看到,對pullRequestList進行了遍歷,然後將每一個拉取請求呼叫defaultMQPushConsumerImpl
的executePullRequestImmediately
方法新增到了PullMessageService
的阻塞佇列中等待進行訊息拉取:
public class RebalancePushImpl extends RebalanceImpl {
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
// 加入到阻塞佇列中
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
}
上面可知,如果阻塞佇列中新增了拉取訊息的請求,接下來會呼叫DefaultMQPushConsumerImpl
的pullMessage
方法進行訊息拉取,處理邏輯如下:
processQueue
,判斷是否置為Dropped刪除狀態,如果處於刪除狀態不進行處理processQueue
中佇列最大偏移量和最小偏移量的間距是否超過ConsumeConcurrentlyMaxSpan
的值,如果超過需要進行流量控制,延遲50毫秒後重新加入佇列中進行處理PullAPIWrapper
的pullKernelImpl
方法向Broker傳送拉取訊息請求public class DefaultMQPushConsumerImpl implements MQConsumerInner {
/**
* 拉取延遲毫秒數
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
/**
* 出現異常後的延遲處理毫秒數
*/
private long pullTimeDelayMillsWhenException = 3000;
public void pullMessage(final PullRequest pullRequest) {
// 從請求中獲取處理佇列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 如果被置為Dropped,不進行處理
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
// 設定拉取時間
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
// ...
// 獲取訊息數量
long cachedMessageCount = processQueue.getMsgCount().get();
// 獲取訊息大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 判斷當前處理的訊息條數是否超過限制
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// ...
return;
}
// 判斷當前處理的訊息大小是否超過限制
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
// 延遲進行處理
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// ...
return;
}
if (!this.consumeOrderly) {// 非順序消費
// 佇列最大偏移量和最小偏移量的間距是否超過ConsumeConcurrentlyMaxSpan
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
// 延遲處理
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// ...
return;
}
} else {
// 順序消費
// ...
}
// 獲取主題訂閱資訊
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
// 延遲處理
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
// 建立訊息拉取成功後的回撥函數
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
// ...
}
// ...
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
// 獲取主題的訂閱資訊
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
// 構建訊息拉取系統標記
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
// 傳送請求拉取訊息
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
}
PullAPIWrapper中主要是獲取了Broker的地址,然後建立拉取請求頭PullMessageRequestHeader,設定拉取的相關資訊,然後呼叫MQClientAPIImpl
的pullMessage
拉取訊息:
public class PullAPIWrapper {
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 根據BrokerName獲取Broker資訊
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
// ...
if (findBrokerResult != null) {
// ...
// 建立拉取訊息的請求頭
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup); // 設定消費組
requestHeader.setTopic(mq.getTopic()); // 設定主題
requestHeader.setQueueId(mq.getQueueId()); // 設定佇列ID
requestHeader.setQueueOffset(offset); // 設定拉取偏移量
requestHeader.setMaxMsgNums(maxNums); // 設定拉取最大訊息個數
requestHeader.setSysFlag(sysFlagInner);// 設定系統標識
requestHeader.setCommitOffset(commitOffset); // 設定commit偏移量
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);// 設定訂閱主題表示式
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
// 獲取Broker地址
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
// 調MQClientAPIImpl的pullMessage拉取訊息
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
MQClientAPIImpl
的pullMessage
中首先構建了遠端請求RemotingCommand
,可以看到請求型別設定的是拉取訊息PULL_MESSAGE
:
pullMessageAsync
方法拉取訊息pullMessageSync
方法拉取訊息以非同步訊息拉取pullMessageAsync
為例,看一下請求的傳送:
invokeAsync
向Broker傳送拉取訊息的請求onSuccess
方法處理訊息public class MQClientAPIImpl {
/**
* 傳送請求拉取訊息
*/
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
// 構建請求,這裡可以看到請求型別是拉取訊息PULL_MESSAGE
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC: // 非同步
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC: // 同步
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
// 非同步傳送請求拉取訊息
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
// 傳送請求
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
// 獲取響應
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
// 處理響應
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
assert pullResult != null;
// 呼叫回撥函數處理
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
// ...
}
}
});
}
}
Broker在啟動的時候註冊了訊息拉取請求處理器PullMessageProcessor
:
public class BrokerController {
private final PullMessageProcessor pullMessageProcessor;
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// ...
// 建立PullMessageProcessor
this.pullMessageProcessor = new PullMessageProcessor(this);
// ...
}
public void registerProcessor() {
// ...
// 註冊處理器
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
// ...
}
}
PullMessageProcessor
的processRequest
方法中用於處理消費者傳送的訊息拉取請求,處理邏輯如下:
MessageStore
的getMessage
方法查詢訊息public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
// 根據消費者組獲取訂閱設定
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}
// ...
// 拉取訊息
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
if (getMessageResult != null) {
// 設定拉取結果
response.setRemark(getMessageResult.getStatus().name());
// 設定下一次的拉取偏移量
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
// 設定最小偏移量
responseHeader.setMinOffset(getMessageResult.getMinOffset());
// 設定最大偏移量
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
// ...
switch (response.getCode()) {
case ResponseCode.SUCCESS:
// ...
break;
case ResponseCode.PULL_NOT_FOUND: // 如果訊息未找到
// 如果允許掛起
if (brokerAllowSuspend && hasSuspendFlag) {
// 掛起的超時時間
long pollingTimeMills = suspendTimeoutMillisLong;
// 如果未開啟長輪詢
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
// 從設定中獲取
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
// 構建拉取請求
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
// 提交到PullRequestHoldService進行掛起
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
// ...
break;
default:
assert false;
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}
// ...
return response;
}
}
DefaultMessageStore
的getMessage
用於根據消費者的拉取請求查詢對應的訊息資料,它首先根據主題名稱和佇列ID查詢對應的消費佇列,並獲取訊息佇列對應的CommitLog檔案的最小偏移量minOffset
和最大偏移量maxOffset
,然後校驗本次拉取的偏移量是否在最小偏移量和最大偏移量之間,如果不在,會呼叫nextOffsetCorrection
進行糾正,所以先來看一下nextOffsetCorrection
方法:
// 糾正下一次拉取訊息的偏移量
private long nextOffsetCorrection(long oldOffset, long newOffset) {
long nextOffset = oldOffset;
// 如果當前Broker不是從節點或者是設定了OffsetCheckInSlave校驗
if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {
nextOffset = newOffset; // 更新拉取進度
}
// 返回nextOffset
return nextOffset;
}
由nextOffsetCorrection
方法可知,如果當前Broker是主節點或者開啟了OffsetCheckInSlave校驗,才會糾正下次的拉取進度設定,否則依舊使用原來的拉取偏移量。
根據拉取偏移量與CommitLog最大最小偏移量的值的對比,處理結果有以下幾種情況:
NO_MESSAGE_IN_QUEUE:對應maxOffset最大偏移量為0的情況,說明當前訊息佇列中沒有訊息,呼叫nextOffsetCorrection設定下一次的拉取偏移量為0,從0開始拉取。
nextOffsetCorrection方法糾正拉取偏移量的條件為當前Broker是主節點或者開啟了OffsetCheckInSlave校驗,所以只有在這個條件下,才會更新為新的拉取偏移量,在當前這個情況下也就是會更新為0,下次從0開始拉取, 如果條件不成立,則不會進行更新,依舊使用原來的拉取偏移量。
OFFSET_TOO_SMALL:對應請待拉取偏移量offset小於CommitLog檔案的最小偏移量的情況,說明拉取進度值過小,呼叫nextOffsetCorrection設定下一次的拉取偏移量為CommitLog檔案的最小偏移量(需要滿足nextOffsetCorrection的更新條件)。
OFFSET_OVERFLOW_ONE:對應待拉取偏移量offset等於CommitLog檔案的最大偏移量的情況,此時雖然呼叫了nextOffsetCorrection進行糾正,但是設定的更新偏移量依舊為offset的值,也就是不進行更新。
OFFSET_OVERFLOW_BADLY:對應待拉取偏移量offset大於CommitLog檔案最大偏移量的情況,說明拉取偏移量越界,此時有以下兩種情況:
NO_MATCHED_LOGIC_QUEUE:如果根據主題未找到訊息佇列,返回沒有匹配的佇列
FOUND:待拉取訊息偏移量介於最大最小偏移量之間,此時根據拉取偏移量和大小從CommitLog中獲取訊息資料
public class DefaultMessageStore implements MessageStore {
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
// ...
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = null;
// 獲取CommitLog檔案的最大偏移量
final long maxOffsetPy = this.commitLog.getMaxOffset();
// 根據主題和佇列ID查詢消費佇列
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) { // 如果消費佇列不為空
// 獲取訊息佇列最小偏移量
minOffset = consumeQueue.getMinOffsetInQueue();
// 獲取訊息佇列最大偏移量
maxOffset = consumeQueue.getMaxOffsetInQueue();
// 如果最大偏移量為0,說明當前訊息佇列中沒有訊息
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
// 設定下一次的拉取偏移量為0
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) { // 如果待拉取偏移量小於最小偏移量
status = GetMessageStatus.OFFSET_TOO_SMALL;
// 設定下一次的拉取偏移量為minOffset
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) { // 如果待拉取偏移量等於最大偏移量
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
// 設定下一次的拉取偏移量為offset也就是不進行更新
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) { // 如果待拉取偏移量大於最大偏移量
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
// 如果最小偏移量為0
if (0 == minOffset) {
// 更新下次拉取偏移量為minOffset
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
// 更新下次拉取偏移量為maxOffset
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
} else {
// 根據偏移量獲取訊息佇列對應的ConsumeQueue
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
// 如果不為空
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
// ...
// 建立獲取訊息結果物件GetMessageResult
getResult = new GetMessageResult(maxMsgNums);
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 獲取偏移量
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
// 獲取大小
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
// ...
// 根據拉取偏移量和大小從CommitLog中獲取訊息資料
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
// ...
this.storeStatsService.getGetMessageTransferedMsgCount().add(1);
// 設定訊息內容
getResult.addMessage(selectResult);
// 設定查詢狀態為FOUND
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
// ...
// 計算下次拉取偏移量
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
// 未查詢到
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+ maxOffset + ", but access logic queue failed.");
}
}
} else {
// 如果未查詢到訊息佇列
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
// ...
// 設定查詢結果
getResult.setStatus(status); // 查詢結果狀態
getResult.setNextBeginOffset(nextBeginOffset); // 下一次拉取的偏移量
getResult.setMaxOffset(maxOffset);// CommitLog最大偏移量
getResult.setMinOffset(minOffset);// CommitLog最小偏移量
return getResult;
}
}
前面知道,非同步拉取訊息的時候註冊了回撥函數PullCallback,當請求返回響應結果之後,會執行回撥函數,這次進入到DefaultMQPushConsumerImpl
的pullMessage
回撥函數中看一下都做了什麼。
在onSuccess
方法中,首先呼叫了PullAPIWrapper
的processPullResult
方法處理返回的響應資訊,然後根據拉取結果進行處理,拉取結果有以下幾種情況:
FOUND:對應GetMessageResult.FOUND的情況,此時判斷是否拉取到了訊息
如果未拉取到訊息,將拉取請求放入到阻塞佇列中再進行一次拉取
如果拉取到了訊息,將訊息提交到ConsumeMessageService中進行消費(非同步處理),然後判斷拉取間隔PullInterval是否大於0,如果大於0,表示需要等待一段時間後再進行拉取,此時呼叫executePullRequestLater
方法延遲下一次拉取,如果PullInterval小於0表示需要立刻進行下一次拉取,此時呼叫executePullRequestImmediately
將拉取請求加入佇列中進行下一次拉取。
NO_MATCHED_MSG:沒有匹配的訊息,此時更新下一次的拉取偏移量,呼叫executePullRequestImmediately
將拉取請求加入佇列中重新進行拉取。
OFFSET_ILLEGAL:拉取偏移量不合法,此時設定下一次拉取偏移量,並將拉取請求中存放的ProcessQueue置為dropped刪除狀態,然後通過DefaultMQPushConsumerImpl
提交非同步任務,在任務中重新更新拉取偏移量,並將ProcessQueue刪除
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 處理拉取結果
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
// 判斷拉取結果
switch (pullResult.getPullStatus()) {
case FOUND:
// 獲取上一次拉取的偏移量
long prevRequestOffset = pullRequest.getNextOffset();
// 更新下一次拉取的偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
// 如果未拉取到訊息
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
// 將拉取請求放入到阻塞佇列中再進行一次拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// 將訊息加入到processQueue
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 將訊息提交到ConsumeMessageService中進行消費(非同步處理)
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 如果PullInterval大於0,等待PullInterval毫秒後將物件放入到阻塞佇列中
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
// 根據間隔時間稍後再進行拉取
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
// 立刻進行下一次拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
// ...
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG: // 沒有匹配的訊息
// 更新下一次的拉取偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
// 再次進行拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL: // 不合法
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
// 設定下一次拉取偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// 設定為dropped,進行丟棄
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
// 更新拉取偏移量
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
// 持久化
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
// 移除處理佇列,等待下一次負責均衡
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
// 如果出現異常,稍後再進行拉取
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
總結
參考
丁威、周繼鋒《RocketMQ技術內幕》
RocketMQ版本:4.9.3