我攤牌了!真正的灰度佇列實現方案!全網你都搜不到!

2022-10-25 15:01:49

背景

目前,公司方面 RPC 呼叫如 Dubbo、Feign 已經能支援基於灰度的呼叫,但是 MQ 還沒有支援灰度的能力,因此導致在測試和生產環境業務驗證、訊息隔離方面體驗比較差,因此我們基於 RabbitMQ 和 Kafka 實現了訊息灰度的能力。

灰度場景

大部分場景下 MQ 的灰度並不會像 RPC 那樣那麼嚴格,但是我們需要確認消費場景,即當灰度消費者不存在的情況下,訊息是否應該由正常消費者去消費。

1. 灰度訊息只由灰度節點消費

事實的情況是可能大家都想要這種嚴格意義上的訊息灰度隔離策略,由此才證明是真正的訊息灰度方案,但是這個方案需要考慮一些具體場景問題。

比如,有時候作為灰度節點的傳送方,它的功能改動點並不是在 MQ 這塊,但是它傳送的訊息卻是灰度訊息,而訊息的消費方可能也未發生過功能變動,也不會有與之對應的灰度消費標識,這種情況下如果我們將灰度的訊息進行丟棄的話,那麼會造成最終的資料不完整。

2. 灰度訊息可以由正常節點消費

因此,我們再考慮第二種方案,如果當灰度消費節點不存在時,訊息會由正常節點消費,當存在灰度節點時,則由灰度節點消費,正常節點消費灰度訊息只為了當灰度節點不存在時的兜底。

那麼,這種場景仍然可能存在問題,比如當消費節點的消費邏輯發生改變時,由正常節點消費就可能造成業務上的錯誤。對於此問題我們可以預設認為如果消費方發生邏輯改變,則灰度節點大概率一定是存在的,如果一些異常情況導致的異常或者宕機的場景,仍然能通過人工或者告警判斷出來,總之,這個問題認為不算是問題。

灰度方案

我們分別從 MQ 的自身特性和一些通用的處理方式出發,分別探討 RabbitMQ 和 Kafka 的灰度實現方式。

常規方案:影子Queue/Topic

這個是現在實現 MQ 灰度最為常見的方案,為每一個Queue/Topic都建立一個與之對應的灰度Queue/Topic。

生產者層將要傳送的訊息進行Queue/Topic/RoutingKey的動態修改,讓他傳送到灰度或正常的Queue/Topic中。

而消費者層面只需要在應用啟動時根據自身的灰度標記動態的切換到灰度Queue/Topic進行監聽即可。

但是對於我們目前的系統現狀而言,這個方案存在三個問題:

首先,由於我們目前系統測試環境的灰度標籤是可以客製化的,可能每一個功能上線都會有一個對應的灰度標識,這樣帶來的問題就是Queue/Topic的數量會隨著灰度標識的增加而倍數性的增加。

而不管哪種MQ,過多的Queue/Topic都會對 MQ 本身造成一定處理能力下降。

另外,我們的灰度標籤是可以根據啟動的範例隨意修改的,也就意味著對應的整套Queue/Topic也得跟著灰度的標識隨意的建立。這樣一來,人工手動跟著建立顯然就不太現實,而生產環境中我們的Queue/Topic建立是需要走流程申請的,這又和我們的現狀違背。

再者,即便我們能夠根據生產者的灰度標識動態的建立Queue/Topic的話,那麼至少也需要考慮在灰度生產者範例正常下線時將它建立的Queue/Topic進行銷燬,如果異常的下線還需要人工的接入定期的進行Queue/Topic的清理工作。

最後,如果是針對 Kafka 或 RocketMQ,這種方案實行起來還比較簡單,如果是對於RabbitMQ,這裡又多了一層 Exchange 和 Queue 的繫結關係,不同的生產模式也需要去做各自的適配。

所以,為了在 RabbitMQ 和 Kafka 之間的一致性,我們決定不採用該方案來實現。

RabbitMQ

對於 RabbitMq,我們使用重新入隊這個特性來實現灰度佇列。

通過重新入隊的這個特性,我們可以在生產者傳送訊息時將灰度的標識標記到訊息頭,傳送時一併行出。

當消費者消費訊息時,根據消費者自身標記決定要不要對訊息進行消費,如果消費者本身不滿足灰度消費規則,則把這條灰度訊息進行Requeue處理。

這條訊息經過輪詢,最終會流轉到灰度標識的消費者進行消費。

實現思路

  1. 生產者在傳送訊息之前獲取到當前範例的灰度標記,對訊息 Header 新增灰度標記
  2. 對消費者新增監聽器,灰度節點消費根據灰度標記判斷對灰度訊息的消費,正常節點根據開關決定是否消費或者進行 Requeue

生產流程

生產者在啟動時,我們通過自動裝配,註冊 RabbitTemplate 時setBeforePublishPostProcessors新增前置處理器,在傳送訊息前對訊息的 Header 新增灰度標記。

消費流程

首先,在消費時通過監聽SimpleMessageListenerContainer重寫executeListener方法進行訊息處理。

  1. 當灰度開關未開啟,執行正常消費邏輯。
  2. 當灰度機器直接匹配到灰度訊息時,那麼直接消費即可。
  3. 通過監聽 Eureka 本地快取重新整理的事件不停地重新整理灰度範例的快取,當正常節點消費灰度訊息時,如果灰度範例不存在就可以直接消費。
  4. 如果存在灰度範例且正常節點消費到灰度訊息,考慮兩種可能,第一是正常的輪詢到正常節點,第二是灰度節點prefetch_count達到閾值,阻塞佇列已滿,灰度訊息在正常節點之間不停地輪詢。為了解決第二個場景,新增了一層布隆過濾器,當再次匹配到同樣的訊息時,當前節點將休眠一段短暫的時間。
  5. 上述場景都未匹配到,那麼執行 Requeue 操作。

Kafka

在 Kafka 的消費理念中有一層消費者組的概念,每個消費者都有一個對應的消費組。

當訊息釋出到主題後,只會被投遞給訂閱它的每個消費組中的一個消費者,兩個消費組之間互不影響。

藉助這個消費特性,可以將同一個消費組中的灰度消費者單獨拎出來,做成一個特殊的消費組,這樣每個消費組都會接收到同樣的訊息。

在正常的消費組中,遇到帶有灰度標識的訊息,我們只做空消費,不實際執行業務邏輯,在灰度消費組中的消費者,只處理匹配到灰度規則的訊息,其它的訊息做空消費。

實現思路

  1. 生產者生產灰度訊息的時候在訊息 Header 裡面新增灰度標記

  2. 灰度消費者和正常消費者設定不同的GroupId

  3. 灰度消費者和正常消費者在拿到訊息後判斷有沒有灰度標記,判斷設定中心是否開啟了訊息灰度,如果開啟了則進行灰度節點的消費,如果沒開啟則不消費

生產流程

生產者在啟動的時候會去動態裝配所有的攔截器,裝配的方式為在 BeanPostProcessor 的後置處理器中獲取到 KafkaTemplate 物件,把我們的攔截器的類的全限定名 set 進去 config 即可,這裡可以支援不管使用者自己建立的 Factory物件還是 KafkaTemplate 物件都能進行攔截器的裝配。

消費流程

消費的時候也是一樣,如果當前節點是灰度節點,那麼就修改當前group.id為灰度,最後通過攔截器執行消費邏輯。