上一篇對RabbitMQ的流程和相關的理論進行初步的概述,如果小夥伴之前對訊息佇列不是很瞭解,那麼在看理論時會有些困惑,這裡以訊息模式為切入點,結合理論細節和程式碼實踐的方式一起來學習。
常用的模式有Simple、Work、Fanout、Direct、Topic、Headers,可以通過設定交換機型別和設定引數來實現各個模式;接下來就分別進行實操演示吧。
以下演示都是通過管理員的賬號進行。其實每種模式其實很大一部分操作都是一樣的,所以公共部分不會重複截圖說明,不過會針對不同的設定進行說明。
簡單模式顧名思義就是簡單,不用設定太多的東西,如下圖所示:
上圖解析:
P:表示生產者,負責推播訊息;
C:表示消費者,負責接收訊息;
中間紅色部分:代表的是佇列(Queue);
小夥伴可能會奇怪,這裡沒有交換機嗎?
其實是有的,上一篇說流程的時候,訊息肯定是要通過交換機轉發到佇列中的,這裡沒有指定,那是因為用到了預設的交換機,具體看以下演示。
對於Web介面演示來說,只需要將訊息能生產、投遞、消費即可,我們不用去弄一個生產者和消費者,生產者和消費者都是業務處理邏輯用的,所以通常都是根據業務需求就行實現的;話不多說開始演示吧。
根據上圖所示,我們只需要建立一個佇列即可,然後就可以進行訊息模擬傳送和消費了。
此時並沒有指定交換機繫結,點選佇列名看詳情中的Bindings,有一個預設的交換機已經和佇列進行繫結:
佇列詳情頁面的說明,在上篇文章中就已經標註了,這裡就不再贅述。
有了繫結關係之後,就可以在預設的交換機頁面開始模擬轉發訊息;首先進入Exchanges管理頁面,點選預設交換機(AMQP default)進入詳情開始釋出訊息:
訊息傳送成功之後就會在佇列介面看到訊息情況:
佇列裡面有了訊息之後,就可以模擬消費者進行訊息消費,點選佇列名進入詳情,可在詳情也模擬消費:
如上所示,簡單模式整個消費流程就通過Web頁面模擬完了。但在消費訊息時,提供了Ack Mode模式(訊息確認模式)選擇來進行消費,可選擇的模式如下:
到這關於簡單模式下的介面演示就結束了,其中描述的細節內容是共用的,在其他模式下的操作也類似,後續不做重複說明。
這裡就用控制檯的方式,一步一步的實現。這裡需要引入Nuget包:RabbitMQ.Client。生產者的整體程式碼如下:
接下來就一步一步來偵錯,看看訊息是怎麼一步一步發出去的;
建立連線
剛開始沒有任何連線,如下:
程式碼繼續下一步,連線就有了:
此時就可以理解為網路連線上了,但通道還沒有建立出來,如下:
根據連線建立通道
通道根據連線進行建立,目的是為了提高傳輸效率,共用一個連線,不然頻繁的建立和銷燬連線會佔資源,影響效能。
定義佇列
有連線和通道之後理論就可以直接發訊息了,但直接通訊會相互依賴比較強,達不到解耦合的效果。所以需要定義一個佇列將訊息存放到裡面,使用者端想用了自己來消費就行,另外佇列還可以達到一定的削峰作用。
建立佇列的時候需要傳幾個引數,分別意思如下:
引數1:queue, 佇列的名稱;
引數2:durable, 佇列是否持久化;如果為true,伺服器重啟之後佇列還在,不會被清除;否則就被清掉。
引數3:exclusive ,是否排他,即是否私有的,如果為true,會對當前佇列加鎖,其他的通道不能存取,並且連線自動關閉;
引數4:autoDelete, 是否自動刪除,當最後一個消費者斷開連線之後是否自動刪除訊息;
引數5:arguments, 用來設定佇列附加引數,如設定佇列的有效期、佇列的訊息生命週期、訊息的最大長度等;
傳送訊息
經過以上步驟,就可以傳送訊息了,如上沒有定義交換機,那就是繫結了預設交換機。
傳送時的幾個引數意思如下:
引數1:exchange,交換機,這裡沒有指定交換機。
引數2:routingKey,路由key,即指定佇列,簡單模式下,路由key預設就是佇列名
引數3:basicProperties, 設定其他相關屬性
引數4:body ,需要傳送的訊息內容
以上的生產者完成了,現在再來一個消費者演示一下訊息消費,消費者整體程式碼如下:
效果如下:
在整個過程中,還是會先建立連線,建立通道,指定佇列; 不同的是增加對接收資料的處理。
是不是用起來比較簡單方便,主要是在複雜專案中,訊息佇列的作用真的很大。來,接著往下說說其他模式。
工作模式是考慮到多個消費者情況下,訊息如何被消費的,主要有兩種方案,輪詢分發和公平分發;
上圖解析:
這兩種方式需要多個消費者,在介面不太好模擬,所以就直接上程式碼演示了。
在簡單模式基礎上稍微改動一下程式碼即可。在生產者中發多條訊息出來,然後啟用多個消費者就可以進行模擬如下:
生產者程式碼整體如下:
消費者程式碼整體如下:
兩個消費者的程式碼都是一樣,沒有變動; 兩個消費者的都是在消費同一個佇列的訊息,可以先啟動兩個消費者,然後在啟動生產者,效果如下:
由上可見,預設情況下其實採用的是輪詢方式。
在實際業務中,有些業務處理比較耗時,有些處理耗時不長,如上案例,假如奇數訊息需要處理比較耗時,那麼對應的消費者就壓力比較大。這種情況可以通過公平分發的方式進行業務處理,處理快點的就多處理點。
如何才能知道消費者處理業務完成呢?消費者處理完成之後主動上報是最好不過的,所以只需要在消費者端將自動確認機制改為手動確認即可,即:業務處理完成之後,手動上報確認狀態。
生產者的程式碼不需要變動,只需要稍微改改消費者程式碼即可,這裡模擬兩個不同處理能力的消費者:
消費者1整體程式碼如下:
消費者2整體程式碼如下,主要是模擬時間不一樣:
先啟動兩個消費者,再啟動生產者,看看消費情況,如下:
以上演示就是公平分發模式的演示,其中有兩個關鍵的步驟:
Fanout模式是一種釋出訂閱模式,是一種廣播機制,不需要指定路由Key。這種模式的交換機就會將訊息廣播到繫結的所有佇列上去,只要有消費者訂閱對應的佇列,就會收到訊息。如下圖:
上圖解析:
在這種模式下,訊息會一次性被多個消費者消費。
先建立一個Fanout模式的交換機;
建立兩個佇列(根據實際需要建立多個),並將佇列繫結到上一步建立的交換機上;
這裡演示就建立兩個佇列,分別是FanoutQ1和FanoutQ2
在佇列詳情頁或交換機詳情頁都可以進行交換機和佇列的繫結,這裡分別在FanoutQ1和FanoutQ2佇列詳情頁進行繫結,如下:
同樣的方式繫結FanoutQ2, 繫結完成之後,可以在交換機詳情頁看到對應的繫結佇列:
通過交換機上投遞訊息,看效果;
此時通過FanoutExchange交換機投遞訊息,繫結到此交換機上的佇列都能收到:
檢視佇列訊息情況,可以看到兩個佇列都接收到訊息了。
其實程式碼和操作Web一樣,只是用程式碼實現生產者和消費者而已。
生產者的程式碼
因為Fanout交換機不用關注RoutingKey,所以在釋出訊息時,第二個引數不需要傳遞RoutingKey。
消費者1的程式碼
消費者比較關注的是交換機需要和生產者指定的是同一個,佇列和交換機有繫結。
消費者2的程式碼,其實和消費者1基本一樣,只是定義的佇列不一樣
先啟動消費者,然後啟動生產者,看效果
這裡是控制檯程式,為了顯示方便就先啟動消費者,後期的生產者,實際應用場景先啟動誰都行。
Direct模式是在Fanout基礎增加RoutingKey條件, 即交換機不會將訊息現全部投遞到所有佇列,而是隻投遞到對應RoutingKey下的佇列。如圖:
上圖解析:
Direct模式其實在實際應用場景中用的比較多的,預設的Exhanges也是Direct模式, 很多關於訊息佇列的框架,預設也是採用這種模式,主要原因是根據RoutingKey精確的處理對應的業務,不會由於考慮不周到,導致訊息處理有不確定性,效能相對也不錯。
先建立一個Direct模式的交換機;
建立兩個佇列,然後將佇列繫結到上一步建立的交換機上,並指定對應的RoutingKey;
建立佇列完成之後,還需要繫結到交換機上,上一種模式演示的是從佇列詳情中維護繫結關係,這次從交換機詳情中進行演示,如下:
繫結第1個佇列,指定RoutingKey是order,模擬處理訂單的:
繫結第2個佇列,指定RoutingKey是msg,模擬處理訊息的:
有了繫結關係,就可以測試驗證了。
注:這裡的RoutingKey可以根據實際情況隨意指定的。
向交換機上投遞訊息,看效果;
模擬釋出一個RoutingKey為order的訊息:
再發佈一個訊息,指定RoutingKey為msg,如下:
兩個訊息都發布成功,而是指定對應的RoutingKey進行投遞,所以現在繫結到此交換機上兩個佇列中分別有一條資料,檢視佇列的訊息概況:
可以進入佇列詳情,通過GetMessage獲取到具體的訊息內容,這裡就不截圖了,上面已經演示過。
在Fanout的程式碼基礎上稍微改動即可,主要改動點就是改變交換機的型別,並在佇列和交換機繫結時設定對應的RoutingKey,釋出訊息的時候指定交換機和RoutingKey。
生產者程式碼
消費者1程式碼,主要是繫結佇列時指定的RoutingKey為order
消費者2程式碼,主要是繫結佇列時指定的RoutingKey為msg
執行起來看效果:
如上演示效果,和Web演示一樣,只有精確匹配到RoutingKey才能消費到對應的訊息資料。
Topic模式是在Direct模式基礎增加模糊匹配RoutingKey,Direct精確匹配RoutingKey,Topic可以通*或#進行模糊匹配,從而把訊息投遞到對應的佇列中,如圖:
上圖解析:
先建立一個Topic模式的交換機;
建立兩個佇列,然後將佇列繫結到上一步建立的交換機上,並指定對應的RoutingKey;
將佇列繫結到交換機上,這裡還是在交換機詳情中進行演示,如下:
同樣的步驟分別對TopicQ1和TopicQ2進行規則繫結,如下:
現在的TopicExchange的繫結關係如下:
有了關係之後就可以進行驗證效果了。
向交換機上投遞訊息,會根據RoutingKey的模糊匹配規則將訊息投遞到對應的佇列中,看效果;
先指定order.create.test釋出訊息,看看會匹配哪些佇列:
TopicQ2接收到訊息,匹配到路由規則order.#
再指定RoutingKey 為order.update 釋出一個訊息,如下:
TopicQ1和TopicQ2都收到訊息了,匹配到路由規則order.#和order.*,如下:
再指定RoutingKey 為order釋出一個訊息,就會匹配到order.#,這裡就不截圖了。
以上測試說明:在Topic型別交換機和佇列繫結關係時,可以指定RoutingKey的匹配規則,星號、#號、點號搭配使用,其中*表示匹配RoutingKey中的一個詞,#號表示匹配RoutingKey的零個或多個詞。
更多情況,小夥伴們自己動手試試。
在Direct的程式碼基礎上稍微改動即可,主要改動點就是改變交換機的型別,並在佇列和交換機繫結時設定對應的RoutingKey,這裡的RoutingKey是一個規則,是星號、#號、點號和每個詞的組合,釋出訊息的時候指定交換機和RoutingKey,RoutingKey會去匹配繫結的規則。
生產者程式碼
消費者1程式碼,指定路由匹配規則為order.#
消費者2程式碼,指定路由匹配規則為order.*
演示效果,將生產者和消費者都啟動
如上圖,和Web演示一樣,#號匹配0個和多個詞,*號只能匹配一個詞。 符號可以與詞任意組合,小夥伴可以根據業務情況自行發揮。
Headers模式不是通過RoutingKey進行匹配投遞訊息,而是匹配請求頭中所帶的鍵值進行訊息投遞,所以建立佇列是需要設定繫結的頭部資訊,有兩種模式:全部匹配和部分匹配。
先建立一個Headers模式的交換機;
建立兩個佇列,然後將佇列繫結到上一步建立的交換機上,可以指定Headers的引數;
將佇列繫結到交換機上,這裡還是在交換機詳情中進行演示,如下:
這裡不使用RoutingKey的方式,而是通過設定引數的形式進行繫結,後續投遞訊息的時候就匹配引數,如果能匹配上,就將訊息投遞到對應的佇列。
繫結HeaderQ1佇列:
同樣的方式繫結HeaderQ2佇列,只是只新增了一個鍵值對,order:111,最後HeaderExchange交換機的繫結關係如下:
關係繫結好之後就可以進行測試效果了。
向交換機上投遞訊息,會根據檢查Headers引數的條件是否符合,若符合將訊息投遞到對應的佇列中,看效果;
設定兩個引數進行釋出,如下:
可以看到兩個佇列都匹配到了,因為order和msg鍵值對匹配到HeaderQ1,order的鍵值對匹配到HeaderQ2,如果只設定一個order簡直對呢:
此時只有HeaderQ2才能精確匹配,HeaderQ1沒有全部匹配,所以對應佇列沒有收到訊息,如下:
由此可見,在介面上沒有指定x-match繫結的話,預設是all,就是要全部匹配才投遞訊息到對應佇列。
這裡繼續新增一個HeaderQ3的佇列,建立方式和上面不一樣,只是在繫結交換機的時候增加x-match 為 any,如下:
繫結成功之後,現在關係如下,其中order的鍵值對是在每個繫結中都有,如下:
測試發訊息之前,把之前訊息都清空了,也就是佇列中的訊息都是空的,這次我們再指定order為111的引數進行釋出訊息,看看有哪些佇列收到訊息呢:
訊息發出後,之後HeaderQ2和HeaderQ3收到訊息,HeaderQ2只有一個order引數,精確匹配上了,HeaderQ3有多個引數,但設定了x-match為any,所以只要匹配其中一個即可。 HeaderQ1多個引數需要全部匹配才行,所以沒有接收到訊息:
Headers模式是根據引數進行匹配,不是通過RoutingKey,所以只需要在繫結佇列時設定好引數,在傳送訊息的時候也設定好引數,這樣就會根據匹配原則去匹配引數,如果匹配上,訊息就投遞到對應的佇列,供消費者進行消費。這裡為了演示比較清晰一點,使用一個生產者,三個消費者的形式進行演示。
生產者
程式碼中關鍵的部分是指定交換機的型別為Headers,然後模擬了兩類引數的形式投遞訊息,方便用於測試引數匹配模式的測試。
消費者1,繫結引數為order:111和msg:222
消費者2,和消費者1代表基本一樣,只是繫結引數不一樣;
消費者3,代表基本和消費者1一樣,只是在引數指定的時候增加了x-match來指定匹配模式,這裡指定為any,也就是隻要其中有部分匹配上也可以消費到訊息。
演示效果,啟動生產者和消費者:
如上圖,消費者3指定了匹配模式為部分匹配,所以可以接收到一個引數的訊息,而消費者1需要精確匹配,所以不能接收到。
關於常用的訊息模式就聊到這吧,小夥伴們可以根據自己的業務場景進行使用,相關演示程式碼的地址如下:
碼雲:https://gitee.com/CodeZoe/dot-net-core-study-demo/tree/main/RabbitMQDemo
RabbitMQ提供了多種模式應對各種業務,但匹配條件越是模糊或者引數化,那效能相對比較弱。再回顧一些常用的訊息佇列元件,是不是很多都是預設使用Direct模式,精確匹配的RoutingKey可以針對具體不同業務處理,匹配效能也相對比較高; 當然其他模式小夥伴也可以針對業務情況進行使用。
後續的文章將繼續分享RabbitMQ訊息確認機制、死信佇列、磁碟監控等相關知識點的應用,關注「Code綜藝圈」,和我一起學習吧。