欄目今天詳細介紹有關RocketMQ知識。
又是好久沒有寫部落格了,雖然可以找出無數個沒有寫的部落格的理由,但是說到底,還是一個字「懶」。今天我終於吃了一顆治療懶癌的藥丸,決定寫一篇部落格。介紹什麼好呢,思來想去,還是介紹下RocketMQ吧,畢竟寫了30多篇部落格,還沒有好好寫過關於MQ的部落格呢。本篇部落格比較基礎,不涉及到原始碼分析,只是掃盲。
我覺得從某種角度來說,微服務促進了MQ的蓬勃發展,本來一個系統有N多個模組,所有模組都強耦合在一起,現在微服務了,一個模組就是一個系統,系統之間肯定需要互動,互動有三種常見的方法,一種是RPC,一種是HTTP,一種就是MQ了。
原本一個業務分為N步,要一步一步處理,才能把最終的結果返回給使用者,現在有了MQ,先把最關鍵的部分處理完畢,然後傳送訊息到MQ,直接返回給使用者OK,至於後面的步驟在後臺慢慢處理吧,真乃提升使用者體驗的神器。
某個介面的請求量突然飆升,勢必會對應用伺服器、資料庫伺服器造成很大的壓力,現在有了MQ,來多少請求都不在怕的,後臺慢慢處理唄。
RocketMQ是用Java編寫的,是阿里開源的訊息中介軟體,吸收了Kafka很多優點。Kafka也是比較熱門的訊息中介軟體,不過Kafka是用Scala編寫的,不利於Java程式設計師去閱讀原始碼,也不利於Java程式設計師做一些客製化化的開發。接觸過Kafka的小夥伴都知道,要用好Kafka實屬不易,相對來說,RocketMQ簡單多了,而且RocketMQ有阿里加持,經歷了N次雙11的考驗,比較適合國內網際網路公司,所以國內使用RocketMQ的公司很多。
圖片來自gitee.com/mirrors/roc…
可以看到RocketMQ主要有四個元件:
生產者,每隔一定時間向NameServer發起Topic的路由資訊查詢。
消費者,每隔一定時間向NameServer發起Topic的路由資訊查詢。
其實,在低版本的RocketMQ中,確實是選用Zookeeper作為註冊中心的,但是後面改成了現在的NameServer,猜想主要原因是:
分為ProducerGroup,ConsumerGroup,我們更多的是關注ConsumerGroup,ConsumerGroup包含多個Consumer。
在叢集消費模式下,一個ConsumerGroup下的Consumer共同消費一個Topic,且每個Consumer會被分配到N個佇列,但是一個佇列只會被一個Consumer消費,不同的ConsumerGroup可以消費同一個Topic,一條訊息會被訂閱此Topic的所有ConsumerGroup消費。
消費模式有兩種:Clustering(叢集消費)和Broadcasting(廣播消費)。
和其他MQ不同,其他MQ是在傳送訊息的時候,指定是叢集消費還是廣播消費,RocketMQ是在消費者端設定是叢集消費還是廣播消費。
預設情況下是叢集消費模式,該模式下,ConsumerGroup所有的Consumer共同消費一個Topic的訊息,每個Consumer負責消費N個佇列的訊息(N也可能為1,甚至是0,沒有分配到佇列),但是一個佇列只會被一個Consumer消費。如果某個Consumer掛掉,ConsumerGroup下的其他Consumer會接替掛掉的Consumer繼續消費。
叢集消費模式下,消費進度維護在Borker端,儲存路徑為${ROCKET_HOME}/store/config/ consumerOffset.json
,如下圖所示:使用topicName@consumerGroupName
為Key,消費進度為Value,Value的形式是queueId:offset
,說明如果有多個ConsumerGroup,每個ConsumerGroup的消費進度是不同的,需要分開來儲存。
廣播消費訊息會發給ConsumerGroup中所有的Consumer。
廣播消費模式下,消費進度維護在Consumer端。
我們知道了在叢集消費模式下,ConsumerGroup下所有的Consumer共同消費一個Topic的訊息,每個Consumer負責消費N個佇列的訊息,那麼具體是如何分配的呢?這就涉及到消費佇列負載演演算法了。
RocketMQ提供了眾多的消費佇列負載演演算法,其中最常用的是兩種演演算法,即AllocateMessageQueueAveragely、AllocateMessageQueueAveragelyByCircle。下面我們來看下這兩個演演算法的區別。
假設,現在一個Topic有16個佇列,用q0~q15表示,有3個Consumer,用c0-c2表示。
用AllocateMessageQueueAveragely消費佇列負載演演算法的結果如下:
用AllocateMessageQueueAveragelyByCircle消費佇列負載演演算法的結果如下:
ConsumerGroup下所有的Consumer共同消費一個Topic的訊息,每個Consumer負責消費N個佇列的訊息,但是一個佇列不能同時被N個Consumer消費,這意味著什麼?
聰明的你一定可以想到,如果一個Topic只有4個佇列,而有5個Consumer,那麼有一個Consumer將不能分配到任何佇列,所以在RocketMQ中,Topic下佇列的個數直接決定了Consumer的最大個數,也就說明,不能光靠增加Consumer來提高消費速度。
雖然建議在建立Topic的時候,就應該充分考慮佇列的個數,但是實際情況往往是不盡人意的,哪怕佇列數沒有發生改變,Consumer的數量也一定會發生改變,比如Consumer的上下線,比如某個Consumer掛了,比如新增了Consumer。佇列的擴容、縮容,Consumer的擴容、縮容都會導致重平衡,也就是為Consumer重新分配消費的佇列。
在RocketMQ中,Consumer會定時查詢Topic的佇列的個數,Consumer的個數,如果發生了改變,就會觸發重平衡。
重平衡是RocketMQ內部實現的,程式設計師無需關心。
一般來說,MQ有兩種方法獲取訊息:
不管是Pull,還是Push,Consumer總會與Broker產生互動,互動的方式一般有短連線、長連線、輪詢三種方式。
看起來,RocketMQ支援既支援Pull,也支援Push,但是實際上Push也是用Pull實現的,那麼Consumer是怎麼與Broker產生互動的呢?
這就是RocketMQ設計的巧妙的地方了,既不是短連線,也不是長連線,也不是輪詢,而是採用的長輪詢。
Consumer發起拉取訊息的請求,分為兩種情況:
RocketMQ支援事務訊息,Producer把事務訊息傳送給Broker後,Broker會把訊息儲存在系統Topic:RMQ_SYS_TRANS_HALF_TOPIC
,這樣Consumer就無法消費到這條訊息了。
Broker會有一個定時任務,消費RMQ_SYS_TRANS_HALF_TOPIC
的訊息,向Producer發起回查,回查的狀態有三種:提交、回滾、未知。
延遲訊息是指息發到Broker後,不能立刻被Consumer消費,需要等待一定的時間才可以被消費到,RocketMQ只支援特定的延遲時間:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
。
RocketMQ支援兩種消費形式:並行消費、順序消費。 如果是順序消費,需要保證排序的訊息在同一個佇列。如何選擇佇列傳送呢,RocketMQ傳送訊息的方法有好幾個過載,其中有一個過載方法支援佇列的選擇。
Producer把訊息傳送到Borker中,Borker是需要把訊息持久化的,RocketMQ支援兩種持久化策略:
為了MQ的可靠性、可用性,在生產環境,一般會部署Follower節點,Follower節點會複製Master的資料,RocketMQ支援兩種持複製策略:
"寫入"是寫入PageCache,還是寫入硬碟,要看Follower Broker的設定。
RocketMQ提供了三種傳送訊息的方法:
在實際開發中,一般選用同步方法,如果要提高RocketMQ的效能,一般都是修改Borker端的引數,特別是刷盤策略和複製策略。
訊息傳送時,如果使用了MessageQueueSelector,那訊息傳送的重試機制將會失效。
傳送訊息響應可能為以下四種:
public enum SendStatus { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE, }複製程式碼
除了第一種,其他情況都是有問題的,為了保證訊息不丟失,需要設定Producer引數:RetryAnotherBrokerWhenNotStoreOK
為true。
如果訊息傳送失敗了,重試的時候,還是傳送給這個Borker,那麼大概率傳送還是失敗的,RockteMQ設計精巧之處在於,重試的時候,會自動避開這個Borker,而選擇其他Borker,但是目前為止,非同步傳送沒有那麼智慧,只會在一個Borker上重試,所以強烈建議選擇同步傳送方式。
RocketMQ提供了兩種故障規避機制。用引數SendLatencyFaultEnable
來控制。
延遲退避機制看起來很好用,但是一般來說Borker端繁忙,導致Borker不可用或者網路不可用只是一瞬間的事情,馬上就可以恢復,如果開啟了延遲退避機制,本來可用的Borker在一段時間內卻被規避了,其他Borker更加繁忙,那可能情況更糟糕。
Consumer有兩個引數,可以消費的並行度,即ConsumeThreadMin
、ConsumeThreadMax
,看起來給人的感覺是,如果Consumer端堆積訊息比較少,消費執行緒數為ConsumeThreadMin
;如果Consumer端堆積訊息比較多,就自動開啟新的執行緒來消費,直到消費執行緒數為ConsumeThreadMax
。但是並不是這樣,Consumer內部持有一個執行緒池,選用的是無界佇列,也就是ConsumeThreadMax
引數是無效的,所以在實際開發中,ConsumeThreadMin
、ConsumeThreadMax
往往設定成一樣。
如果查詢不到消費進度的時候,Consumer從哪裡開始消費,RocketMQ支援從最新訊息、最早訊息、指定時間戳這三種方式進行消費。
RocketMQ會為每個ConsumerGroup都設定一個Topic名稱為%RETRY%+consumerGroup
的重試佇列,用來儲存需要給ConsumerGroup重試的訊息,但是重試需要一定的延時時間,RocketMQ對於重試訊息的處理是先儲存至Topic名稱為SCHEDULE_TOPIC_XXXX
的延遲佇列中,後臺定時任務按照對應的時間進行Delay後重新儲存至%RETRY%+consumerGroup
的重試佇列中。
本來以為寫掃盲文,應該會寫的很順,但是還是想多了,因為是掃盲文,面向的是沒有怎麼接觸過RocketMQ的小夥伴,但是RocketMQ有沒有那麼簡單,不可能用一篇部落格,就讓沒有怎麼接觸過RocketMQ的小夥伴順利入門,所以在寫部落格的時候,一直在想,這個東西重要嗎,需要仔細描述嗎;這個東西可以忽視,可以不介紹嗎 等等,大家可以看到本文基本都是在介紹各種概念,幾乎沒有涉及到API的層面,因為一旦涉及到API,那麼估計寫兩個星期也寫不完。
相關免費學習推薦:
以上就是終於來了...RocketMQ掃盲篇的詳細內容,更多請關注TW511.COM其它相關文章!