RabbitMQ之訊息模式簡單易懂,超詳細分享~~~

2022-09-13 12:02:10

前言

上一篇對RabbitMQ的流程和相關的理論進行初步的概述,如果小夥伴之前對訊息佇列不是很瞭解,那麼在看理論時會有些困惑,這裡以訊息模式為切入點,結合理論細節和程式碼實踐的方式一起來學習。

正文

常用的模式有Simple、Work、Fanout、Direct、Topic、Headers,可以通過設定交換機型別和設定引數來實現各個模式;接下來就分別進行實操演示吧。

以下演示都是通過管理員的賬號進行。其實每種模式其實很大一部分操作都是一樣的,所以公共部分不會重複截圖說明,不過會針對不同的設定進行說明。

1. 簡單模式(Simple)

簡單模式顧名思義就是簡單,不用設定太多的東西,如下圖所示:

上圖解析:

  • P:表示生產者,負責推播訊息;

  • C:表示消費者,負責接收訊息;

  • 中間紅色部分:代表的是佇列(Queue);

小夥伴可能會奇怪,這裡沒有交換機嗎?

其實是有的,上一篇說流程的時候,訊息肯定是要通過交換機轉發到佇列中的,這裡沒有指定,那是因為用到了預設的交換機,具體看以下演示。

1.1 Web管理介面進行演示

對於Web介面演示來說,只需要將訊息能生產、投遞、消費即可,我們不用去弄一個生產者和消費者,生產者和消費者都是業務處理邏輯用的,所以通常都是根據業務需求就行實現的;話不多說開始演示吧。

根據上圖所示,我們只需要建立一個佇列即可,然後就可以進行訊息模擬傳送和消費了。

此時並沒有指定交換機繫結,點選佇列名看詳情中的Bindings,有一個預設的交換機已經和佇列進行繫結

佇列詳情頁面的說明,在上篇文章中就已經標註了,這裡就不再贅述。

有了繫結關係之後,就可以在預設的交換機頁面開始模擬轉發訊息;首先進入Exchanges管理頁面,點選預設交換機(AMQP default)進入詳情開始釋出訊息:

訊息傳送成功之後就會在佇列介面看到訊息情況:

佇列裡面有了訊息之後,就可以模擬消費者進行訊息消費,點選佇列名進入詳情,可在詳情也模擬消費:

如上所示,簡單模式整個消費流程就通過Web頁面模擬完了。但在消費訊息時,提供了Ack Mode模式(訊息確認模式)選擇來進行消費,可選擇的模式如下:

  • Nack message requeue true:獲取訊息,但是不會向Server做ack應答確認(即不告訴伺服器訊息被消費了),訊息重新入隊。即佇列中的訊息不會被刪除掉;
  • Automatic Ack:獲取訊息,向Server做應答確認(即會告訴伺服器訊息被消費了),訊息不重新入隊,將會從佇列中刪除;
  • Reject requeue true:拒絕獲取訊息(即拒絕處理訊息),訊息重新入隊;
  • Reject requeue false:拒絕獲取訊息(即拒絕處理訊息),訊息不重新入隊,將會被刪除;

到這關於簡單模式下的介面演示就結束了,其中描述的細節內容是共用的,在其他模式下的操作也類似,後續不做重複說明。

1.2 程式碼進行演示

這裡就用控制檯的方式,一步一步的實現。這裡需要引入Nuget包:RabbitMQ.Client。生產者的整體程式碼如下:

接下來就一步一步來偵錯,看看訊息是怎麼一步一步發出去的;

  • 建立連線

    剛開始沒有任何連線,如下:

    程式碼繼續下一步,連線就有了:

    此時就可以理解為網路連線上了,但通道還沒有建立出來,如下:

  • 根據連線建立通道

    通道根據連線進行建立,目的是為了提高傳輸效率,共用一個連線,不然頻繁的建立和銷燬連線會佔資源,影響效能

  • 定義佇列

    有連線和通道之後理論就可以直接發訊息了,但直接通訊會相互依賴比較強,達不到解耦合的效果。所以需要定義一個佇列將訊息存放到裡面,使用者端想用了自己來消費就行,另外佇列還可以達到一定的削峰作用

    建立佇列的時候需要傳幾個引數,分別意思如下:

    引數1:queue, 佇列的名稱;

    引數2:durable, 佇列是否持久化;如果為true,伺服器重啟之後佇列還在,不會被清除;否則就被清掉。

    引數3:exclusive ,是否排他,即是否私有的,如果為true,會對當前佇列加鎖,其他的通道不能存取,並且連線自動關閉;

    引數4:autoDelete, 是否自動刪除,當最後一個消費者斷開連線之後是否自動刪除訊息;

    引數5:arguments, 用來設定佇列附加引數,如設定佇列的有效期、佇列的訊息生命週期、訊息的最大長度等;

  • 傳送訊息

    經過以上步驟,就可以傳送訊息了,如上沒有定義交換機,那就是繫結了預設交換機。

    傳送時的幾個引數意思如下:

    引數1:exchange,交換機,這裡沒有指定交換機。

    引數2:routingKey,路由key,即指定佇列,簡單模式下,路由key預設就是佇列名

    引數3:basicProperties, 設定其他相關屬性

    引數4:body ,需要傳送的訊息內容

以上的生產者完成了,現在再來一個消費者演示一下訊息消費,消費者整體程式碼如下:

效果如下:

在整個過程中,還是會先建立連線,建立通道,指定佇列; 不同的是增加對接收資料的處理。

是不是用起來比較簡單方便,主要是在複雜專案中,訊息佇列的作用真的很大。來,接著往下說說其他模式。

2. 工作模式(Work)

工作模式是考慮到多個消費者情況下,訊息如何被消費的,主要有兩種方案,輪詢分發和公平分發;

  • 輪詢分發:消費者依次輪著消費訊息,直到訊息消費完為止,按均分配。
  • 公平分發:根據消費者能力進行分發,即處理快的消費就多,處理慢的就消費就少,能者多勞。

上圖解析:

  • P:表示生產者,負責推播訊息;
  • C1、C2:表示多個消費者,都可以從同一個佇列接收訊息;
  • 中間紅色部分:代表的是佇列(Queue);

這兩種方式需要多個消費者,在介面不太好模擬,所以就直接上程式碼演示了。

2.1 輪詢分發

在簡單模式基礎上稍微改動一下程式碼即可。在生產者中發多條訊息出來,然後啟用多個消費者就可以進行模擬如下:

生產者程式碼整體如下:

消費者程式碼整體如下:

兩個消費者的程式碼都是一樣,沒有變動; 兩個消費者的都是在消費同一個佇列的訊息,可以先啟動兩個消費者,然後在啟動生產者,效果如下:

由上可見,預設情況下其實採用的是輪詢方式。

2.2 公平分發

在實際業務中,有些業務處理比較耗時,有些處理耗時不長,如上案例,假如奇數訊息需要處理比較耗時,那麼對應的消費者就壓力比較大。這種情況可以通過公平分發的方式進行業務處理,處理快點的就多處理點。

如何才能知道消費者處理業務完成呢?消費者處理完成之後主動上報是最好不過的,所以只需要在消費者端將自動確認機制改為手動確認即可,即:業務處理完成之後,手動上報確認狀態。

生產者的程式碼不需要變動,只需要稍微改改消費者程式碼即可,這裡模擬兩個不同處理能力的消費者:

  • 消費者1處理一條訊息業務需要500毫秒;
  • 消費者2處理一條訊息業務需要2秒;

消費者1整體程式碼如下:

消費者2整體程式碼如下,主要是模擬時間不一樣:

先啟動兩個消費者,再啟動生產者,看看消費情況,如下:

以上演示就是公平分發模式的演示,其中有兩個關鍵的步驟:

  • 設定每次消費的訊息條數,可以根據實際業務情況設定,這裡設定的是每次取一條。通過 channel.BasicQos進行設定。
  • 將訊息確認模式改為自動模式,這樣就可以根據實際業務處理情況反饋確認資訊,伺服器就會將訊息處理掉,即刪除訊息。所以一般在實際業務場景大都會推薦使用手動確認的方式,這樣避免業務未處理導致訊息就被伺服器給清掉的情況。

3. 釋出訂閱模式(Fanout)

Fanout模式是一種釋出訂閱模式,是一種廣播機制,不需要指定路由Key。這種模式的交換機就會將訊息廣播到繫結的所有佇列上去,只要有消費者訂閱對應的佇列,就會收到訊息。如下圖:

上圖解析:

  • P:表示生產者,負責推播訊息;
  • X:表示交換機,圖中表示一個交換機繫結了多個佇列;
  • C1、C2:表示多個消費者,都可以從同一個佇列接收訊息;
  • 中間紅色部分:代表的是佇列(Queue);

在這種模式下,訊息會一次性被多個消費者消費。

3.1 Web管理介面進行演示
  • 先建立一個Fanout模式的交換機;

  • 建立兩個佇列(根據實際需要建立多個),並將佇列繫結到上一步建立的交換機上;

    這裡演示就建立兩個佇列,分別是FanoutQ1和FanoutQ2

    在佇列詳情頁或交換機詳情頁都可以進行交換機和佇列的繫結,這裡分別在FanoutQ1和FanoutQ2佇列詳情頁進行繫結,如下:

    同樣的方式繫結FanoutQ2, 繫結完成之後,可以在交換機詳情頁看到對應的繫結佇列:

  • 通過交換機上投遞訊息,看效果;

    此時通過FanoutExchange交換機投遞訊息,繫結到此交換機上的佇列都能收到:

    檢視佇列訊息情況,可以看到兩個佇列都接收到訊息了。

3.2 程式碼進行演示

其實程式碼和操作Web一樣,只是用程式碼實現生產者和消費者而已。

  • 生產者的程式碼

    因為Fanout交換機不用關注RoutingKey,所以在釋出訊息時,第二個引數不需要傳遞RoutingKey。

  • 消費者1的程式碼

    消費者比較關注的是交換機需要和生產者指定的是同一個,佇列和交換機有繫結。

  • 消費者2的程式碼,其實和消費者1基本一樣,只是定義的佇列不一樣

  • 先啟動消費者,然後啟動生產者,看效果

    這裡是控制檯程式,為了顯示方便就先啟動消費者,後期的生產者,實際應用場景先啟動誰都行。

4. 路由模式(Direct)

Direct模式是在Fanout基礎增加RoutingKey條件, 即交換機不會將訊息現全部投遞到所有佇列,而是隻投遞到對應RoutingKey下的佇列。如圖:

上圖解析:

  • P:表示生產者,負責推播訊息;
  • X:表示交換機,指定型別為direct,圖中表示一個交換機繫結了多個佇列;
  • 中間箭頭上的error、info、warning代表具體的RoutingKey;
  • C1、C2:表示多個消費者,都可以從同一個佇列接收訊息;
  • 中間紅色部分:代表的是佇列(Queue);

Direct模式其實在實際應用場景中用的比較多的,預設的Exhanges也是Direct模式, 很多關於訊息佇列的框架,預設也是採用這種模式,主要原因是根據RoutingKey精確的處理對應的業務,不會由於考慮不周到,導致訊息處理有不確定性,效能相對也不錯

4.1 Web管理介面進行演示
  • 先建立一個Direct模式的交換機;

  • 建立兩個佇列,然後將佇列繫結到上一步建立的交換機上,並指定對應的RoutingKey;

    建立佇列完成之後,還需要繫結到交換機上,上一種模式演示的是從佇列詳情中維護繫結關係,這次從交換機詳情中進行演示,如下:

    繫結第1個佇列,指定RoutingKey是order,模擬處理訂單的:

    繫結第2個佇列,指定RoutingKey是msg,模擬處理訊息的:

    有了繫結關係,就可以測試驗證了。

    注:這裡的RoutingKey可以根據實際情況隨意指定的。

  • 向交換機上投遞訊息,看效果;

    模擬釋出一個RoutingKey為order的訊息:

    再發佈一個訊息,指定RoutingKey為msg,如下:

    兩個訊息都發布成功,而是指定對應的RoutingKey進行投遞,所以現在繫結到此交換機上兩個佇列中分別有一條資料,檢視佇列的訊息概況:

    可以進入佇列詳情,通過GetMessage獲取到具體的訊息內容,這裡就不截圖了,上面已經演示過。

4.2 程式碼進行演示

在Fanout的程式碼基礎上稍微改動即可,主要改動點就是改變交換機的型別,並在佇列和交換機繫結時設定對應的RoutingKey,釋出訊息的時候指定交換機和RoutingKey。

  • 生產者程式碼

  • 消費者1程式碼,主要是繫結佇列時指定的RoutingKey為order

  • 消費者2程式碼,主要是繫結佇列時指定的RoutingKey為msg

  • 執行起來看效果:

    如上演示效果,和Web演示一樣,只有精確匹配到RoutingKey才能消費到對應的訊息資料。

5. 主題模式(Topic)

Topic模式是在Direct模式基礎增加模糊匹配RoutingKey,Direct精確匹配RoutingKey,Topic可以通*或#進行模糊匹配,從而把訊息投遞到對應的佇列中,如圖:

上圖解析:

  • P:表示生產者,負責推播訊息;
  • X:表示交換機,指定型別為topic,圖中表示一個交換機繫結了多個佇列;
  • 中間箭頭上的文字代表模糊匹配的RoutingKey;其中*表示匹配RoutingKey中的一個詞,#號表示匹配RoutingKey的零個或多個詞,匹配符需要與點號(.)搭配使用。 *.orange.test.# 範例中orange算一個詞,test算一個詞,即通過點號(.)分開的就稱為一個詞。
  • C1、C2:表示多個消費者,都可以從同一個佇列接收訊息;
  • 中間紅色部分:代表的是佇列(Queue);
5.1 Web管理介面進行演示
  • 先建立一個Topic模式的交換機;

  • 建立兩個佇列,然後將佇列繫結到上一步建立的交換機上,並指定對應的RoutingKey;

    將佇列繫結到交換機上,這裡還是在交換機詳情中進行演示,如下:

    同樣的步驟分別對TopicQ1和TopicQ2進行規則繫結,如下:

    現在的TopicExchange的繫結關係如下:

    有了關係之後就可以進行驗證效果了。

  • 向交換機上投遞訊息,會根據RoutingKey的模糊匹配規則將訊息投遞到對應的佇列中,看效果;

    先指定order.create.test釋出訊息,看看會匹配哪些佇列:

    TopicQ2接收到訊息,匹配到路由規則order.#

    再指定RoutingKey 為order.update 釋出一個訊息,如下:

    TopicQ1和TopicQ2都收到訊息了,匹配到路由規則order.#和order.*,如下:

    再指定RoutingKey 為order釋出一個訊息,就會匹配到order.#,這裡就不截圖了。

    以上測試說明:在Topic型別交換機和佇列繫結關係時,可以指定RoutingKey的匹配規則,星號、#號、點號搭配使用,其中*表示匹配RoutingKey中的一個詞,#號表示匹配RoutingKey的零個或多個詞

    更多情況,小夥伴們自己動手試試。

5.2 程式碼進行演示

在Direct的程式碼基礎上稍微改動即可,主要改動點就是改變交換機的型別,並在佇列和交換機繫結時設定對應的RoutingKey,這裡的RoutingKey是一個規則,是星號、#號、點號和每個詞的組合,釋出訊息的時候指定交換機和RoutingKey,RoutingKey會去匹配繫結的規則。

  • 生產者程式碼

  • 消費者1程式碼,指定路由匹配規則為order.#

  • 消費者2程式碼,指定路由匹配規則為order.*

  • 演示效果,將生產者和消費者都啟動

    如上圖,和Web演示一樣,#號匹配0個和多個詞,*號只能匹配一個詞。 符號可以與詞任意組合,小夥伴可以根據業務情況自行發揮。

6. 引數模式(Headers)

Headers模式不是通過RoutingKey進行匹配投遞訊息,而是匹配請求頭中所帶的鍵值進行訊息投遞,所以建立佇列是需要設定繫結的頭部資訊,有兩種模式:全部匹配和部分匹配。

  • 全部匹配:x-match=all,表示所有的鍵值都匹配了才行。
  • 部分匹配:x-match=any,表示只要其中有鍵值對匹配就行。
5.1 Web管理介面進行演示
  • 先建立一個Headers模式的交換機;

  • 建立兩個佇列,然後將佇列繫結到上一步建立的交換機上,可以指定Headers的引數;

    將佇列繫結到交換機上,這裡還是在交換機詳情中進行演示,如下:

    這裡不使用RoutingKey的方式,而是通過設定引數的形式進行繫結,後續投遞訊息的時候就匹配引數,如果能匹配上,就將訊息投遞到對應的佇列。

    繫結HeaderQ1佇列:

    同樣的方式繫結HeaderQ2佇列,只是只新增了一個鍵值對,order:111,最後HeaderExchange交換機的繫結關係如下:

    關係繫結好之後就可以進行測試效果了。

  • 向交換機上投遞訊息,會根據檢查Headers引數的條件是否符合,若符合將訊息投遞到對應的佇列中,看效果;

    設定兩個引數進行釋出,如下:

    可以看到兩個佇列都匹配到了,因為order和msg鍵值對匹配到HeaderQ1,order的鍵值對匹配到HeaderQ2,如果只設定一個order簡直對呢:

    此時只有HeaderQ2才能精確匹配,HeaderQ1沒有全部匹配,所以對應佇列沒有收到訊息,如下:

    由此可見,在介面上沒有指定x-match繫結的話,預設是all,就是要全部匹配才投遞訊息到對應佇列

    這裡繼續新增一個HeaderQ3的佇列,建立方式和上面不一樣,只是在繫結交換機的時候增加x-match 為 any,如下:

    繫結成功之後,現在關係如下,其中order的鍵值對是在每個繫結中都有,如下:

    測試發訊息之前,把之前訊息都清空了,也就是佇列中的訊息都是空的,這次我們再指定order為111的引數進行釋出訊息,看看有哪些佇列收到訊息呢:

    訊息發出後,之後HeaderQ2和HeaderQ3收到訊息,HeaderQ2只有一個order引數,精確匹配上了,HeaderQ3有多個引數,但設定了x-match為any,所以只要匹配其中一個即可。 HeaderQ1多個引數需要全部匹配才行,所以沒有接收到訊息:

5.2 程式碼進行演示

Headers模式是根據引數進行匹配,不是通過RoutingKey,所以只需要在繫結佇列時設定好引數,在傳送訊息的時候也設定好引數,這樣就會根據匹配原則去匹配引數,如果匹配上,訊息就投遞到對應的佇列,供消費者進行消費。這裡為了演示比較清晰一點,使用一個生產者,三個消費者的形式進行演示。

  • 生產者

    程式碼中關鍵的部分是指定交換機的型別為Headers,然後模擬了兩類引數的形式投遞訊息,方便用於測試引數匹配模式的測試。

  • 消費者1,繫結引數為order:111和msg:222

  • 消費者2,和消費者1代表基本一樣,只是繫結引數不一樣;

  • 消費者3,代表基本和消費者1一樣,只是在引數指定的時候增加了x-match來指定匹配模式,這裡指定為any,也就是隻要其中有部分匹配上也可以消費到訊息。

  • 演示效果,啟動生產者和消費者:

    如上圖,消費者3指定了匹配模式為部分匹配,所以可以接收到一個引數的訊息,而消費者1需要精確匹配,所以不能接收到。

關於常用的訊息模式就聊到這吧,小夥伴們可以根據自己的業務場景進行使用,相關演示程式碼的地址如下:

碼雲:https://gitee.com/CodeZoe/dot-net-core-study-demo/tree/main/RabbitMQDemo

總結

RabbitMQ提供了多種模式應對各種業務,但匹配條件越是模糊或者引數化,那效能相對比較弱。再回顧一些常用的訊息佇列元件,是不是很多都是預設使用Direct模式,精確匹配的RoutingKey可以針對具體不同業務處理,匹配效能也相對比較高; 當然其他模式小夥伴也可以針對業務情況進行使用。

後續的文章將繼續分享RabbitMQ訊息確認機制、死信佇列、磁碟監控等相關知識點的應用,關注「Code綜藝圈」,和我一起學習吧。