大家好,我是三友~~
在眾多關於MQ的面試八股文中有這麼一道題,「如何保證MQ訊息消費的冪等性」。
為什麼需要保證冪等性呢?是因為訊息會重複消費。
為什麼訊息會重複消費?
明明已經消費了,為什麼訊息會被再次被消費呢?
不同的MQ產生的原因可能不一樣
本文就以RocketMQ為例,來扒一扒RocketMQ中會導致訊息重複訊息的原因,最終你會發現,其實訊息重複消費算是RocketMQ無奈的「bug」。
微信公眾號:三友的java日記
如果有對RocketMQ不熟悉的小夥伴,可以看看我之前寫的 RocketMQ保姆級教學 和 RocketMQ訊息短暫而又精彩的一生 這兩篇文章。
首先,我們來瞅瞅RocketMQ傳送訊息和消費訊息的基本原理。
如圖,簡單說一下上圖中的概念:
所以,整個訊息傳送和消費過程大致如下:
在正常情況下,生產者的確是按照這個方式來傳送訊息的
但是當出現了異常時,這種異常包括訊息傳送超時、響應超時等等,RocketMQ為了保證訊息成功傳送,會進行訊息傳送的重試操作,預設情況下會最多會重試兩次
重試操作比較簡單,就是選擇另一臺機器的Queue來傳送。
雖然重試操作可以很大程度保證訊息能夠傳送成功,但是同時也會帶來訊息重複傳送的問題。
舉個例子,假設生產者向A機器傳送訊息,發生了異常,響應超時了,但是就一定代表訊息沒發成功麼?
不一定,有可能會出現伺服器端的確接受到並處理了訊息,但是由於網路波動等等,導致生產者接收不到伺服器端響應的情況,此時訊息處理成功了,但是生成者還是以為發生了異常
此時如果發生重試操作,那麼勢必會導致訊息被傳送了兩次甚至更多次,導致伺服器端存了多條相同的訊息,那麼就一定會導致消費者重複消費訊息。
在RocketMQ的並行消費訊息的模式下,需要使用者實現MessageListenerConcurrently
介面來處理訊息
當消費者獲取到訊息之後會呼叫MessageListenerConcurrently
的實現,傳入需要消費的訊息集合msgs
,這裡提到的msgs
很重要
如上程式碼,當訊息消費出現異常的時候,status
就會為null,後面就會將status
設定成為RECONSUME_LATER
。
RECONSUME_LATER
翻譯成功中文就是稍後重新消費的意思
所以從這可以看出,一旦丟擲異常,那麼訊息之後就可以被重複訊息。
到這其實可能有小夥伴覺得訊息消費失敗重新消費很正常,保證訊息儘可能消費成功。
對,這句話不錯,的確可以在一定程度上保證消費異常的訊息可以消費成功。
但是坑不在這,而是前面提到的消費時傳入的整個集合中的訊息都需要被重新消費。
具體的原因我們接著往下看
當訊息處理之後,不論是成功還是異常,都需要對結果進行處理,程式碼如下
當處理結果為RECONSUME_LATER
的時候(異常會設定為RECONSUME_LATER
),此時ackIndex
會設定成-1
,後面迴圈遍歷的時候就會遍歷到所有這次消費的訊息,然後呼叫sendMessageBack
方法,sendMessageBack
方式是用來實現訊息重新消費的邏輯,這裡就不展開說了。
所以,一旦被消費的一批訊息中出現一個消費異常的情況,那麼就會導致整批訊息被重新消費,從而會導致在出現異常之前的成功處理的訊息都會被重複消費,非常坑。
不過好在消費時傳入的訊息集合中的訊息數量是可以設定的,並且預設就是1
也就說預設情況下那個集合中就一條訊息,所以預設情況下不會出現消費成功的訊息被重複消費的情況。
所以這個引數不要輕易設定,一旦設定大了,就可能導致訊息被重新消費。
除了並行消費訊息的模式以外,RocketMQ還支援順序消費訊息的模式,也會造成重複消費,邏輯其實差不多,但是在實現訊息重新消費的邏輯不一樣。
首先來講一講什麼是offset。
前面說過,訊息在傳送的時候需要指定傳送到,訊息最後會被放到Queue中,其實真正的訊息不是在Queue中,Queue存的是每個訊息的位置,但是你可以理解為Queue存的是訊息。
而訊息在Queue中是有序號的,這個序號就被稱為offset,從0開始,單調遞增1。
比如說,如上圖,訊息1的offset就是0,訊息2的offset就是1,依次類推。
這個offset的一個作用就是用來管理消費者的消費進度。
當消費者在成功消費訊息之後,需要將所消費的訊息的offset提交給RocketMQ伺服器端,告訴RocketMQ,這個Queue的訊息我已經消費到了這個位置了。
提交offset的程式碼就在上述第二節提到的處理結果的後面
這樣有一個好處,那麼一旦消費者重啟了或者其它啥的要從這個Queue拉取訊息的時候,此時他只需要問問RocketMQ伺服器端上次這個Queue訊息消費到哪個位置了,之後消費者只需要從這個位置開始消費訊息就行了,這樣就解決了接著消費的問題。
但是RocketMQ在設計的時候,當消費完訊息的時候並不是同步告訴RocketMQ伺服器端offset,而是定時傳送。
如圖,當消費者消費完訊息的時候,會將offset儲存到記憶體中的一個Map資料結構中,所以上面截圖的那段程式碼其實是更新記憶體中的offset
而在消費者啟動的時候會開啟一個定時任務,預設是5s一次,會通過網路請求將記憶體中的每個Queue的消費進度offset傳送給RocketMQ伺服器端。
由於是定時任務,所以就可能出現伺服器一旦宕機,導致最新消費的offset沒有成功告訴RocketMQ伺服器端的情況
此時,消費進度offset就丟了,那麼消費者重啟的時候只能從RocketMQ中獲取到上一次提交的offset,從這裡開始消費,而不是最新的offset,出現明明消費到了第8個訊息,RocketMQ卻告訴他只消費到了第5個訊息的情況,此時必然會導致訊息又出現重複消費的情況。
上一節說到,消費者會有一個每隔5s鐘的定時任務將每個佇列的消費進度offset提交到RocketMQ伺服器端
當RocketMQ伺服器端接收到提交請求之後,會將這個消費進度offset儲存到記憶體中
同時為了保證RocketMQ伺服器端重啟消費進度不會丟失,也會開啟一個定時任務,預設也是5s一次,將記憶體中的消費進度持久化到磁碟檔案中
所以,整個消費進度offset的資料流轉過程如下
當RocketMQ伺服器端重啟之後,會從磁碟中讀取檔案的資料載入到記憶體中。
跟消費者產生的問題一樣,一旦RocketMQ發生宕機,那麼offset就有可能丟失5s鐘的資料,RocketMQ伺服器端一旦重啟,消費者從RocketMQ伺服器端獲取到的訊息消費進度就比實際消費的進度低,同樣也會導致訊息重複消費。
在RocketMQ的高可用模式中,有一種名叫主從同步的模式,當主節點掛了之後,從節點可以手動升級為主節點對外提供存取,保證高可用。
在主從同步模式下,從節點預設每隔10s會向主節點傳送請求,同步一些後設資料,這些後設資料就包括消費進度
當從節點獲取到主節點的消費進度之後,會將主節點的消費進度設定到自己的記憶體中,同時也會持久化到磁碟。
所以整個消費進度offset的資料的流轉過程就會變成如下
同樣,由於也是定時任務,那麼一旦主節點掛了,從節點就會丟10s鐘的消費進度,此時如果從節點升級為主節點對外提供存取,就會出現跟上面提到的一樣的情況,消費者從這個新的主節點中拿到的消費進度比實際的低,自然而然就會重複消費訊息。
所以,總的來說,在消費進度資料流轉的過程中,只要某個環節出現了問題,都有很有可能會導致訊息重複消費。
先來講一講什麼是重平衡,其實重平衡很好理解,我說一下你就明白了。
前面說到,消費者是從佇列中獲取訊息的
在RocketMQ中,有個消費者組的概念,一個消費者組中可以有多個消費者,不同消費者組之間消費訊息是互不干擾的,所以前面提到的消費者其實都在消費組下
在同一個消費者組中,訊息消費有兩種模式:
由於RocketMQ預設是叢集消費模式,並且絕大多數業務場景都是使用叢集消費模式,所以這裡就不討論廣播消費模式了,感興趣的同學可以看看RocketMQ訊息短暫而又精彩的一生 這篇文章。
叢集消費模式是指同一條訊息只能被這個消費者組消費一次,這就叫叢集消費。
並且前面提到提交消費進度給RocketMQ伺服器端的情況只會叢集消費模式下才會有,在廣播消費模式不會提給到RocketMQ伺服器端,僅僅持久化到本地磁碟
同時前面說的消費者提交消費進度真正提交的是消費者組對於這個Queue的消費進度,而不是指具體的某個消費者對於Queue消費進度。
雖然說這裡將前面提到的一些含義更深一步,但是並不妨礙前面的理解。
叢集消費的實現就是將佇列按照一定的演演算法分配給消費者,預設是按照平均分配的。
如圖所示,假設某個topic有4個Queue,有個消費者組訂閱了這個topic,這個消費者組有兩個消費者1和消費者2,此時每個消費者就可以被分配兩個佇列,這樣就能保證訊息正常情況下只會被消費一次。如果只有一個消費者,那麼這個消費者就會消費所有佇列,很好理解。
接著後面又啟動了一個消費者3,此時為了保證剛上線的消費者3能夠消費訊息,就要進行重平衡操作,重新分配每個消費者消費的佇列。
在重平衡之後就可能會出現下面這種情況
如上圖,原本被消費者2消費的Queue4被分配給消費者3,此時消費者3就能消費到訊息了,這就是重平衡。
除了新增消費者會導致重平衡之外,消費者數量減少,佇列的數量增加或者減少都會觸發重平衡。
在瞭解了重平衡概念之後,接下來分析一下為什麼重平衡會導致訊息的重複消費。
假設在進行重平衡時,還未重平衡完之前,消費者2此時還是會按照上面第二節提到的消費訊息的邏輯來消費Queue4的訊息
當消費者2已經重平衡完成了,發現Queue4自己已經不能消費了,那麼此時就會把這個Queue4設定為dropped,就是丟棄的意思
但是由於重平衡進行時消費者2仍然在消費Queue4的訊息,但是當消費完之後,發現佇列被設定成dropped,那麼此時被消費者2消費訊息的offset就不會被提交,原因如下程式碼
這段程式碼前面已經出現過,一旦dropped被設定成true,這個if條件就通不過,消費進度就不會被提交。
成功消費訊息了,但是卻不提交消費進度,這就非常坑了。。
於是當消費者3開始消費Queue4的訊息的時候,他就會問問RocketMQ伺服器端,我消費者3所在的消費者組對於Queue4這個佇列消費到哪了,我接著消費就行了。
此時由於沒有提交消費進度,RocketMQ伺服器端告訴消費者3的消費進度就會比實際的低,這就造成了訊息重複消費的情況。
在RocketMQ中有這麼一個機制,會定時清理長時間正在消費的訊息。
如圖,假設有5條訊息現在正在被消費者處理,這5條訊息會被存在一個集合中,並且是按照offset的大小排序,訊息1的offset最小,訊息5的offset最大。
RocketMQ消費者啟動時會開啟一個預設15分鐘執行一次的定時任務
這個定時任務會去檢查正在處理的訊息的第一條訊息,也就是圖中的訊息1,一旦發現訊息1已經處理了超過15分鐘了,那麼此時就會將訊息1從集合中移除,之後會隔一定時間再次消費訊息1。
這也會有坑,雖然訊息1從集合中被移除了,但是訊息1並沒有消失,仍然被消費者繼續處理,但是訊息1隔一定時間就會再次被消費,就會出現訊息1被重複消費的情況。
這就是清理長時間消費的訊息導致重複消費的原因。
但此時又會引出一個新的疑問,為什麼要移除這個處理超過15分鐘的訊息呢?
這就又跟前面提到的消費進度提交有關!
前面說過訊息被消費完成之後會提交消費進度,提交的消費進度實際會有兩種情況:
第一種就是某個執行緒消費了所有的訊息,當把所有的訊息都消費完成之後,就會把訊息從集合中全部移除,此時提交的消費進度offset就是圖中訊息5的offset+1
加1的操作是為了保證如果發生重啟,那麼消費者下次消費的起始位置就是訊息5後面的訊息,保證訊息5不被重複消費
第二種情況就不太一樣了
假設現在有兩個執行緒來處理這5條訊息,執行緒1處理前2條,執行緒2處理後3條,如圖
現線上程1出現了長時間處理訊息的情況。
此時執行緒2處理完訊息之後,移除後面三條訊息,準備提交offset的時候發現集合中還有元素,就是執行緒1正在處理的前兩條訊息,此時執行緒2提交的offset並不是訊息5對應的offset,而是訊息1的offset,程式碼如下
這麼做的主要原因就是保證訊息1和訊息2至少被消費一次。
因為一旦提交了訊息5對應的offset,如果消費者重啟了,下次消費就會接著從訊息5的後面開始消費,而對於訊息1和訊息2來說,並不知道有沒有被消費成功,就有可能出現訊息丟失的情況。
所以,一旦集合中最前面的訊息長時間處理,那麼就會導致後面被消費的訊息進度無法提交,那麼重啟之後就會導致大量訊息被重複消費。
為了解決這個問題,RocketMQ引入了定時清理的機制,定時清理長時間消費的訊息,這樣消費進度就可以提交了。
總得來說,RocketMQ中還是存在很多種導致訊息重讀消費的情況,並且官方也說了,只是在大多數情況下訊息不會重複
所以如果你的業務場景中需要保證訊息不能重複消費,那麼就需要根據業務場景合理的設計冪等技術方案。
掃碼或者搜尋關注公眾號 三友的java日記 ,及時乾貨不錯過,公眾號致力於通過畫圖加上通俗易懂的語言講解技術,讓技術更加容易學習,回覆 面試 即可獲得一套面試真題。