MQ系列16:MQ實現訊息過濾處理

2023-10-19 15:01:11

MQ系列1:訊息中介軟體執行原理
MQ系列2:訊息中介軟體的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ訊息的傳送模式
MQ系列6:訊息的消費
MQ系列7:訊息通訊,追求極致效能
MQ系列8:資料儲存,訊息佇列的高可用保障
MQ系列9:高可用架構分析
MQ系列10:如何保證訊息冪等性消費
MQ系列11:如何保證訊息可靠性傳輸
MQ系列12:如何保證訊息順序性
MQ系列13:訊息大量堆積如何為解決
MQ系列14:MQ如何做到訊息延時處理
MQ系列15:MQ實現批次訊息處理

1 背景

訊息佇列作為釋出訂閱模型的訊息中介軟體廣泛應用於上下游業務整合場景。在實際業務場景中,同一個主題下的訊息往往會被多個不同的下游業務方處理,各下游的處理邏輯不同,只需要關注自身邏輯需要的訊息子集。所以在 訊息中心和消費者之間,需要有一種訊息過濾功能,可以幫助消費者更高效地過濾自己需要的訊息集合,避免大量無效訊息投遞給消費者,降低下游系統處理壓力。
Apache RocketMQ 很好的支援了這一能力,它解決了單個業務域即同一個主題內不同訊息子集的過濾問題。

2 關於訊息過濾

2.1 概念

在消費者訂閱了某個主題後,Apache RocketMQ 會將該主題中的所有訊息投遞給消費者。若消費者只需要關注部分訊息,可通過設定過濾條件在 Apache RocketMQ 伺服器端進行過濾,只獲取到需要關注的訊息子集,避免接收到大量無效的訊息。所以,過濾的本質就是將符合條件的訊息投遞給消費者,而不是將匹配到的訊息過濾掉。
Apache RocketMQ 的訊息過濾功能通過生產者和消費者對訊息的屬性、標籤進行定義,並在 Apache RocketMQ 伺服器端根據過濾條件進行篩選匹配,將符合條件的訊息投遞給消費者進行消費。

2.2 訊息過濾說明

2.2.1 原理介紹

備註:圖片直接使用官網提供的

訊息過濾主要通過以下幾個關鍵流程實現:

  • 生產者:生產者在初始化訊息時預先為訊息設定一些屬性和標籤,用於後續消費時指定過濾目標。
  • 消費者:消費者在初始化及後續消費流程中通過呼叫訂閱關係註冊介面,向伺服器端上報需要訂閱指定主題的哪些訊息,即過濾條件。
  • 伺服器端:消費者獲取訊息時會觸發伺服器端的動態過濾計算,Apache RocketMQ 伺服器端根據消費者上報的過濾條件的表示式進行匹配,並將符合條件的訊息投遞給消費者。

2.2.2 訊息過濾分類

Apache RocketMQ 支援Tag標籤過濾和SQL屬性過濾,這兩種過濾方式對比如下:

對比項 Tag標籤過濾 SQL屬性過濾
過濾目標 訊息的Tag標籤。 訊息的屬性,包括使用者自定義屬性以及系統屬性(Tag是一種系統屬性)。
過濾能力 精準匹配。 SQL語法匹配。
適用場景 簡單過濾場景、計算邏輯簡單輕量。 複雜過濾場景、計算邏輯較複雜。

2.3 Tag標籤過濾

Tag標籤過濾方式是 Apache RocketMQ 提供的基礎訊息過濾能力,基於生產者為訊息設定的Tag標籤進行匹配。生產者在傳送訊息時,設定訊息的Tag標籤,消費者需指定已有的Tag標籤來進行匹配訂閱。
Tag標籤設定規則:

  • Tag由生產者傳送訊息時設定,每條訊息允許設定一個Tag標籤。
  • Tag使用可見字元,建議長度不超過128字元。

生產訊息:傳送的時候,需要設定Tag標籤

Message message = messageBuilder.setTopic("topicTest")
//設定訊息索引鍵,可根據關鍵字精確查詢某條訊息
.setKeys("msgKey")
//設定訊息Tag,這樣消費端可以根據Tag過濾訊息
//該語句表示訊息的Tag設定為"TagTest1"
.setTag("TagTest1")
// 設定訊息體
.setBody("hello world!".getBytes())
.build();

訂閱訊息:匹配單個或者多個Tag標籤。

String topic = "topicTest";

//1、第一種情況,只訂閱訊息標籤為"TagTest1"的訊息。
FilterExpression filterExpression = new FilterExpression("TagTest1", FilterExpressionType.TAG);
//2、第二種情況,訂閱訊息標籤為"TagTest1"、"TagTest2"或"TagTest3"的訊息。
FilterExpression filterExpression = new FilterExpression("TagTest1||TagTest2||TagTest3", FilterExpressionType.TAG);

pushConsumer.subscribe(topic, filterExpression);

如上,消費者可以限制接收包含 TagTest1 或 TagTest2 或 TagTest3 的訊息,但是限制是一個訊息只能有一個標籤,這無法應對網際網路複雜的應用場景。在這種情況下,可以在訊息中設定一些屬性,再使用SQL表示式通過篩選屬性來篩選訊息。下面我們看看怎麼實現。

2.4 SQL屬性過濾

SQL屬性過濾是 RocketMQ 提供的高階訊息過濾方式,通過生產者為訊息設定的屬性(Key)及屬性值(Value)進行匹配。生產者在傳送訊息時可設定多個屬性,消費者訂閱時可設定SQL語法的過濾表示式過濾多個屬性。

2.4.1 基本語法

RocketMQ只定義了一些基本語法來支援這個特性。你也可以很容易地擴充套件它。

  • 數值比較,比如:>,>=,<,<=,BETWEEN,=;
  • 字元比較,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 邏輯符號 AND,OR,NOT;

常數支援型別為:

  • 數值,比如:123,3.1415;
  • 字元,比如:‘abc’,必須用單引號包裹起來;
  • NULL,特殊的常數
  • 布林值,TRUE 或 FALSE

只有使用push模式的消費者才能用使用SQL92標準的sql語句,介面如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

2.4.2 訊息生產者

傳送訊息時,設定了訊息Tag標籤並定義了屬性,用於做訊息過濾

Message message = messageBuilder.setTopic("topicTest")
//設定訊息索引鍵,可根據關鍵字精確查詢某條訊息
.setKeys("msgKey")
//設定訊息Tag,這樣消費端可以根據Tag過濾訊息
//該語句表示訊息的Tag設定為"TagTest1"
.setTag("TagTest1")
//訊息也可以設定自定義的分類屬性,比如下面這句話表示為訊息自定義一個屬性,該屬性為性別,屬性值為1(男)或者0(女)。
.addProperty("Sex", 1)
// 設定訊息體
.setBody("hello world!".getBytes())
.build();

2.4.3 訊息消費者

使用過濾表示式進行訊息篩選,如下:

String topic = "topic";

//只訂閱性別為1(男)的訊息。
FilterExpression filterExpression = new FilterExpression("Sex IS NOT NULL AND Sex=1", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);

//只訂閱性別為1(男)且年齡大於18的訊息。
FilterExpression filterExpression = new FilterExpression("Sex IS NOT NULL AND Age IS NOT NULL AND Sex = 1 AND Age > 18", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);

//訂閱所有訊息
FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);

3 總結

Rocket MQ的訊息過濾功能是在伺服器端進行的,可以根據訊息的標籤、屬性等進行過濾。具體來說,Rocket MQ主要支援以下兩種過濾方式:
TAG過濾:TAG是訊息的業務標識,可以通過設定Tag表示式,判斷訊息是否包含指定的Tag,從而進行過濾。這種過濾方式簡單直觀,適用於基於Tag進行訊息分類的場景。
SQL92過濾:可以使用SQL92表示式來靈活地過濾訊息的Tag和屬性。這種方式提供了更強大的過濾能力,可以根據複雜的條件進行訊息篩選。
需要注意的是,Rocket MQ的訊息過濾功能雖然強大,但是也會增加伺服器端的處理負擔。因此,在使用時需要根據實際情況進行權衡,避免過度依賴訊息過濾功能導致系統效能下降。