MQ系列15:MQ實現批次訊息處理

2023-10-17 15:02:33

MQ系列1:訊息中介軟體執行原理
MQ系列2:訊息中介軟體的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ訊息的傳送模式
MQ系列6:訊息的消費
MQ系列7:訊息通訊,追求極致效能
MQ系列8:資料儲存,訊息佇列的高可用保障
MQ系列9:高可用架構分析
MQ系列10:如何保證訊息冪等性消費
MQ系列11:如何保證訊息可靠性傳輸
MQ系列12:如何保證訊息順序性
MQ系列13:訊息大量堆積如何為解決
MQ系列14:MQ如何做到訊息延時處理

1 背景

在網際網路業務的實際應用場景中,訊息的批次處理是非常必要的,因為我們時刻面臨著大量資料的並行執行。
例如,我們在一個業務互動的時候會有大量的分支行為需要非同步去處理,但是這些動作又是在不同的業務粒度上的,所以我們需要多次呼叫MQ寫入訊息,可能有多次的連線和訊息傳送。
這個寫MySQL資料庫是一樣的,多次建連和寫入,跟一次建連和批次資料庫,效能是完全不能比的。
所以我們需要有MQ有批次訊息的能力來對我們的業務資料進行快速處理。

2 批次訊息實現過程

Rocket MQ的批次訊息,可以提高訊息的吞吐能力和處理效率,降低下游系統的API呼叫頻率,同時對訊息服務的穩定性也有幫助。

2.1 批次訊息的特點

  • 批次訊息具有相同的topic。
  • 批次訊息具有相同的waitStoreMsgOK屬性。
  • 批次訊息不支援延遲訊息。
  • 批次訊息的大小不超過4M(4.4版本之後要求不超過1M)。

2.2 批次訊息的使用場景

  • 訊息的吞吐能力和處理效率:通過將多條訊息打包成一批進行傳送,可以減少網路傳輸開銷和訊息處理的時間,從而提高整體的訊息處理效率。
  • 下游系統的API呼叫頻率:通過將多條訊息合併成一條批次訊息進行傳送,可以減少下游系統接收和處理訊息的次數,從而降低API呼叫頻率,減輕下游系統的負載壓力。

2.3 批次訊息的傳送範例

Rocket MQ提供了批次傳送訊息的功能,可以通過呼叫DefaultMQProducer的send()方法,將多條訊息以列表的形式傳送給指定的topic。
以下是一個簡單的範例程式碼:

DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName_1");
String topic = "BatchSendTest_1";
producer.start();  
List<Message> msgs = new ArrayList<>();  
msgs.add(new Message(topic, "Tag1", "OrderID-063105013", "Hello world".getBytes()));  
msgs.add(new Message(topic, "Tag1", "OrderID-063105014", "Brand".getBytes()));  
msgs.add(new Message(topic, "Tag1", "OrderID-063105015", "handsome boy ".getBytes()));  
try {
   producer.send(msgs);
} catch (Exception e) {
   e.printStackTrace();
   // 處理異常
}
finally { 
  // 如果不再傳送訊息,關閉生產者Producer
  producer.shutdown();
}

在以上範例程式碼中,建立了一個DefaultMQProducer範例,並呼叫其start()方法啟動生產者。
然後構造了一個包含三條訊息的列表,通過呼叫producer的send()方法將列表中的訊息傳送給指定的topic。
如果訊息的總長度可能大於1MB時,這時候最好把訊息進行分割,參考下面的程式碼:

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                   //if the next sublist has no element, add this one and then break, otherwise just break
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
    
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       // handle the error
   }
}

可以看出來,Rocket MQ的批次訊息可以提高訊息的吞吐能力和處理效率,降低下游系統的API呼叫頻率,是一種優化訊息傳輸和處理的有效手段。

3 總結

  • 對於同型別、同特徵的訊息,可以聚合進行批次傳送,減少MQ的連線傳送次數,能夠顯著提升效能。
  • 批次傳送訊息須有相同的topic,相同的waitStoreMsgOK,且不能是延時訊息。