ActiveMQ、RabbitMQ、RocketMQ、Kafka四種訊息中介軟體分析介紹

2022-07-24 06:01:08

ActiveMQ、RabbitMQ、RocketMQ、Kafka四種訊息中介軟體分析介紹

我們從四種訊息中介軟體的介紹到基本使用,以及高可用,訊息重複性,訊息丟失,訊息順序效能方面進行分析介紹!

一、訊息中介軟體的使用場景

訊息中介軟體的使用場景總結就是六個字:解耦、非同步、削峰

1.解耦

如果我方系統A要與三方B系統進行資料對接,推播系統人員資訊,通常我們會使用介面開發來進行。但是如果運維期間B系統進行了調整,或者推播過程中B系統網路進行了調整,又或者後續過程中我們需要推播資訊到三方C系統中,這樣的話就需要我們進行頻繁的介面開發調整,還需要考慮介面推播訊息失敗的場景。

如果我們使用訊息中介軟體進行訊息推播,我們只需要按照一種約定的資料結構進行資料推播,其他三方系統從訊息中介軟體取值消費就可以,即便是三方系統出現宕機或者其他調整,我們都可以正常進行資料推播。

總結:通過一個 MQ,Pub/Sub 釋出訂閱訊息這麼一個模型,A 系統就跟其它系統徹底解耦了。

2.非同步

繼續我們上述的訊息推播業務,如果我們現在需要同時推播訊息到BCD三個系統中,而BCD系統接收到訊息後需要進行復雜的邏輯處理,以及讀庫寫庫處理。如果一個三方系統進行訊息處理需要1s,那我們遍歷推播完一次訊息,就需要三秒。

而如果我們使用訊息中介軟體,我們只需要推播到訊息中介軟體,然後進行介面返回,可能只需要20ms,大大提升了使用者體驗。訊息推播後BCD系統各自進行訊息消費即可,不需要我們等待。

3.削峰

還是上述我們的應用場景,假設某一時間段內,每秒都有一條訊息推播,如果我們使用介面進行推播,BCD三個系統處理完就需要三秒。這樣會導致使用者前端體驗較差,而且系統後臺一直處於阻塞狀態,後續的訊息推播介面一直在等待。

如果我們使用訊息中介軟體,我們只需要將訊息推播至訊息中介軟體中,BCD系統對積壓的訊息進行相應的處理。

在上述高頻發的訊息時間段內,會在訊息中間中產生訊息積壓,BCD系統在上述時間段外對積壓訊息進行相應的處理即可。

二、訊息中介軟體的優缺點

訊息中介軟體的優點其實就是他的使用場景。

訊息中介軟體的缺點與優點也是相輔相成的,主要有以下三個

1.系統可用性降低

系統關聯的中介軟體越多,越容易引發宕機問題。

如上述案例中的問題,原本進行訊息推播我們只需要開發介面進行推播即可,引入訊息中介軟體後就需要考慮訊息中介軟體的高可用問題,如果訊息中介軟體出現宕機問題,我們所有的訊息推播都會失敗。

2.系統複雜度提高

上述案例中,如果我們使用介面進行訊息推播,我們只需要考慮介面超時以及介面推播訊息失敗的問題。但我們引入訊息中介軟體後,就需要考慮訊息中介軟體的維護,以及訊息重複消費,訊息丟失的問題。

3.一致性問題

上述案例中,如果我們使用介面進行訊息推播,推播訊息我們可以放在事務中處理,如果推播過程中出現異常,我們可以進行資料回滾,但我們引入訊息中介軟體後,就需要考慮訊息推播後,消費失敗的問題,以及如果我們同時推播訊息到BCD系統中,如何保證他們的事務一致性。

三、四種訊息中介軟體的基本介紹

特性 ActiveMQ RabbitMQ RocketMQ Kafka
單機吞吐量 萬級,比 RocketMQ、Kafka 低一個數量級 同 ActiveMQ 10 萬級,支撐高吞吐 10 萬級,高吞吐,一般配合巨量資料類的系統來進行實時資料計算、紀錄檔採集等場景
topic 數量對吞吐量的影響 topic 可以達到幾百/幾千的級別,吞吐量會有較小幅度的下降,這是 RocketMQ 的一大優勢,在同等機器下,可以支撐大量的 topic topic 從幾十到幾百個時候,吞吐量會大幅度下降,在同等機器下,Kafka 儘量保證 topic 數量不要過多,如果要支撐大規模的 topic,需要增加更多的機器資源
時效性 ms 級 微秒級,這是 RabbitMQ 的一大特點,延遲最低 ms 級 延遲在 ms 級以內
可用性 高,基於主從架構實現高可用 同 ActiveMQ 非常高,分散式架構 非常高,分散式,一個資料多個副本,少數機器宕機,不會丟失資料,不會導致不可用
訊息可靠性 有較低的概率丟失資料 基本不丟 經過引數優化設定,可以做到 0 丟失 同 RocketMQ
功能支援 MQ 領域的功能極其完備 基於 erlang 開發,並行能力很強,效能極好,延時很低 MQ 功能較為完善,還是分散式的,擴充套件性好 功能較為簡單,主要支援簡單的 MQ 功能,在巨量資料領域的實時計算以及紀錄檔採集被大規模使用
其他 Apache軟體基金會開發、起步較早,但沒有經過大量吞吐場景驗證,目前社群不是很活躍 開源,穩定,社群活躍度高 阿里出品,目前已交給Apache,但社群活躍度較低 Apache軟體基金會開發、開源、高通吐量,社群活躍度高

1.ActiveMQ

1.1:Activemq 是什麼

Activemq 是一種開源的,實現了JMS1.1規範的,訊息導向(MOM)的中介軟體,為應用程式提供高效的、可延伸的、穩定的和安全的企業級訊息通訊。

1.2:Activemq 的作用及原理

Activemq 的作用就是系統之間進行通訊,原理就是生產者生產訊息, 把訊息傳送給activemq, Activemq 接收到訊息, 然後檢視有多少個消費者,

然後把訊息轉發給消費者, 此過程中生產者無需參與。 消費者接收到訊息後做相應的處理和生產者沒有任何關係。

1.3:Activemq 的通訊方式

publish(釋出)-subscribe(訂閱)(釋出-訂閱方式)

釋出/訂閱方式用於多接收使用者端的方式,作為釋出訂閱的方式,可能存在多個接收使用者端,並且接收端使用者端與傳送使用者端存在時間上的依賴。一個接收端只能接收他建立以後傳送使用者端傳送的資訊。

p2p(point-to-point)(對等)

p2p的過程則理解起來比較簡單。它好比是兩個人打電話,這兩個人是獨享這一條通訊鏈路的。一方傳送訊息,另外一方接收,就這麼簡單。在實際應用中因為有多個使用者對使用p2p的鏈路,相互通訊的雙方是通過一個類似於佇列的方式來進行交流。和前面pub-sub的區別在於一個topic有一個傳送者和多個接收者,而在p2p裡一個queue只有一個傳送者和一個接收者。

1.4:Activemq 的訊息持久化機制

JDBC: 持久化到資料庫
AMQ :紀錄檔檔案(已基本不用)
KahaDB : AMQ基礎上改進,預設選擇
LevelDB :谷歌K/V資料庫
在activemq.xml中檢視預設的broker持久化機制。

1.5:Activemq 的訊息確認機制

(1)AUTO_ACKNOWLEDGE = 1 自動確認

(2)CLIENT_ACKNOWLEDGE = 2 使用者端手動確認

(3)DUPS_OK_ACKNOWLEDGE = 3 自動批次確認

(4)SESSION_TRANSACTED = 0 事務提交併確認

(5)INDIVIDUAL_ACKNOWLEDGE = 4 單條訊息確認

前四種是JMS API中提供的使用者端ACK_MODE。第五種是InforSuiteMQ自定義補充的一種ACK_MODE。

2.RabbitMQ

2.1:RabbitMQ是什麼

RabbitMQ是一個由erlang語言編寫的、開源的、在AMQP基礎上完整的、可複用的企業訊息系統。

2.2:RabbitMQ的作用及原理

基本概念

關鍵名稱 說明
Channel(通道) 訊息推播使用的通道
Producer(訊息的生產者) 向訊息佇列釋出訊息的使用者端應用程式
Consumer(訊息的消費者) 從訊息佇列取得訊息的使用者端應用程式
Message(訊息) 訊息由訊息頭和訊息體組成
Routing Key(路由鍵) 訊息頭的一個屬性,用於標記訊息的路由規則,決定了交換機的轉發路徑
Queue(訊息佇列) 用於儲存生產者的訊息
Exchange(交換器路由器) 提供Producer到Queue之間的匹配
Binding(繫結) 用於建立Exchange和Queue之間的關聯
Binding Key(繫結鍵) Exchange與Queue的繫結關係,用於匹配Routing Key
Broker(服務主體) RabbitMQ Server,伺服器實體

2.3:RabbitMQ的通訊方式

2.3.1:簡單佇列

最簡單的工作佇列,其中一個訊息生產者,一個訊息消費者,一個佇列。也稱為對等模式

2.3.2:工作佇列模式

一個訊息生產者,一個交換器,一個訊息佇列,多個消費者。同樣也稱為對等模式

2.3.3:釋出訂閱模式

Pulish/Subscribe,無選擇接收訊息,一個訊息生產者,一個交換機(交換機型別為fanout),多個訊息佇列,多個消費者

生產者只需把訊息傳送到交換機,繫結這個交換機的佇列都會獲得一份一樣的資料。

2.3.4:路由模式

在釋出/訂閱模式的基礎上,有選擇的接收訊息,也就是通過 routing 路由進行匹配條件是否滿足接收訊息。

2.3.5:主體模式

topics(主題)模式跟routing路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似於SQL中 = 和 like 的關係。

2.3.6:RPC模式

與上面其他5種所不同之處,該模式是擁有請求/回覆的。也就是有響應的,上面5種都沒有。

RPC是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的處理業務,處理完後然後在A伺服器繼續執行下去,把非同步的訊息以同步的方式執行。

2.4:RabbitMQ的訊息持久化機制

Queue(訊息佇列)的持久化是通過durable=true來實現的。

Message(訊息)的持久化 ,通過設定訊息是持久化的標識。

Exchange(交換機)的持久化 。

2.5:RabbitMQ的訊息確認機制

confirm機制:確認訊息是否成功傳送到Exchange

ack機制:確認訊息是否被消費者成功消費

  • AcknowledgeMode.NONE:自動確認
  • AcknowledgeMode.AUTO:根據情況確認
  • AcknowledgeMode.MANUAL:手動確認

3.RocketMQ

3.1:RocketMQ是什麼

RocketMQ是阿里開發的一款純java、分散式、佇列模型的開源訊息中介軟體,支援事務訊息、順序訊息、批次訊息、定時訊息、訊息回溯等。

3.2:RocketMQ的作用及原理

基本概念

關鍵名稱 說明
Producer 訊息生產者
Producer Group 生產者組
Consumer 訊息消費者
Consumer Group 消費者組
Topic Topic用於將訊息按主題做劃分,Producer將訊息發往指定的Topic,Consumer訂閱該Topic就可以收到這條訊息
Message 代表一條訊息
Tag 標籤可以被認為是對 Topic 進一步細化
Broker 負責接收並儲存訊息
Queue Topic和Queue是1對多的關係,一個Topic下可以包含多個Queue,主要用於負載均衡
Offset RocketMQ在儲存訊息時會為每個Topic下的每個Queue生成一個訊息的索引檔案,每個Queue都對應一個Offset記錄當前Queue中訊息條數。
NameServer NameServer可以看作是RocketMQ的註冊中心

3.3:RocketMQ的通訊方式

RocketMQ訊息訂閱有兩種模式

一種是Push模式(MQPushConsumer),即MQServer主動向消費端推播

另外一種是Pull模式(MQPullConsumer),即消費端在需要時,主動到MQ Server拉取

但在具體實現時,Push和Pull模式本質都是採用消費端主動拉取的方式,即consumer輪詢從broker拉取訊息

叢集模式和廣播模式

叢集模式:預設情況下我們都是使用的叢集模式,也就是說消費者組收到訊息後,只有其中的一臺機器會接收到訊息。

廣播模式:消費者組內的每臺機器都會收到這條訊息。

3.4:RocketMQ的訊息持久化機制

exchange持久化、queue持久化、message持久化

CommitLog:紀錄檔資料檔案,儲存訊息內容,所有 queue 共用,不區分 topic ,順序讀寫 ,1G 一個檔案

ConsumeQueue:邏輯 Queue,基於 topic 的 CommitLog 的索引檔案,訊息先到達 commitLog,然後非同步轉發到 consumeQueue,包含 queue 在 commitLog 中的物理偏移量 offset,訊息實體內容大小和 Message Tag 的 hash 值,大於 600W 個位元組,寫滿之後重新生成,順序寫

IndexFile:基於 Key 或 時間區間的 CommitLog 的索引檔案,檔名以建立的時間戳命名,固定的單個 indexFile 大小為 400M,可以儲存 2000W 個索引

3.5:RocketMQ的訊息確認機制

confirm機制:確認訊息是否成功傳送到Exchange

ack機制:確認訊息是否被消費者成功消費

4.Kafka

4.1:Kafka是什麼

Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一個分散式、分割區的、多副本的、多訂閱者,基於zookeeper協調的分散式紀錄檔系統,可作為訊息中介軟體

4.2:Kafka的作用及原理

基本概念

關鍵名稱 說明
producer 生產者
consumer 消費者
consumer group 消費者組
broker 一臺kafka伺服器就是一個broker,一個叢集由多個broker組成,一個broker可以容納多個topic
topic 一個訊息佇列,生產者和消費者都是面對一個Topic
partition 每個partition時一個有序佇列,partition是topic中儲存資料和消費資料所使用的佇列所在
replica 副本,為了保證當前某個節點發生故障時,當前節點上的資料不會發生丟失
leader 每個分割區多個副本的「主」,生產者生產資料的物件,以及消費組消費者消費的物件
follower 每個分割區多個副本的「從」,實時從leader資料的同步

4.3:Kafka的通訊方式

生產者傳送模式

1.發後即忘(fire-and-forget):只管往Kafka中傳送訊息而並不關心訊息是否正確到達

2.同步(sync):一般是在send()方法裡指定一個Callback的回撥函數,Kafka在返回響應時呼叫該函數來實現非同步的傳送確認。

3.非同步(async):send()方法會返回Futrue物件,通過呼叫Futrue物件的get()方法,等待直到結果返回

消費者消費模式

1.At-most-once(最多一次):在每一條訊息commit成功之後,再進行消費處理;設定自動提交為false,接收到訊息之後,首先commit,然後再進行消費。

2.At-least-once(最少一次):在每一條訊息處理成功之後,再進行commit;設定自動提交為false;訊息處理成功之後,手動進行commit。

3.Exactly-once(正好一次):將offset作為唯一id與訊息同時處理,並且保證處理的原子性;設定自動提交為false;訊息處理成功之後再提交。

4.4:Kafka的訊息持久化機制

Kafka直接將資料寫入到紀錄檔檔案中,以追加的形式寫入

4.5:Kafka的訊息確認機制

confirm機制:確認訊息是否成功傳送

ack機制:確認訊息是否被消費者成功消費

四、訊息佇列高可用

引言:系統應用MQ作為訊息中介軟體後,會導致系統可用性降低。所以只要你用了 MQ,高可用肯定是要考慮到的

1.ActiveMQ高可用

ActiveMQ的部署方式有三種,分別為:單節點部署(不支援高可用),Master-Slave部署方式(主從模式),Broker-Cluster部署方式(負載均衡)

1.1.單節點部署(不支援高可用)

單節點部署方式因為不支援高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。

1.2.Master-Slave部署方式(支援高可用)

1.2.1.shared filesystem Master-Slave部署方式

主要是通過共用儲存目錄來實現master和slave的熱備,所有的ActiveMQ應用都在不斷地獲取共用目錄的控制權,哪個應用搶到了控制權,它就成為master。

多個共用儲存目錄的應用,誰先啟動,誰就可以最早取得共用目錄的控制權成為master,其他的應用就只能作為slave。

1.2.2.shared database Master-Slave方式

與shared filesystem方式類似,只是共用的儲存媒介由檔案系統改成了資料庫而已。

1.2.3.Replicated LevelDB Store方式

這種主備方式是ActiveMQ5.9以後才新增的特性,使用ZooKeeper協調選擇一個node作為master。被選擇的master broker node開啟並接受使用者端連線。

其他node轉入slave模式,連線master並同步他們的儲存狀態。slave不接受使用者端連線。所有的儲存操作都將被複制到連線至Master的slaves。

如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網路中並連線master進入slave mode。所有需要同步的disk的訊息操作都將等待儲存狀態被複制到其他法定節點的操作完成才能完成。

當一個新的master被選中,你需要至少保障一個法定node線上以能夠找到擁有最新狀態的node。這個node將會成為新的master。因此,推薦執行至少3個replica nodes,以防止一個node失敗了,服務中斷。

1.3.Broker-Cluster部署方式(不支援高可用)

前面的Master-Slave的方式雖然能解決多服務熱備的高可用問題,但無法解決負載均衡和分散式的問題。Broker-Cluster的部署方式就可以解決負載均衡的問題。

​ Broker-Cluster部署方式中,各個broker通過網路互相連線,並共用queue。當broker-A上面指定的queue-A中接收到一個message處於pending狀態,而此時沒有consumer連線broker-A時。如果cluster中的broker-B上面由一個consumer在消費queue-A的訊息,那麼broker-B會先通過內部網路獲取到broker-A上面的message,並通知自己的consumer來消費。

1.3.1.static Broker-Cluster部署

在activemq.xml檔案中靜態指定Broker需要建立橋連線的其他Broker

1.3.2.Dynamic Broker-Cluster部署

在activemq.xml檔案中不直接指定Broker需要建立橋連線的其他Broker,由activemq在啟動後動態查詢

1.4.Master-Slave與Broker-Cluster相結合的部署方式

可以看到Master-Slave的部署方式雖然解決了高可用的問題,但不支援負載均衡,

Broker-Cluster解決了負載均衡,但當其中一個Broker突然宕掉的話,那麼存在於該Broker上處於Pending狀態的message將會丟失,無法達到高可用的目的。

Master-Slave與Broker-Cluster相結合的部署方式是目前ActiveMQ比較推薦的部署方案。

2.RabbitMQ高可用

RabbitMQ的部署方式有三種,分別為:單機模式(不支援高可用),普通叢集模式(不支援高可用),映象叢集模式(支援高可用)

2.1單機模式(不支援高可用)

單節點部署方式因為不支援高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。

2.2普通叢集模式(不支援高可用)

普通叢集模式,意思就是在多臺機器上啟動多個 RabbitMQ 範例,每個機器啟動一個。你建立的 queue,只會放在一個 RabbitMQ 範例上,但是每個範例都同步 queue 的後設資料(後設資料可以認為是 queue 的一些設定資訊,通過後設資料,可以找到 queue 所在範例)。你消費的時候,實際上如果連線到了另外一個範例,那麼那個範例會從 queue 所在範例上拉取資料過來。

這種方式確實很麻煩,也不怎麼好,沒做到所謂的分散式,就是個普通叢集。因為這導致你要麼消費者每次隨機連線一個範例然後拉取資料,要麼固定連線那個 queue 所在範例消費資料,前者有資料拉取的開銷,後者導致單範例效能瓶頸。

而且如果那個放 queue 的範例宕機了,會導致接下來其他範例就無法從那個範例拉取,如果你開啟了訊息持久化,讓 RabbitMQ 落地儲存訊息的話,訊息不一定會丟,得等這個範例恢復了,然後才可以繼續從這個 queue 拉取資料。

2.3映象叢集模式(支援高可用)

這種模式,才是所謂的 RabbitMQ 的高可用模式。跟普通叢集模式不一樣的是,在映象叢集模式下,你建立的 queue,無論後設資料還是 queue 裡的訊息都會存在於多個範例上,就是說,每個 RabbitMQ 節點都有這個 queue 的一個完整映象,包含 queue 的全部資料的意思。然後每次你寫訊息到 queue 的時候,都會自動把訊息同步到多個範例的 queue 上。

3.RocketMQ高可用

RocketMQ的部署方式有兩種,分別為:單節點模式(不支援高可用),多節點模式

3.1.單節點模式(不支援高可用)

單節點部署方式因為不支援高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。

3.2.多節點模式

3.2.1.多Master模式(不支援高可用)

一個叢集無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master

設定簡單,單個Master 宕機或重啟維護對應用無影響。

單臺機器宕機期間,這臺機器上未被消費的訊息在機器恢復之前不可訂閱,訊息實時性會受到受到影響。

3.2.2.多Master多Slave模式(非同步複製)(支援高可用)

每個 Master 設定一個 Slave,有多對Master-Slave, HA,採用非同步複製方式,主備有短暫訊息延遲,毫秒級。

即使磁碟損壞,訊息丟失的非常少,且訊息實時性不會受影響,因為Master 宕機後,消費者仍然可以從 Slave消費,此過程對應用透明。不需要人工干預。效能同多 Master 模式幾乎一樣。

Master 宕機,磁碟損壞情況,會丟失少量訊息。

3.2.3.多Master多Slave模式(同步雙寫)(支援高可用)

每個 Master 設定一個 Slave,有多對Master-Slave, HA採用同步雙寫方式,主備都寫成功,嚮應用返回成功。

資料與服務都無單點, Master宕機情況下,訊息無延遲,服務可用性與資料可用性都非常高

效能比非同步複製模式略低,大約低 10%左右,傳送單個訊息的 RT會略高。

4.Kafka高可用

Kafka的部署方式有三種,分別為:單broke節點(不支援高可用),單機多broker模式(支援高可用),多機多broker模式(支援高可用)

4.1.單broke節點(不支援高可用)

單節點部署方式因為不支援高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。

4.2.單機多broker模式(支援高可用)

這種部署方式其實是一種偽叢集模式,單機部署多節點如果出現伺服器宕機,那麼所有節點都不能正常提供服務。

4.3.多機多broker模式(支援高可用)

Kafka 0.8 以前,是沒有 HA 機制的,就是任何一個 broker 宕機了,那個 broker 上的 partition 就廢了,沒法寫也沒法讀,沒有什麼高可用性可言。

Kafka 0.8 以後,提供了 HA 機制,就是 replica(複製品) 副本機制。每個 partition 的資料都會同步到其它機器上,形成自己的多個 replica 副本。所有 replica 會選舉一個 leader 出來,那麼生產和消費都跟這個 leader 打交道,然後其他 replica 就是 follower。寫的時候,leader 會負責把資料同步到所有 follower 上去,讀的時候就直接讀 leader 上的資料即可。只能讀寫 leader?很簡單,要是你可以隨意讀寫每個 follower,那麼就要 care 資料一致性的問題,系統複雜度太高,很容易出問題。Kafka 會均勻地將一個 partition 的所有 replica 分佈在不同的機器上,這樣才可以提高容錯性。

五、訊息重複消費問題

引言:為什麼要考慮重複消費的問題?比如我們消費後通過消費中介軟體來呼叫,扣費10元,但是消費者消費訊息後還沒來得及進行確認,訊息中介軟體進行了重啟,那麼訊息者就會進行再次扣費處理,這樣就會出問題!

ActiveMQ、RabbitMQ、RocketMQ、Kafka,都有可能會出現訊息重複消費的問題,正常。因為這問題通常不是 MQ 自己保證的,是由我們開發來保證的。

我們以Kafka為例說明一下重複消費的問題:

Kafka 實際上有個 offset 的概念,就是每個訊息寫進去,都有一個 offset,代表訊息的序號,然後 consumer 消費了資料之後,每隔一段時間(定時定期),會把自己消費過的訊息的 offset 提交一下,表示「我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的 offset 來繼續消費吧」。

但是,如果在這期間重啟系統或者直接 kill 程序了,再重啟。這會導致 consumer 有些訊息處理了,但是沒來得及提交 offset。重啟之後,少數訊息會再次消費一次。

如果消費者乾的事兒是拿一條資料就往資料庫裡寫一條,會導致說,你可能就把資料在資料庫裡插入了 2 次,那麼資料就錯啦。

重複消費問題引發後,我們就需要考慮怎麼保證冪等性。

冪等性,通俗點說,就一個資料,或者一個請求,給你重複來多次,你得確保對應的資料是不會改變的,不能出錯。

保證冪等性的具體實現方式需要結合對應的業務去實現,這裡提供幾個思路:

  • 如果是資料插入操作,插入前我們根據唯一鍵先進行查詢,如果已有資料那我們只進行更新就行。
  • 如果是寫 Redis,則我們無需考慮冪等性,反正每次都是 set,天然冪等性。
  • 如果是基於資料庫的唯一鍵來保證重複資料不會重複插入多條。因為有唯一鍵約束了,重複資料插入只會報錯,不會導致資料庫中出現髒資料。
  • 如果不是以上集中通用的場景,那需要我們傳送訊息的時候攜帶唯一ID,消費者在消費前進行相應的查重處理,處理後在進行相應的業務操作。

六、訊息丟失問題

引言:MQ 有個基本原則,就是資料不能多一條,也不能少一條。

不能多,就是上面說的重複消費和冪等性問題。

不能少,就是說這資料別搞丟了。那這個問題你必須得考慮一下。

訊息丟失的問題需要從生產者、MQ、消費者三個方面來進行考慮,相應的解決方案也需要從這三方面出發(生產者確認機制,MQ訊息持久化、消費者確認機制)。

1.ActiveMQ

1.生產者丟失訊息

生產者丟失訊息的問題可以通過訊息重投、重試機制來解決

2.ActiveMQ丟失訊息

ActiveMQ丟失訊息的問題需要通過ActiveMQ訊息持久化機制+高可用(見ActiveMQ章節)來解決,ActiveMQ的訊息持久化機制有以下幾種

JDBC: 持久化到資料庫
AMQ :紀錄檔檔案(已基本不用)
KahaDB : AMQ基礎上改進,預設選擇
LevelDB :谷歌K/V資料庫
在activemq.xml中檢視預設的broker持久化機制。

3.訊息者丟失訊息

消費者丟失訊息通過ack機制來解決,訊息者進行業務處理後,再進行ack確認,避免訊息丟失。

2.RabbitMQ

1.生產者丟失訊息

生產者訊息丟失,通過confirm機制來確認訊息傳送,然後進行相應的訊息重投、重試機制

2.RabbitMQ丟失訊息

RabbitMQ丟失訊息的問題需要通過RabbitMQ訊息持久化機制+高可用(見RabbitMQ章節)來解決,

RabbitMQ持久化包含:

Queue(訊息佇列)的持久化是通過durable=true來實現的。

Message(訊息)的持久化 ,通過設定訊息是持久化的標識。

Exchange(交換機)的持久化 。

3.訊息者丟失訊息

消費者丟失訊息通過ack機制來解決,訊息者進行業務處理後,再進行ack確認,避免訊息丟失。

3.RocketMQ

1.生產者丟失訊息

生產者訊息丟失,通過confirm機制來確認訊息傳送,然後進行相應的訊息重投、重試機制

2.RocketMQ丟失訊息

RocketMQ丟失訊息的問題需要通過RocketMQ訊息持久化機制+高可用(見RocketMQ章節)來解決,

RocketMQ持久化包含:exchange持久化、queue持久化、message持久化

3.訊息者丟失訊息

消費者丟失訊息通過ack機制來解決,訊息者進行業務處理後,再進行ack確認,避免訊息丟失。

4.Kafka

1.生產者丟失訊息

生產者訊息丟失,通過confirm機制來確認訊息傳送,然後進行相應的訊息重投、重試機制

2.Kafka丟失訊息

Kafka直接將資料寫入到紀錄檔檔案中,以追加的形式寫入

3.訊息者丟失訊息

消費者丟失訊息通過ack機制來解決,訊息者進行業務處理後,再進行ack確認,避免訊息丟失。

總結:其實MQ訊息丟失,無非就是生產者傳送時丟失,MQ傳遞時丟失,消費者消費時丟失幾種問題,我們相應的從以上三方面解決就可以,但是上述三種方式使用後,其實也不能保證100%訊息不丟失,所以往往在業務場景還會使用資料庫輔助記錄的方式,來保證訊息不丟失。但資料庫輔助記錄方式對相關效能以及使用用較大的影響,所以一般資料只需要進行上面三種方式處理,就能保證訊息基本不丟失。發生訊息丟失時我們配合紀錄檔進行相應的訊息恢復就可以。

資料庫輔助記錄:生產者傳送訊息時同步傳送一條訊息到資料庫中,消費者拿到訊息並完成業務處理後,從資料庫刪除對應的記錄。

七、訊息順序性問題

引言:為什麼要保證訊息的順序性?

比如現在我們有個賬號餘額為5,我們充值50元,購買一件20元的商品,但因訊息不能保證順序,導致先進行扣費處理,這樣就會導致我們購買失敗。

訊息順序性消費情況,尤其在高可用(叢集方式)下一定要考慮。

1.ActiveMQ

ActiveMQ因為預設是單queue 佇列,所以它模式就是保證訊息順序性消費的。

2.RabbitMQ

  • 將RabbitMQ拆分多個 queue,每個 queue 一個 consumer,保證訊息的順序性。
  • 一個 queue 但是對應一個 consumer,然後這個 consumer 內部用記憶體佇列做排隊,然後分發給底層不同的 worker 來處理。

3.RocketMQ

RocketMQ保證訊息順序性方法與Kafka大致相同。

  • 一個 topic,一個 queue,一個 consumer,內部單執行緒消費,單執行緒吞吐量太低,一般不會用這個。
  • 寫 N 個記憶體 queue,具有相同 key 的資料都到同一個記憶體 queue;然後對於 N 個執行緒,每個執行緒分別消費一個記憶體 queue 即可,這樣就能保證順序性。

4.Kafka

  • 一個 topic,一個 partition,一個 consumer,內部單執行緒消費,單執行緒吞吐量太低,一般不會用這個。
  • 寫 N 個記憶體 queue,具有相同 key 的資料都到同一個記憶體 queue;然後對於 N 個執行緒,每個執行緒分別消費一個記憶體 queue 即可,這樣就能保證順序性。

八、訊息積壓問題

引言:如何解決訊息佇列的延時以及過期失效問題?訊息佇列滿了以後該怎麼處理?有幾百萬訊息持續積壓幾小時,怎麼處理?

其實訊息積壓的問題,一般都是由消費端出了問題導致的,在實際業務場景中一般不會出現,但是出現問題一般都是大問題。

模擬場景:

一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鐘就是 18 萬條。由於消費者宕機導致現在MQ中積壓幾百萬資料

解決思路:

  • 先修復 consumer 的問題,確保其恢復消費速度,然後將現有 consumer 都停掉(避免重複消費)。
  • 新建一個 topic,partition 是原來的 10 倍,臨時建立好原先 10 倍的 queue 數量。
  • 然後寫一個臨時的分發資料的 consumer 程式,這個程式部署上去消費積壓的資料,消費之後不做耗時的處理,直接均勻輪詢寫入臨時建立好的 10 倍數量的 queue。
  • 接著臨時徵用 10 倍的機器來部署 consumer,每一批 consumer 消費一個臨時 queue 的資料。這種做法相當於是臨時將 queue 資源和 consumer 資源擴大 10 倍,以正常的 10 倍速度來消費資料。
  • 等快速消費完積壓資料之後,得恢復原先部署的架構,重新用原先的 consumer 機器來消費訊息。

mq 中的訊息過期失效了

假設你用的是 RabbitMQ,RabbtiMQ 是可以設定過期時間的,也就是 TTL。如果訊息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉,這個資料就沒了。

假設 1 萬個訂單積壓在 mq 裡面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程式把那 1000 個訂單給查出來,手動發到 mq 裡去再補一次。

mq 都快寫滿了

如果訊息積壓在 mq 裡,長時間都沒有處理掉,此時導致 mq 都快寫滿了,咋辦?

這種情況下只能是通過增加臨時Consumer將資料進行快速消費,等MQ恢復正常後再補充資料。

RocketMQ方案

對於 RocketMQ,官方針對訊息積壓問題,提供瞭解決方案。

  1. 提高消費並行度
    絕大部分訊息消費行為都屬於 IO 密集型,即可能是運算元據庫,或者呼叫 RPC,這類消費行為的消費速度在於後端資料庫或者外系統的吞吐量,通過增加消費並行度,可以提高總的消費吞吐量,但是並行度增加到一定程度,反而會下降。所以,應用必須要設定合理的並行度。 如下有幾種修改消費並行度的方法:

​ 同一個 ConsumerGroup 下,通過增加 Consumer 範例數量來提高並行度(需要注意的是超過訂閱佇列數的 Consumer 範例無效)。可以通過加機器,或者 在已有機器啟動多個程序的方式。 提高單個 Consumer 的消費並行執行緒,通過修改引數 consumeThreadMin、consumeThreadMax 實現。

  1. 批次方式消費
    某些業務流程如果支援批次方式消費,則可以很大程度上提高消費吞吐量,例如訂單扣款類應用,一次處理一個訂單耗時 1 s,一次處理 10 個訂單可能也只耗時 2 s,這樣即可大幅度提高消費的吞吐量,通過設定 consumer 的 consumeMessageBatchMaxSize 返個引數,預設是 1,即一次只消費一條訊息,例如設定為 N,那麼每次消費的訊息數小於等於 N。

  2. 跳過非重要訊息
    發生訊息堆積時,如果消費速度一直追不上傳送速度,如果業務對資料要求不高的話,可以選擇丟棄不重要的訊息。例如,當某個佇列的訊息數堆積到 100000 條以上,則嘗試丟棄部分或全部訊息,這樣就可以快速追上傳送訊息的速度。範例程式碼如下:

  3. 優化每條訊息消費過程
    舉例如下,某條訊息的消費過程如下:

九、自我實現訊息佇列思路

引言:如果讓你寫一個訊息佇列,該如何進行架構設計?

比如說訊息佇列系統,我們從以下幾個角度來考慮一下:

  • 可延伸性:就是需要的時候快速擴容,就可以增加吞吐量和容量。可以參考afka 的設計理念,broker -> topic -> partition,每個 partition 放一個機器,就存一部分資料。
  • 持久化:為了保證MQ的訊息不丟失,設計時一定要考慮訊息的持久化機制,且持久化要順序寫,這樣就沒有磁碟隨機讀寫的定址開銷,磁碟順序讀寫的效能是很高的,這就是 kafka 的思路。
  • 高可用:保證MQ的可靠性,可以參考kafka 。多副本 -> leader & follower -> broker 掛了重新選舉 leader 即可對外服務。
  • 能不能支援資料 0 丟失啊?可以的,參考我們之前說的那個 kafka 資料零丟失方案。

十、MQ總結

其實MQ的使用,無非就是從原理,高可用,重複訊息,順序讀寫,資料丟失幾個方面開展。

上述的介紹是偏重思路方面來進行展開的,至於具體的MQ使用細節,我想你有了對應的思路去查會有一大堆。這也是我學習技術的一個思路,先掌握一個大的方向,然後沿著一個大的方向再進行相應的詳細學習。

最後,上述MQ介紹中,大部分都是有我平時開發積累所得,也有一部分是藉助網路現場學習。

如有不足或錯誤,歡迎大家指出,我們共同學習進步!