訊息佇列的一些場景及原始碼分析,RocketMQ使用相關問題及效能優化 https://www.cnblogs.com/yizhiamumu/p/16694126.html
訊息佇列的對比測試與RocketMQ使用擴充套件 https://www.cnblogs.com/yizhiamumu/p/16677881.html
訊息佇列為什麼選用redis?聊聊如何做技術方案選型?https://www.cnblogs.com/yizhiamumu/p/16573472.html
分散式事務原理及解決方案案例 https://www.cnblogs.com/yizhiamumu/p/16662412.html
分散式事務實戰方案彙總 https://www.cnblogs.com/yizhiamumu/p/16625677.html
訊息佇列初見:一起聊聊引入系統mq 之後的問題 https://www.cnblogs.com/yizhiamumu/p/16573472.html
參考:訊息佇列為什麼選用redis?聊聊如何做技術方案選型?https://www.cnblogs.com/yizhiamumu/p/16573472.html
近幾年,確實出現了很多訊息佇列解決方案,但其實去分析每種訊息佇列,會發現他們誕生的背景和要針對性解決的問題是不一樣的。
回到 RocketMQ,大家能從近兩年 RocketMQ 在社群的一系列動作中發現,RocketMQ 同時在訊息、事件、流三個領域都有發力,逐漸演進至一個超融合處理平臺。作為一個融合的資料處理平臺,RocketMQ 當前在開源的佈局看起來是與業界多個 MQ 趨同,在 RocketMQ 開源的背後其實是商業上真實的需求驅動。
一般講效能,其實就是吞吐量和延遲兩個指標。
對於吞吐量來講,RocketMQ 在 2017 年就能優化到單機 50W 的 TPS。如果是在批次的場景,實際上從生產環境的穩定性,以及業務訊息的重要性來講,各個訊息佇列都能輕易地打滿網路頻寬或者磁碟資源。
也就是說,效能一般情況下差異都不大,是很難作為一個產品的核心競爭力的,除非是架構層面有限制。
延遲就是一個非常重要的指標了,線上業務對於是 2ms 延時和 5ms 延時基本上都能接受,但非常難以接受的是經常性有秒級的毛刺(在延遲這個指標後面長尾延遲)。
除了上述兩點,彈性和可延伸能力也是非常重要的。
訊息我們可以直接在記憶體中使用陣列或者佇列來儲存資料即可。效能非常高。
但是有幾方面的缺點
既然要儲存資料,就需要解決資料存哪裡?從儲存方式來看,主要有幾個方面:
效能,吞吐量,本質上就是資料結構的設計決定的。我們看看上面資料儲存方式對應的資料結構
儲存 | 資料結構 | 寫放大 | 讀放大 |
mysql | B+ tree | 寫一條資料需要兩次寫入1、資料寫入是按頁為單位進行寫的,假設頁的大小為B 位元組,那麼寫放大為Θ(B)(最壞的結果)2、為了避免在寫頁的過程中出現故障,需要寫入redo log(WAL) | 既支援隨機讀取又支援範圍查詢的系統。讀放大為O(logBN/B),資料量大的適合效能會急劇下降,常規是b+ tree 超過4層,大約2000萬記錄是臨界點 |
rocketdb | LSM tree | Memtable/SSTable實現,寫的話也變成順序寫了(這一點是極大的優化點),但是後臺會出現多路歸併演演算法來合併,這個過程佔用磁碟IO 會到當前訊息的讀寫有擾動,寫放大Θ(klogkN/B) | 讀的順序是MemTable->分層的sst ,效能會比B+ tree 略差,讀放大Θ((log2N/B)/logk) |
檔案系統 | append only log | 直接在檔案末尾追加,所有的的寫都是順序的,因此效能極高 | 不支援根據內容進行檢索,只能根據檔案偏移量執行查詢 |
mysql 在巨量資料量的情況,效能會急劇下降,並且擴充套件性非常不友好。
分散式KV 儲存 天然的分散式系統,對巨量資料量和未來的擴充套件都問題不大,LSM tree 對寫效能和吞吐都比mysql 要好。查詢其實是可以通過快取等手段去優化,可以考慮。
但是,滿足效能和吞吐量最優的毫無疑問是使用檔案系統,因為訊息不需要修改,讀和寫都是順序讀寫,效能極高。
但是現實中的需求我們可能需要使用多個佇列來完成不同的業務。比如一個佇列來處理訂單相關的業務,一個佇列來處理商品相關的業務等等。那麼我們該如何調整呢?
我們都知道檔案 append only log 的方式是不支援根據訊息的內容來搜尋的,如果所有的佇列的資料存在一個檔案中,是沒辦法滿足需求的。
換個思路,一個佇列一個檔案我們就可以繞開根據內容檢索的需求,kafka 就是這麼玩的。
這個時候,每個佇列一個檔案,讀寫還是順序的嗎?
我們現在面臨的問題是,作為一款面向業務的高效能訊息中介軟體,隨著業務的複雜度變高,佇列數量是急劇變大的。
如果要保證寫入的吞吐量和效能,還需要得所有的佇列都寫在同一個檔案。
但是,按照佇列消費的場景就意味著要根據訊息內容(佇列名字)來進行消費,append only log 是不支援檢索的,如何解決這個問題。
我們會增加一個索引來處理慢sql 。我們是否也可以建立一個佇列的索引,每一個佇列就是一個索引檔案。
讀取資料的時候,先從索引佇列找到訊息在檔案的偏移量後,在到資料檔案去讀取。
那麼,索引的檔案的數量變大的之後,那麼對索引檔案的讀寫不就是又變成隨機讀寫了嗎?效能又會急劇下降?
一個一個來解決:
(rocketmq 中資料檔案稱為:commitlog, topic索引檔案稱為 consumeQueue)
方案 | 優點 | 缺點 |
每一個queue 都單獨一個檔案 | 消費的時候不需要獨立建立一個索引,系統複雜度降低,並且效能高 | 當queue 很多的時候,並且每個queue 的資料量都不是很大情況,就會存在很多小檔案,寫和讀都講變成隨機讀,效能會受到影響 |
所有queue 共用一個檔案 | 所有的寫都是順序寫的,效能比較高,可以支撐大量queue 效能也不至於下降的厲害 | 1、需要建立獨立的索引檔案,查詢資料的鏈路變長,需要先從索引查到資料再到資料檔案查詢2、索引佇列本身也是小檔案,好在因為資料量少,基本可以常駐記憶體3、讀變成隨時讀,不過整體還是順序讀 |
我們得出結論:選擇檔案系統,append only log.根據訊息佇列即時消費和順序讀寫的特點,剛寫入的內容還在page cache ,就被讀走了,甚至都不需要回到磁碟,效能會非常高。
如果所有的資料都存在一個commitlog 檔案的話,隨著資料量變大,檔案必然會非常大。
解決方案是,我們大檔案切換成小檔案,每個檔案固定大小1G,寫滿了就切換到一個新的檔案
訊息佇列的第一個特點就是資料量大,一臺機器容易面臨瓶頸,因此我們需要把資料均衡的分發到各個機器上。
解決方案是,一段很長的佇列平均切成N份,把這N份分別放到不同的機器上
雖然訊息已經分成切分成為多份放到不同的機器了,但是每一份都是都只有一個副本,也就意味著,任何一臺機器的硬碟壞掉的話,該機器上的訊息就會丟失掉了,這是不可接受的。
行業通常的做法一份資料存多個副本,並且確保所有的副本不能全都在同一臺機器。
問題來了,那麼這多份資料是同步雙寫還是非同步雙寫呢?
方案 | 優點 | 缺點 |
同步雙寫 | 資料不會丟失 | 效能會降低,單個RT變長 |
非同步雙寫 | 單個RT 更加小,效能更高,吞吐量更大 | 資料可能會丟失 |
其實每個業務場景需求是不一樣的,RocketMq 是支援可設定的
RocketMQ主要的儲存檔案包括CommitLog檔案、ConsumeQueue檔案、Indexfile檔案
RocketMQ
採用的是混合型的儲存結構,即為Broker
單個範例下所有的佇列共用一個紀錄檔資料檔案(即為CommitLog
)來儲存。
RocketMQ
的混合型儲存結構(多個Topic
的訊息實體內容都儲存於一個CommitLog
中)針對Producer
和Consumer
分別採用了資料
和索引
部分相分離的儲存結構,Producer
傳送訊息至Broker
端,然後Broker
端使用同步或者非同步的方式對訊息刷盤持久化,儲存至CommitLog
中。
只要訊息被刷盤持久化至磁碟檔案CommitLog
中,那麼Producer
傳送的訊息就不會丟失。正因為如此,Consumer
也就肯定有機會去消費這條訊息。當無法拉取到訊息後,可以等下一次訊息拉取,同時伺服器端也支援長輪詢模式,如果一個訊息拉取請求未拉取到訊息,Broker
允許等待30s
的時間,只要這段時間內有新訊息到達,將直接返回給消費端。
這裡,RocketMQ
的具體做法是,使用Broker
端的後臺服務執行緒—ReputMessageService
不停地分發請求並非同步構建ConsumeQueue
(邏輯消費佇列)和IndexFile
(索引檔案)資料
所以,Broker是怎麼儲存資料的呢?Broker在收到訊息之後,會把訊息儲存到commitlog的檔案當中,而同時在分散式的儲存當中,每個broker都會儲存一部分topic的資料,同時,每個topic對應的messagequeue下都會生成consumequeue檔案用於儲存commitlog的物理位置偏移量offset,indexfile中會儲存key和offset的對應關係。
CommitLog檔案儲存於${Rocket_Home}/store/commitlog目錄中,從圖中我們可以明顯看出來檔名的偏移量,每個檔案預設1G,寫滿後自動生成一個新的檔案。
由於同一個topic的訊息並不是連續的儲存在commitlog中,消費者如果直接從commitlog獲取訊息效率非常低,所以通過consumequeue儲存commitlog中訊息的偏移量的實體地址,這樣消費者在消費的時候先從consumequeue中根據偏移量定位到具體的commitlog物理檔案,然後根據一定的規則(offset和檔案大小取模)在commitlog中快速定位。
RocketMQ
對檔案的讀寫巧妙地利用了作業系統的一些高效檔案讀寫方式——PageCache
、順序讀寫
、零拷貝
在RocketMQ
中,ConsumeQueue
邏輯消費佇列儲存的資料較少,並且是順序讀取,在page cache
機制的預讀取作用下,Consume Queue
檔案的讀效能幾乎接近讀記憶體,即使在有訊息堆積情況下也不會影響效能。而對於CommitLog
訊息儲存的紀錄檔資料檔案來說,讀取訊息內容時候會產生較多的隨機存取讀取,嚴重影響效能。如果選擇合適的系統IO
排程演演算法,比如設定排程演演算法為Deadline
(此時塊儲存採用SSD的話),隨機讀的效能也會有所提升。
頁快取(PageCache
)是OS
對檔案的快取,用於加速對檔案的讀寫。一般來說,程式對檔案進行順序讀寫的速度幾乎接近於記憶體的讀寫速度,主要原因就是由於OS
使用PageCache
機制對讀寫存取操作進行了效能優化,將一部分的記憶體用作PageCache
。對於資料的寫入,OS
會先寫入至Cache
內,隨後通過非同步的方式由pdflush
核心執行緒將Cache
內的資料刷盤至物理磁碟上。對於資料的讀取,如果一次讀取檔案時出現未命中PageCache
的情況,OS
從物理磁碟上存取讀取檔案的同時,會順序對其他相鄰塊的資料檔案進行預讀取
RocketMQ
主要通過MappedByteBuffer
對檔案進行讀寫操作。其中,利用了NIO
中的FileChannel
模型將磁碟上的物理檔案直接對映到使用者態的記憶體地址中(這種Mmap
的方式減少了傳統IO
,將磁碟檔案資料在作業系統核心地址空間的緩衝區,和使用者應用程式地址空間的緩衝區之間來回進行拷貝的效能開銷),將對檔案的操作轉化為直接對記憶體地址進行操作,從而極大地提高了檔案的讀寫效率(正因為需要使用記憶體對映機制,故RocketMQ
的檔案儲存都使用定長結構來儲存,方便一次將整個檔案對映至記憶體)。
什麼是零拷貝
在作業系統中,使用傳統的方式,資料需要經歷幾次拷貝,還要經歷使用者態/核心態
切換
傳統檔案傳輸示意圖
所以,可以通過零拷貝的方式,減少使用者態與核心態的上下文切換和記憶體拷貝的次數,用來提升I/O
的效能。零拷貝比較常見的實現方式是mmap
,這種機制在Java
中是通過MappedByteBuffer
實現的。
mmap示意圖
RocketMQ
提供了兩種刷盤策略:同步刷盤
和非同步刷盤
同步刷盤
:在訊息達到Broker
的記憶體之後,必須刷到commitLog
紀錄檔檔案中才算成功,然後返回Producer
資料已經傳送成功。非同步刷盤
:非同步刷盤是指訊息達到Broker
記憶體後就返回Producer
資料已經傳送成功,會喚醒一個執行緒去將資料持久化到CommitLog
紀錄檔檔案中Broker
在訊息的存取時直接操作的是記憶體(記憶體對映檔案),這可以提供系統的吞吐量,但是無法避免機器掉電時資料丟失,所以需要持久化到磁碟中
刷盤的最終實現都是使用NIO
中的 MappedByteBuffer.force()
將對映區的資料寫入到磁碟,如果是同步刷盤的話,在Broker
把訊息寫到CommitLog
對映區後,就會等待寫入完成
非同步而言,只是喚醒對應的執行緒,不保證執行的時機,
RocketMQ
中的負載均衡都在Client
端完成,具體來說的話,主要可以分為Producer
端傳送訊息時候的負載均衡和Consumer
端訂閱訊息的負載均衡。
Producer
端在傳送訊息的時候,會先根據Topic
找到指定的TopicPublishInfo
,在獲取了TopicPublishInfo
路由資訊後,RocketMQ
的使用者端在預設方式下selectOneMessageQueue()
方法會從TopicPublishInfo
中的messageQueueList
中選擇一個佇列(MessageQueue
)進行傳送訊息。具這裡有一個sendLatencyFaultEnable
開關變數,如果開啟,在隨機遞增取模的基礎上,再過濾掉not available
的Broker
代理。
Producer負載均衡:索引遞增隨機取模 public MessageQueue selectOneMessageQueue(){ //索引遞增 int index = this.sendWhichQueue.incrementAndGet(); //利用索引取亂數,取餘數 int pos = Math.abs(index) % this.messageQueueList.size(); if(pos<0){ pos=0; } return this.messageQueueList.get(pos); }
所謂的latencyFaultTolerance
,是指對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency
超過550Lms
,就退避3000Lms
;超過1000L
,就退避60000L
;如果關閉,採用隨機遞增取模的方式選擇一個佇列(MessageQueue
)來傳送訊息,latencyFaultTolerance
機制是實現訊息傳送高可用的核心關鍵所在。
在RocketMQ
中,Consumer
端的兩種消費模式(Push/Pull
)都是基於拉模式來獲取訊息的,而在Push
模式只是對pull
模式的一種封裝,其本質實現為訊息拉取執行緒在從伺服器拉取到一批訊息後,然後提交到訊息消費執行緒池後,又「馬不停蹄」的繼續向伺服器再次嘗試拉取訊息。如果未拉取到訊息,則延遲一下又繼續拉取。在兩種基於拉模式的消費方式(Push/Pull
)中,均需要Consumer
端知道從Broker
端的哪一個訊息佇列中去獲取訊息。因此,有必要在Consumer
端來做負載均衡,即Broker
端中多個MessageQueue
分配給同一個ConsumerGroup
中的哪些Consumer
消費。
Consumer
端的心跳包傳送
在Consumer
啟動後,它就會通過定時任務不斷地向RocketMQ
叢集中的所有Broker
範例傳送心跳包(其中包含了,訊息消費分組名稱、訂閱關係集合、訊息通訊模式和使用者端id的值等資訊)。Broker
端在收到Consumer
的心跳訊息後,會將它維護在ConsumerManager
的本地快取變數—consumerTable
,同時並將封裝後的使用者端網路通道資訊儲存在本地快取變數—channelInfoTable
中,為之後做Consumer
端的負載均衡提供可以依據的後設資料資訊。
Consumer
端實現負載均衡的核心類—RebalanceImpl
在Consumer
範例的啟動流程中的啟動MQClientInstance
範例部分,會完成負載均衡服務執行緒—RebalanceService
的啟動(每隔20s執行一次)。
通過檢視原始碼可以發現,RebalanceService
執行緒的run()
方法最終呼叫的是RebalanceImpl
類的rebalanceByTopic()
方法,這個方法是實現Consumer
端負載均衡的核心。
rebalanceByTopic()
方法會根據消費者通訊型別為廣播模式
還是叢集模式
做不同的邏輯處理
所謂的長輪詢,就是Consumer
拉取訊息,如果對應的Queue
如果沒有資料,Broker
不會立即返回,而是把 PullReuqest
hold起來,等待 queue
訊息後,或者長輪詢阻塞時間到了,再重新處理該 queue
上的所有 PullRequest
//如果沒有拉到資料 case ResponseCode.PULL_NOT_FOUND: // broker 和 consumer 都允許 suspend,預設開啟 if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); //封裝一個PullRequest PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); //把PullRequest掛起來 this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; }
掛起的請求,有一個服務執行緒會不停地檢查,看queue
中是否有資料,或者超時。
PullRequestHoldService#run()
@Override public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); //檢查hold住的請求 this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
是因為使用了順序儲存、Page Cache和非同步刷盤。
1、我們在寫入commitlog的時候是順序寫入的,這樣比隨機寫入的效能就會提高很多。
2、寫入commitlog的時候並不是直接寫入磁碟,而是先寫入作業系統的PageCache。
3、最後由作業系統非同步將快取中的資料刷到磁碟。
二:RocketMQ
的基本架構 RocketMQ
一共有四個部分組成:NameServer
,Broker
,Producer 生產者
,Consumer 消費者
,它們對應了:發現
、發
、存
、收
,為了保證高可用,一般每一部分都是叢集部署的
NameServer
是一個無狀態的伺服器,角色類似於 Kafka
使用的 Zookeeper
,但比 Zookeeper
更輕量。
特點:
每個 NameServer
結點之間是相互獨立,彼此沒有任何資訊互動。
Nameserver
被設計成幾乎是無狀態的,通過部署多個結點來標識自己是一個偽叢集,Producer
在傳送訊息前從 NameServer
中獲取 Topic
的路由資訊也就是發往哪個 Broker
,Consumer
也會定時從 NameServer
獲取 Topic
的路由資訊,Broker
在啟動時會向 NameServer
註冊,並定時進行心跳連線,且定時同步維護的 Topic
到 NameServer
功能主要有兩個:
Broker
結點保持長連線。Topic
的路由資訊。訊息儲存和中轉角色,負責儲存和轉發訊息
Broker
內部維護著一個個 Consumer Queue
,用來儲存訊息的索引,真正儲存訊息的地方是 CommitLog
(紀錄檔檔案)
單個 Broker
與所有的 Nameserver
保持著長連線和心跳,並會定時將 Topic
資訊同步到 NameServer
,和 NameServer
的通訊底層是通過 Netty
實現的。
訊息生產者,業務端負責傳送訊息,由使用者自行實現和分散式部署。
Producer
由使用者進行分散式部署,訊息由Producer
通過多種負載均衡模式傳送到Broker
叢集,傳送低延時,支援快速失敗。
RocketMQ
提供了三種方式傳送訊息:同步
、非同步
和單向
同步傳送
:同步傳送指訊息傳送方發出資料後會在收到接收方發回響應之後才發下一個封包。一般用於重要通知訊息,例如重要通知郵件、行銷簡訊。非同步傳送
:非同步傳送指傳送方發出資料後,不等接收方發回響應,接著傳送下個封包,一般用於可能鏈路耗時較長而對響應時間敏感的業務場景,例如使用者視訊上傳後通知啟動轉碼服務。單向傳送
:單向傳送是指只負責傳送訊息而不等待伺服器迴應且沒有回撥函數觸發,適用於某些耗時非常短但對可靠性要求並不高的場景,例如紀錄檔收集訊息消費者,負責消費訊息,一般是後臺系統負責非同步消費。
Consumer
也由使用者部署,支援PUSH
和PULL
兩種消費模式,支援叢集消費和廣播消費,提供實時的訊息訂閱機制。
Pull
:拉取型消費者(Pull Consumer
)主動從訊息伺服器拉取資訊,只要批次拉取到訊息,使用者應用就會啟動消費過程,所以 Pull
稱為主動消費型
Push
:推播型消費者(Push Consumer
)封裝了訊息的拉取、消費進度和其他的內部維護工作,將訊息到達時執行的回撥介面留給使用者應用程式來實現。所以 Push
稱為被動消費型別
,但其實從實現上看還是從訊息伺服器中拉取訊息,不同於 Pull
的是 Push
首先要註冊消費監聽器,當監聽器處觸發後才開始消費訊息RocketMQ
是一個分散式訊息佇列,也就是訊息佇列
+分散式系統
作為訊息佇列,它是發-存-收
的一個模型,對應的就是Producer、Broker、Cosumer
;作為分散式系統,它要有伺服器端、使用者端、註冊中心,對應的就是Broker、Producer/Consumer、NameServer
主要的工作流程:RocketMQ
由NameServer
註冊中心叢集、Producer
生產者叢集、Consumer
消費者叢集和若干Broker
(RocketMQ
程序)組成:
Broker
在啟動的時候去向所有的NameServer
註冊,並保持長連線,每30s傳送一次心跳Producer
在傳送訊息的時候從NameServer
獲取Broker
伺服器地址,根據負載均衡演演算法選擇一臺伺服器來傳送訊息Conusmer
消費訊息的時候同樣從NameServer
獲取Broker
地址,然後主動拉取訊息來消費NameServer因為是無狀態,且不相互通訊的,所以只要叢集部署就可以保證高可用。
RocketMQ的高可用主要是在體現在Broker的讀和寫的高可用,Broker的高可用是通過叢集
和主從
實現的。
Broker可以設定兩種角色:Master和Slave,Master角色的Broker支援讀和寫,Slave角色的Broker只支援讀,Master會向Slave同步訊息。
也就是說Producer只能向Master角色的Broker寫入訊息,Cosumer可以從Master和Slave角色的Broker讀取訊息。
Consumer 的組態檔中,並不需要設定是從 Master 讀還是從 Slave讀,當 Master 不可用或者繁忙的時候, Consumer 的讀請求會被自動切換到從 Slave。有了自動切換 Consumer 這種機制,當一個 Master 角色的機器出現故障後,Consumer 仍然可以從 Slave 讀取訊息,不影響 Consumer 讀取訊息,這就實現了讀的高可用。
如何達到傳送端寫的高可用性呢?
在建立 Topic 的時候,把 Topic 的多個Message Queue 建立在多個 Broker 組上(相同 Broker 名稱,不同 brokerId機器組成 Broker 組),這樣當 Broker 組的 Master 不可用後,其他組Master 仍然可用, Producer 仍然可以傳送訊息 RocketMQ 目前還不支援把Slave自動轉成 Master ,如果機器資源不足,需要把 Slave 轉成 Master ,則要手動停止 Slave 色的 Broker ,更改組態檔,用新的組態檔啟動 Broker。
而訊息在master和slave之間的同步是根據raft協定來進行的:
1、在broker收到訊息後,會被標記為uncommitted狀態
2、然後會把訊息傳送給所有的slave
3、slave在收到訊息之後返回ack響應給master
4、master在收到超過半數的ack之後,把訊息標記為committed
5、傳送committed訊息給所有slave,slave也修改狀態為committed
Kafka採用Zookeeper作為註冊中心(也開始逐漸去Zookeeper),
RocketMQ不使用Zookeeper其實主要可能從這幾方面來考慮:
訊息丟失可能發生在生產者傳送訊息、MQ本身丟失訊息、消費者丟失訊息3個方面。
生產者丟失訊息的可能點在於程式傳送失敗拋異常了沒有重試處理,或者傳送的過程成功但是過程中網路閃斷MQ沒收到,訊息就丟失了。
由於同步傳送的一般不會出現這樣使用方式。
非同步傳送的場景下,一般分為兩個方式:非同步有回撥和非同步無回撥,無回撥的方式,生產者傳送完後不管結果可能就會造成訊息丟失,而通過非同步傳送+回撥通知+本地訊息表的形式我們就可以做出一個解決方案。
所以在生產階段,主要通過請求確認機制,來保證訊息的可靠傳遞。
以下單的場景舉例。
1、下單後先儲存本地資料和MQ訊息表,這時候訊息的狀態是傳送中,如果本地事務失敗,那麼下單失敗,事務回滾(訂單資料、MQ訊息記錄都不會儲存)。
2、下單成功,直接返回使用者端成功,非同步傳送MQ訊息。
3、MQ回撥通知訊息傳送結果,對應更新資料庫MQ傳送狀態。
4、JOB輪詢超過一定時間(時間根據業務設定)還未傳送成功的訊息去重試
在監控平臺設定或者JOB程式處理超過一定次數一直傳送不成功的訊息,告警,人工介入。
非同步回撥的形式是適合大部分場景下的一種解決方案。
如果生產者保證訊息傳送到MQ,而MQ收到訊息後還在記憶體中,這時候宕機了又沒來得及同步給從節點,就有可能導致訊息丟失。
比如RocketMQ:
RocketMQ分為同步刷盤和非同步刷盤兩種方式,預設的是非同步刷盤,就有可能導致訊息還未刷到硬碟上就丟失了,可以通過設定為同步刷盤的方式來保證訊息可靠性,這樣即使MQ掛了,恢復的時候也可以從磁碟中去恢復訊息。
比如Kafka也可以通過設定做到:
acks=all 只有參與複製的所有節點全部收到訊息,才返回生產者成功。這樣的話除非所有的節點都掛了,訊息才會丟失。 replication.factor=N,設定大於1的數,這會要求每個partion至少有2個副本 min.insync.replicas=N,設定大於1的數,這會要求leader至少感知到一個follower還保持著連線 retries=N,設定一個非常大的值,讓生產者傳送失敗一直重試
雖然我們可以通過設定的方式來達到MQ本身高可用的目的,但是都對效能有損耗,怎樣設定需要根據業務做出權衡。
所以儲存階段,可以通過設定可靠性優先的 Broker 引數來避免因為宕機丟訊息,簡單說就是可靠性優先的場景都應該使用同步。
圖:同步刷盤和非同步刷盤
消費者丟失訊息的場景1:消費者剛收到訊息,此時伺服器宕機,MQ認為消費者已經消費,不會重複傳送訊息,訊息丟失。
RocketMQ預設是需要消費者回復ack確認,而kafka需要手動開啟設定關閉自動offset。
消費方不返回ack確認,重發的機制根據MQ型別的不同傳送時間間隔、次數都不盡相同,如果重試超過次數之後會進入死信佇列,需要手工來處理了。(Kafka沒有這些)
消費者丟失訊息的場景2:消費者收到訊息,但消費業務邏輯出錯,消費失敗。
解決:利用前面提到的MQ本地表,消費者收到訊息且業務邏輯執行完畢後再更新MQ訊息的狀態(更新為已消費)
所以從Consumer角度分析,如何保證訊息被成功消費?
因為訊息佇列維護了消費的位置,邏輯執行失敗了,沒有確認,再去佇列拉取訊息,就還是之前的一條。
對分散式訊息佇列來說,同時做到確保一定投遞和不重複投遞是很難的,就是所謂的「有且僅有一次」 。RocketMQ擇了確保一定投遞,保證訊息不丟失,但有可能造成訊息重複。
處理訊息重複問題,主要有業務端自己保證,主要的方式有兩種:業務冪等和訊息去重。
業務冪等:第一種是保證消費邏輯的冪等性,也就是多次呼叫和一次呼叫的效果是一樣的。這樣一來,不管訊息消費多少次,對業務都沒有影響。
訊息去重:第二種是業務端,對重複的訊息就不再消費了。這種方法,需要保證每條訊息都有一個惟一的編號,通常是業務相關的,比如訂單號,消費的記錄需要落庫,而且需要保證和訊息確認這一步的原子性。
具體做法是可以建立一個消費記錄表,拿到這個訊息做資料庫的insert操作。給這個訊息做一個唯一主鍵(primary key)或者唯一約束,那麼就算出現重複消費的情況,就會導致主鍵衝突,那麼就不再處理這條訊息。
發生了訊息積壓,這時候就得想辦法趕緊把積壓的訊息消費完,就得考慮提高消費能力,一般有兩種辦法:
我們可以從以下幾個角度來考慮:
1、消費者出錯,肯定是程式或者其他問題導致的,如果容易修復,先把問題修復,讓consumer恢復正常消費。
2、如果時間來不及處理很麻煩,做轉發處理,寫一個臨時的consumer消費方案,先把訊息消費,然後再轉發到一個新的topic和MQ資源,這個新的topic的機器資源單獨申請,要能承載住當前積壓的訊息。
3、處理完積壓資料後,修復consumer,去消費新的MQ和現有的MQ資料,新MQ消費完成後恢復原狀。
最初,我們傳送的訊息記錄是落庫儲存了的,而轉發傳送的資料也儲存了,那麼我們就可以通過這部分資料來找到丟失的那部分資料,再單獨跑個指令碼重發就可以了。
如果轉發的程式沒有落庫,那就和消費方的記錄去做對比,只是過程會更艱難一點。
順序訊息是指訊息的消費順序和產生順序相同,在有些業務邏輯下,必須保證順序,比如訂單的生成、付款、發貨,這個訊息必須按順序處理才行。
順序訊息分為全域性順序訊息和部分順序訊息,全域性順序訊息指某個 Topic 下的所有訊息都要保證順序;
部分順序訊息只要保證每一組訊息被順序消費即可,比如訂單訊息,只要保證同一個訂單 ID 個訊息能按順序消費即可。
部分順序訊息相對比較好實現,生產端需要做到把同 ID 的訊息傳送到同一個 Message Queue ;在消費過程中,要做到從同一個Message Queue讀取的訊息順序處理——消費端不能並行處理順序訊息,這樣才能達到部分有序。
RocketMQ 預設情況下不保證順序,比如建立一個 Topic ,預設八個寫佇列,八個讀佇列,這時候一條訊息可能被寫入任意一個佇列裡;在資料的讀取過程中,可能有多個 Consumer ,每個 Consumer 也可能啟動多個執行緒並行處理,所以訊息被哪個 Consumer 消費,被消費的順序和寫人的順序是否一致是不確定的。
要保證全域性順序訊息, 需要先把 Topic 的讀寫佇列數設定為 一,然後Producer Consumer 的並行設定,也要是一。簡單來說,為了保證整個 Topic全域性訊息有序,只能消除所有的並行處理,各部分都設定成單執行緒處理 ,這時候就完全犧牲RocketMQ的高並行、高吞吐的特性了。
有兩種方案:
一般採用Cosumer端過濾,如果希望提高吞吐量,可以採用Broker過濾。
對訊息的過濾有三種方式:
根據Tag過濾:這是最常見的一種,用起來高效簡單
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
SQL 表示式過濾:SQL表示式過濾更加靈活
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); // 只有訂閱的訊息有這個屬性a, a >=0 and a <= 3 consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
Filter Server 方式:最靈活,也是最複雜的一種方式,允許使用者自定義函數進行過濾
電商的訂單超時自動取消,就是一個典型的利用延時訊息的例子,使用者提交了一個訂單,就可以傳送一個延時訊息,1h後去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
RocketMQ是支援延時訊息的,只需要在生產訊息的時候設定訊息的延時級別:
// 範例化一個生產者來產生延時訊息 DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); // 啟動生產者 producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // 設定延時等級3,這個訊息將在10s之後傳送(現在只支援固定的幾個時間,詳看delayTimeLevel) message.setDelayTimeLevel(3); // 傳送訊息 producer.send(message); }
但是目前RocketMQ支援的延時級別是有限的:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
RocketMQ怎麼實現延時訊息的:臨時儲存
+定時任務
。
Broker收到延時訊息了,會先傳送到主題(SCHEDULE_TOPIC_XXXX)的相應時間段的Message Queue中,然後通過一個定時任務輪詢這些佇列,到期後,把訊息投遞到目標Topic的佇列中,然後消費者就可以正常消費這些訊息。
事務訊息就是MQ提供的類似XA的分散式事務能力,通過事務訊息可以達到分散式事務的最終一致性。
半事務訊息:是指暫時還不能被 Consumer 消費的訊息,Producer 成功傳送到 Broker 端的訊息,但是此訊息被標記為 「暫不可投遞」 狀態,只有等 Producer 端執行完本地事務後經過二次確認了之後,Consumer 才能消費此條訊息。就是MQ收到了生產者的訊息,但是沒有收到二次確認,不能投遞的訊息。
實現原理如下:
死信佇列用於處理無法被正常消費的訊息,即死信訊息。
當一條訊息初次消費失敗,訊息佇列 RocketMQ 會自動進行訊息重試;
達到最大重試次數後,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該訊息,此時,訊息佇列 RocketMQ 不會立刻將訊息丟棄,而是將其傳送到該消費者對應的特殊佇列中,該特殊佇列稱為死信佇列。
死信訊息的特點:
死信佇列的特點:
RocketMQ 控制檯提供對死信訊息的查詢、匯出和重發的功能。
rocketmq-console 這個是官方提供了一個 WEB 專案,可以檢視 rocketmq資料和執行一些操作。但是這個監控介面又沒有許可權控制,並且還有一些消 耗效能的查詢操作,如果要提高效能,建議這個可以暫停
-XX:-UseBiasedLocking: 禁用偏向鎖
RocketMQ 推薦使用 G1 垃圾回收器
-Xms8g -Xmx8g -Xmn4g:這個就是很關鍵的一塊引數了,也是重點需要調整的,就是預設的堆大小是 8g 記憶體,新生代是 4g 記憶體。
如果是記憶體比較大,比如有 48g 的記憶體,所以這裡完全可以給他們翻幾倍,比如給堆記憶體 20g,其中新生代給 10g,甚至可以更多些,當然要留一些記憶體給作業系統來用
-XX:+UseG1GC -XX:G1HeapRegionSize=16m:這幾個引數也是至關重要的,這是選用了G1垃圾回收器來做分代回收,對新生代和老年代都是用G1來回收。這裡把G1的region大小設定為了16m,這個因為機器記憶體比較多,所以region 大小可以調大一些給到16m,不然用2m的region, 會導致region數量過多。
-XX:G1ReservePercent=25:這個引數是說,在 G1 管理的老年代裡預留 25%的空閒記憶體,保證新生代物件晉升到老年代的時候有足夠空間,避免老年代記憶體都滿了,新生代有物件要進入老年代沒有充足記憶體了。預設值是 10%,略微偏少,這裡 RocketMQ 給調大了一些。
-XX:initiatingHeapOccupancyPercent= :30:這個引數是說,當堆記憶體的使用率達到 30%之後就會自動啟動 G1 的並行垃圾回收,開始嘗試回收一些垃圾物件。預設值是 45%,這裡調低了一些,也就是提高了 GC 的頻率,但是避免了垃圾物件過多,一次垃圾回收耗時過長的問題。
-XX:-OmitStackTraceInFastThrow:這個引數是說,有時候 JVM 會拋棄-些異常堆疊資訊,因此這個引數設定之後,就是禁用這個特性,要把完整的異常堆疊資訊列印出來。
-XX:+AIwaysPreTouch:這個引數的意思是我們剛開始指定 JVM 用多少記憶體,不會真正分配給他,會在實際需要使用的時候再分配給他。所以使用這個引數之後,就是強制讓 JVM 啟動的時候直接分配我們指定的記憶體,不要等到使用記憶體的時候再分配。
-XX:-UseLargePages:這個引數的意思是禁用大記憶體頁,某些情況下會導致記憶體浪費或範例無法啟動。預設啟動。
# vim /etc/sysctl.conf
一個請求到 RocketMQ 的應用,一般會經過網路卡、核心空間、使用者空間
在作業系統級別,是可以做軟中斷聚合的優化。
網路卡佇列 CPU 繫結
緩衝區調整
佇列大小調整等
文: 一隻阿木木