RocketMQ是一個純Java、分散式、佇列模型的開源訊息中介軟體,前身是MetaQ,是阿里參考Kafka特點研發的一個佇列模型的訊息中介軟體(RocketMQ是阿里開源其自研的第三代分散式訊息中介軟體),後開源給apache基金會成為了apache的頂級開源專案,具有高效能、高可靠、高實時、分散式特點。
RocketMQ英文直譯:Rocket火箭、MQ message queue 訊息佇列
Apache基金會中的342個專案中,暫時還只有Kylin、CarbonData、Eagle 、Dubbo和 RocketMQ 共計五個中國技術人主導的專案,我們比較熟悉的是Dubbo和RocketMQ,都是阿里的,難怪國內這麼多公司舔阿里、
RocketMQ可以實用於電商領域,金融領域,巨量資料領域,這些領域正好是阿里的專長。
阿里巴巴內部圍繞著RocketMQ核心打造了三款產品,分別是MetaQ、Notify和Aliware MQ,這三者分別採用了不同的模型,
MetaQ主要使用了拉模型,解決了順序訊息和海量堆積問題;
Notify主要使用了推模型,解決了事務訊息;
而云產品Aliware MQ則是提供了商業化的版本。
說到高並行,就是雙11洗禮,為了在高並行下保護資料庫,RocketMQ團隊重點做了兩件事情,優化慢請求與統一儲存引擎。
(1)優化慢請求:這裡主要是解決在海量高並行場景下降低慢請求對整個叢集帶來的抖動,毛刺問題。這是一個極具挑戰的技術活,團隊同學經過長達1個多月的跟進調優,從雙十一的覆盤情況來看,99.996%的延遲落在了10ms以內,而99.6%的延遲在1ms以內。優化主要集中在RocketMQ儲存層演演算法優化、JVM與作業系統調優。更多的細節大家可以參考《萬億級資料洪峰下的分散式訊息引擎》。
(2)統一儲存引擎:主要解決的訊息引擎的高可用,成本問題。在多代訊息引擎共存的前提下,我們對Notify的儲存模組進行了全面移植與替換。
RocketMQ天生為金融網際網路領域而生,追求高可用、高並行、低延遲,是一個阿里巴巴由內而外成功孕育的典範,RocketMQ在阿里集團也被廣泛應用在訂單,交易,充值,流計算,訊息推播,紀錄檔流式處理,binglog分發等場景。
RocketMQ優點:
單機吞吐量:十萬級
可用性:非常高,分散式架構
訊息可靠性:經過引數優化設定,訊息可以做到0丟失
功能支援:MQ功能較為完善,還是分散式的,擴充套件性好
支援10億級別的訊息堆積,不會因為堆積導致效能下降
原始碼是java,我們可以自己閱讀原始碼,客製化自己公司的MQ,可以掌控
天生為金融網際網路領域而生,對於可靠性要求很高的場景,尤其是電商裡面的訂單扣款,以及業務削峰,在大量交易湧入時,後端可能無法及時處理的情況
RoketMQ在穩定性上可能更值得信賴,這些業務場景在阿里雙11已經經歷了多次考驗,如果你的業務有上述並行場景,建議可以選擇RocketMQ
RocketMQ缺點:
支援的使用者端語言不多,目前是java及c++,其中c++不成熟
社群活躍度不是特別活躍那種
沒有在 mq 核心中去實現JMS等介面,有些系統要遷移需要修改大量程式碼
RocketMQ的業務用途(重點):
釋出/訂閱訊息傳遞模型
財務級交易訊息
各種跨語言使用者端,例如Java,C / C ++,Python,Go
可插拔的傳輸協定,例如TCP,SSL,AIO
內建的訊息跟蹤功能,還支援開放式跟蹤
多功能的巨量資料和流生態系統整合
按時間或偏移量追溯訊息
可靠的FIFO和嚴格的有序訊息傳遞在同一佇列中
高效的推拉消費模型
單個佇列中的百萬級訊息累積容量
多種訊息傳遞協定,例如JMS和OpenMessaging
靈活的分散式橫向擴充套件部署架構
快如閃電的批次訊息交換系統
各種訊息過濾器機制,例如SQL和Tag
用於隔離測試和雲隔離群集的Docker映像
功能豐富的管理儀表板,用於設定,指標和監視
認證與授權
RocketMQ專案結構
GitHub地址:https://github.com/apache/rocketmq
RocketMQ核心模組(下載原始碼,idea開啟):
rocketmq-broker:接受生產者發來的訊息並儲存(通過呼叫rocketmq-store),消費者從這裡取得訊息。
rocketmq-client:提供傳送、接受訊息的使用者端API。
rocketmq-namesrv:NameServer,類似於Zookeeper,這裡儲存著訊息的TopicName,佇列等執行時的元資訊。
rocketmq-common:通用的一些類,方法,資料結構等。
rocketmq-remoting:基於Netty4的client/server + fastjson序列化 + 自定義二進位制協定。
rocketmq-store:訊息、索引儲存等。
rocketmq-filtersrv:訊息過濾器Server,需要注意的是,要實現這種過濾,需要上傳程式碼到MQ!(一般而言,我們利用Tag足以滿足大部分的過濾需求,如果更靈活更復雜的過濾需求,可以考慮filtersrv元件)。
rocketmq-tools:命令列工具。
他主要有四大核心組成部分:NameServer、Broker、Producer以及Consumer四部分。
RocketMQ的四個部分(NameServer、Broker、Producer以及Consumer)都是採用分散式叢集,這是他高並行(吞吐量大),高可用的原因之一,叢集的模式有多種,包括多master 模式、多master多slave非同步複製模式、多 master多slave同步雙寫模式。
這個模式很像Kafka,因為就是阿里參考Kafka特點研發的一個佇列模型的訊息中介軟體。
NameServer定義:主要負責對於源資料的管理,包括了對於Topic和路由資訊的管理。
NameServer的地位:NameServer在RocketMQ中是註冊中心,Zookeeper在Dubbo也是註冊中心
NameServer是一個功能齊全的伺服器,從功能上看,類似Dubbo中的Zookeeper,但NameServer與Zookeeper相比更輕量。主要是因為每個NameServer節點互相之間是獨立的,沒有任何資訊互動。
NameServer心跳
NameServer壓力不會太大,平時主要開銷是在維持心跳和提供Topic-Broker的關係資料。但是,每個Broker向NameServer發心跳時, 會帶上當前自己負責的所有Topic資訊,如果一個Broker中的Topic個數太多(萬級別),會導致一次心跳中,就Topic的資料就幾十M,網路情況差的話, 網路傳輸失敗,心跳失敗,導致NameServer誤認為Broker心跳失敗。
NameServer 是無狀態的,可以橫向擴充套件,成為一個註冊中心叢集
NameServer 被設計成幾乎無狀態的,可以橫向擴充套件,節點之間相互之間無通訊,通過部署多臺機器來標記自己是一個偽叢集。
每個 Broker 在啟動的時候會到 NameServer 註冊:
每個 Broker 在啟動的時候會到 NameServer 註冊,然後,各個Broker將資訊註冊到NameServer,然後Producer和Consumer就可以來NameServer取Broker的資訊了,即每個 Producer 在傳送訊息前會根據 Topic 到 NameServer 獲取到 Broker 的路由資訊,每個Consumer 也會定時獲取 Topic 的路由資訊。
NameServer作為RocketMQ的註冊中心,在互動邏輯上,和Dubbo中註冊中心的角色,幾乎一模一樣。
Producer定義:訊息生產者,負責產生訊息,一般由業務系統負責產生訊息。
Producer分散式部署
Producer由使用者進行分散式部署,訊息由Producer通過多種負載均衡模式傳送到Broker叢集,傳送低延時,支援快速失敗。
Producer擁有了三種方式傳送訊息:同步傳送、非同步傳送和單向傳送
同步雙向傳送定義:同步傳送指訊息傳送方Producer發出資料後,一定要收到接收方發回響應之後才發下一個封包。
同步雙向傳送特點:可靠性最好,耗時最長,注重可靠性高,適合發重要資訊。
同步雙向傳送用途:用於重要通知訊息,例如重要通知郵件、行銷簡訊。
非同步雙向傳送定義:非同步傳送指訊息傳送方Producer發出資料後,不等接收方發回響應,接著傳送下個封包。
非同步雙向傳送特點:可靠性稍差,耗時較長。
非同步雙向傳送用途:用於可能鏈路耗時較長而對響應時間敏感的業務場景,例如使用者視訊上傳後,訊息佇列傳送訊息,通知啟動轉碼服務。
單向傳送定義:單向傳送是指只負責傳送訊息而不等待伺服器迴應且沒有回撥函數觸發,單向,傳送方自己發自己的,不管接收方。
單向傳送特點:可靠性最差,但是耗時最短,注重耗時短。
單向傳送用途:適用於某些耗時非常短但對可靠性要求並不高的場景,例如紀錄檔收集。
Broker定義:訊息中轉角色,負責儲存訊息,轉發訊息。
Broker是具體提供業務的伺服器,單個Broker節點與所有的NameServer節點保持長連線及心跳,並會定時將Topic資訊註冊到NameServer,順帶一提底層的通訊和連線都是基於Netty實現的((1)netty基於NIO,網路通訊高效率,(2)阿里的兩種開源中介軟體,Dubbo和RocketMQ的結構相同,都需要網路通訊,網路通訊都是netty)。
Broker負責訊息儲存,以Topic為緯度支援輕量級的佇列,單機可以支撐上萬佇列規模,支援訊息推拉模型。官網上有資料顯示:具有上億級訊息堆積能力,同時可嚴格保證訊息的有序性。
Consumer定義:訊息消費者,負責消費訊息,一般是後臺系統負責非同步消費。
Consumer也由使用者部署,支援PUSH和PULL兩種型別的消費模式,支援叢集消費和廣播訊息,提供實時的訊息訂閱機制。
Pull:拉取型消費者(Pull Consumer)主動從訊息伺服器拉取資訊,只要批次拉取到訊息,使用者應用就會啟動消費過程,所以 Pull 稱為主動消費型。
Push:推播型消費者(Push Consumer)封裝了訊息的拉取、消費進度和其他的內部維護工作,將訊息到達時執行的回撥介面留給使用者應用程式來實現。所以 Push 稱為被動消費型別,但從實現上看還是從訊息伺服器中拉取訊息,不同於 Pull 的是 Push 首先要註冊消費監聽器,當監聽器處觸發後才開始消費訊息。
第一,Message 訊息
Message本質:就是訊息,就是要傳輸的資訊。
Message與Topic關係:一條訊息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。
Message與Tag的關係:一條訊息也可以擁有一個可選的標籤(Tag)和額處的鍵值對,它們可以用於設定一個業務 Key 並在 Broker 上查詢此訊息以便在開發期間查詢問題。
第二,Topic 主題
Topic定義:Topic 可以看做訊息的分類,它是訊息的第一級型別。比如一個電商系統可以分為:交易訊息、物流訊息等,
Topic與Message的關係:一條訊息必須有一個 Topic 。
Topic與Producer的關係:一個 Topic 可以有0個、1個、多個生產者向其傳送訊息,一個生產者也可以同時向不同的 Topic 傳送訊息。
Topic與Consumer的關係:一個 Topic 也可以被 0個、1個、多個消費者訂閱。
第三,Group 分組
Group定義:分為ProducerGroup,ConsumerGroup,代表某一類的生產者和消費者,一般來說同一個服務可以作為Group,同一個Group一般來說傳送和消費的訊息都是一樣的
Group與Topic的關係:一個組可以訂閱多個Topic。
第四,Queue 佇列 和 Message Queue 訊息佇列
4.1 Queue 佇列
Queue定義:在Kafka中叫Partition,每個Queue內部是有序的,在RocketMQ中分為讀和寫兩種佇列,一般來說讀寫佇列數量一致,如果不一致就會出現很多問題。
4.2 Message Queue 訊息佇列
Message Queue(訊息佇列),主題被劃分為一個或多個子主題,即子主題就是訊息佇列。
訊息佇列和Topic的關係:一個 Topic 下可以設定多個訊息佇列,傳送訊息時執行該訊息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有佇列將訊息發出去。
訊息佇列是訊息的物理管理單位:一個Topic下可以有多個Queue,Queue的引入使得訊息的儲存可以分散式叢集化,具有了水平擴充套件能力。
第五,Offset 偏移量
在RocketMQ 中,所有訊息佇列都是持久化,長度無限的資料結構,所謂長度無限是指佇列中的每個儲存單元都是定長,存取其中的儲存單元使用Offset 來存取,Offset 為 java long 型別,64 位,理論上在 100年內不會溢位,所以認為是長度無限。
也可以認為 Message Queue 是一個長度無限的陣列,Offset 就是下標。
第六,Tag 標籤
Tag定義:Tag 可以看作子分類/子主題,它是訊息的第二級型別,用於為使用者提供額外的靈活性。使用標籤,同一業務模組不同目的的訊息就可以用相同 Topic 而不同的 Tag 來標識。比如交易訊息又可以分為:交易建立訊息、交易完成訊息等,
Tag意義:標籤有助於保持您的程式碼乾淨和連貫,並且還可以為 RocketMQ 提供的查詢系統提供幫助。
Tag與Message的關係:一條訊息可以沒有 Tag 。
第七,訊息消費模式
訊息消費模式有兩種:Clustering(叢集消費)和Broadcasting(廣播消費)。
第一,叢集消費模式:預設情況下就是叢集消費,該模式下一個消費者叢集共同消費一個主題的多個佇列,一個佇列只會被一個消費者消費,如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費。
第二,廣播消費模式:廣播消費訊息會發給消費者組中的每一個消費者進行消費。
第八,Message Order
Message Order(訊息順序)有兩種:Orderly(順序消費)和Concurrently(並行消費)。
第一,順序消費:順序消費表示訊息消費的順序同生產者為每個訊息佇列傳送的順序一致,所以如果正在處理全域性順序是強制性的場景,需要確保使用的主題只有一個訊息佇列。
第二,並行消費:並行消費不再保證訊息順序,消費的最大並行數量受每個消費者使用者端指定的執行緒池限制。
面試問題:如何保證 訊息的可用性 ?刷盤 + 主從同步
訊息可用性1-記憶體中的訊息持久化到磁碟(同步刷盤+非同步刷盤):當我們選擇好了叢集模式之後,那麼我們需要關心的就是怎麼去儲存和複製這個資料,只有講broker記憶體中的訊息持久化到磁碟,才能保證broker宕機,訊息不丟失,RocketMQ對訊息的持久化到磁碟,提供了同步和非同步的策略來滿足我們的,
情況1:選擇同步刷盤,如果刷盤超時會給返回 刷盤超時 FLUSH_DISK_TIMEOUT,
情況2:選擇非同步刷盤,如果是非同步刷盤不會返回刷盤成功與否的任何資訊,
所以,選擇同步刷盤可以盡最大程度保證刷盤的時候訊息不會丟失。
訊息可用性2-主從partition複製(同步複製+非同步複製):RocketMQ的主從同步提供了同步複製和非同步複製兩種模式,當然選擇同步複製可以提升可用性,但是訊息的傳送RT時間會下降10%左右。
RockteMQ刷盤
RocketMQ刷盤的最終實現都是使用NIO中的 MappedByteBuffer.force() 將對映區的資料寫入到磁碟,
如果是同步刷盤,在Broker把訊息寫到CommitLog對映區後,就會等待寫入完成。
如果是非同步刷盤,只是喚醒對應的執行緒,不保證執行的時機。
RocketMQ混合型儲存結構 + Kafka獨立型儲存結構
RocketMQ採用的是混合型的儲存結構,定義:為Broker單個範例下所有的佇列共用一個紀錄檔資料檔案(即為CommitLog)來儲存。缺點:會存在較多的隨機讀操作,因此讀的效率偏低,同時消費訊息需要依賴ConsumeQueue,構建該邏輯消費佇列需要一定開銷。
而Kafka採用的是獨立型的儲存結構,定義:每個佇列一個檔案。
RocketMQ的生產者如何保證訊息順序傳送?
生產者消費者一般需要保證順序訊息的話,可能就是一個業務場景下的,比如訂單的建立、支付、發貨、收貨。
那這些東西是不是一個訂單號呢?一個訂單的肯定是一個訂單號的說,那簡單了呀。
一個topic下有多個佇列,為了保證傳送有序,RocketMQ提供了MessageQueueSelector佇列選擇機制,他有三種實現:
SelectMessageQueueByHash
SelectMessageQueueByMachineRoom
SelectMessageQueueByRandom
SelectMessageQueueByHash:使用Hash取模法,讓同一個訂單傳送到同一個佇列中,再使用同步傳送,只有同個訂單的建立訊息傳送成功,再傳送支付訊息。這樣,我們保證了傳送有序。
RocketMQ的topic內的佇列機制,可以保證儲存滿足FIFO(First Input First Output 簡單說就是指先進先出),剩下的只需要消費者順序消費即可。
RocketMQ僅保證順序傳送,順序消費由消費者業務保證!!!(解釋:一個訂單你傳送的時候放到一個佇列裡面去,你同一個的訂單號Hash一下是不是還是一樣的結果,那肯定是一個消費者消費,那順序是不是就保證了?)
真正的順序消費不同的中介軟體都有自己的不同實現我這裡就舉個例子,大家思路理解下。
考點:RocketMQ支援的分散式事務
步驟1:Producer傳送半訊息給Broker(對應下圖的中的 1 2 3 4)
是指暫不能被Consumer消費的訊息。Producer 已經把訊息成功傳送到了 Broker 端,但此訊息被標記為暫不能投遞狀態,處於該種狀態下的訊息稱為半訊息。需要 Producer對訊息的二次確認後,Consumer才能去消費它。
在《分散式事務》單獨專欄中講。
步驟2:訊息回查,Broker詢問Producer(對應下圖的中 5 6 7 8)
由於網路閃段,生產者應用重新啟動等原因。導致 Producer 端一直沒有對 Half Message(半訊息) 進行 二次確認。這是Brock伺服器會定時掃描長期處於半訊息的訊息,會主動詢問 Producer端 該訊息的最終狀態(Commit或者Rollback),該訊息即為 訊息回查。
未使用訊息回查的流程 1 2 3 4 8
已使用訊息回查的流程 1 2 3 4 5 6 7 8
對於上圖解釋 1-8
1、A服務先傳送個Half Message給Brock端,訊息中攜帶 B服務 即將要+100元的資訊。
2、伺服器端傳送響應,半訊息接收成功,
3、執行本地事務(會有三種情況1、執行成功。2、執行失敗。3、網路等原因導致沒有響應)
4、服務A根據本地事務,傳送 commit/rollback 給broker,轉到8.
5、如果因為網路等原因遲遲沒有返回失敗還是成功,那麼會執行RocketMQ的回撥介面,來進行事務的回查。
6、服務A檢查本地事務;
7、服務A根據本地事務,傳送 commit/rollback 給broker,轉到8.
8、如果本地事務成功,那麼Product像Brock伺服器傳送Commit,這樣B服務就可以消費該message;如果本地事務失敗,那麼Product像Brock伺服器傳送Rollback,那麼就會直接刪除上面這條半訊息。
考點:訊息過濾
第一,Broker端訊息過濾
定義:在Broker中,按照Consumer的要求做過濾;
優點:減輕網路傳輸負擔,減少了對於Consumer無用訊息的網路傳輸;
缺點:增加Broker的負擔,實現相對複雜。
第二,Consumer端訊息過濾
定義:Consumer端,程式設計師完全自定義實現過濾規則來過濾;
優點:減輕Broker的負擔。
缺點:增加網路傳輸負擔,很多無用的訊息要傳輸到Consumer端。
考點:Broker的Buffer問題(RocketMQ,讀寫磁碟代替記憶體buffer)
定義:Broker的Buffer通常指的是Broker中一個佇列的記憶體緩衝Buffer大小,這個Buffer通常大小有限。
RocketMQ,讀寫磁碟代替記憶體buffer:注意,RocketMQ同其他MQ有非常顯著的區別,RocketMQ沒有記憶體Buffer概念,RocketMQ的佇列都是持久化磁碟。從這方面說,RocketMQ的記憶體Buffer抽象成一個無限長度的佇列(理由:磁碟可以不斷存入),不管有多少資料進來都能裝得下,這個無限是有前提的,Broker會定期刪除過期的資料。例如Broker只儲存3天的訊息,那麼這個Buffer雖然長度無限,但是3天前的資料會被從隊尾刪除。為什麼RocketMQ沒有記憶體Buffer?
回答:不需要,RocketMQ的佇列都是持久化磁碟,不需要的時候讀盤,為了防止磁碟爆滿,資料定期清除。
考點:你知道訊息佇列的 「回溯消費/重複消費」 嗎?
定義:回溯消費是指Consumer已經消費成功的訊息,由於業務上的需求需要重新消費,要支援此功能,Broker在向Consumer投遞成功訊息後,訊息仍然需要保留。並且重新消費一般是按照時間維度,隔一段時間後回溯消費/重複消費。RocketMQ支援按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向後回溯。
實踐:
例如由於Consumer系統故障,恢復後需要重新消費1小時前的資料,那麼Broker要提供一種機制,可以按照時間維度來回退消費進度。
考點:訊息堆積
訊息中介軟體三個功能:非同步、解耦、流量控制,訊息堆積主要是涉及流量控制功能,流量控制就是擋住前端的資料洪峰,保證後端系統的穩定性,這就要求訊息中介軟體具有一定的訊息堆積能力,訊息堆積分以下兩種情況:
1、訊息堆積在broker的記憶體緩衝Buffer,一旦超過記憶體緩衝Buffer,可以根據一定的丟棄策略來丟棄訊息,如CORBA Notification規範中描述。適合能容忍丟棄訊息的業務,這種情況訊息的堆積能力主要在於記憶體Buffer大小,而且訊息堆積後,效能下降不會太大,因為記憶體中資料多少對於對外提供的存取能力影響有限。
2、訊息堆積到持久化儲存系統中,例如DB,KV儲存,檔案記錄形式。當訊息不能在記憶體Cache命中時,要不可避免的存取磁碟,會產生大量讀IO,讀IO的吞吐量直接決定了訊息堆積後的存取能力。
所以,評估訊息堆積能力主要有以下四點:
(1)記憶體中,訊息能堆積多少條,多少位元組?即訊息的堆積容量。
(2)訊息堆積後,發訊息的吞吐量大小,是否會受堆積影響?訊息堆積在記憶體中吞吐量不受影響,訊息堆積在磁碟持久化(db kv儲存 檔案記錄儲存),吞吐量受影響。
(3)訊息堆積後,正常消費的Consumer是否會受影響?
(4)訊息堆積後,存取堆積在磁碟的訊息時,吞吐量有多大?
考點:定時訊息
定義:定時訊息是指訊息發到Broker後,不能立刻被Consumer消費,要到特定的時間點或者等待特定的時間後才能被消費。
RocketMQ支援定時訊息,但是不支援任意時間精度,支援特定的level,例如定時5s,10s,1m等。問題:為什麼不支援任意時間精度?
回答:如果要支援任意的時間精度,在Broker層面,必須要做訊息排序,如果再涉及到持久化,那麼訊息排序要不可避免的產生巨大效能開銷。
問題:對於下單,多執行緒場景下,第一個執行緒還沒走完,第二個現在進來,也判斷沒處理過那不就兩個都繼續加了麼?
回答:資料庫層面,訂單號+業務場景,組成一個唯一主鍵,你插入資料庫只能成功第一個,後續的都會報錯的,報違反唯一主鍵的錯誤。
為什麼不直接就不判斷就等他插入的時候報錯,丟掉後續的就好了?
理由:報錯有很多種,你哪裡知道不是資料庫掛了的錯?或者別的執行時異常?
不過你如果可以做到拋特定的異常也可以,反正我們要減少資料庫的報錯,如果並行大,像我現在負責的系統都是10W+QPS,那紀錄檔會打滿瘋狂報警的。
RocketMQ概要,完成了。
天天打碼,天天進步!!!