ActiveMQ、RabbitMQ、RocketMQ、Kafka四種訊息中介軟體分析介紹
我們從四種訊息中介軟體的介紹到基本使用,以及高可用,訊息重複性,訊息丟失,訊息順序效能方面進行分析介紹!
訊息中介軟體的使用場景總結就是六個字:解耦、非同步、削峰
如果我方系統A要與三方B系統進行資料對接,推播系統人員資訊,通常我們會使用介面開發來進行。但是如果運維期間B系統進行了調整,或者推播過程中B系統網路進行了調整,又或者後續過程中我們需要推播資訊到三方C系統中,這樣的話就需要我們進行頻繁的介面開發調整,還需要考慮介面推播訊息失敗的場景。
如果我們使用訊息中介軟體進行訊息推播,我們只需要按照一種約定的資料結構進行資料推播,其他三方系統從訊息中介軟體取值消費就可以,即便是三方系統出現宕機或者其他調整,我們都可以正常進行資料推播。
總結:通過一個 MQ,Pub/Sub 釋出訂閱訊息這麼一個模型,A 系統就跟其它系統徹底解耦了。
繼續我們上述的訊息推播業務,如果我們現在需要同時推播訊息到BCD三個系統中,而BCD系統接收到訊息後需要進行復雜的邏輯處理,以及讀庫寫庫處理。如果一個三方系統進行訊息處理需要1s,那我們遍歷推播完一次訊息,就需要三秒。
而如果我們使用訊息中介軟體,我們只需要推播到訊息中介軟體,然後進行介面返回,可能只需要20ms,大大提升了使用者體驗。訊息推播後BCD系統各自進行訊息消費即可,不需要我們等待。
還是上述我們的應用場景,假設某一時間段內,每秒都有一條訊息推播,如果我們使用介面進行推播,BCD三個系統處理完就需要三秒。這樣會導致使用者前端體驗較差,而且系統後臺一直處於阻塞狀態,後續的訊息推播介面一直在等待。
如果我們使用訊息中介軟體,我們只需要將訊息推播至訊息中介軟體中,BCD系統對積壓的訊息進行相應的處理。
在上述高頻發的訊息時間段內,會在訊息中間中產生訊息積壓,BCD系統在上述時間段外對積壓訊息進行相應的處理即可。
訊息中介軟體的優點其實就是他的使用場景。
訊息中介軟體的缺點與優點也是相輔相成的,主要有以下三個
系統關聯的中介軟體越多,越容易引發宕機問題。
如上述案例中的問題,原本進行訊息推播我們只需要開發介面進行推播即可,引入訊息中介軟體後就需要考慮訊息中介軟體的高可用問題,如果訊息中介軟體出現宕機問題,我們所有的訊息推播都會失敗。
上述案例中,如果我們使用介面進行訊息推播,我們只需要考慮介面超時以及介面推播訊息失敗的問題。但我們引入訊息中介軟體後,就需要考慮訊息中介軟體的維護,以及訊息重複消費,訊息丟失的問題。
上述案例中,如果我們使用介面進行訊息推播,推播訊息我們可以放在事務中處理,如果推播過程中出現異常,我們可以進行資料回滾,但我們引入訊息中介軟體後,就需要考慮訊息推播後,消費失敗的問題,以及如果我們同時推播訊息到BCD系統中,如何保證他們的事務一致性。
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
單機吞吐量 | 萬級,比 RocketMQ、Kafka 低一個數量級 | 同 ActiveMQ | 10 萬級,支撐高吞吐 | 10 萬級,高吞吐,一般配合巨量資料類的系統來進行實時資料計算、紀錄檔採集等場景 |
topic 數量對吞吐量的影響 | topic 可以達到幾百/幾千的級別,吞吐量會有較小幅度的下降,這是 RocketMQ 的一大優勢,在同等機器下,可以支撐大量的 topic | topic 從幾十到幾百個時候,吞吐量會大幅度下降,在同等機器下,Kafka 儘量保證 topic 數量不要過多,如果要支撐大規模的 topic,需要增加更多的機器資源 | ||
時效性 | ms 級 | 微秒級,這是 RabbitMQ 的一大特點,延遲最低 | ms 級 | 延遲在 ms 級以內 |
可用性 | 高,基於主從架構實現高可用 | 同 ActiveMQ | 非常高,分散式架構 | 非常高,分散式,一個資料多個副本,少數機器宕機,不會丟失資料,不會導致不可用 |
訊息可靠性 | 有較低的概率丟失資料 | 基本不丟 | 經過引數優化設定,可以做到 0 丟失 | 同 RocketMQ |
功能支援 | MQ 領域的功能極其完備 | 基於 erlang 開發,並行能力很強,效能極好,延時很低 | MQ 功能較為完善,還是分散式的,擴充套件性好 | 功能較為簡單,主要支援簡單的 MQ 功能,在巨量資料領域的實時計算以及紀錄檔採集被大規模使用 |
其他 | Apache軟體基金會開發、起步較早,但沒有經過大量吞吐場景驗證,目前社群不是很活躍 | 開源,穩定,社群活躍度高 | 阿里出品,目前已交給Apache,但社群活躍度較低 | Apache軟體基金會開發、開源、高通吐量,社群活躍度高 |
Activemq 是一種開源的,實現了JMS1.1規範的,訊息導向(MOM)的中介軟體,為應用程式提供高效的、可延伸的、穩定的和安全的企業級訊息通訊。
Activemq 的作用就是系統之間進行通訊,原理就是生產者生產訊息, 把訊息傳送給activemq, Activemq 接收到訊息, 然後檢視有多少個消費者,
然後把訊息轉發給消費者, 此過程中生產者無需參與。 消費者接收到訊息後做相應的處理和生產者沒有任何關係。
釋出/訂閱方式用於多接收使用者端的方式,作為釋出訂閱的方式,可能存在多個接收使用者端,並且接收端使用者端與傳送使用者端存在時間上的依賴。一個接收端只能接收他建立以後傳送使用者端傳送的資訊。
p2p的過程則理解起來比較簡單。它好比是兩個人打電話,這兩個人是獨享這一條通訊鏈路的。一方傳送訊息,另外一方接收,就這麼簡單。在實際應用中因為有多個使用者對使用p2p的鏈路,相互通訊的雙方是通過一個類似於佇列的方式來進行交流。和前面pub-sub的區別在於一個topic有一個傳送者和多個接收者,而在p2p裡一個queue只有一個傳送者和一個接收者。
JDBC: 持久化到資料庫
AMQ :紀錄檔檔案(已基本不用)
KahaDB : AMQ基礎上改進,預設選擇
LevelDB :谷歌K/V資料庫
在activemq.xml中檢視預設的broker持久化機制。
(1)AUTO_ACKNOWLEDGE = 1 自動確認
(2)CLIENT_ACKNOWLEDGE = 2 使用者端手動確認
(3)DUPS_OK_ACKNOWLEDGE = 3 自動批次確認
(4)SESSION_TRANSACTED = 0 事務提交併確認
(5)INDIVIDUAL_ACKNOWLEDGE = 4 單條訊息確認
前四種是JMS API中提供的使用者端ACK_MODE。第五種是InforSuiteMQ自定義補充的一種ACK_MODE。
RabbitMQ是一個由erlang語言編寫的、開源的、在AMQP基礎上完整的、可複用的企業訊息系統。
基本概念
關鍵名稱 | 說明 |
---|---|
Channel(通道) | 訊息推播使用的通道 |
Producer(訊息的生產者) | 向訊息佇列釋出訊息的使用者端應用程式 |
Consumer(訊息的消費者) | 從訊息佇列取得訊息的使用者端應用程式 |
Message(訊息) | 訊息由訊息頭和訊息體組成 |
Routing Key(路由鍵) | 訊息頭的一個屬性,用於標記訊息的路由規則,決定了交換機的轉發路徑 |
Queue(訊息佇列) | 用於儲存生產者的訊息 |
Exchange(交換器路由器) | 提供Producer到Queue之間的匹配 |
Binding(繫結) | 用於建立Exchange和Queue之間的關聯 |
Binding Key(繫結鍵) | Exchange與Queue的繫結關係,用於匹配Routing Key |
Broker(服務主體) | RabbitMQ Server,伺服器實體 |
最簡單的工作佇列,其中一個訊息生產者,一個訊息消費者,一個佇列。也稱為對等模式
一個訊息生產者,一個交換器,一個訊息佇列,多個消費者。同樣也稱為對等模式
Pulish/Subscribe,無選擇接收訊息,一個訊息生產者,一個交換機(交換機型別為fanout),多個訊息佇列,多個消費者
生產者只需把訊息傳送到交換機,繫結這個交換機的佇列都會獲得一份一樣的資料。
在釋出/訂閱模式的基礎上,有選擇的接收訊息,也就是通過 routing 路由進行匹配條件是否滿足接收訊息。
topics(主題)模式跟routing路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似於SQL中 = 和 like 的關係。
與上面其他5種所不同之處,該模式是擁有請求/回覆的。也就是有響應的,上面5種都沒有。
RPC是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的處理業務,處理完後然後在A伺服器繼續執行下去,把非同步的訊息以同步的方式執行。
Queue(訊息佇列)的持久化是通過durable=true來實現的。
Message(訊息)的持久化 ,通過設定訊息是持久化的標識。
Exchange(交換機)的持久化 。
confirm機制:確認訊息是否成功傳送到Exchange
ack機制:確認訊息是否被消費者成功消費
RocketMQ是阿里開發的一款純java、分散式、佇列模型的開源訊息中介軟體,支援事務訊息、順序訊息、批次訊息、定時訊息、訊息回溯等。
基本概念
關鍵名稱 | 說明 |
---|---|
Producer | 訊息生產者 |
Producer Group | 生產者組 |
Consumer | 訊息消費者 |
Consumer Group | 消費者組 |
Topic | Topic用於將訊息按主題做劃分,Producer將訊息發往指定的Topic,Consumer訂閱該Topic就可以收到這條訊息 |
Message | 代表一條訊息 |
Tag | 標籤可以被認為是對 Topic 進一步細化 |
Broker | 負責接收並儲存訊息 |
Queue | Topic和Queue是1對多的關係,一個Topic下可以包含多個Queue,主要用於負載均衡 |
Offset | RocketMQ在儲存訊息時會為每個Topic下的每個Queue生成一個訊息的索引檔案,每個Queue都對應一個Offset記錄當前Queue中訊息條數。 |
NameServer | NameServer可以看作是RocketMQ的註冊中心 |
RocketMQ訊息訂閱有兩種模式
一種是Push模式(MQPushConsumer),即MQServer主動向消費端推播
另外一種是Pull模式(MQPullConsumer),即消費端在需要時,主動到MQ Server拉取
但在具體實現時,Push和Pull模式本質都是採用消費端主動拉取的方式,即consumer輪詢從broker拉取訊息
叢集模式和廣播模式
叢集模式:預設情況下我們都是使用的叢集模式,也就是說消費者組收到訊息後,只有其中的一臺機器會接收到訊息。
廣播模式:消費者組內的每臺機器都會收到這條訊息。
exchange持久化、queue持久化、message持久化
CommitLog:紀錄檔資料檔案,儲存訊息內容,所有 queue 共用,不區分 topic ,順序讀寫 ,1G 一個檔案
ConsumeQueue:邏輯 Queue,基於 topic 的 CommitLog 的索引檔案,訊息先到達 commitLog,然後非同步轉發到 consumeQueue,包含 queue 在 commitLog 中的物理偏移量 offset,訊息實體內容大小和 Message Tag 的 hash 值,大於 600W 個位元組,寫滿之後重新生成,順序寫
IndexFile:基於 Key 或 時間區間的 CommitLog 的索引檔案,檔名以建立的時間戳命名,固定的單個 indexFile 大小為 400M,可以儲存 2000W 個索引
confirm機制:確認訊息是否成功傳送到Exchange
ack機制:確認訊息是否被消費者成功消費
Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一個分散式、分割區的、多副本的、多訂閱者,基於zookeeper協調的分散式紀錄檔系統,可作為訊息中介軟體
基本概念
關鍵名稱 | 說明 |
---|---|
producer | 生產者 |
consumer | 消費者 |
consumer group | 消費者組 |
broker | 一臺kafka伺服器就是一個broker,一個叢集由多個broker組成,一個broker可以容納多個topic |
topic | 一個訊息佇列,生產者和消費者都是面對一個Topic |
partition | 每個partition時一個有序佇列,partition是topic中儲存資料和消費資料所使用的佇列所在 |
replica | 副本,為了保證當前某個節點發生故障時,當前節點上的資料不會發生丟失 |
leader | 每個分割區多個副本的「主」,生產者生產資料的物件,以及消費組消費者消費的物件 |
follower | 每個分割區多個副本的「從」,實時從leader資料的同步 |
生產者傳送模式
1.發後即忘(fire-and-forget):只管往Kafka中傳送訊息而並不關心訊息是否正確到達
2.同步(sync):一般是在send()方法裡指定一個Callback的回撥函數,Kafka在返回響應時呼叫該函數來實現非同步的傳送確認。
3.非同步(async):send()方法會返回Futrue物件,通過呼叫Futrue物件的get()方法,等待直到結果返回
消費者消費模式
1.At-most-once(最多一次):在每一條訊息commit成功之後,再進行消費處理;設定自動提交為false,接收到訊息之後,首先commit,然後再進行消費。
2.At-least-once(最少一次):在每一條訊息處理成功之後,再進行commit;設定自動提交為false;訊息處理成功之後,手動進行commit。
3.Exactly-once(正好一次):將offset作為唯一id與訊息同時處理,並且保證處理的原子性;設定自動提交為false;訊息處理成功之後再提交。
Kafka直接將資料寫入到紀錄檔檔案中,以追加的形式寫入
confirm機制:確認訊息是否成功傳送
ack機制:確認訊息是否被消費者成功消費
引言:系統應用MQ作為訊息中介軟體後,會導致系統可用性降低。所以只要你用了 MQ,高可用肯定是要考慮到的
ActiveMQ的部署方式有三種,分別為:單節點部署(不支援高可用),Master-Slave部署方式(主從模式),Broker-Cluster部署方式(負載均衡)
單節點部署方式因為不支援高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。
主要是通過共用儲存目錄來實現master和slave的熱備,所有的ActiveMQ應用都在不斷地獲取共用目錄的控制權,哪個應用搶到了控制權,它就成為master。
多個共用儲存目錄的應用,誰先啟動,誰就可以最早取得共用目錄的控制權成為master,其他的應用就只能作為slave。
與shared filesystem方式類似,只是共用的儲存媒介由檔案系統改成了資料庫而已。
這種主備方式是ActiveMQ5.9以後才新增的特性,使用ZooKeeper協調選擇一個node作為master。被選擇的master broker node開啟並接受使用者端連線。
其他node轉入slave模式,連線master並同步他們的儲存狀態。slave不接受使用者端連線。所有的儲存操作都將被複制到連線至Master的slaves。
如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網路中並連線master進入slave mode。所有需要同步的disk的訊息操作都將等待儲存狀態被複制到其他法定節點的操作完成才能完成。
當一個新的master被選中,你需要至少保障一個法定node線上以能夠找到擁有最新狀態的node。這個node將會成為新的master。因此,推薦執行至少3個replica nodes,以防止一個node失敗了,服務中斷。
前面的Master-Slave的方式雖然能解決多服務熱備的高可用問題,但無法解決負載均衡和分散式的問題。Broker-Cluster的部署方式就可以解決負載均衡的問題。
Broker-Cluster部署方式中,各個broker通過網路互相連線,並共用queue。當broker-A上面指定的queue-A中接收到一個message處於pending狀態,而此時沒有consumer連線broker-A時。如果cluster中的broker-B上面由一個consumer在消費queue-A的訊息,那麼broker-B會先通過內部網路獲取到broker-A上面的message,並通知自己的consumer來消費。
在activemq.xml檔案中靜態指定Broker需要建立橋連線的其他Broker
在activemq.xml檔案中不直接指定Broker需要建立橋連線的其他Broker,由activemq在啟動後動態查詢
可以看到Master-Slave的部署方式雖然解決了高可用的問題,但不支援負載均衡,
Broker-Cluster解決了負載均衡,但當其中一個Broker突然宕掉的話,那麼存在於該Broker上處於Pending狀態的message將會丟失,無法達到高可用的目的。
Master-Slave與Broker-Cluster相結合的部署方式是目前ActiveMQ比較推薦的部署方案。
RabbitMQ的部署方式有三種,分別為:單機模式(不支援高可用),普通叢集模式(不支援高可用),映象叢集模式(支援高可用)
單節點部署方式因為不支援高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。
普通叢集模式,意思就是在多臺機器上啟動多個 RabbitMQ 範例,每個機器啟動一個。你建立的 queue,只會放在一個 RabbitMQ 範例上,但是每個範例都同步 queue 的後設資料(後設資料可以認為是 queue 的一些設定資訊,通過後設資料,可以找到 queue 所在範例)。你消費的時候,實際上如果連線到了另外一個範例,那麼那個範例會從 queue 所在範例上拉取資料過來。
這種方式確實很麻煩,也不怎麼好,沒做到所謂的分散式,就是個普通叢集。因為這導致你要麼消費者每次隨機連線一個範例然後拉取資料,要麼固定連線那個 queue 所在範例消費資料,前者有資料拉取的開銷,後者導致單範例效能瓶頸。
而且如果那個放 queue 的範例宕機了,會導致接下來其他範例就無法從那個範例拉取,如果你開啟了訊息持久化,讓 RabbitMQ 落地儲存訊息的話,訊息不一定會丟,得等這個範例恢復了,然後才可以繼續從這個 queue 拉取資料。
這種模式,才是所謂的 RabbitMQ 的高可用模式。跟普通叢集模式不一樣的是,在映象叢集模式下,你建立的 queue,無論後設資料還是 queue 裡的訊息都會存在於多個範例上,就是說,每個 RabbitMQ 節點都有這個 queue 的一個完整映象,包含 queue 的全部資料的意思。然後每次你寫訊息到 queue 的時候,都會自動把訊息同步到多個範例的 queue 上。
RocketMQ的部署方式有兩種,分別為:單節點模式(不支援高可用),多節點模式
單節點部署方式因為不支援高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。
一個叢集無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master
設定簡單,單個Master 宕機或重啟維護對應用無影響。
單臺機器宕機期間,這臺機器上未被消費的訊息在機器恢復之前不可訂閱,訊息實時性會受到受到影響。
每個 Master 設定一個 Slave,有多對Master-Slave, HA,採用非同步複製方式,主備有短暫訊息延遲,毫秒級。
即使磁碟損壞,訊息丟失的非常少,且訊息實時性不會受影響,因為Master 宕機後,消費者仍然可以從 Slave消費,此過程對應用透明。不需要人工干預。效能同多 Master 模式幾乎一樣。
Master 宕機,磁碟損壞情況,會丟失少量訊息。
每個 Master 設定一個 Slave,有多對Master-Slave, HA採用同步雙寫方式,主備都寫成功,嚮應用返回成功。
資料與服務都無單點, Master宕機情況下,訊息無延遲,服務可用性與資料可用性都非常高
效能比非同步複製模式略低,大約低 10%左右,傳送單個訊息的 RT會略高。
Kafka的部署方式有三種,分別為:單broke節點(不支援高可用),單機多broker模式(支援高可用),多機多broker模式(支援高可用)
單節點部署方式因為不支援高可用,只會在開發或者測試環境下用到,且單節點部署方式較簡單,不進行詳述。
這種部署方式其實是一種偽叢集模式,單機部署多節點如果出現伺服器宕機,那麼所有節點都不能正常提供服務。
Kafka 0.8 以前,是沒有 HA 機制的,就是任何一個 broker 宕機了,那個 broker 上的 partition 就廢了,沒法寫也沒法讀,沒有什麼高可用性可言。
Kafka 0.8 以後,提供了 HA 機制,就是 replica(複製品) 副本機制。每個 partition 的資料都會同步到其它機器上,形成自己的多個 replica 副本。所有 replica 會選舉一個 leader 出來,那麼生產和消費都跟這個 leader 打交道,然後其他 replica 就是 follower。寫的時候,leader 會負責把資料同步到所有 follower 上去,讀的時候就直接讀 leader 上的資料即可。只能讀寫 leader?很簡單,要是你可以隨意讀寫每個 follower,那麼就要 care 資料一致性的問題,系統複雜度太高,很容易出問題。Kafka 會均勻地將一個 partition 的所有 replica 分佈在不同的機器上,這樣才可以提高容錯性。
引言:為什麼要考慮重複消費的問題?比如我們消費後通過消費中介軟體來呼叫,扣費10元,但是消費者消費訊息後還沒來得及進行確認,訊息中介軟體進行了重啟,那麼訊息者就會進行再次扣費處理,這樣就會出問題!
ActiveMQ、RabbitMQ、RocketMQ、Kafka,都有可能會出現訊息重複消費的問題,正常。因為這問題通常不是 MQ 自己保證的,是由我們開發來保證的。
我們以Kafka為例說明一下重複消費的問題:
Kafka 實際上有個 offset 的概念,就是每個訊息寫進去,都有一個 offset,代表訊息的序號,然後 consumer 消費了資料之後,每隔一段時間(定時定期),會把自己消費過的訊息的 offset 提交一下,表示「我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的 offset 來繼續消費吧」。
但是,如果在這期間重啟系統或者直接 kill 程序了,再重啟。這會導致 consumer 有些訊息處理了,但是沒來得及提交 offset。重啟之後,少數訊息會再次消費一次。
如果消費者乾的事兒是拿一條資料就往資料庫裡寫一條,會導致說,你可能就把資料在資料庫裡插入了 2 次,那麼資料就錯啦。
重複消費問題引發後,我們就需要考慮怎麼保證冪等性。
冪等性,通俗點說,就一個資料,或者一個請求,給你重複來多次,你得確保對應的資料是不會改變的,不能出錯。
保證冪等性的具體實現方式需要結合對應的業務去實現,這裡提供幾個思路:
引言:MQ 有個基本原則,就是資料不能多一條,也不能少一條。
不能多,就是上面說的重複消費和冪等性問題。
不能少,就是說這資料別搞丟了。那這個問題你必須得考慮一下。
訊息丟失的問題需要從生產者、MQ、消費者三個方面來進行考慮,相應的解決方案也需要從這三方面出發(生產者確認機制,MQ訊息持久化、消費者確認機制)。
生產者丟失訊息的問題可以通過訊息重投、重試機制來解決
ActiveMQ丟失訊息的問題需要通過ActiveMQ訊息持久化機制+高可用(見ActiveMQ章節)來解決,ActiveMQ的訊息持久化機制有以下幾種
JDBC: 持久化到資料庫
AMQ :紀錄檔檔案(已基本不用)
KahaDB : AMQ基礎上改進,預設選擇
LevelDB :谷歌K/V資料庫
在activemq.xml中檢視預設的broker持久化機制。
消費者丟失訊息通過ack機制來解決,訊息者進行業務處理後,再進行ack確認,避免訊息丟失。
生產者訊息丟失,通過confirm機制來確認訊息傳送,然後進行相應的訊息重投、重試機制
RabbitMQ丟失訊息的問題需要通過RabbitMQ訊息持久化機制+高可用(見RabbitMQ章節)來解決,
RabbitMQ持久化包含:
Queue(訊息佇列)的持久化是通過durable=true來實現的。
Message(訊息)的持久化 ,通過設定訊息是持久化的標識。
Exchange(交換機)的持久化 。
消費者丟失訊息通過ack機制來解決,訊息者進行業務處理後,再進行ack確認,避免訊息丟失。
生產者訊息丟失,通過confirm機制來確認訊息傳送,然後進行相應的訊息重投、重試機制
RocketMQ丟失訊息的問題需要通過RocketMQ訊息持久化機制+高可用(見RocketMQ章節)來解決,
RocketMQ持久化包含:exchange持久化、queue持久化、message持久化
消費者丟失訊息通過ack機制來解決,訊息者進行業務處理後,再進行ack確認,避免訊息丟失。
生產者訊息丟失,通過confirm機制來確認訊息傳送,然後進行相應的訊息重投、重試機制
Kafka直接將資料寫入到紀錄檔檔案中,以追加的形式寫入
消費者丟失訊息通過ack機制來解決,訊息者進行業務處理後,再進行ack確認,避免訊息丟失。
總結:其實MQ訊息丟失,無非就是生產者傳送時丟失,MQ傳遞時丟失,消費者消費時丟失幾種問題,我們相應的從以上三方面解決就可以,但是上述三種方式使用後,其實也不能保證100%訊息不丟失,所以往往在業務場景還會使用資料庫輔助記錄的方式,來保證訊息不丟失。但資料庫輔助記錄方式對相關效能以及使用用較大的影響,所以一般資料只需要進行上面三種方式處理,就能保證訊息基本不丟失。發生訊息丟失時我們配合紀錄檔進行相應的訊息恢復就可以。
資料庫輔助記錄:生產者傳送訊息時同步傳送一條訊息到資料庫中,消費者拿到訊息並完成業務處理後,從資料庫刪除對應的記錄。
引言:為什麼要保證訊息的順序性?
比如現在我們有個賬號餘額為5,我們充值50元,購買一件20元的商品,但因訊息不能保證順序,導致先進行扣費處理,這樣就會導致我們購買失敗。
訊息順序性消費情況,尤其在高可用(叢集方式)下一定要考慮。
ActiveMQ因為預設是單queue 佇列,所以它模式就是保證訊息順序性消費的。
RocketMQ保證訊息順序性方法與Kafka大致相同。
引言:如何解決訊息佇列的延時以及過期失效問題?訊息佇列滿了以後該怎麼處理?有幾百萬訊息持續積壓幾小時,怎麼處理?
其實訊息積壓的問題,一般都是由消費端出了問題導致的,在實際業務場景中一般不會出現,但是出現問題一般都是大問題。
模擬場景:
一個消費者一秒是 1000 條,一秒 3 個消費者是 3000 條,一分鐘就是 18 萬條。由於消費者宕機導致現在MQ中積壓幾百萬資料
解決思路:
mq 中的訊息過期失效了
假設你用的是 RabbitMQ,RabbtiMQ 是可以設定過期時間的,也就是 TTL。如果訊息在 queue 中積壓超過一定的時間就會被 RabbitMQ 給清理掉,這個資料就沒了。
假設 1 萬個訂單積壓在 mq 裡面,沒有處理,其中 1000 個訂單都丟了,你只能手動寫程式把那 1000 個訂單給查出來,手動發到 mq 裡去再補一次。
mq 都快寫滿了
如果訊息積壓在 mq 裡,長時間都沒有處理掉,此時導致 mq 都快寫滿了,咋辦?
這種情況下只能是通過增加臨時Consumer將資料進行快速消費,等MQ恢復正常後再補充資料。
RocketMQ方案
對於 RocketMQ,官方針對訊息積壓問題,提供瞭解決方案。
同一個 ConsumerGroup 下,通過增加 Consumer 範例數量來提高並行度(需要注意的是超過訂閱佇列數的 Consumer 範例無效)。可以通過加機器,或者 在已有機器啟動多個程序的方式。 提高單個 Consumer 的消費並行執行緒,通過修改引數 consumeThreadMin、consumeThreadMax 實現。
批次方式消費
某些業務流程如果支援批次方式消費,則可以很大程度上提高消費吞吐量,例如訂單扣款類應用,一次處理一個訂單耗時 1 s,一次處理 10 個訂單可能也只耗時 2 s,這樣即可大幅度提高消費的吞吐量,通過設定 consumer 的 consumeMessageBatchMaxSize 返個引數,預設是 1,即一次只消費一條訊息,例如設定為 N,那麼每次消費的訊息數小於等於 N。
跳過非重要訊息
發生訊息堆積時,如果消費速度一直追不上傳送速度,如果業務對資料要求不高的話,可以選擇丟棄不重要的訊息。例如,當某個佇列的訊息數堆積到 100000 條以上,則嘗試丟棄部分或全部訊息,這樣就可以快速追上傳送訊息的速度。範例程式碼如下:
優化每條訊息消費過程
舉例如下,某條訊息的消費過程如下:
引言:如果讓你寫一個訊息佇列,該如何進行架構設計?
比如說訊息佇列系統,我們從以下幾個角度來考慮一下:
其實MQ的使用,無非就是從原理,高可用,重複訊息,順序讀寫,資料丟失幾個方面開展。
上述的介紹是偏重思路方面來進行展開的,至於具體的MQ使用細節,我想你有了對應的思路去查會有一大堆。這也是我學習技術的一個思路,先掌握一個大的方向,然後沿著一個大的方向再進行相應的詳細學習。
最後,上述MQ介紹中,大部分都是有我平時開發積累所得,也有一部分是藉助網路現場學習。
如有不足或錯誤,歡迎大家指出,我們共同學習進步!