MQ系列1:訊息中介軟體執行原理
MQ系列2:訊息中介軟體的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ訊息的傳送模式
MQ系列6:訊息的消費
MQ系列7:訊息通訊,追求極致效能
MQ系列8:資料儲存,訊息佇列的高可用保障
MQ系列9:高可用架構分析
MQ系列10:如何保證訊息冪等性消費
MQ系列11:如何保證訊息可靠性傳輸
MQ系列12:如何保證訊息順序性
MQ系列13:訊息大量堆積如何為解決
MQ系列14:MQ如何做到訊息延時處理
在網際網路業務的實際應用場景中,訊息的批次處理是非常必要的,因為我們時刻面臨著大量資料的並行執行。
例如,我們在一個業務互動的時候會有大量的分支行為需要非同步去處理,但是這些動作又是在不同的業務粒度上的,所以我們需要多次呼叫MQ寫入訊息,可能有多次的連線和訊息傳送。
這個寫MySQL資料庫是一樣的,多次建連和寫入,跟一次建連和批次資料庫,效能是完全不能比的。
所以我們需要有MQ有批次訊息的能力來對我們的業務資料進行快速處理。
Rocket MQ的批次訊息,可以提高訊息的吞吐能力和處理效率,降低下游系統的API呼叫頻率,同時對訊息服務的穩定性也有幫助。
Rocket MQ提供了批次傳送訊息的功能,可以通過呼叫DefaultMQProducer的send()方法,將多條訊息以列表的形式傳送給指定的topic。
以下是一個簡單的範例程式碼:
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName_1");
String topic = "BatchSendTest_1";
producer.start();
List<Message> msgs = new ArrayList<>();
msgs.add(new Message(topic, "Tag1", "OrderID-063105013", "Hello world".getBytes()));
msgs.add(new Message(topic, "Tag1", "OrderID-063105014", "Brand".getBytes()));
msgs.add(new Message(topic, "Tag1", "OrderID-063105015", "handsome boy ".getBytes()));
try {
producer.send(msgs);
} catch (Exception e) {
e.printStackTrace();
// 處理異常
}
finally {
// 如果不再傳送訊息,關閉生產者Producer
producer.shutdown();
}
在以上範例程式碼中,建立了一個DefaultMQProducer範例,並呼叫其start()方法啟動生產者。
然後構造了一個包含三條訊息的列表,通過呼叫producer的send()方法將列表中的訊息傳送給指定的topic。
如果訊息的總長度可能大於1MB時,這時候最好把訊息進行分割,參考下面的程式碼:
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
//it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
// handle the error
}
}
可以看出來,Rocket MQ的批次訊息可以提高訊息的吞吐能力和處理效率,降低下游系統的API呼叫頻率,是一種優化訊息傳輸和處理的有效手段。