MQ系列1:訊息中介軟體執行原理
MQ系列2:訊息中介軟體的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ訊息的傳送模式
在之前的文章中,我們學習了RocketMQ的原理;RocketMQ中 命名服務 ServiceName 的執行流程;以及訊息生產、傳送的原理和模式。這一篇,就讓我們從訊息消費的角度去進一步的學習。
訊息的消費主要是由如下幾個核心能力組成的:
RocketMQ訊息訂閱有方式:
RocketMQ 目前支援叢集模式和廣播消費模式,其中叢集模式使用範圍比較大,即對等,訊息消費了即完成。
RocketMQ使用者端消費資料之後,需要向Broker反饋訊息的消費進度,Broker獲取到消費進度記錄下來。這樣保證 佇列rebalance和使用者端消費者重啟動的時候,可以獲取到準確的消費進度。
訊息消費以及進度反饋的主步驟如下:
綜合上面的內容,需要注意的點如下:
可以在DefaultMQPushConsumer 物件中設定各種屬性來對消費流量進行控制:
PullInterval: 設定消費端拉取MQ訊息的間隔時間。間隔時間是按照上次消費完成之後(比如rocketMQ收到Ack回覆訊息之後)。
PullInterval=20s,比如上次rocketMq服務收到Ack訊息是12:15:15,則 12:15:35再去拉訊息。
PullBatchSize: 消費端每個佇列一次拉取多少個訊息,若該消費端分賠了N個監控佇列,每次拉取M個,那麼消費端每次去rocketMq拉取的訊息為N * M。
消費端每次pull到訊息總數=PullBatchSize * 監聽佇列數,如 PullBatchSize = 2, 監聽佇列=5,則 訊息總數量 = 2 * 5 = 10。
ThreadMin和ThreadMax: 消費端消費pull到的訊息需要的執行緒數量。
RocketMq 邏輯消費佇列數量的設定
rocketMq 可以設定消費佇列,如 queue Read1 ,queue Read2,設定數量決定每次pull到的訊息總數。Rocket MQ 提供了讀寫佇列數量的設定。
消費端節點部署數量
多節點消費端執行緒數量要比單節點消費執行緒數量多,理論上消費速度大於單節點,分治思維。
在過濾訊息的時候,標籤模式簡單而是用,可以篩選出你需要的資料。如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupTest");
consumer.subscribe("testTopic", MessageSelector.byTag("Tag1 || Tag2 || Tag3").bySql("sex = 'male' AND name = 'brand'));
這種情況下,訊息中帶有 Tag1 、Tag2、Tag3 標籤就會被過濾出來,但是單個消限制息只能有一個標籤,這就遠遠滿足不了各種複雜的並交集場景的需要了。
這時候Rocket MQ可以在訊息中設定一些屬性,再使用SQL表示式篩選屬性來過濾出需要的資料。 如下
------------
| message |
|----------| sex = male AND name = 'brand' , Gotten
| name = 'brand' |
| sex = 'male'|
| age = 21|
------------
------------
| message |
|----------| sex = male AND name = 'brand', Gotten , Missed
| name = 'Anny' |
| sex = 'female'|
| age = 20 |
------------
提高消費並行度
在同一個ConsumerGroup下(Clustering方式),可以通過增加Consumer範例的數量來提高並行度。
通過加機器,或者在已有機器中啟動多個Consumer程序都可以增加Consumer範例數。
注意:總的Consumer數量不要超過Topic下Read Queue數量,超過的Consumer範例接收不到訊息。
此外,通過提高單個Consumer範例中的並行處理的執行緒數,可以在同一個Consumer內增加並行度來提高吞吐量(設定方法是修改consumeThreadMin和consumeThreadMax)。
以批次方式進行消費
某些業務場景下,多條訊息同時處理的時間會大大小於逐個處理的時間總和,比如消費訊息中涉及update某個資料庫,一次update10條的時間會大大小於十次update1條資料的時間。
可以通過批次方式消費來提高消費的吞吐量。實現方法是設定Consumer的consumeMessageBatchMaxSize這個引數,預設是1,如果設定為N,在訊息多的時候每次收到的是個長度為N的訊息連結串列。
檢測延時情況,跳過非重要訊息
Consumer在消費的過程中,如果發現由於某種原因發生嚴重的訊息堆積,短時間無法消除堆積,這個時候可以選擇丟棄不重要的訊息,使Consumer儘快追上Producer的進度。
消費者的基本實現,連線 NameServer的地址,指定Topic和Tag,讀取到需要消費的資料,然後輪詢並處理。
public class SimpleConsumerApplication {
public static void main(String[] args) throws MQClientException {
// 1.建立消費者Consumer,並指定消費者組名為 testConsumGroup
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
// 2.指定NameServer的地址,以獲取Broker路由地址
consumer.setNamesrvAddr("192.168.139.1:9876");
// 3.指定Topic和Tag 資訊。* 代表所有
consumer.subscribe("testTopic", "*");
// 4.設定回撥函數,用來處理讀取到的訊息, MessageListenerOrderly 用單個執行緒處理處理佇列的資料
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
for (MessageExt msg : msgList) {
System.out.println("執行緒 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
// Todo,具體的業務邏輯
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5.消費者開始執行消費任務
consumer.start();
}
}
相比與基本消費,多了一個 ConsumeFromWhere的設定。代表消費者從哪個位置開始消費,列舉如下:
public class SimpleOrderApplication {
public static void main(String[] args) throws MQClientException {
// 1.建立消費者Consumer,並指定消費者組名為 testConsumGroup
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
// 2.指定NameServer的地址,以獲取Broker路由地址
consumer.setNamesrvAddr("192.168.139.1:9876");
/**
* 設定Consumer第一次啟動是從佇列頭部、佇列尾部、還是指定時間戳節點開始消費
* 非第一次啟動接著上次消費的進度繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 3.指定Topic和Tag 資訊。* 代表所有
consumer.subscribe("testTopic", "*");
// 4.設定回撥函數,用來處理讀取到的訊息, MessageListenerOrderly 用單個執行緒處理處理佇列的資料
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
for (MessageExt msg : msgList) {
System.out.println("執行緒 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
// Todo,具體的業務邏輯
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5.消費者開始執行消費任務
consumer.start();
}
}
可以使用MessageSelector.byTag來進行標籤篩選;或者使用MessageSelector.bySql 來進行訊息屬性篩選;或者混合使用。
參考下面程式碼,註釋說明的比較清楚。
public class FilterConsumerApplication {
public static void main(String[] args) throws MQClientException {
// 1.建立消費者Consumer,並指定消費者組名為 testConsumGroup
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
// 2.指定NameServer的地址,以獲取Broker路由地址
consumer.setNamesrvAddr("192.168.139.1:9876");
// 3.指定Topic和Tag 資訊。只有訂閱的訊息有 sex 和 name 屬性, 並且年齡為 18 歲以上的男性
// consumer.subscribe("testTopic", MessageSelector.byTag("userTag1 || userTag2"));
consumer.subscribe("testTopic", MessageSelector.bySql("sex = 'male' AND age > 18"));
// 4.設定回撥函數,用來處理讀取到的訊息, MessageListenerOrderly 用單個執行緒處理處理佇列的資料
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
for (MessageExt msg : msgList) {
System.out.println("執行緒 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
// Todo,具體的業務邏輯
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5.消費者開始執行消費任務
consumer.start();
}
}