【RocketMQ】【原始碼】延遲訊息實現原理

2023-09-15 12:02:58

RocketMQ設定了延遲級別可以讓訊息延遲消費,延遲訊息會使用SCHEDULE_TOPIC_XXXX這個主題,每個延遲等級對應一個訊息佇列,並且與普通訊息一樣,會儲存每個訊息佇列的消費進度(delayOffset.json中的offsetTable):

public class MessageStoreConfig {
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}

延遲級別與延遲時間對應關係:
延遲級別0 ---> 對應延遲時間1s,也就是延遲1秒後消費者重新從Broker拉取進行消費
延遲級別1 ---> 延遲時間5s
延遲級別2 ---> 延遲時間10s
...
以此類推,最大的延遲時間為2h。

延遲訊息

使用延遲訊息時,只需設定延遲級別即可,Broker在儲存的時候會判斷是否設定了延遲級別,如果設定了延遲級別就按延遲訊息來處理,由【訊息的儲存】文章可知,訊息儲存之前會進入到asyncPutMessage方法中,延遲訊息的處理就是在這裡做的,處理邏輯如下:

  1. 判斷訊息的延遲級別是否超過了最大延遲級別,如果超過了就使用最大延遲級別;

  2. 獲取RMQ_SYS_SCHEDULE_TOPIC,它是在TopicValidator中定義的常數,值為SCHEDULE_TOPIC_XXXX:

    public class TopicValidator {
        // ...
        public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
    }
    
  3. 根據延遲級別選取對應的佇列,一般會把相同延遲級別的訊息放在同一個佇列中;

  4. 將訊息原本的TOPIC和佇列ID設定到訊息屬性中;

  5. 更改訊息佇列的主題為RMQ_SYS_SCHEDULE_TOPIC,所以延遲訊息的主題最終被設定為RMQ_SYS_SCHEDULE_TOPIC,會將訊息投遞到延遲佇列中;

public class CommitLog {
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // ...
        // 獲取事務型別
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        // 如果未使用事務或者提交事務
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 判斷延遲級別
            if (msg.getDelayTimeLevel() > 0) {
                // 如果超過了最大延遲級別
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                // 獲取RMQ_SYS_SCHEDULE_TOPIC
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 根據延遲級別選取對應的佇列
                int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // 將訊息原本的TOPIC和佇列ID設定到訊息屬性中
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 設定SCHEDULE_TOPIC
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }
        // ...
    }
}

延遲訊息被投遞到延遲佇列中之後,會由定時任務去處理佇列中的訊息,接下來就去看下定時任務的處理過程。

註冊定時任務

Broker啟動的時候會呼叫ScheduleMessageServicestart方法,start方法中為不同的延遲級別建立了對應的定時任務來處理延遲訊息,然後從offsetTable中獲取當前延遲等級對應那個訊息佇列的消費進度,如果未獲取到,則使用0,從佇列的第一條訊息開始處理,然後建立定時任務DeliverDelayedMessageTimerTask,可以看到首次是延遲1000ms執行:

public class ScheduleMessageService extends ConfigManager {
    // 首次執行延遲的時間
    private static final long FIRST_DELAY_TIME = 1000L;
    public void start() {
        if (started.compareAndSet(false, true)) {
            super.load();
            this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
            if (this.enableAsyncDeliver) {
                this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
            }
            // 遍歷所有的延遲級別
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) { // 如果獲取的消費進度為空
                    offset = 0L; // 預設為0,從第一條訊息開始處理
                }
                if (timeDelay != null) {
                    if (this.enableAsyncDeliver) {
                        this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                    }
                    // 為每個延遲級別建立對應的定時任務
                    this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                }
            }
            // ...
        }
    }
}

執行定時任務

DeliverDelayedMessageTimerTaskScheduleMessageService的內部類,它實現了Runnable介面,在run方法中呼叫了executeOnTimeup方法來處理延遲訊息:

public class ScheduleMessageService extends ConfigManager {
    class DeliverDelayedMessageTimerTask implements Runnable {
        @Override
        public void run() {
            try {
                if (isStarted()) {
                    // 執行任務
                    this.executeOnTimeup();
                }
            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
            }
        }
    }
}

executeOnTimeup方法的處理邏輯如下:

  1. 根據主題名稱以及延遲等級獲取ConsumeQueue,如果獲取為空,會重新建立一個任務提交到執行緒池中,延遲時間為DELAY_FOR_A_WHILE,延遲一段時間後重新執行;
  2. 根據當前延遲訊息佇列的消費進度,從ConsumeQueue獲取資料,如果獲取為空,處理同上,重新建立一個任務延遲一段時間之後重新執行;
  3. 因為佇列中的訊息是按寫入順序進行儲存的,所以根據偏移量獲取到的第一條訊息開始,向後處理:
    (1)獲取訊息儲存時間戳
    (2)根據延遲等級和訊息的儲存時間戳計算訊息的到期時間
    (3)獲取當前時間,使用當前時間減去訊息的到期時間
    • 如果值大於0,表示還未到達指定的延遲時間,需要繼續等待,重新建立一個任務延遲一段時間之後重新執行;
    • 如果值小於等於0,表示已經到達了指定的延遲時間,會呼叫messageTimeup對訊息處理,恢復訊息原本的Topic;
  4. 根據是否開啟了非同步來決定同步投遞訊息還是非同步投遞訊息,這一步會將訊息投遞到原本Topic中的訊息佇列,之後與普通訊息的儲存流程一致;
public class ScheduleMessageService extends ConfigManager {
    class DeliverDelayedMessageTimerTask implements Runnable {
        public void executeOnTimeup() {
            // 根據主題名稱以及延遲等級獲取ConsumeQueue
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
            // 如果ConsumeQueue為空,新建定時任務等待下次執行
            if (cq == null) {
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
                return;
            }
            // 根據偏移量從ConsumeQueue獲取資料
            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
            if (bufferCQ == null) {
                // ...

                // 如果獲取為空,新建定時任務等待下次執行
                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
                return;
            }
            long nextOffset = this.offset;
            try {
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                // 開始處理延遲訊息
                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 獲取訊息在CommitLog中的偏移量
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    // 訊息大小
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    // tag雜湊值
                    long tagsCode = bufferCQ.getByteBuffer().getLong();
                    if (cq.isExtAddr(tagsCode)) {
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        } else {
                            //can't find ext content.So re compute tags code.
                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                tagsCode, offsetPy, sizePy);
                            // 獲取訊息儲存時間戳
                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                            // 根據延遲等級和訊息的儲存時間計算訊息的到期時間
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }
                    // 獲取當前時間
                    long now = System.currentTimeMillis();
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    // 計算訊息的到期時間
                    long countdown = deliverTimestamp - now;
                    // 如果大於0,表示還未到達指定的延遲時間,需要繼續等待
                    if (countdown > 0) {
                        // 新建定時任務等待下次執行
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }
                    // 走到這裡,表示已經到了訊息的延遲時間,從CommitLog取出訊息
                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                    if (msgExt == null) {
                        continue;
                    }
                    // 處理訊息,這裡會恢復訊息原本的Topic
                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                            msgInner.getTopic(), msgInner);
                        continue;
                    }

                    boolean deliverSuc;
                    // 投遞訊息到原本的主題中
                    if (ScheduleMessageService.this.enableAsyncDeliver) {
                        // 非同步投遞
                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                    } else {
                        // 同步投遞
                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                    }
                    if (!deliverSuc) {
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }
                }
                // 計算下一條訊息的偏移量
                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
            } catch (Exception e) {
                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
            } finally {
                bufferCQ.release();
            }

            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
        }

    }

    private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setBody(msgExt.getBody()); // 設定訊息體
        msgInner.setFlag(msgExt.getFlag()); // 設定falg
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        // ...
        msgInner.setWaitStoreMsgOK(false);
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        // 恢復原本的Topic
        msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));

        String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
        int queueId = Integer.parseInt(queueIdStr);
        msgInner.setQueueId(queueId);

        return msgInner;
    }
}