前段時間寫過一篇:
# RabbitMQ:訊息丟失 | 訊息重複 | 訊息積壓的原因+解決方案+網上學不到的使用心得
很多人加了我好友,說很喜歡這篇文章,也問了我一些問題。
因為最近工作比較忙,隔了一段時間沒寫,忙完後專門花時間把RabbitMQ剩下的一個重要技術點通過案例的方式整理出來,就是延遲訊息的用法。
延遲訊息含義不解釋了,就是字面意思。
用法一共兩種方式,死信佇列和延遲外掛,兩種各有利弊,我會一一陳述並給出最佳用法。
死信佇列不要理解成很玄乎的東西,它就是普通佇列繫結了死信交換機,而且設定引數還是固定的,無需動腦,作用的話你想象成回收站就好了,被拒絕或超時的訊息就往這裡邊丟,然後還能繼續被消費,就這麼簡單。
宣告普通交換機、佇列、路由,這裡我們宣告兩個預備延遲的佇列,名稱分別包含5s和15min,用來區分延遲訊息是否達到預期效果。
我們接下來所有交換機和佇列都是以Direct模式來建立的,也就是對等方式,具體原因後面會講。
另外,注意這裡註釋的延遲交換機、佇列,都是為了特別說明,其實還是普通佇列,參考上面的原理圖解。
建立交換機、佇列、繫結關係。
聰明的小夥伴應該能發現,上面這段程式碼只有交換機和繫結佇列的關係,卻沒有建立佇列。
沒錯,接下來就是重點部分,建立佇列時,要繫結死信交換機,這樣就變成了一個死信佇列。
可以看到,5s和15min的佇列繫結的都是同一個死信交換機,只是路由規則、訊息過期時間TTL不同。
這樣,在專案啟動後,RabbitMQ就會建立出兩個具備不同過期時間的死信佇列,後面會有截圖專門給大家看。
繫結後的效果,在專案啟動後RabbitMQ會把交換機和佇列都建立出來,在控制檯就能看到。
普通佇列繫結死信交換機和對應的路由規則後,我們接下來就把死信交換機、路由規則、佇列建立出來即可,其實和建立普通佇列沒區別。
為了演示方便,我們的生產者和消費者是寫在同一個專案中的,所以組態檔沒有區別。但是線上上環境中,為了解耦生產者和消費者往往是分開的。
這裡可以發現,我們給RabbitMQ開啟了訊息確認機制,讀過開頭提過那篇文章的小夥伴應該知道,線上環境我們為了提高效能一般是不開啟確認機制的,這裡之所以開啟,是為了演示訊息的投遞情況,同時也為了特別講後面延遲外掛會出現的一個問題。
這裡加了諸如訊息唯一ID、訊息確認機制的寫法,單純為了展示給大家看,實際上你可以不加。
這裡注意,監聽的佇列也就是我們前面宣告的死信佇列,因為過期的訊息都通過繫結的死信交換機轉發到了裡面,如果對過程有疑惑,可以回到開頭的圖解那裡對著圖片來看。
分別建立了5s延遲和15min延遲的測試介面
外掛方式,要比死信佇列方式簡單得多,只需要安裝外掛,啟動外掛功能,然後建立延遲佇列即可。
這裡給出原始碼安裝方式和docker安裝方式,大家根據各自情況自己選。
下載外掛:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
本次演示下載的是3.8.0版本外掛,它能相容3.7.x和3.8.x的RabbitMQ
直通車:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
提醒一下,要下載和RabbitMQ對應的版本,否則延遲佇列不會生效。這裡面有些版本是相容低版本MQ的,可以點選進去具體檢視支援的版本。
將下載好的外掛上傳到RabbitMQ的plugins下:rabbitmq_server-3.7.24/plugins
然後開啟延遲佇列:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
這樣就可以開始愉快的使用啦,具體的操作和圖示在下面的docker安裝步驟裡展示出來,這倆的操作沒啥區別。
首先,還是一樣的步驟,把下載好的外掛上傳到RabbitMQ伺服器上,因為是docker方式,所以你還得把它上傳到docker容器中。
複製檔案到docker容器中:
進入容器檢視是否上傳成功
開啟延遲外掛,開啟成功後就是下面的提示效果。
重啟RabbitMQ,不重啟不會生效,如果重啟沒效果,你可能需要kill程序再啟動。
開啟控制檯介面,如果看到建立交換機的選項中有了x-delayed-message,就表示延遲外掛安裝和啟動成功。
這裡我專門畫了紅框,是因為這個設定在延遲外掛中有一個作用,可以加也可以不加,後面的踩坑手記會單獨解釋。
這裡只需注意一點,交換機建立時要設定為延遲交換機,也就是setDelayed(true)。
我們通過MessagePostProcessor這個勾點來設定持久化模式和延遲時間,專案中可能多個地方會用到,所以單獨抽取出來通過工具類獲取物件。
這個寫法是固定的,可以直接複製到自己的專案中使用,延遲時間自己定義。
直接監聽這個延遲佇列即可,沒有特別的地方。
這裡我們測試這個6s延遲的訊息是否成功
可以看到,剛好6s後消費了訊息,延遲效果沒問題。
其實死信佇列的坑點比外掛要少多了,但是死信佇列沒有外掛那麼簡單直接好理解。
1)、原理一定要弄明白,否則你連交換機和佇列怎麼建立怎麼繫結都不知道;
2)、繫結死信交換機時,x-dead-letter這些引數對應的值一定要寫對,尤其是路由,寫錯了會導致過期訊息不進入死信佇列,找半天原因都找不到;
延遲外掛的坑點就真的多了,筆者用死信佇列方式幾乎是1次就成功,外掛反倒折騰了我好幾個回合。
1)、延遲外掛的版本一定要下載對,和RabbitMQ本身版本對應,最好是點進去看下說明,一般都會告訴你相容哪個版本,如果下載錯了,我只能為你默哀;
2)、不要下載過高版本的RabbitMQ和外掛,你可能會瘋掉;
3)、開啟外掛後一定要重啟RabbitMQ,否則不生效,如果重啟也沒生效,你可以嘗試下kill掉MQ的程序,然後再啟動;
4)、開啟訊息確認機制後,你會發現延遲外掛很特殊的一個地方,就是每次投遞訊息都會進入returncallback回撥中。
我著重來說明下最後一個坑點,也談不上是坑點,只是你以後使用過程中很可能會產生疑惑。
還記得前文中yml檔案畫一個紅框的設定嗎,我們把那個引數註釋掉看看效果。
你會發現,一旦開啟了訊息確認機制,延遲訊息每次都會先進入returncallback回撥,然後才會投遞成功,你們可以自己試一試,每次都會這樣。
原因是什麼呢?
這裡就要提到延遲外掛的原理了,從前文建立延遲交換機那裡就可以看到,是給交換機設定了delayed:true,因為延遲訊息實際上是掛載到交換機上,不會馬上就通過路由投遞出去。
那麼我們再來看看,上面這個圖中returncallback回撥列印出來的返回資訊:replyText=NO_ROUTE,很明顯了吧,說是沒有路由,所以訊息確認失敗了,因為延遲外掛沒有馬上通過路由投遞。
那又有疑問了,沒有馬上投遞,為什麼會進入returncallback回撥呢?
下面這張圖是原始碼中的一段,告訴你了為什麼。
因為有個mandatory引數,如果不設定它的話,它是為null的,當為null的時候,傳遞的值是this.properties.isPublisherReturns()。
官方對mandatory的解釋如下:
Enable mandatory messages. If a mandatory message cannot be routed to a queue by the server, it will return an unroutable message with a Return method.
翻譯過來:
開啟強制性的訊息。如果強制訊息不能被伺服器路由到佇列,它將使用return方法返回一個不可路由的訊息。
所以,mandatory為true表示開啟強制投遞,為false表示不強制,而且這個值可以為null。
而我們前面yml檔案中開啟了訊息確認機制:publisher-returns: true
所以,每次一定會走returncallback回撥。
因此,我們要麼不開啟訊息確認機制,要麼就把mandatory設定為false,這樣延遲外掛的訊息就不會每次走一遍回撥了。
1)、延遲外掛方式更方便,但我建議首選死信佇列方式,因為死信佇列在RabbitMQ的使用中佔比還挺重的,它不僅可以用在延遲訊息,還可以用在其他很多地方,另外死信佇列是必學的,並且拿來即用免去了安裝外掛帶來的風險;
2)、死信佇列在延遲訊息這塊其實是有隱形BUG的,它在多個延遲時間場景同時存在的情況下有先後執行順序的問題,可能出現15min的延遲訊息在前面,導致5s的延遲訊息要等待15min的執行完了再執行,這是RabbitMQ本身的有序機制導致的,只對隊尾訊息判定,所以我們在使用時一定要做延遲訊息隔離,不同延遲場景要分開處理;
3)、延遲外掛就沒有上述死信佇列的問題,已經專門處理了複雜場景下的有序問題,所以一個專案在開始之初決定要使用RabbitMQ的時候,不管未來用不用得上,請務必就在安裝時順便也把延遲外掛也裝上,避免專案中期忽然想用時安裝外掛必須重啟RabbitMQ帶來的未知風險;
4)、大部分情況下,死信佇列完全足夠,但切記專案中不要頻繁使用延遲佇列,基本上用到的場景會很少,比如定時關單、定時釋放鎖資源等等,特定的對延遲時間準確性要求很高的場景才用,其他還是以分散式任務排程為主,延遲佇列太多會引起不必要的認知混亂;
5)、我個人的經驗,延遲訊息的場景下,交換機最好使用Direct對等模式,我們公司曾經出現過同事使用Topic模式,自以為路由規則匹配寫的沒問題,實際上導致規則衝突訊息被其他消費者給消費掉了,MQ流轉訊息本身是靜默處理的,所以他找死都找不到原因,直到後來被其他同事偶然發現才解決。所以對於這種單一場景最保險的方式還是Direct,一對一總不會有問題。
總結下來就以下幾點:
1)、不管用不用,在安裝RabbitMQ時就順便把延遲外掛也裝上;
2)、推薦以死信佇列方式為主;
3)、不要太多地方使用延遲佇列;
4)、交換機模式使用Direct對等。
最後,我會把本次案例的程式碼地址放在評論中,兩種實現方式都有,可以直接執行起來,想要學習的可以下載來看看。
原創文章純手打,覺得有一滴滴幫助就請舉手之勞點個推薦吧~
持續分享工作中的真實經驗和心得體會,喜歡的話就點個關注吧~