【定時功能】訊息的定時傳送-基於RocketMQ

2022-07-31 06:01:38

一、功能介紹

  要實現一個訊息的定時傳送功能,也就是讓訊息可以在某一天某一個時間具體節點進行傳送。而我們公司的業務場景是類似簡訊的業務,而且數量不小,使用者會進行號碼、訊息內容、定時傳送時間等資訊的提交。等到了設定的定時時間,則進行訊息的傳送工作。

 

二、思考實現邏輯

  前提準備:

    MySQL

    RocketMQ,最好broker開啟佇列自動建立的設定

 

  剛開始我想的是基於MySQL去實現定時傳送,後來覺得這種掃描方式單執行緒的時候並行能力不夠,多執行緒也得需要做並行控制,還得做一些任務的排程,比如執行緒執行一半卡死或者異常的時候,還得做任務的補償工作。所以,後面我選擇了通過元件的方式實現,這是我想到了訊息佇列的延遲傳送功能,剛好我司系統用到了RocketMQ,我就想通過該元件實現該訊息的定時傳送功能。RocketMQ的檔案預設只儲存72小時,這個要注意一下,有需要調整時間的需要調整一下。我也不建議調整的太大,這樣容易引起系統的資源浪費。

  具體實現邏輯如下:

  1)使用者建立訊息,記錄到資料庫(號碼,內容,定時傳送時間)

  2)如果是是可以立即傳送的訊息,傳送訊息到RocketMQ的立即傳送主題(TOPIC_NOW_SEND)中,後續有執行緒消費該訊息,直接進行資料傳送。

  3)如果是需要定時傳送的訊息,傳送訊息到RocketMQ的延遲傳送主題(TOPIC_DELAY_SEND)中。

  這裡有一些細節設定:

    tags:可以用於同一主題下區分不同型別的訊息。此處可以設定為當前日期(我是用的是YYYYMMDD,例如:20220730),因為設定為需要傳送的當天日期後,程式就可以通過該欄位分辨是否需要處理。後續消費者就可以通過當天日期篩選需要當天傳送的訊息,過濾掉不是當天需要傳送的訊息。

    keys:索引,可以用於查詢訊息。我們設定的格式可以是【訂單號+訊息定時傳送的時間戳(可以是毫秒級)】,例如:FS20220730123456+16591914490001

  4)啟動兩個(或者多個,這個和檔案儲存的時間有關係)消費者進行輪換,這裡我稱之為masterConsumer(啟動時負責消費前一天的資料)和slaveConsumer(啟動時負責消費當天的資料)。這裡我設計的初衷是每天有自己的消費者,例如20220730消費tags=20220730的資料,20220731消費tags=20220731的資料。這裡需要消費前一天的資料原因是,需要防止出現晚上23:59分提交訊息的時候,當天沒處理完。所以,需要在第二天進行額外的補償操作。

  程式啟動第1天(例如當天是20220730):

    masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND  消費tags=20220729
    slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND  消費tags=20220730

  程式啟動第2天(例如當天是20220731):
    slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND  消費tags=20220730
    masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND  消費tags=20220731

  程式啟動第3天(例如當天是20220801):
    masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND  消費tags=20220731
    slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND  消費tags=20220801

  程式啟動第4天(例如當天是20220802):
    slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND  消費tags=20220801
    masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND  消費tags=20220802

  ......

  程式的masterConsumer和slaveConsumer就這樣持續輪換消費資料。

  5)消費者的消費資料邏輯:

  消費者需要設定consumeFromWhere引數=CONSUME_FROM_FIRST_OFFSET(預設是ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,從最新的位置開始消費),這樣資料才可以第一次啟動從佇列的最前位置開始消費。

  消費到資料之後,獲取keys的內容,根據之前傳送的設定獲取訊息定時傳送的時間戳,然後和當前的時間進行比對。這裡要比對的主要原因是當前 RocketMQ 不支援任意時間的延遲。 生產者傳送延遲訊息前需要設定幾個固定的延遲級別,分別對應1s到2h的1到18個延遲級,訊息消費失敗會進入延遲訊息佇列,訊息傳送時間與設定的延遲級別和重試次數有關。

  當前支援的訊息延遲級別有:

    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

  如果比對時間<=1秒,則直接傳送到RocketMQ的立即傳送主題(TOPIC_NOW_SEND)中,後續有執行緒進行消費傳送;否則,傳送時間根據RocketMQhi吃的延遲級別進行選擇,按照最大可支援的時間為準,等待後續的訊息重新消費,直到最終訊息符合條件,可以投遞到RocketMQ的立即傳送主題(TOPIC_NOW_SEND)為止。例如:如果比對時間>2h,則重新投遞到延遲傳送主題(TOPIC_DELAY_SEND)中,延遲傳送的等級為18(也就是2h)。如果比對時間>2m,比對時間<3m,則重新投遞到延遲傳送主題(TOPIC_DELAY_SEND)中,延遲傳送的等級為6(也就是2m)。

  6)在第4步驟中的masterConsumer和slaveConsumer的輪換邏輯支援,即在過了0點之後,前一天的consumer需要進行重置。

  

  以上就是我的實現邏輯。

 

三、敲程式碼

  1、定義延遲級別與時間的對應關係

package cn.lxw.mq.constant;

import java.util.Arrays;

public enum MessageDelayLevel {
    SECOND_1(1, 1 * 1000L),
    SECOND_5(2, 5 * 1000L),
    SECOND_10(3, 10 * 1000L),
    SECOND_30(4, 30 * 1000L),
    MINUTE_1(5, 1 * 60 * 1000L),
    MINUTE_2(6, 2 * 60 * 1000L),
    MINUTE_3(7, 3 * 60 * 1000L),
    MINUTE_4(8, 4 * 60 * 1000L),
    MINUTE_5(9, 5 * 60 * 1000L),
    MINUTE_6(10, 6 * 60 * 1000L),
    MINUTE_7(11, 7 * 60 * 1000L),
    MINUTE_8(12, 8 * 60 * 1000L),
    MINUTE_9(13, 9 * 60 * 1000L),
    MINUTE_10(14, 10 * 60 * 1000L),
    MINUTE_20(15, 20 * 60 * 1000L),
    MINUTE_30(16, 30 * 60 * 1000L),
    HOUR_1(17, 1 * 60 * 60 * 1000L),
    HOUR_2(18, 2 * 60 * 60 * 1000L),
    ;
    //    1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    private Integer level;
    private Long mills;

    MessageDelayLevel(Integer level, Long mills) {
        this.level = level;
        this.mills = mills;
    }

    public Integer getLevel() {
        return level;
    }

    public void setLevel(Integer level) {
        this.level = level;
    }

    public Long getMills() {
        return mills;
    }

    public void setMills(Long mills) {
        this.mills = mills;
    }

    /**
     * 功能描述: <br>
     * 〈計算需等待的時間,最長2小時〉
     * @Param: [timeMills]
     * @Return: {@link MessageDelayLevel}
     * @Author: luoxw
     * @Date: 2022/7/26 16:33
     */
    public static MessageDelayLevel getMaxLevel(long timeMills){
        long millsBwt = timeMills - System.currentTimeMillis();
        if(millsBwt < 1000L){
            millsBwt = 1000L;
        }
        final long paramMills = millsBwt;
        return Arrays.asList(MessageDelayLevel.values()).stream().filter(p -> p.mills.compareTo(paramMills) <= 0).max((c1,c2) -> c1.getLevel().compareTo(c2.getLevel())).orElse(HOUR_2);
    }

    public static void main(String[] args) {
        getMaxLevel(2 * 60 * 60 * 1000L);
        getMaxLevel(2 * 60 * 60 * 1000L + 1);
        getMaxLevel(2 * 60 * 60 * 1000L - 1);
        getMaxLevel( 6 * 1000L);
        getMaxLevel( 5 * 1000L);
        getMaxLevel( System.currentTimeMillis());
    }
}

  2、延遲訊息消費邏輯

package cn.lxw.task;

import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.MessageExt;
import cn.lxw.Consumer;
import cn.lxw.DateUtil;
import cn.lxw.constant.LogConst;
import cn.lxw.context.AppEnvContext;
import cn.lxw.enums.EnumMQTopic;
import cn.lxw.mq.constant.MessageDelayLevel;
import cn.lxw.util.ProducerUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class Send_DelayMessageTask implements Runnable {
    private DefaultMQProducer defaultSendProducer;
    private DefaultMQProducer defaultDelayProducer;


    private static volatile String currentDate = DateUtil.operateDate(new Date(), 0);
    private DefaultMQPushConsumer masterDelayConsumer;
    private static volatile String lastDate = DateUtil.operateDate(new Date(), -1);
    private DefaultMQPushConsumer slaveDelayConsumer;
    // 2個消費者的時候,奇數更新master消費者,偶數更新slave消費者
    private static volatile AtomicInteger closeCnt = new AtomicInteger();

  
    public Send_DelayMessageTask() {
        try {
            String rocketMQAddr = AppEnvContext.getPropValue("rocketmq.address");
            defaultSendProducer = ProducerUtil.init(rocketMQAddr, "PRODUCER_TOPIC_NOW_SEND");
            defaultDelayProducer = ProducerUtil.init(rocketMQAddr, "PRODUCER_TOPIC_DELAY_SEND");
            // 主從消費者交替進行資料消費。
            // 初始化的時候,主消費者消費昨天(1月1日)的資料,從消費者消費今天(1月2日)的資料。
            int consumeThreadMin = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMin", 10);
            int consumeThreadMax = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMax", 10);
            int consumeMessageBatchMaxSize = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeMessageBatchMaxSize", 10);
            masterDelayConsumer = Consumer.getInstance(
                    "TOPIC_DELAY_SEND",
                    "MASTER_CONSUMER_TOPIC_DELAY_SEND",
                    rocketMQAddr,
                    lastDate,
                    consumeThreadMin,
                    consumeThreadMax,
                    consumeMessageBatchMaxSize
            );
            slaveDelayConsumer = Consumer.getInstance(
                    "TOPIC_DELAY_SEND",
                    "SLAVE_CONSUMER_TOPIC_DELAY_SEND",
                    rocketMQAddr,
                    currentDate,
                    consumeThreadMin,
                    consumeThreadMax,
                    consumeMessageBatchMaxSize
            );
        } catch (Exception e) {
            log.error(LogConst.PREFIX + "初始化異常", e);
        }
    }

    // 消費者重啟
    private void restartConsumer(DefaultMQPushConsumer consumer, String consumerName) throws Exception {
        String rocketMQAddr = AppEnvContext.getPropValue("rocketmq.address");
        consumer.shutdown();
        int consumeThreadMin = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMin", 10);
        int consumeThreadMax = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMax", 10);
        int consumeMessageBatchMaxSize = AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeMessageBatchMaxSize", 10);
        consumer = Consumer.getInstance(
                "TOPIC_DELAY_SEND",
                consumerName,
                rocketMQAddr,
                currentDate,
                consumeThreadMin,
                consumeThreadMax,
                consumeMessageBatchMaxSize
        );
        MessageListenerConcurrently listener = getDelayListener();
        consumer.registerMessageListener(listener);
        consumer.start();
        closeCnt.incrementAndGet();
    }

    // 重新初始化消費者
    private synchronized void reInitComsumer(){
        try {
            String thisDate = DateUtil.operateDate(new Date(), 0);
            int lastCloseCnt = closeCnt.get();
            // 初始化的時候不執行 and 時間沒跨度的時候不執行,只有當第二天的時候才執行資料
            if(lastCloseCnt > 0 && !thisDate.equals(currentDate)){
                // 獲取今天的時間
                currentDate = thisDate;
                // 如果時間對2取餘的值為1,則更新主消費者,之前的昨天(1月1日)更新為第二天(1月3日)的資料
                if(closeCnt.get() % 2 == 1) {
                    restartConsumer(masterDelayConsumer, "MASTER_CONSUMER_TOPIC_DELAY_SEND");
                }
                // 如果時間對2取餘的值為0,則更新從消費者,之前的今天(1月2日)更新為第三天(1月4日)的資料
                if(closeCnt.get() % 2 == 0){
                    restartConsumer(slaveDelayConsumer, "SLAVE_CONSUMER_TOPIC_DELAY_SEND");
                }
            }
        } catch (Exception e) {
            log.error(LogConst.PREFIX + "消費者重新初始化異常", e);
        }
    }
    
    // 消費邏輯
    private MessageListenerConcurrently getDelayListener(){
        MessageListenerConcurrently listener = (List<MessageExt> list, ConsumeConcurrentlyContext context) -> {
            for (MessageExt msg : list) {
                String msgBody = null;
                try {
                    msgBody = new String(msg.getBody());
                    String msgKeys = msg.getKeys();
                    // 訂單號+傳送時間戳
                    String[] split = msgKeys.split("\\+");
                    if(split.length == 2) {
                        String delaySendTimeStr = split[1];
                        long time = DateUtil.parseStr2TimeMills(delaySendTimeStr);
                        // 時間比對,獲取最大支援的延遲級別
                        MessageDelayLevel maxLevel = MessageDelayLevel.getMaxLevel(time);
                        Integer level = maxLevel.getLevel();
                        // 如果延遲級別=1,也就是小於等於1秒的時候,直接傳送到立即傳送主題(TOPIC_NOW_SEND)
                        if (MessageDelayLevel.SECOND_1.getLevel().equals(level)) {
                            SendResult sendResult = ProducerUtil.syncSend(
                                    defaultSendProducer,
                                    "TOPIC_NOW_SEND",
                                    msg.getTags(),
                                    msg.getKeys(),
                                    msgBody
                            );
                            log.info(LogConst.PREFIX + "定時訊息[{}]傳送到[立即傳送主題]結果:{}", msgBody, sendResult);
                        } else {
                            SendResult sendResult = ProducerUtil.syncSendDelay(
                                    defaultDelayProducer,
                                    "TOPIC_DELAY_SEND",
                                    msg.getTags(),
                                    msg.getKeys(),
                                    level,
                                    msgBody
                            );
                            log.info(LogConst.PREFIX + "定時訊息[{}]傳送到[延遲傳送主題]結果:{}", msgBody, sendResult);
                        }
                    }
                } catch (Exception ex) {
                    log.error(LogConst.PREFIX + "定時訊息[{}]傳送異常", msgBody, ex);
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        };
        return listener;
    }

    @Override
    public void run() {
        try {
            MessageListenerConcurrently masterListener = getDelayListener();
            masterDelayConsumer.registerMessageListener(masterListener);
            MessageListenerConcurrently slaveListener = getDelayListener();
            slaveDelayConsumer.registerMessageListener(slaveListener);
            // 啟動消費者
            masterDelayConsumer.start();
            slaveDelayConsumer.start();
        }catch (Exception ex){
            log.error(LogConst.PREFIX + "定時訊息消費程式啟動異常", ex);
        }

        while (true){
            // 一分鐘檢查一次是否需要輪換消費者
            ThreadUtil.sleep(60 * 1000);
            reInitComsumer();
        }
    }
}

  3、其餘的邏輯就不展示了,可以根據自己實際業務情況進行處理。

 

四、總結

  這是我實現的一個方式,其實是不太完美的,因為量大的時候效能也是有問題的,因為根據RocketMQ檔案儲存的時間(預設72小時)因素決定。如果儲存的時間太長,訊息太多,而我們的程式第一次啟動從佇列的最前位置開始消費,此時的訊息資料是巨大的。不管是對RocketMQ元件,還是對我們程式都是一個壓力,而且目前的訊息過濾功能只能在使用者端進行處理。所以,這裡我建議的是一天一個主題,以定時傳送的當天進行傳送。例如:20220730需要傳送的資料用TOPIC_DELAY_SEND_20220730主題;20220731需要傳送的資料用TOPIC_DELAY_SEND_20220731主題。這樣的話,資料消費的時候也不需要通過tags進行過濾了,只需要通過keys的時間戳資料進行判斷,消費邏輯大致和文中寫法一樣,只需要調整延遲傳送的主題名為【TOPIC_DELAY_SEND_YYYYMMDD】,傳送和輪換的延遲傳送主題也需要根據傳送時間進行調整。

  以上思路僅供各位參考使用,有更好的實現方式可以在下方進行留言。謝謝大家的觀看!衷心感謝!