聊聊訊息佇列(MQ)那些事

2022-11-13 21:03:56

每年的雙十一期間,各大電商平臺流量暴增,同時,電商平臺系統的負載壓力也會很大。譬如訂單支付的場景,每個訂單支付成功後,伺服器可能要完成扣減積分、扣減優惠券、扣減商品庫存、傳簡訊等一系列操作。單個使用者請求,伺服器處理起來並沒有什麼問題,但是,瞬時並行的多個請求到了伺服器,資料庫壓力上來了,導致請求響應慢,甚至宕機。

為了解決這個問題,我們可能會想到,讓資料庫處理完一個請求後再處理下一個請求不就好了麼。就這樣,訊息佇列出來了。訊息佇列,又稱為MQ(Message Queue),它實現了讓多個請求以訊息的形式排好隊,讓訊息處理程式一個一個的處理,有效防止了高並行給伺服器帶來的壓力。

訊息佇列應用場景

MQ典型的應用場景有非同步、削峰、解耦三種。

非同步
譬如說一個系統A,它有一個操作處理完自己的邏輯以後需要呼叫其他系統的介面,如下圖:

這時候,程式碼是這樣:

public class SystemA {

	@Resource
	SystemBapi systemBapi;

	@Resource
	SystemCapi systemCapi;

	@Resource
	SystemDapi systemDapi;

	public void doSomething() {

		//產生一個id
		long id = doSomethingAction();

		//呼叫其他系統介面  
		systemBapi.doSomething(id);
		systemCapi.doSomething(id);
		systemDapi.doSomething(id);
	}
}

上面的程式碼,系統A產生id的邏輯需要50ms,呼叫系統B的介面需要300ms,呼叫系統C的介面需要300ms,呼叫系統D的介面需要300ms。一個這樣的操作就需要50+300+300+300=950ms。如果後面還要對接其他系統,這個操作的時間會更長。

如果呼叫其他系統介面實時性要求不高,那麼,為了提高使用者體驗和吞吐量,呼叫其他系統介面的操作就可以交給MQ實現非同步操作。如下圖:

系統A執行完了以後,將id給到訊息佇列中,然後就直接返回了。

削峰
譬如有3臺伺服器組成叢集,每臺伺服器的處理能力是1000個QPS/S,合起來就是3000個QPS/S。遇上了流量高峰,達到了5000個QPS/S,並行請求數量已經超過所有伺服器總的處理能力,這時候就可以考慮利用MQ來控制並行數,以免伺服器崩潰。

具體做法是所有請求先進入到MQ,然後每個伺服器根據自己的能夠處理的請求數去消費訊息,也就是無論每秒多少個QPS,系統只處理能力範圍內的請求數,剩下的請求等有資源再去處理,這就是「削峰填谷」,如下圖:

解耦
解耦就是降低了訊息生產者與消費者的耦合度。耦合度高,程式維護起來就會很麻煩。譬如,系統A產生了一個id後,需要把id交給系統B、系統C、系統D去處理。如果由系統A直接去呼叫其他系統介面,系統A的程式程式碼需要寫上呼叫系統B、系統C、系統D介面的程式碼。如果某一天系統C說不需要處理系統A的id了,讓系統A不要呼叫系統C的介面,那系統A要改程式碼。又某一天系統E說我要處理系統A的id,讓系統A呼叫系統E的介面,系統A又得改程式碼。系統A的程式設計師這樣子搞煩不煩?

系統A程式設計師有一天開竅了,把程式裡所有呼叫外部系統的程式碼都遮蔽,弄了個MQ中介軟體,讓系統A產生id以後就給到MQ。然後發個公告告訴所有其他系統的程式設計師,你們誰想要我這邊產生的id你們自己去MQ拿,別來煩我。

這樣一來,系統A跟其他系統就解耦了,程式碼也不用改來改去。

訊息佇列要注意的問題

問題一:可用性
MQ作為整個分散式架構的重要部件,如果MQ服務不可用,那整個系統都掛了。因此,MQ必須要支援叢集。當下主流的MQ中介軟體都能夠不同程度的支援叢集,實現了MQ服務的高可用。

問題二:訊息丟失
訊息丟失有可能發生在生產者丟失訊息、MQ本身丟失訊息、消費者丟失訊息3個方面。

  • 生產者丟失訊息
    生產者丟失訊息一般是在傳送訊息的時候出現異常(譬如網路異常),導致MQ無法接收到訊息。這個問題可以採用本地訊息表+回撥通知+定時任務的方式解決。

    就以系統A傳送訊息,系統B消費訊息為例,具體解決方案如下:
    1、系統A執行本地事務業務邏輯,並且往本地訊息表插入一條資料(代表準備要傳送的訊息),訊息狀態為「未傳送」。本地事務成功,提交儲存本地資料,失敗則回滾。
    2、本地事務成功後,傳送訊息給MQ。
    3、MQ接收到訊息後,回撥通知系統A,系統A把本地訊息表對應的訊息記錄狀態變為「已傳送」。
    4、定時任務輪詢本地訊息表,超過一定時間狀態為「未傳送」的訊息重新傳送給MQ。
    5、定時任務處理超過一定次數一直傳送不成功的訊息告警,人工介入。

  • MQ丟失訊息
    訊息成功傳送到MQ,是先放到記憶體裡的,如果還沒來得及給消費者消費訊息,MQ服務就掛了,就會丟失訊息。MQ服務叢集可以一定程度上解決這個問題,但叢集中各節點的資料同步也需要一定時間,如果在同步資料之前MQ服務就掛了,訊息也會丟失。還有一個方法就是MQ接收到訊息的同時,把訊息資料持久化到磁碟,這樣,MQ服務恢復的時候就可以從磁碟獲取資料重新給消費者消費。可能有人會問,那訊息還沒來得及持久化到磁碟MQ服務就掛了咋辦?如果是這樣,就可以用到前面說到的本地訊息表,把本地訊息表裡的資料重新發一遍。

  • 消費者丟失訊息
    消費者從MQ拉取訊息,還沒來得及處理訊息,消費者伺服器掛了。此時,可能造成消費者丟失訊息。這種情況,可以讓消費者處理完訊息時給MQ一個確認訊息來解決。如果MQ沒有收到確認訊息,就會有重試的機制,最終確保訊息給到消費者消費。當然了,如果重試超過一定次數,就應該告警,人工介入。

問題三:重複消費
因為在網路延遲的情況下,訊息重複傳送的問題不可避免的發生。譬如,生產者傳送訊息的時候使用了重試機制,傳送訊息後由於網路原因沒有收到MQ的確認資訊,然後又去重新傳送了一次訊息。但其實MQ已經接到了訊息,並返回了響應,只是因為網路原因導致生產者沒有收到MQ的確認資訊。這種情況下,生產者的訊息重試機制就會繼續就這個訊息重新傳送,從而導致同一條訊息多次傳送,這樣消費者也會重複消費這條訊息。當然,這只是列舉了一種情況,實際上還有其他情況會導致訊息被重複消費。

解決重複消費的關鍵就是在消費者端引入冪等性機制。什麼是冪等性機制呢?我們可以把它理解成,假如一個介面被重複呼叫,依然可以保證資料的準確性。舉個例子,比如每條訊息都會有一個唯一的id,消費者處理完這個訊息會儲存這個id,如果處理訊息之前能找到這個id,就說明這條訊息已經處理過了,就不做處理並且返回給MQ一個確認資訊。

訊息佇列中介軟體

為什麼要用訊息佇列中介軟體?自己寫不行嗎?我們之所以要用中介軟體,是因為這些中介軟體已經解決了很多訊息佇列常見的問題(高可用、訊息丟失、重複消費......),而且各種中介軟體都有各自的特性,已經做得非常成熟了,你確定你寫的有這些中介軟體好用嗎?

目前在市面上比較主流的MQ中介軟體主要有,ActiveMQ、RabbitMQ、Kafka、RocketMQ 等這幾種。網上找來這幾個中介軟體的對比,如下表:

特性 ActiveMQ RabbitMQ Kafka RocketMQ
所屬社群/公司 Apache Mozilla Public License Apache Apache/Ali
單機呑吐量 萬級(最差) 萬級 十萬級 十萬級(最高)
時效性 毫秒級 微秒級 毫秒級 毫秒級
可用性 高(主從) 高(主從) 非常高(分散式) 非常高(分散式)
功能特性 MQ領域功能極其完備 基於erlang開發,所以並行能力很強,效能極其好,延時很低 功能較為簡單,主要支援簡單的MQ功能,在巨量資料領域的實時計算以及紀錄檔採集被大規模使用 MQ功能比較完備,擴充套件性佳
訊息可靠性 有較低的概率丟失資料 基本不丟 經過引數優化設定,可以做到 0 丟失 同 Kafka
事務 支援 不支援 支援 支援
broker端訊息過濾 支援 不支援 不支援 可以支援Tag標籤過濾和SQL表示式過濾
訊息查詢 支援 根據訊息id查詢 不支援 支援Message id或Key查詢
訊息回溯 支援 不支援 理論上可以支援時間或offset回溯,但是得修改程式碼。 支援按時間來回溯訊息,精度毫秒,例如從一天之前的某時某分某秒開始重新消費訊息。
路由邏輯 基於交換機,可設定複雜路由邏輯 根據topic 根據topic,可以設定過濾消費
持久化 記憶體、檔案、資料庫 佇列基於記憶體,只能少量堆積 磁碟,大量堆積 磁碟,大量堆積
順序訊息 支援 不支援 支援 支援
社群活躍度
適用場景 主要場景就是解耦和非同步呼叫,較少在大規模吞吐的場景中使用 資料量沒有那麼大,小公司 一般配合巨量資料類的系統來進行實時資料計算、紀錄檔採集等場景。 目前在阿里被廣泛應用在訂單、交易、充值、流計算、訊息推播、紀錄檔流式處理、binglog分發訊息等場景。

根據上表,我個人認為對效能要求比較高的,推薦選擇RocketMQ,畢竟經歷了多年阿里雙十一極端並行的場景。如果是巨量資料領域的,可以選擇Kafka。