RabbitMQ:訊息丟失 | 訊息重複 | 訊息積壓的原因+解決方案+網上學不到的使用心得

2022-07-08 15:01:25

前言

首先說一點,企業中最常用的實際上既不是RocketMQ,也不是Kafka,而是RabbitMQ。

RocketMQ很強大,但主要是阿里推廣自己的雲產品而開源出來的一款訊息佇列,其實中小企業用RocketMQ的沒有想象中那麼多。

深層次的原因在於兔寶在中小企業普及更早,經受的考驗也更久,很容易產生「回頭客」,當初隨RabbitMQ成長的一批人才如今大部分都已成為企業中的中堅骨幹,技術選型親睞RabbitMQ的機率就更高。

至於Kafka,主要還是用在巨量資料和紀錄檔採集方面,除了一些公司有特定的需求會使用外,對訊息收發準確率要求較高的公司依然是以RabbitMQ作為企業級訊息佇列的首選。

工作這麼多年我自身的感受是,RabbitMQ經久不衰,除非後續其他訊息中介軟體有與眾不同的使用體驗,否則依然是RabbitMQ的佔有率更高。

所以準備進入軟體行業的小夥伴,我建議有必要系統的先把RabbitMQ學好,然後再學習其他訊息中介軟體擴充套件視野,他們的原理大同小異,是可以觸類旁通的。


兩個概念

RabbitMQ避免訊息丟失的方法主要是利用訊息確認機制和手動簽收機制,所以有必要把這兩個概念搞清楚。

1、訊息確認機制

主要是生產者使用的機制,用來確認訊息是否被成功消費。

設定如下:

spring: 
    rabbitmq:
        address: 192.168.x.x:xxxx
        virtual-host: /
        username: guest
        password: guest
        connection-timeout: 5000
        publisher-confirms: true # 訊息成功確認
        publisher-returns: true # 訊息失敗確認
        template: 
            mandatory: true # 手動簽收機制

這樣,當你實現RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback這兩個介面的方法後,就可以針對性地進行訊息確認的紀錄檔記錄,之後做進一步的訊息傳送補償,以達到接近100%投遞的目的。

虛擬碼如下:

@Component
@Slf4j
public class RabbitMQSender implements RabbitTemplate.ConfirmCallback, 
RabbitTemplate.ReturnCallback {
    
    /**
     * 傳送訊息
     */
    public void sendOrder(Order order) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        
        // 傳送訊息
        rabbitTemplate.convertAndSend(xx, xx, order, xx);
    }
    
    
    /**
     * 成功接收後的回撥
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
    
        // 如果成功接收了,這裡可以對紀錄檔表的訊息收發狀態做更新。
        // ....
        
    }
    
    
    /**
     * 失敗後的回撥
     */
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
    
        // 如果失敗了,這裡可以對紀錄檔表的訊息收發狀態做更新,之後通過任務排程去補償傳送。
        // ....
        
    }
}

2、訊息簽收機制

RabbitMQ的訊息是自動簽收的,你可以理解為快遞簽收了,那麼這個快遞的狀態就從傳送變為已簽收,唯一的區別是快遞公司會對物流軌跡有記錄,而MQ簽收後就從佇列中刪除了。

企業級開發中,RabbitMQ我們基本都開啟手動簽收方式,這樣可以有效避免訊息的丟失。

前文中已經在生產者開啟了手動簽收機制,那麼作為消費方,也要設定手動簽收。

設定如下:

spring: 
    rabbitmq:
        address: 192.168.x.x:xxxx
        virtual-host: /
        username: guest
        password: guest
        connection-timeout: 5000
        listener: 
            simple: 
                concurrency: 5 # 並行數量
                max-concurrency: 10 # 最大並行數量
                acknowledge-mode: manual # 開啟手動簽收
                prefetch: 1 # 限制每次只消費一個(一個執行緒),上面設定5,也就是能一次接收5個

消費監聽時,手動簽收就一行程式碼,虛擬碼如下:

@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel, 
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    
    // ....
    
    // 手動簽收
    channel.basicAck(tag, false);
    
}

訊息丟失

兩個概念搞清楚後,就可以來學習訊息丟失的問題和處理方案了。

1、出現原因

訊息丟失的原因無非有三種:

1)、訊息發出後,中途網路故障,伺服器沒收到;

2)、訊息發出後,伺服器收到了,還沒持久化,伺服器宕機;

3)、訊息發出後,伺服器收到了,消費方還未處理業務邏輯,服務卻掛掉了,而訊息也自動簽收,等於啥也沒幹。

這三種情況,(1) 和 (2)是由於生產方未開啟訊息確認機制導致,(3)是由於消費方未開啟手動簽收機制導致。

2、解決方案

1)、生產方傳送訊息時,要try...catch,在catch中捕獲異常,並將MQ傳送的關鍵內容記錄到紀錄檔表中,紀錄檔表中要有訊息傳送狀態,若傳送失敗,由定時任務定期掃描重發並更新狀態;

2)、生產方publisher必須要加入確認回撥機制,確認成功傳送並簽收的訊息,如果進入失敗回撥方法,就修改資料庫訊息的狀態,等待定時任務重發;

3)、消費方要開啟手動簽收ACK機制,消費成功才將訊息移除,失敗或因異常情況而尚未處理,就重新入隊。

其實這就是前面闡述兩個概念時已經講過的內容,也是接近100%訊息投遞的企業級方案之一,主要目的就是為了解決訊息丟失的問題。


訊息重複

1、出現原因

訊息重複大體上有兩種情況會出現:

1)、訊息消費成功,事務已提交,簽收時結果伺服器宕機或網路原因導致簽收失敗,訊息狀態會由unack轉變為ready,重新傳送給其他消費方;

2)、訊息消費失敗,由於retry重試機制,重新入隊又將訊息傳送出去。

2、解決方案

網上大體上能蒐羅到的方法有三種:

1)、消費方業務介面做好冪等;

2)、訊息紀錄檔表儲存MQ傳送時的唯一訊息ID,消費方可以根據這個唯一ID進行判斷避免訊息重複;

3)、消費方的Message物件有個getRedelivered()方法返回Boolean,為TRUE就表示重複傳送過來的。

我這裡只推薦第一種,業務方法冪等這是最直接有效的方式,(2)還要和資料庫產生互動,(3)有可能導致第一次消費失敗但第二次消費成功的情況被砍掉。


訊息積壓

1、出現原因

訊息積壓出現的場景一般有兩種:

1)、消費方的服務掛掉,導致一直無法消費訊息;

2)、消費方的服務節點太少,導致消費能力不足,從而出現積壓,這種情況極可能就是生產方的流量過大導致。

2、解決方案

1)、既然消費能力不足,那就擴充套件更多消費節點,提升消費能力;

2)、建立專門的佇列消費服務,將訊息批次取出並持久化,之後再慢慢消費。

(1)就是最直接的方式,也是訊息積壓最常用的解決方案,但有些企業考慮到伺服器成本壓力,會選擇第(2)種方案進行迂迴,先通過一個獨立服務把要消費的訊息存起來,比如存到資料庫,之後再慢慢處理這些訊息即可。


使用心得

這裡單獨講一下本人在工作中使用RabbitMQ的一些心得,希望能有所幫助。

1)、訊息丟失、訊息重複、訊息積壓三個問題中,實際上主要解決的還是訊息丟失,因為大部分公司遇不到訊息積壓的場景,而稍微有水準的公司核心業務都會解決冪等問題,所以幾乎不存在訊息重複的可能;

2)、訊息丟失的最常見企業級方案之一就是定時任務補償,因為不論是SOA還是微服務的架構,必然會有分散式任務排程的存在,自然也就成為MQ最直接的補償方式,如果MQ一定要實現100%投遞,這種是最普遍的方案。但我實際上不推薦中小企業使用該方案,因為憑空增加維護成本,而且沒有一定規模的專案完全沒必要,大家都小看了RabbitMQ本身的效能,比如我們公司,支撐一個三甲醫院,也就是三臺8核16G伺服器的叢集,上線至今3年毫無壓力;

3)、不要迷信網上和培訓機構講解的生產者訊息確認機制,也就是前面兩個概念中講到的ConfirmCallback和ReturnCallback,這種機制十分降低MQ效能,我們團隊曾遇到過一次流量高峰期帶來的MQ傳輸及消費效能大幅降低的情況,後來發現是訊息確認機制導致,關閉後立馬恢復正常,從此以後都不再使用這種機制,MQ執行十分順暢。同時我們會建立後臺管理實現人工補償,通過識別業務狀態判斷消費方是否處理了業務邏輯,畢竟這種情況都是少數,效能和運維成本,在這一塊我們選擇了效能;

4)、我工作這些年使用RabbitMQ沒見過自動簽收方式,一定是開啟手動簽收;

5)、手動簽收方式你在網上看到的教學幾乎都是處理完業務邏輯之後再手動簽收,但實際上這種用法是不科學的,在分散式的架構中,MQ用來解耦和轉發是非常常見的,如果是支付業務,往往在回撥通知中通過MQ轉發到其他服務,其他服務如果業務處理不成功,那麼手動簽收也不執行,這個訊息又會入隊發給其他消費者,這樣就可能在流量洪峰階段因為偶然的業務處理失敗造成堵塞,甚至標題所講的三種問題同時出現,這樣就會得不償失。

不科學的用法:在處理完業務邏輯後再手動簽收,否則不簽收,就好比客人進店了你得買東西,否則不讓走。

@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel, 
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    
    // 處理業務
    doBusiness(order);
    
    // 手動簽收
    channel.basicAck(tag, false);
    
}

科學的用法:不論業務邏輯是否處理成功,最終都要將訊息手動簽收,MQ的使命不是保證客人進店了必須消費,不消費就不讓走,而是客人能進來就行,哪怕是隨便看看也算任務完成。

@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel, 
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    
    try {
        // 處理業務
        doBusiness(order);
    } catch(Exception ex) {
        // 記錄紀錄檔,通過後臺管理或其他方式人工處理失敗的業務。
    } finally {
        // 手動簽收
        channel.basicAck(tag, false);
    }
    
}

可能有人會問你這樣不是和自動簽收沒區別嗎,NO,你要知道如果自動簽收,出現訊息丟失你連記錄紀錄檔的可能都沒有。

另外,為什麼一定要這麼做,因為MQ是中介軟體,本身就是輔助工具,就是一個滴滴司機,保證給你送到順便說個再見就行,沒必要還下車給你搬東西。

如果強加給MQ過多壓力,只會造成本身業務的畸形。我們使用MQ的目的就是解耦和轉發,不再做多餘的事情,保證MQ本身是流暢的、職責單一的即可。


總結

本篇主要講了RabbitMQ的三種常見問題及解決方案,同時分享了一些作者本人工作中使用的心得,我想網上是很難找到的,如果哪一天用到了,不妨再開啟看看,也許能避免一些生產環境可能出現的問題。

我總結下來就是三點:

1)、訊息100%投遞會增加運維成本,中小企業視情況使用,非必要不使用;

2)、訊息確認機制影響效能,非必要不使用;

3)、消費者先保證訊息能簽收,業務處理失敗可以人工補償。

工作中怕的永遠不是一個技術不會使用,而是遇到問題不知道有什麼解決思路。


分享

多年工作及學習過程中在雲筆記中記錄了很多內容,我閒暇之餘都做了下整理,本篇也是其中之一,有感興趣的朋友可以私信我獲取,什麼時候用到了翻開說不定就能節省很多時間。



原創文章純手打,覺得有一滴滴幫助就請舉手之勞點個推薦吧~

持續分享工作中的真實經驗和心得體會,喜歡的話就點個關注吧~