【RocketMQ】訊息拉模式分析

2023-01-20 12:00:58

RocketMQ有兩種獲取訊息的方式,分別為推模式和拉模式。

推模式
推模式在【RocketMQ】訊息的拉取一文中已經講過,雖然從名字上看起來是訊息到達Broker後推播給消費者,實際上還是需要消費向Broker傳送拉取請求獲取訊息內容,推模式對應的訊息消費實現類為DefaultMQPushConsumerImpl,回顧一下推模式下的訊息消費過程:

  1. 消費者在啟動的時候做一些初始化工作,它會建立MQClientInstance並進行啟動;
  2. MQClientInstance中參照了訊息拉取服務PullMessageService和負載均衡服務RebalanceService,它們都繼承了ServiceThread,MQClientInstance在啟動後也會對它們進行啟動,所以訊息拉取執行緒和負載均衡執行緒也就啟動了;
  3. 負載均衡服務啟動後,會對該消費者訂閱的主題進行負載均衡,為消費者分配訊息佇列,並建立PullRequest拉取請求,用於拉取訊息;
  4. PullMessageService中等待阻塞佇列中PullRequest拉取請求的到來,接著會呼叫DefaultMQPushConsumerImplpullMessage方法進行訊息拉取;
  5. 消費者向Broker傳送拉取訊息的請求,從Broker拉取訊息;
  6. 消費者對Broker返回的響應資料進行處理,解析訊息進行消費;

推模式下進行訊息消費的例子:

@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {
    private String consumerGroup;
    private String topic = "FooBar";
    private String brokerName = "BrokerA";
    private MQClientInstance mQClientFactory;

    @Mock
    private MQClientAPIImpl mQClientAPIImpl;
    private static DefaultMQPushConsumer pushConsumer;

    @Before
    public void init() throws Exception {
        // ...
        // 消費者組
        consumerGroup = "FooBarGroup" + System.currentTimeMillis();
        // 範例化DefaultMQPushConsumer
        pushConsumer = new DefaultMQPushConsumer(consumerGroup);
        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
        // 設定拉取間隔
        pushConsumer.setPullInterval(60 * 1000);
        // 註冊訊息監聽器
        pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                Optional.ofNullable(result).orElse(new ArrayList<MessageExt>()).stream().forEach(x-> {
                    // 處理訊息
                    System.out.println(new String(x.getBody()));
                });
                return null;
            }
        });
        // ...
        // 設定訂閱的主題
        pushConsumer.subscribe(topic, "*");
        // 啟動消費者
        pushConsumer.start();
    }
}

訊息推模式的詳細過程可參考【RocketMQ】訊息的拉取,接下來我們看一下拉模式。

拉模式
首先來看一下拉模式下進行訊息消費的例子,拉模式下需要消費者不斷呼叫poll方法獲取訊息,底層是一個阻塞佇列,如果佇列中沒有資料,會進入等待直到佇列中增加了資料:

 private void testPull() {
        // 建立DefaultLitePullConsumer
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumerGroup");;
        try {
            litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
            litePullConsumer.subscribe("LitePullConsumerTest", "*");
            litePullConsumer.start();
            litePullConsumer.setPollTimeoutMillis(20 * 1000);
            while(true) {
                // 獲取訊息
                List<MessageExt> result = litePullConsumer.poll();
                Optional.ofNullable(result).orElse(new ArrayList<MessageExt>()).stream().forEach(x-> {
                    // 處理訊息
                    System.out.println(new String(x.getBody()));
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            litePullConsumer.shutdown();
        }
    }

推模式與拉模式的區別
對比上面推模式進行消費的例子,從使用方式上來講,推模式不需要消費者主動去拉取訊息,只需要註冊訊息監聽器,當有訊息到達時,觸發consumeMessage方法進行訊息消費,從表面上看就像是Broker主動推播給消費者一樣,所以叫做推模式,儘管底層還是需要消費者發起拉取請求向Broker拉取訊息

拉模式在使用方式上,需要消費者主動呼叫poll方法獲取訊息,從表面上看消費者需要不斷主動進行訊息拉取,所以叫做拉模式。

拉模式實現原理

拉模式下對應的訊息拉取實現類為DefaultLitePullConsumerImpl,在DefaultLitePullConsumerDefaultMQPullConsumer被標註了@Deprecated,已不推薦使用)的建構函式中,可以看到對其進行了範例化,並在start方進行了啟動:

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
    // 拉模式下預設的訊息拉取實現類
    private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;

    public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        this.consumerGroup = consumerGroup;
        // 建立DefaultLitePullConsumerImpl
        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
    }

    @Override
    public void start() throws MQClientException {
        setTraceDispatcher();
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        // 啟動DefaultLitePullConsumerImpl
        this.defaultLitePullConsumerImpl.start();
        // ...
    }
}

與訊息推模式類似,DefaultLitePullConsumerImpl的start的方法主要做一些初始化的工作:

  1. 初始化使用者端範例物件mQClientFactory,對應實現類為MQClientInstance,拉取服務執行緒、負載均衡執行緒都是通過MQClientInstance啟動的;
  2. 初始化負載均衡類,拉模式對應的負載均衡類為RebalanceLitePullImpl
  3. 建立訊息拉取API物件PullAPIWrapper,用於向Broker傳送拉取訊息的請求;
  4. 初始化訊息拉取偏移量;
  5. 啟動一些定時任務;
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultLitePullConsumer.changeInstanceNameToPID();
                }
                // 初始化MQClientInstance
                initMQClientFactory();
                // 初始化負載均衡
                initRebalanceImpl();
                // 初始化訊息拉取API物件
                initPullAPIWrapper();
                // 初始化拉取偏移量
                initOffsetStore();
                // 啟動MQClientInstance
                mQClientFactory.start();
                // 啟動一些定時任務
                startScheduleTask();
                this.serviceState = ServiceState.RUNNING;
                log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
                operateAfterRunning();
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }
}

負載均衡

拉取模式對應的負載均衡類為RebalanceLitePullImpl(推模式使用的是RebalanceService),在initRebalanceImpl方法中設定了消費者組、消費模式、分配策略等資訊:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
    // 範例化,拉模式使用的是RebalanceLitePullImpl
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);

    private void initRebalanceImpl() {
        // 設定消費者組
        this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
        // 設定消費模式
        this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
        // 設定分配策略
        this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
        // 設定mQClientFactory
        this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    }
}

【RocketMQ】訊息的拉取一文中已經講到過,消費者啟動後會進行負載均衡,對每個主題進行負載均衡,拉模式下處理邏輯也是如此,所以這裡跳過中間的過程,進入到rebalanceByTopic方法,可以負載均衡之後如果消費者負載的ProcessQueue發生了變化,會呼叫messageQueueChanged方法觸發變更事件:

public abstract class RebalanceImpl {
     private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // ...
            }
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                // ...
                if (mqSet != null && cidAll != null) {
                    // ...
                    try {
                        // 分配訊息佇列
                        allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    // 更新處理佇列
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        // 觸發變更事件
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }
}

觸發訊息佇列變更事件

RebalanceLitePullImplmessageQueueChanged方法中又呼叫了MessageQueueListenermessageQueueChanged方法觸發訊息佇列改變事件:

public class RebalanceLitePullImpl extends RebalanceImpl {
    @Override
    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
        if (messageQueueListener != null) {
            try {
                // 觸發改變事件
                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
            } catch (Throwable e) {
                log.error("messageQueueChanged exception", e);
            }
        }
    }
}

MessageQueueListenerImplDefaultLitePullConsumerImpl的內部類,在messageQueueChanged方法中,不管是廣播模式還是叢集模式,都會呼叫updatePullTask更新拉取任務:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    class MessageQueueListenerImpl implements MessageQueueListener {
        @Override
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
            MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
            switch (messageModel) {
                case BROADCASTING:
                    updateAssignedMessageQueue(topic, mqAll);
                    updatePullTask(topic, mqAll); // 更新拉取任務
                    break;
                case CLUSTERING:
                    updateAssignedMessageQueue(topic, mqDivided);
                    updatePullTask(topic, mqDivided); // 更新拉取任務
                    break;
                default:
                    break;
            }
        }
    }
}

更新拉取任務

在updatePullTask方法中,從拉取任務表taskTable中取出了所有的拉取任務進行遍歷,taskTable中記錄了之前分配的拉取任務,負載均衡之後可能發生變化,所以需要對其進行更新,這一步主要是處理原先分配給當前消費者的訊息佇列,在負載均衡之後不再由當前消費者負責,所以需要從taskTable中刪除,之後呼叫startPullTask啟動拉取任務:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();

    private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
        // 從拉取任務表中獲取之前分配的訊息佇列進行遍歷
        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
            // 如果與重新進行負載均衡的主題一致
            if (next.getKey().getTopic().equals(topic)) {
                // 如果重新分配的訊息佇列集合中不包含此訊息獨立
                if (!mqNewSet.contains(next.getKey())) {
                    next.getValue().setCancelled(true);
                    // 從任務表移除
                    it.remove();
                }
            }
        }
        // 啟動拉取任務
        startPullTask(mqNewSet);
    }
}

提交拉取任務

startPullTask方法入參中傳入的是負載均衡後重新分配的訊息佇列集合,在startPullTask中會對重新分配的集合進行遍歷,如果taskTable中不包含某個訊息佇列,就構建PullTaskImpl物件,加入taskTable,這一步主要是處理負載均衡後新增的訊息佇列,為其構建PullTaskImpl加入到taskTable,之後將拉取訊息的任務PullTaskImpl提交到執行緒池週期性的執行:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    private void startPullTask(Collection<MessageQueue> mqSet) {
        // 遍歷最新分配的訊息佇列集合
        for (MessageQueue messageQueue : mqSet) {
            // 如果任務表中不包含
            if (!this.taskTable.containsKey(messageQueue)) {
                // 建立拉取任務
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
                // 加入到任務表
                this.taskTable.put(messageQueue, pullTask);
                // 將任務提交到執行緒池定時執行
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
            }
        }
    }
}

拉取訊息

PullTaskImpl繼承了Runnable,在run方法中的處理邏輯如下:

  1. 獲取訊息佇列對應處理佇列ProcessQueue;
  2. 獲取訊息拉取偏移量,也就是從何處開始拉取訊息;
  3. 呼叫pull方法進行訊息拉取;
  4. 判斷拉取結果,如果拉取到了訊息,將拉取到的結果封裝為ConsumeRequest進行提交,也就是放到了阻塞佇列中,後續消費者從佇列中獲取資料進行消費;
   public class PullTaskImpl implements Runnable {
        private final MessageQueue messageQueue;
        private volatile boolean cancelled = false;
        private Thread currentThread;

        @Override
        public void run() {
            // 如果未取消
            if (!this.isCancelled()) {
                this.currentThread = Thread.currentThread();
                // ...
                // 獲取訊息佇列對應的ProcessQueue
                ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
                // ...  跳過一系列校驗
                long offset = 0L;
                try {
                    // 獲取拉取偏移量
                    offset = nextPullOffset(messageQueue);
                } catch (Exception e) {
                    log.error("Failed to get next pull offset", e);
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
                    return;
                }

                if (this.isCancelled() || processQueue.isDropped()) {
                    return;
                }
                long pullDelayTimeMills = 0;
                try {
                    SubscriptionData subscriptionData;
                    // 獲取主題
                    String topic = this.messageQueue.getTopic();
                    // 獲取主題對應的訂閱資訊SubscriptionData
                    if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                        subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
                    } else {
                        subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
                    }
                    // 拉取訊息
                    PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
                    if (this.isCancelled() || processQueue.isDropped()) {
                        return;
                    }
                    // 判斷拉取結果
                    switch (pullResult.getPullStatus()) {
                        case FOUND: // 如果獲取到了資料
                            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
                            synchronized (objLock) { // 加鎖
                                if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                                    processQueue.putMessage(pullResult.getMsgFoundList());
                                    // 將拉取結果封裝為ConsumeRequest,提交消費請求
                                    submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
                                }
                            }
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("The pull request offset illegal, {}", pullResult.toString());
                            break;
                        default:
                            break;
                    }
                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
                } catch (InterruptedException interruptedException) {
                    log.warn("Polling thread was interrupted.", interruptedException);
                } catch (Throwable e) {
                    pullDelayTimeMills = pullTimeDelayMillsWhenException;
                    log.error("An error occurred in pull message process.", e);
                }
                // ...
            }
        }
    }

submitConsumeRequest方法中可以看到將建立的ConsumeRequest物件放入了阻塞佇列consumeRequestCache中:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    // 阻塞佇列
    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();

    private void submitConsumeRequest(ConsumeRequest consumeRequest) {
        try {
            // 放入阻塞佇列consumeRequestCache中
            consumeRequestCache.put(consumeRequest);
        } catch (InterruptedException e) {
            log.error("Submit consumeRequest error", e);
        }
    }
}

訊息消費

在前面的例子中,可以看到消費者是呼叫poll方法獲取資料的,進入到poll方法中,可以看到是從consumeRequestCache中獲取消費請求的,然後從中解析出訊息內容返回:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
    public synchronized List<MessageExt> poll(long timeout) {
        try {
            // ...
            long endTime = System.currentTimeMillis() + timeout;
            // 從consumeRequestCache中獲取資料進行處理
            ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            // ...
            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                // 獲取訊息內容
                List<MessageExt> messages = consumeRequest.getMessageExts();
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
                assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                this.resetTopic(messages);
                // 返回訊息內容
                return messages;
            }
        } catch (InterruptedException ignore) {
        }
        return Collections.emptyList();
    }
}

參考

RocketMQ原始碼分析之pull模式consumer

RocketMQ版本:4.9.3