MQ(message queue),從字面意思上看,本質是個佇列,FIFO 先入先出,只不過佇列中存放的內容是message 而已,還是一種跨程序的通訊機制,用於上下游傳遞訊息。在網際網路架構中,MQ 是一種非常常見的上下游「邏輯解耦+物理解耦」的訊息通訊服務。使用了 MQ 之後,訊息傳送上游只需要依賴 MQ,不用依賴其他服務。
關於文章全部範例程式碼:RabbitMQ_Study
①:流量消峰
如果訂單系統最多能處理10000次/s的訂單,這個處理能力應付正常時段下單綽綽有餘,正常時段我們下單一秒後就能返回結果。但是在高峰期,如果有兩萬次下單作業系統是處理不了的(服務處理慢不說,有可能會把響應方的服務搞宕機),但是我們能限制訂單超過一萬後不允許使用者下單。假設使用訊息佇列做緩衝,我們可以取消這個限制,把一秒內下的訂單分散成一段時間來處理,這時有些使用者可能在下單十幾秒後才能收到下單成功的操作,但是比不能下單的體驗要好。
②:應用解耦
以電商應用為例,應用中有訂單系統、庫存系統、物流系統、支付系統。使用者建立訂單後,如果耦合呼叫庫存系統、物流系統、支付系統,任何一個子系統出了故障,都會造成下單操作異常。當轉變成基於訊息佇列的方式後,系統間呼叫的問題會減少很多,比如物流系統因為發生故障,需要幾分鐘來修復。在這幾分鐘的時間裡,物流系統要處理的請求資訊被快取在訊息佇列中,使用者的下單操作可以正常完成。當物流系統恢復後,繼續處理訂單資訊即可,終端使用者感受不到物流系統的故障,提升系統的可用性。
③:非同步處理
有些服務間呼叫是非同步的,例如 A 呼叫 B,B 需要花費很長時間執行,但是 A 需要知道 B 什麼時候可以執行完,以前一般有兩種方式,A 過一段時間去呼叫 B 的查詢 api 查詢。或者 A 提供一個 callback api, B 執行完之後呼叫 api 通知 A 服務。這兩種方式都不是很優雅,使用訊息匯流排,可以很方便解決這個問題,A 呼叫 B 服務後,只需要監聽 B 處理完成的訊息,當 B 處理完成後,會傳送一條訊息給 MQ,MQ 會將此訊息轉發給 A 服務。這樣 A 服務既不用迴圈呼叫 B 的查詢 api,也不用提供 callback api。同樣 B 服務也不用做這些操作。A 服務還能及時的得到非同步處理成功的訊息。
RabbitMQ 是一個在AMQP基礎上實現的,可複用的企業訊息系統。它可以用於大型軟體系統各個模組之間的高效通訊,支援高並行,支援可延伸。
你可以把它當做一個快遞站點,當你要傳送一個包裹時,你把你的包裹放到快遞站,快遞員最終會把你的快遞送到收件人那裡,按照這種邏輯 RabbitMQ 是一個快遞站,一個快遞員幫你傳遞快件。RabbitMQ 與快遞站的主要區別在於,它不處理快件而是接收,儲存和轉發訊息資料。
AMQP:即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協定,是應用層協定的一個開放標準,為訊息導向中介層設計。基於此協定的使用者端與訊息中介軟體可傳遞訊息,並不受使用者端/中介軟體不同產品,不同的開發語言等條件的限制。
訊息佇列:MQ 全稱為Message Queue, 訊息佇列。是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊。佇列的使用除去了接收和傳送應用程式同時執行的要求。在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。
①:生產者(Producer)
產生資料傳送訊息的程式是生產者
②:交換機(Exchange)
交換機是 RabbitMQ 非常重要的一個部件,一方面它接收來自生產者的訊息,另一方面它將訊息推播到佇列中。交換機必須確切知道如何
處理它接收到的訊息,是將這些訊息推播到特定佇列還是推播到多個佇列,亦或者是把訊息丟棄,這個得有交換機型別決定
③:佇列(Queue)
佇列是 RabbitMQ 內部使用的一種資料結構,儘管訊息流經 RabbitMQ 和應用程式,但它們只能儲存在佇列中。佇列僅受主機的記憶體和
磁碟限制的約束,本質上是一個大的訊息緩衝區。許多生產者可以將訊息傳送到一個佇列,許多消費者可以嘗試從一個佇列接收資料。這就是
我們使用佇列的方式
④:消費者(Consumer)
消費與接收具有相似的含義。消費者大多時候是一個等待接收訊息的程式。請注意生產者,消費者和訊息中介軟體很多時候並不在同一機器上。
同一個應用程式既可以是生產者又是可以是消費者。
Broker: 接收和分發訊息的應用,RabbitMQ Server就是Message Broker;簡單來說就是訊息佇列伺服器實體Virtual host:出於多租戶和
安全因素設計的,把AMQP的基本元件劃分到一個虛擬的分組中,類似於網路中的namespace概念。當多個不同的使用者使用同一個
RabbitMQ server提供的服務時,可以劃分出多個vhost,每個使用者在自己的vhost建立exchange/queue等 Connection: publisher/consumer和broker之間的TCP連線 Channel: 訊息通道,如果每一次存取RabbitMQ都建立一個Connection,在訊息量大的時候建立TCPConnection的開銷將是巨大的,效率也較
低。Channel是在connection內部建立的邏輯連線,如果應用程式支援多執行緒,通常每個thread建立單獨的channel進行通訊,AMQP
method包含了channel id幫助使用者端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的
Connection極大減少了作業系統建立TCP connection的開銷。 Exchange: message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發訊息到queue中去。
常用的型別有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast) Queue: 訊息佇列載體,每個訊息都會被投入到一個或多個佇列;訊息最終被送到這裡等待consumer取走 Routing Key: 路由關鍵字,exchange根據這個關鍵字進行訊息投遞。 Binding: exchange和queue之間的虛擬連線,binding中可以包含routing key,Binding資訊被儲存到exchange中的查詢表中,用於message
的分發依據;它的作用就是把exchange和queue按照路由規則繫結起來。 producer: 訊息生產者,就是投遞訊息的程式。 consumer: 訊息消費者,就是接受訊息的程式。
這裡我以RabbitMQ 3.9 的版本來進行本文的講解,這裡我們需要準備幾個檔案
RabbitMQ 3.9.13 Erlang 23.3.4.11(版本相容) rabbitmq_delayed_message_exchange-3.8.0.ez
# 安裝 erlang 環境 rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm # 安裝 socat 環境 yum -y install socat # 安裝 RabbitMQ 服務 rpm -ivh rabbitmq-server-3.9.13-1.el7.noarch.rpm # 檢查是否安裝 yum list | grep rabbitmq yum list | grep erlang
注:socat支援多協定,用於協定處理,埠轉發,rabbitmq依賴於socat,因此在安裝rabbitmq前要安裝socat。由於預設的CentOS-Base.repo源中沒有socat,所以 $ yum install socat會出現以下錯誤:No package socat available
補充命令: # 檢視所以的已開啟的埠 firewall-cmd --zone=public --list-ports # 開啟15672埠(--permanent代表永久生效,重啟系統不會失效) firewall-cmd --zone=public --add-port=15672/tcp --permanent # 防火牆關閉 systemctl stop firewalld # 啟動RabbitMQ服務 systemctl start rabbitmq-server 或 /sbin/service rabbitmq-server start 或 service rabbitmq-server start # 新增開機啟動RabbitMQ服務 chkconfig rabbitmq-server on # 開啟web管理介面(可以更方便快速的對RabbitMQ進行操作) rabbitmq-plugins enable rabbitmq_management # 停止RabbitMQ服務 systemctl stop rabbitmq-server 或 /sbin/service rabbitmq-server stop 或 service rabbitmq-server stop
注:web管理介面應用的操作
rabbitmqctl stop_app 停止web頁面
rabbitmqctl start_app 啟動web頁面
若大家用的是最新版本安裝可能不太一樣,請前往:RabbitMQ安裝
關於RabbitMQ中的使用者角色【tags】 其他(none): 不能登入管理控制檯(啟用management plugin的情況下,以下相同) 普通管理者(management): 使用者可以通過AMQP做的任何事外加以下許可權 列出自己可以通過AMQP登入的virtual hosts 檢視自己的virtual hosts中的queues, exchanges 和 bindings 檢視和關閉自己的channels 和 connections 檢視有關自己的virtual hosts的「全域性」的統計資訊,包含其他使用者在這些virtual hosts中的活動 決策制定者(policymaker): management的許可權外加以下許可權 檢視、建立和刪除自己的virtual hosts所屬的policies和parameters 監控者(monitor/monitoring): management的許可權外加以下許可權 列出所有virtual hosts,包括他們不能登入的virtual hosts 檢視其他使用者的connections和channels 檢視節點級別的資料如clustering和memory使用情況 檢視真正的關於所有virtual hosts的全域性的統計資訊 超級管理員(administrator): policymaker和monitoring的許可權外加以下許可權 建立和刪除virtual hosts 檢視、建立和刪除users 檢視建立和刪除permissions 關閉其他使用者的connections
# 檢視RabbitMQ裡的所有使用者 rabbitmqctl list_users # 檢視預設guest使用者的許可權 rabbitmqctl list_user_permissions {username} 【RabbitMQ中的使用者管理】 rabbitmqctl add_user {username} {password} # 該命令將建立一個 non-administrative 使用者 rabbitmqctl delete_user {username} # 表示刪除一個使用者,該命令將指示RabbitMQ broker去刪除指定的使用者 rabbitmqctl change_password {username} {newpassword} # 表示修改指定的使用者的密碼 rabbitmqctl clear_password {username} # 表示清除指定使用者的密碼 # 執行此操作後的使用者,將不能用密碼登入,但是可能通過已經設定的SASL EXTERNAL的方式登入。 rabbitmqctl authenticate_user {username} {password} # 表示指引RabbitMQ broker認證該使用者和密碼 rabbitmqctl set_user_tags {username} {tag ...} # 表示設定使用者的角色,{tag}可以是零個,一個,或者是多個。並且已經存在的tag也將會被移除 【RabbitMQ中的許可權控制】 在上面我們新增完相關的使用者後,就可以對其使用者分配相關vhost的許可權了。 vhost對於Rabbit就像虛擬機器器之於物理伺服器一樣,它們通過在各個範例間提供邏輯上分離, 允許你為不同的應用程式安全保密地執行資料。 而在RabbitMQ中相應的許可權分為讀、寫、設定三部分: 讀:有關消費訊息的任何操作,包括「清除」整個佇列(同樣需要繫結操作的成功) 寫:釋出訊息(同樣需要繫結操作的成功) 設定:佇列和交換機的建立和刪除 知道了RabbitMQ許可權相關的設定後,我們就可以根據具體情況來設定相應的資訊。 RabbitMQ的許可權是以vhost為分隔的,我們需要確定一個vhost來確定相關的許可權設定,預設的vhost是「/」 rabbitmqctl add_vhost {vhost} # {vhost} 表示待建立的虛擬主機項的名稱 rabbitmqctl delete_vhost {vhost} # 表示刪除一個vhost。刪除一個vhost將會刪除該vhost的所有exchange、queue、binding、使用者許可權、引數和策略。 rabbitmqctl list_vhosts {vhostinfoitem ...} # 表示列出所有的vhost。其中 {vhostinfoitem} 表示要展示的vhost的欄位資訊,展示的結果將按照{vhostinfoitem}指定的欄位 # 順序展示。這些欄位包括: name(名稱) 和 tracing (是否為此vhost啟動跟蹤)。 # 如果沒有指定具體的欄位項,那麼將展示vhost的名稱。 rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read} # 表示設定使用者許可權。 {vhost} 表示待授權使用者存取的vhost名稱,預設為 "/"; {user} 表示待授權反問特定vhost的使用者名稱稱; # {conf}表示待授權使用者的設定許可權,是一個匹配資源名稱的正規表示式; {write} 表示待授權使用者的寫許可權,是一個匹配資源名稱 # 的正規表示式; {read}表示待授權使用者的讀許可權,是一個資源名稱的正規表示式。 # rabbitmqctl set_permissions -p / admin "^mip-.*" ".*" ".*" # 例如上面例子,表示授權給使用者 "admin" 具有所有資源名稱以 "mip-" 開頭的 設定許可權;所有資源的寫許可權和讀許可權。 rabbitmqctl clear_permissions [-p vhost] {username} # 表示設定使用者拒絕存取指定指定的vhost,vhost預設值為 "/" rabbitmqctl list_permissions [-p vhost] # 表示列出具有許可權存取指定vhost的所有使用者、對vhost中的資源具有的操作許可權。預設vhost為 "/"。 # 注意,空字串表示沒有任何許可權。
實際操作說明: rabbitmqctl list_users # 檢視RabbitMQ裡的所有使用者 rabbitmqctl list_vhosts # 檢視RabbitMQ裡的所有vhosts rabbitmqctl list_permissions # 檢視RabbitMQ裡所有使用者的許可權 rabbitmqctl list_user_permissions guest # 檢視RabbitMQ裡guest使用者的許可權 rabbitmqctl add_vhost test # 建立的一個虛擬主機項為 test 的名稱 rabbitmqctl add_user admin 123 # 建立一個使用者為admin 密碼為123 rabbitmqctl set_user_tags admin administrator # 設定admin的角色為超級管理員(administrator) rabbitmqctl set_permissions -p test admin ".*" ".*" ".*" # 設定admin在test的vhost中,並設定全部檔案的讀寫操作 rabbitmqctl list_permissions -p test # 檢視test中的vhost裡的使用者
systemctl stop rabbitmq-server # 停止RabbitMQ服務 yum list | grep rabbitmq # 檢視RabbitMQ安裝的相關列表 yum -y remove rabbitmq-server.noarch # 解除安裝RabbitMQ已安裝的相關內容 yum list | grep erlang # 檢視erlang安裝的相關列表 yum -y remove erlang-* yum remove erlang.x86_64 # 解除安裝erlang已安裝的相關內容 rm -rf /usr/lib64/erlang rm -rf /var/lib/rabbitmq rm -rf /usr/local/erlang rm -rf /usr/local/rabbitmq # 刪除有關的所有檔案
本小節將使用Java編寫兩個程式來模擬簡單佇列,用生產者(Producer)傳送訊息到RabbitMQ佇列後,再由消費者(Consumer)來監控RabbitMQ傳送來的佇列資訊;簡單佇列就是一個生產者傳送訊息到佇列,監聽那個佇列的一個消費者獲取訊息並處理
<dependencies> <!--RabbitMQ使用者端座標--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency> </dependencies> <build> <plugins> <!--設定maven編譯版本--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source><!--原始碼使用的JDK--> <target>1.8</target><!--target需要生成的目標class檔案的編譯版本--> <encoding>UTF-8</encoding><!--字元集編碼,防止中文亂碼--> <failOnError>true</failOnError><!--指示即使存在編譯錯誤,構建是否仍將繼續--> <failOnWarning>false</failOnWarning><!--指示即使存在編譯警告,構建是否仍將繼續--> <showDeprecation>false</showDeprecation><!--設定是否顯示使用不推薦API的源位置--> <showWarnings>false</showWarnings><!--設為true若要顯示編譯警告,請執行以下操作--> <meminitial>128M</meminitial><!--編譯器使用的初始化記憶體--> <maxmem>512M</maxmem><!--編譯器使用的最大記憶體--> </configuration> </plugin> </plugins> </build>
package cn.xw.helloWorld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author AnHui OuYang * @version 1.0 * created at 2022-03-04 17:42 */ public class Producer { //簡單佇列名稱 public static final String QUEUE_NAME = "helloWorldQueue"; public static void main(String[] args) throws IOException, TimeoutException { //建立一個連線工廠 ConnectionFactory factory = new ConnectionFactory(); //設定RabbitMQ服務的IP、賬號、密碼、Vhost虛擬主機(預設 "/" 則不需要設定) factory.setHost("192.168.31.51"); factory.setUsername("admin"); factory.setPassword("123"); factory.setVirtualHost("test"); //通過工廠物件獲取一個連線 Connection connection = factory.newConnection(); //通過連線來獲取一個通道 Channel channel = connection.createChannel(); //宣告一個佇列 //引數一:佇列名稱 //引數二:佇列裡的訊息是否持久化,預設訊息儲存在記憶體中,預設false //引數三:該佇列是否只供一個消費者進行消費的獨佔佇列,則為 true(僅限於此連線),false(預設,可以多個消費者消費) //引數四:是否自動刪除 最後一個消費者斷開連線以後 該佇列是否自動刪除 true 自動刪除,預設false //引數五:構建佇列的其它屬性,看下面擴充套件引數 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //傳送的訊息 byte[] msg = "這是一個簡單訊息".getBytes(StandardCharsets.UTF_8); //傳送訊息 //引數一:將傳送到RabbitMQ的哪個交換機上 //引數二:路由的key是什麼(直接交換機找到路由後,通過路由key來確定最終的佇列) //引數三:其它引數 //引數四:傳送到佇列的具體資訊 channel.basicPublish("", QUEUE_NAME, null, msg); System.out.println("訊息傳送完成!"); } }
package cn.xw.helloWorld; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @author AnHui OuYang * @version 1.0 * created at 2022-03-05 15:12 */ public class Consumer { //簡單佇列名稱 public static final String QUEUE_NAME = "helloWorldQueue"; public static void main(String[] args) throws IOException, TimeoutException { //建立一個連線工廠 ConnectionFactory factory = new ConnectionFactory(); //設定RabbitMQ服務的IP、賬號、密碼、Vhost虛擬主機(預設 "/" 則不需要設定) factory.setHost("192.168.31.51"); factory.setUsername("admin"); factory.setPassword("123"); factory.setVirtualHost("test"); //通過工廠物件獲取一個連線 Connection connection = factory.newConnection(); //通過連線來獲取一個通道 Channel channel = connection.createChannel(); System.out.println("消費者開始監聽佇列訊息...."); //推播的訊息如何進行消費的介面回撥 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("獲取佇列資訊:" + new String(message.getBody(), StandardCharsets.UTF_8)); } }; //取消消費的一個回撥介面 如在消費的時候佇列被刪除掉了 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); } }; //消費者消費訊息 //引數一:消費哪個佇列 //引數二:消費成功之後是否要自動應答 true 代表自動應答 false 手動應答 //引數三:接受佇列訊息的回撥介面 //引數四:取消消費的回撥介面 channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
編寫好上面的消費者程式碼和生產者程式碼後我們就可以進行Demo演示了,首先執行生產者傳送訊息後我們再執行消費者程式碼
隨後執行完消費者後會列印具體的佇列訊息
注:必須先執行生產者,因為執行消費者後會發現在RabbitMQ中找不到指定Queue佇列,這時就會出現異常;但是為了不報錯也可以在消費者程式碼裡面也建立佇列,所有,生產者消費者都可以建立佇列
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'helloWorldQueue' in vhost 'test', class-id=60, method-id=20)
x-dead-letter-exchange:
死信交換器
x-dead-letter-routing-key:
死信訊息的可選路由鍵
x-expires:
佇列在指定毫秒數後被刪除
x-message-ttl:
毫秒為單位的訊息過期時間,佇列級別
x-ha-policy:
建立HA佇列,此引數已失效
x-ha-nodes:
HA佇列的分佈節點,此引數已失效
x-max-length:
佇列的訊息條數限制。限制加入queue中訊息的條數。先進先出原則,超過後,後面的訊息會頂替前面的訊息。
x-max-length-bytes:
訊息容量限制,該引數和x-max-length目的一樣限制佇列的容量,但是這個是靠佇列大小(bytes)來達到限制。
x-max-priority:
最大優先值為255的佇列優先排序功能
x-overflow:
設定佇列溢位行為。這決定了當達到佇列的最大長度時訊息會發生什麼。
有效值是drop-head、reject-publish或reject-publish-dlx。
x-single-active-consumer:
表示佇列是否是單一活動消費者,true時,註冊的消費組內只有一個消費者消費訊息,
其他被忽略,false時訊息迴圈分發給所有消費者(預設false)
x-queue-mode:
將佇列設定為延遲模式,在磁碟上保留儘可能多的訊息,以減少RAM的使用;
如果未設定,佇列將保留記憶體快取以儘可能快地傳遞訊息
x-queue-master-locator:
在叢集模式下設定映象佇列的主節點資訊
工作佇列(又稱任務佇列)的主要思想是避免立即執行資源密集型任務,而不得不等待它完成。相反我們安排任務在之後執行。我們把任務封裝為訊息並將其傳送到佇列。在後臺執行的工作程序將彈出任務並最終執行作業。當有多個工作執行緒時,這些工作執行緒將一起處理這些任務。
生產者生產了1萬個訊息傳送到佇列中,這時為了提高處理效率往往設定了多個消費者同時監聽訊息佇列並處理訊息
/** * @author AnHui OuYang * @version 1.0 * created at 2022-03-05 16:24 */ public class ChannelUtil { public static Channel getChannel() { //通道初始化 Channel channel = null; try { //建立一個連線工廠 ConnectionFactory factory = new ConnectionFactory(); //設定RabbitMQ服務的IP、賬號、密碼、Vhost虛擬主機(預設 "/" 則不需要設定) factory.setHost("192.168.31.51"); factory.setUsername("admin"); factory.setPassword("123"); factory.setVirtualHost("test"); //通過工廠物件獲取一個連線 Connection connection = factory.newConnection(); //通過連線來獲取一個通道 channel = connection.createChannel(); } catch (Exception e) { e.printStackTrace(); } return channel; } }
public class ProducerA { //工作佇列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //建立佇列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //迴圈傳送訊息 for (int i = 0; i < 1000; i++) { byte[] msg = ("這是一個編號為:" + i + " 的待處理的訊息").getBytes(StandardCharsets.UTF_8); channel.basicPublish("", QUEUE_NAME, null, msg); } System.out.println("訊息傳送完成!"); } }
public class ConsumerA { //工作佇列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //建立佇列 以防啟動消費者發現佇列不存在報錯 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("消費者A開始監聽佇列訊息...."); //消費者消費訊息 channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> { System.out.println("A消費者獲取佇列資訊並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } } public class ConsumerB { //工作佇列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //建立佇列 以防啟動消費者發現佇列不存在報錯 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("消費者B開始監聽佇列訊息...."); //消費者消費訊息 channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> { System.out.println("B消費者獲取佇列資訊並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } }
建立完生產者和消費者後首先啟動兩個消費者,然後啟動生產者,生產者傳送訊息,被兩個消費者監聽並消費
在上面的程式碼中我們會發現兩個消費者消費訊息的順序是輪詢的(A1,B2,A3,B4......);這也是預設的消費規則,但是在日常生產環境中並不會用此模式來進行佇列訊息的消費。
每個消費者服務完成一個任務可能需要的時間長短不一樣,如果其中一個消費者處理一個任務時並僅只完成了部分就突然掛掉了,會發生什麼情況。RabbitMQ一旦向消費者傳遞了一條訊息,便立即將該訊息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的訊息。以及後續傳送給該消費的訊息,因為它無法接收到。為了保證訊息在傳送過程中不丟失,rabbitmq引入訊息應答機制,
訊息應答就是:消費者在接收到訊息並且處理該訊息之後,告訴rabbitmq它已經處理了,rabbitmq可以把該訊息刪除了
訊息傳送後立即被認為已經傳送成功,這種模式需要在高吞吐量和資料傳輸安全性方面做權衡,因為這種模式如果訊息在消費者接收到之前,消費者那邊出現連線或者channel關閉,那麼訊息就丟失了,當然另一方面這種模式在消費者那邊可以傳遞過載的訊息,沒有對傳遞的訊息數量進行限制,當然這樣有可能使得消費者這邊由於接收太多還來不及處理的訊息,導致這些訊息的積壓,最終使得記憶體耗盡,最終這些消費者執行緒被作業系統殺死,所以這種模式僅適用在消費者可以高效並以某種速率能夠處理這些訊息的情況下使用(就是業務處理簡單的訊息)。
自動應答:佇列向消費者傳送訊息後,消費者接收到訊息就算成功應答了,隨後佇列將會刪除對應的佇列訊息;
上面案例全部採用的是自動應答,所以我們要想實現訊息消費過程中不丟失,需要把自動應答改為手動應答,這樣確保從訊息佇列來一個訊息給消費者,等消費者消費完畢以後再告知RabbitMQ已處理完,然後RabbitMQ才會傳送下一條訊息個消費者處理,保證訊息不丟失
注:basicConsume訊息接收方法中的autoAck引數必須為false才可以顯示為手動確認
手動應答分為三種情況:
②:手動拒絕 basicReject(long deliveryTag, boolean requeue):
拒絕deliveryTag對應的訊息,第二個引數是否requeue,true則重新入佇列,否則丟棄或者進入死信佇列。
③:手動不確認 basicNack(long deliveryTag, boolean multiple, boolean requeue)
不確認deliveryTag對應的訊息,第二個引數是否應用於多訊息,第三個引數是否requeue,與basic.reject區別就是同時支援多個訊息,
可以nack該消費者先前接收未ack的所有訊息。nack後的訊息也會被自己消費到。
③:手動恢復 basicRecover(boolean requeue)
是否恢復訊息到佇列,true則重新入佇列,並且儘可能的將之前recover的訊息投遞給其他消費者消費,而不是自己再次消費。
false則訊息會重新被投遞給自己。
④:手動應答 basicAck(long deliveryTag, boolean multiple)
如果消費者在處理訊息的過程中,出了錯,就沒有什麼辦法重新處理這條訊息,所以在平時都是處理訊息成功後,再確認訊息;
當autoAck=false時,RabbitMQ會等待消費者手動發回ack訊號後,才從記憶體(和磁碟,如果是持久化訊息的話)中移除訊息。
它採用訊息確認機制,消費者就有足夠的時間處理訊息(任務),不用擔心處理訊息過程中消費者程序掛掉後訊息丟失的問題,
因為RabbitMQ會一直持有訊息直到消費者手動呼叫channel.basicAck為止。對於RabbitMQ伺服器端而言,如果伺服器端一直沒
有收到消費者的ack訊號,並且消費此訊息的消費者已經斷開連線,則伺服器端會安排該訊息重新進入佇列,等待投遞給下一個
消費者(也可能還是原來的那個消費者)。這裡我們啟動了手動確認後,就必須呼叫channel.basicAck方法進行確認,
否則的話RabbitMQ會一直進行等待,當我們這個消費者關閉後,RabbitMQ會將該條訊息再發給對應的消費者進行消費,
直到有消費者對該條訊息進行消費並應答完成。
引數說明:
deliveryTag:對應訊息的ID;通過message.getEnvelope().getDeliveryTag()獲取
requeue:是否重新入列,true代表拒絕應答後會重新返回佇列,false則直接刪除或者進入死信佇列
multiple:是否批次應答,true代表批次應答
假設有個佇列依次排列為 1、2、3...10 (1最先出隊,10最後出隊);
當為true,傳送1~5訊息給消費者處理完都未確認,當到第6時執行應答方法,並且multiple為true,則代表1~6都被被批次應答
當為false,傳送1~5訊息給消費者處理完都未確認,當到第6時執行應答方法,並且multiple為true,則代表只要6被應答
//生產者只管發任務訊息,程式碼不變,消費者程式碼優化更改以下,多個消費者程式碼也和這一樣 public class ConsumerB { //工作佇列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //建立佇列 以防啟動消費者發現佇列不存在報錯 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("消費者B開始監聽佇列訊息...."); //應答方式 true自動應答 false手動應答(若是手動應答必須設定false) boolean autoAck = false; //消費者消費訊息requeue channel.basicConsume(QUEUE_NAME, autoAck, (consumerTag, message) -> { try { //這裡我就一句列印語句,沒有複雜邏輯,正常這裡有複雜業務 System.out.println("B消費者獲取佇列資訊並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); int i = 1/0; //手動確認應答 不批次應答 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); //出現異常手動進行不應答;並且放入佇列中(reject或者使用uack方式都可以,或者本次訊息不處理了可以通過recover重新放到佇列) channel.basicReject(message.getEnvelope().getDeliveryTag(), true); } }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } }
注:"在手動應答的情況下,如果channel.basicAck收到確認前的程式碼有問題,會丟擲異常,導致無法進行手動確認,一般消費者也不會連線中斷,那麼該訊息就一直無法被處理,連被其它消費者處理的機會都沒有,所以一般我們會進行try-catch處理,處理成功則手動確認,失敗或有異常則拒絕。"
在生產過程中,難免會發生伺服器宕機的事情,RabbitMQ也不例外,可能由於某種特殊情況下的異常而導致RabbitMQ宕機從而重啟,那麼這個時候對於訊息佇列裡的資料,包括交換機、佇列以及佇列中存在訊息恢復就顯得尤為重要了。RabbitMQ本身帶有持久化機制,包括交換機、佇列以及訊息的持久化。持久化的主要機制就是將資訊寫入磁碟,當RabbitMQ服務宕機重啟後,從磁碟中讀取存入的持久化資訊,恢復資料。
預設不是持久化的,在伺服器重啟之後,交換機會消失。我們在管理臺的Exchange頁簽下檢視交換機,可以看到使用上述方法宣告的交換機,Features一列是空的,即沒有任何附加屬性。
我們可以看到第三個引數durable,如果為true時則表示要做持久化,當服務重啟時,交換機依然存在,所以使用該方法宣告的交換機是下面這個樣子的:
與交換機的持久化相同,佇列的持久化也是通過durable引數實現的(設定後佇列也會有個D),看一下方法的定義:
queueDeclare(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String, Object> arguments)
boolean durable:
引數跟交換機方法的引數一樣,true表示做持久化,當RabbitMQ服務重啟時,佇列依然存在
boolean exclusive(補充):
排它佇列。如果一個佇列被宣告為排他佇列,那麼這個佇列只能被第一次宣告它的連線所見,並在連線斷開的時候自動刪除。
這裡有三點需要說明:
1:排它佇列是基於連線可見的,同一連線的不同通道是可以同時存取同一連線建立的排它佇列
2:如果一個連線已經宣告了一個排它佇列,其它連線是不允許建立同名的排它佇列的,這個與普通佇列不同
3:即使該佇列是持久化的,一旦連線關閉或者使用者端退出,該排它佇列都會被自動刪除的,這種佇列適用於一
個使用者端傳送讀取訊息的應用場景
boolean autoDelete(補充):
自動刪除,如果該佇列沒有任何訂閱的消費者的話,該佇列會被自動刪除。這種佇列適用於臨時佇列
訊息的持久化是指當訊息從交換機傳送到佇列之後,被消費者消費之前,伺服器突然宕機重啟,訊息仍然存在。訊息持久化的前提是佇列持久化,假如佇列不是持久化,那麼訊息的持久化毫無意義。通過如下程式碼設定訊息的持久化:
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) BasicProperties props設定訊息持久化方式: 引數實現類: public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties { private String contentType; // 訊息的內容型別,如:text/plain private String contentEncoding; // 訊息內容編碼 private Map<String,Object> headers; // 設定訊息的header,型別為Map<String,Object> private Integer deliveryMode; // 1(nopersistent)非持久化,2(persistent)持久化 private Integer priority; // 訊息的優先順序 private String correlationId; // 關聯ID private String replyTo; // 用於指定回覆的佇列的名稱 private String expiration; // 訊息的失效時間 private String messageId; // 訊息ID private Date timestamp; // 訊息的時間戳 private String type; // 型別 private String userId; // 使用者ID private String appId; // 應用程式ID private String clusterId; // 叢集ID } deliveryMode是設定訊息持久化的引數,等於1不設定持久化,等於2設定持久化; 我們平時不會使用BasicProperties類而是使用MessageProperties,通過這個類來獲取具體設定 設定 MessageProperties.PERSISTENT_TEXT_PLAIN 代表: public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain",null,null,2,0, null, null, null,null, null, null, null,null, null);
//也可以通過這種方式設定;傳送訊息的引數設定 expiration過期時間 deliveryMode 訊息持久化方式
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration("10000").deliveryMode(2).build();
保證在伺服器重啟的時候可以保持不丟失相關資訊,重點解決伺服器的異常崩潰而導致的訊息丟失問題。但是,將所有的訊息都設定為持久化,會嚴重影響RabbitMQ的效能,寫入硬碟的速度比寫入記憶體的速度慢的不只一點點。對於可靠性不是那麼高的訊息可以不採用持久化處理以提高整體的吞吐率,在選擇是否要將訊息持久化時,需要在可靠性和吞吐量之間做一個權衡。
在上面的案例中,RabbitMQ 分發訊息採用的輪訓分發,但是在某種場景下這種策略並不是很好,比方說有兩個消費者在處理任務,其中有個消費者A處理任務的速度非常快,而另外一個消費者B處理速度卻很慢,這個時候我們還採用輪訓分發的話就會發現消費者A早早的處理完後空閒在那,而消費者B還在處理,這時消費者A等待消費者B處理完任務後A消費者才會得到下一個任務訊息;這就會浪費空閒消費者A發伺服器資源;但RabbitMQ 並不知道這種情況它依然很公平的進行分發。
為了避免這種情況,我們可以設定引數 channel.basicQos(1);
意思就是說如果消費者對這個任務還沒有處理完或者我還沒有應答你,你先別分配給我,我目前只能處理一個任務,然後 rabbitmq 就會把該任務分配給沒有那麼忙的那個空閒消費者,當然如果所有的消費者都沒有完成手上任務,佇列還在不停的新增新任務,佇列有可能就會遇到佇列被撐滿的情況,這個時候就只能新增新的 worker(消費者服務)或者改變其它儲存任務的策略。
說好聽點就是不公平分發,其實它叫預取值,後面說明,預取值就是通道中可以允許未確認訊息的最大值,如果是1,那處理快的就很快處理完可以處理下一條,慢的還得繼續處理,不接受訊息,實現不公平分發。
我們還需要設定手動應答,因為自動應答,會發現雖然實現不公平分發,但是還是一樣的,每個消費者消費的資料量很大可能是一樣的,因為自動應答是一旦傳送到消費者代表完成,後續還會繼續給這個消費者傳送,但是手動應答則會發現,我消費的慢,會等消費者消費完才會被分配下一個訊息處理;所以消費快的消費者會消費更多的訊息。
消費者A消費者B程式碼改造:
public class ConsumerA { //工作佇列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //建立佇列 以防啟動消費者發現佇列不存在報錯 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //設定0代表輪詢分發、1不公平分發、大於1代表預取值 channel.basicQos(1); System.out.println("消費者A(處理資源很快)開始監聽佇列訊息...."); //應答方式 true自動應答 false手動應答(若是手動應答必須設定false) boolean autoAck = false; //消費者消費訊息requeue channel.basicConsume(QUEUE_NAME, autoAck, (consumerTag, message) -> { try { //這裡我就一句列印語句,沒有複雜邏輯,正常這裡有複雜業務 System.out.println("A消費者獲取佇列資訊並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); Thread.sleep(3000); //3秒才能處理完一個任務訊息 //手動確認應答 不批次應答 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } catch (IOException | InterruptedException e) { e.printStackTrace(); //出現異常手動進行不應答;並且放入佇列中 channel.basicReject(message.getEnvelope().getDeliveryTag(), true); } }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } } //消費者A(處理資源很快)開始監聽佇列訊息.... //A消費者獲取佇列資訊並處理:這是一個編號為:0 的待處理的訊息 //A消費者獲取佇列資訊並處理:這是一個編號為:2 的待處理的訊息 //A消費者獲取佇列資訊並處理:這是一個編號為:3 的待處理的訊息 //A消費者獲取佇列資訊並處理:這是一個編號為:4 的待處理的訊息 //A消費者獲取佇列資訊並處理:這是一個編號為:6 的待處理的訊息 //A消費者獲取佇列資訊並處理:這是一個編號為:7 的待處理的訊息 //A消費者獲取佇列資訊並處理:這是一個編號為:8 的待處理的訊息 public class ConsumerB { //工作佇列名稱 public static final String QUEUE_NAME = "workQueue"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //建立佇列 以防啟動消費者發現佇列不存在報錯 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //設定0代表輪詢分發、1不公平分發、大於1代表預取值 channel.basicQos(1); System.out.println("消費者B(處理資源很慢)開始監聽佇列訊息...."); //應答方式 true自動應答 false手動應答(若是手動應答必須設定false) boolean autoAck = false; //消費者消費訊息requeue channel.basicConsume(QUEUE_NAME, autoAck, (consumerTag, message) -> { try { //這裡我就一句列印語句,沒有複雜邏輯,正常這裡有複雜業務 System.out.println("B消費者獲取佇列資訊並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); Thread.sleep(10000); //10秒才能處理完一個任務訊息 //手動確認應答 不批次應答 channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } catch (IOException | InterruptedException e) { e.printStackTrace(); //出現異常手動進行不應答;並且放入佇列中 channel.basicReject(message.getEnvelope().getDeliveryTag(), true); } }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } } //消費者B(處理資源很慢)開始監聽佇列訊息.... //B消費者獲取佇列資訊並處理:這是一個編號為:1 的待處理的訊息 //B消費者獲取佇列資訊並處理:這是一個編號為:5 的待處理的訊息 //B消費者獲取佇列資訊並處理:這是一個編號為:9 的待處理的訊息
總結不公平分發就是,在消費者有收到確認機制後並設定不公平分發就代表哪個消費者先消費完後任務,RabbitMQ佇列會先為它分配下一個任務訊息,反之慢的消費者等消費完也可以拿到新訊息處理
本身佇列傳送給消費者的訊息是非同步傳送的,所以在任何時候,消費者連線佇列時的channel上肯定不止一個訊息,另外來自消費者的手動確認本質上也是非同步的。因此這裡就存在一個未確認的訊息緩衝區,因此希望開發人員能限制此緩衝區的大小,以避免緩衝區裡面無限制的未確認訊息問題。這個時候就可以通過使用basicQos方法設定「預取計數」值來完成的。該值定義通道上允許的未確認訊息的最大數量。一旦數量達到設定的數量,RabbitMQ 將停止再往通道上傳遞更多訊息,除非至少有一個未處理的訊息被確認後RabbitMQ才會再往通道上傳送一條任務訊息;
假設在通道上有未確認的訊息 5、6、7,8,並且通道的預取計數設定為 4,此時 RabbitMQ 將不會再往該通道上再傳遞任何訊息,除非至少有一個未應答的訊息被ack。比方說tag=6這個訊息剛剛被確認ACK,RabbitMQ將會感知這個tag=6被確認並再往通道傳送一條訊息。
訊息應答和 QoS 預取值對使用者吞吐量有重大影響。通常,增加預取將提高向消費者傳遞訊息的速度。雖然自動應答傳輸訊息速率是最佳的,但是,在這種情況下已傳遞但尚未處理的訊息的數量也會增加,從而增加了消費者的RAM消耗(隨機存取記憶體),應該小心使用具有無限預處理的自動確認模式或手動確認模式,消費者消費了大量的訊息如果沒有確認的話,會導致消費者連線節點的記憶體消耗變大,所以找到合適的預取值是一個反覆試驗的過程,不同的負載該值取值也不同,100 到 300 範圍內的值通常可提供最佳的吞吐量,並且不會給消費者帶來太大的風險。預取值為 1 是最保守的。當然這將使吞吐量變得很低,特別是消費者連線延遲很嚴重的情況下,特別是在消費者連線等待時間較長的環境中。對於大多數應用來說,稍微高一點的值將是最佳的。
生產者將通道設定成confirm模式,一旦通道進入confirm模式,所有在該通道上面釋出的訊息都將會被指派一個唯一的ID(從1開始),一旦訊息被投遞到所匹配的佇列之後,broker就會傳送一個確認指令給生產者(包含訊息的唯一ID),這就使得生產者知道訊息已經正確到達目的佇列了,如果訊息和佇列設定的是可持久化的,那麼向生產者確認訊息之前會先將訊息寫入磁碟之後再發出,broker回傳給生產者的確認訊息中delivery-tag域包含了確認訊息的序列號,此外broker也可以設定basic.ack的multiple域(批次確認),表示當前序列號及這個序列號之前的所有訊息都會一併確認。
confirm模式最大的好處在於它是非同步的,一旦釋出一條訊息,生產者應用程式就可以在等通道返回確認的同時繼續傳送下一條訊息,當訊息最終得到確認之後,生產者便可以通過回撥方法來處理該確認訊息,如果 RabbitMQ 因為自身內部錯誤導致訊息丟失,就會傳送一條nack訊息,生產者程式同樣可以在回撥方法中處理該 nack 訊息。
釋出確認預設是沒有開啟的,如果要開啟需要呼叫方法confirmSelect,每當你要想使用釋出確認,都需要在channel上呼叫該方法
這是一種簡單的確認方式,它是一種同步確認釋出的方式,也就是釋出者釋出一個訊息之後只有等RabbitMQ回撥確認方法,釋出者並且也接受到RabbitMQ的確認時,後續的訊息才能繼續釋出,waitForConfirmsOrDie(long)這個方法只有在訊息被確認的時候才返回,如果在指定時間範圍內這個訊息沒有被確認那麼它將丟擲異常。
這種確認方式有一個最大的缺點就是:釋出速度特別的慢,因為如果沒有確認釋出的訊息就會阻塞所有後續訊息的釋出,這種方式最多提供每秒不超過數百條釋出訊息的吞吐量。當然對於某些應用程式來說這可能已經足夠了。
public class ProducerA { //單個釋出確認 public static final String SINGLE_RELEASE_CONFIRMATION = "singleReleaseConfirmation";
public static void main(String[] args) throws IOException, InterruptedException { long begin = System.currentTimeMillis(); //記錄開始時間 //獲取通道 Channel channel = ChannelUtil.getChannel(); //建立一個通道 channel.queueDeclare(SINGLE_RELEASE_CONFIRMATION, true, false, false, null); //開啟發布確認功能 channel.confirmSelect(); //迴圈傳送訊息 for (int i = 0; i < 1000; i++) { String str = "單個釋出確認資訊" + i; System.out.println("開始傳送資訊:" + i); //釋出資訊 channel.basicPublish("", SINGLE_RELEASE_CONFIRMATION, MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes(StandardCharsets.UTF_8)); //驗證是否傳送成功(等待確認) //channel.waitForConfirms(3000); 傳送三秒後沒得到回覆將斷定未傳送過去 boolean b = channel.waitForConfirms(); if (b) { System.out.println("傳送成功了:" + i); } } long end = System.currentTimeMillis(); //記錄結尾時間 System.out.println("單個釋出確認用時:" + (end - begin)); //單個釋出確認用時:2278 } }
與單個等待確認訊息相比,先發布一批訊息然後一起確認可以極大的提高吞吐量,當然這種方式的缺點就是:當發生故障導致釋出出現問題時,不知道那一批確認的是哪個訊息出現問題了,我們必須將整個批次處理儲存在記憶體中,以記錄重要的資訊而後重新發布訊息。當然這種方案仍然是同步的,也一樣阻塞訊息的釋出。
public class ProducerA { //批次確認 public static final String BATCH_CONFIRMATION = "batchConfirmation";public static void main(String[] args) throws IOException, InterruptedException { long begin = System.currentTimeMillis(); //記錄開始時間 //獲取通道 Channel channel = ChannelUtil.getChannel(); //建立一個通道 channel.queueDeclare(BATCH_CONFIRMATION, true, false, false, null); //開啟發布確認功能 channel.confirmSelect(); //定義每次批次處理多少訊息進行確認 int batchNumber = 100; //迴圈傳送訊息 for (int i = 0; i < 1000; i++) { String str = "批次釋出確認資訊" + i; System.out.println("開始傳送資訊:" + i); //釋出資訊 channel.basicPublish("", BATCH_CONFIRMATION, MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes(StandardCharsets.UTF_8)); //驗證是否傳送成功(等待確認) 用求餘的方式來判斷每輪100個 //channel.waitForConfirms(3000); 傳送三秒後沒得到回覆將斷定未傳送過去 if ((i + 1) % batchNumber == 0) { if (channel.waitForConfirms()) { System.out.println("批次傳送成功了 範圍為:" + (i - (batchNumber - 1)) + " ~ " + i); } } } long end = System.currentTimeMillis(); //記錄結尾時間 System.out.println("批次釋出確認用時:" + (end - begin)); //批次釋出確認用時:454 } }
非同步確認雖然程式設計邏輯比上兩個要複雜,但是價效比最高,無論是可靠性還是效率都沒得說,他是利用回撥函數來達到訊息可靠性傳遞的,這個中介軟體也是通過函數回撥來保證是否投遞成功,下面就讓我們來詳細講解非同步確認是怎麼實現的。
public class ProducerA { //非同步釋出確認 public static final String ASYNC_RELEASE_CONFIRMATION = "asyncReleaseConfirmation"; public static void main(String[] args) throws IOException { long begin = System.currentTimeMillis(); //記錄開始時間 //獲取通道 Channel channel = ChannelUtil.getChannel(); //建立一個通道 channel.queueDeclare(ASYNC_RELEASE_CONFIRMATION, true, false, false, null); //開啟發布確認功能 channel.confirmSelect(); //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Start //執行緒安全有序的一個雜湊表Map,適用於高並行的情況 //1.輕鬆的將序號與訊息進行關聯 2.輕鬆批次刪除條目 只要給到序列號 3.支援並行存取 ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { //這個是回撥成功的,回撥成功後把集合中的資料刪除,最終就代表失敗的多少 if (multiple) { ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = outstandingConfirms.headMap(deliveryTag, true); longStringConcurrentNavigableMap.clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("~~~~ 回撥成功的資料:" + deliveryTag + " 是否批次確認:" + multiple); }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("~~~~ 回撥失敗的資料:" + deliveryTag); }; //新增監聽器,監聽返回(監聽器一定要再傳送訊息之前就建立和監聽) 引數1:回撥成功 引數2:回撥失敗 channel.addConfirmListener(ackCallback, nackCallback); //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ End //迴圈傳送訊息 (因為是非同步 這裡正常傳送不用管它) for (int i = 0; i < 10000; i++) { String str = "非同步釋出確認資訊" + i; //記錄要傳送的資料新增到集合中 outstandingConfirms.put(channel.getNextPublishSeqNo(),str); //釋出資訊 channel.basicPublish("", ASYNC_RELEASE_CONFIRMATION, MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes(StandardCharsets.UTF_8)); } long end = System.currentTimeMillis(); //記錄結尾時間 System.out.println("非同步釋出確認用時:" + (end - begin)); //非同步釋出確認用時:337 } }
交換機(Exchange)接收訊息,並根據路由鍵(Routing Key)轉發訊息到繫結的佇列
RabbitMQ訊息傳遞模型的核心思想是: 生產者生產的訊息從不會直接傳送到佇列。實際上,通常生產者甚至都不知道這些訊息傳遞傳遞到了哪些佇列中。相反,生產者只能將訊息傳送到交換機(exchange),交換機工作的內容非常簡單,一方面它接收來自生產者的訊息,另一方面將它們推入佇列。交換機必須確切知道如何處理收到的訊息。是應該把這些訊息放到特定佇列(需要有提前繫結路由鍵)還是說把它們群發到許多佇列中還是說應該丟棄它們。這就的由交換機的型別來決定。
binding其實是exchange和queue之間的橋樑,它告訴我們exchange和那個佇列進行了繫結關係。
直接(direct), 主題(topic) ,標題(headers) , 扇出(fanout)
每當我們消費者連線到RabbitMQ時,我們都需要一個全新的空佇列(因為這個佇列需要繫結到交換機上),為此我們可以建立一個具有隨機名稱的佇列,或者能讓伺服器為我們選擇一個隨機佇列名稱那就更好了。其次一旦我們斷開了消費者的連線,佇列將被自動刪除。
我們沒使用Exchange,但仍能夠將訊息傳送到佇列。之前能實現的原因是因為我們使用的是預設交換,我們通過空字串("")進行標識
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
當初使用:channel.basicPublish("", "當初寫佇列名稱", null,"傳送的訊息");
第一個引數是交換機的名稱:空字串表示預設或無名稱交換機
訊息能路由傳送到佇列中其實是由routingKey(binding key)繫結key指定的,那時key都填寫佇列名稱,所有直接被繫結到對應佇列,
可以說使用的是直接交換機(direct)
扇出交換機是最基本的交換機型別,它所能做的事情非常簡單---廣播訊息。扇出交換機會把能接收到的訊息全部傳送給繫結在自己身上的佇列。因為廣播不需要「思考」,所以扇形交換機處理訊息的速度也是所有的交換機型別裡面最快的。
建立交換機方法:exchangeDeclare(String exchange,BuiltinExchangeType type,boolean durable,
boolean autoDelete,boolean internal,Map<String, Object> arguments)
exchange: 交換機名稱
type: 交換機型別,direct、topic、 fanout、 headers
durable: 是否需要持久化
autoDelete: 當最後一個繫結到Exchange上的佇列刪除後,自動刪除該Exchange
internal: 當前Exchange是否用於RabbitMQ內部使用,預設為False
arguments: 擴充套件引數,用於擴充套件AMQP協定客製化化使用
注:推薦在編寫生產者時建立交換機,在編寫消費者時應該建立佇列,並且佇列繫結交換機,啟動時先啟動交換機
public class FanoutConsumerA { //交換機名稱 public static final String FANOUT_DEMO_EXCHANGE = "fanoutDemoExchange"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告exchange交換機 並設定釋出訂閱模式(扇出模式)防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(FANOUT_DEMO_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null); //建立一個臨時佇列 String queueName = channel.queueDeclare().getQueue(); //把佇列繫結到指定交換機上 channel.queueBind(queueName, FANOUT_DEMO_EXCHANGE, ""); //接收佇列訊息 channel.basicConsume(queueName, false, (consumerTag, message) -> { System.out.println("A臨時消費者獲取佇列資訊並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } } public class FanoutConsumerB { //交換機名稱 public static final String FANOUT_DEMO_EXCHANGE = "fanoutDemoExchange"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告exchange交換機 並設定釋出訂閱模式(扇出模式)防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(FANOUT_DEMO_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null); //建立一個臨時佇列 String queueName = channel.queueDeclare().getQueue(); //把佇列繫結到指定交換機上 channel.queueBind(queueName, FANOUT_DEMO_EXCHANGE, ""); //接收佇列訊息 channel.basicConsume(queueName, false, (consumerTag, message) -> { System.out.println("B臨時消費者獲取佇列資訊並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } }
public class FanoutProducer { //交換機名稱 public static final String FANOUT_DEMO_EXCHANGE = "fanoutDemoExchange"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告exchange交換機 並設定釋出訂閱模式(扇出模式) channel.exchangeDeclare(FANOUT_DEMO_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null); //迴圈傳送訊息 for (int i = 0; i < 10; i++) { String str = "非同步釋出確認資訊" + i; //釋出資訊 channel.basicPublish(FANOUT_DEMO_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes(StandardCharsets.UTF_8)); } System.out.println("訊息傳送完畢!"); } }
直連交換機是一種帶路由功能的交換機,一個佇列會繫結到一個交換機上,一個交換機身上可以繫結多個佇列;當生產者傳送訊息給交換機時,交換機會根據binding在交換機上的routing_key來查詢路由,最終被送到指定的佇列裡;當一個交換機繫結多個佇列,就會被送到對應的佇列去處理。
下面我將以一個案例的方式來使用直接交換機,如下圖:有一個紀錄檔交換機(LogExchange),它負責的功能是將生產者傳送的紀錄檔資訊交到對應的佇列中,佇列分別為基本紀錄檔佇列(BasicLogQueue)、錯誤佇列(ErrQueue)、通知佇列(NotifyQueue);其中基本紀錄檔佇列記錄日常執行紀錄檔錯誤佇列記錄重大問題資訊,因為錯誤紀錄檔需要告知管理員,所有將錯誤紀錄檔又傳送到通知佇列來傳送郵件告知
public class BasicLogConsumer { //交換機名稱 public static final String LOG_EXCHANGE = "LogExchange"; //佇列名稱 public static final String BASIC_LOG_QUEUE = "BasicLogQueue"; //路由繫結關係 Routing Key public static final String BASIC_LOG_KEY = "BasicLogKey"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告exchange交換機 並設定為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(LOG_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //建立一個基本紀錄檔佇列 channel.queueDeclare(BASIC_LOG_QUEUE, true, false, false, null); //佇列繫結到交換機上,並通過路由key來對應兩者的連線 channel.queueBind(BASIC_LOG_QUEUE, LOG_EXCHANGE, BASIC_LOG_KEY); //接收佇列訊息 channel.basicConsume(BASIC_LOG_QUEUE, true, (consumerTag, message) -> { System.out.println("基本紀錄檔佇列裡獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } }
public class ErrConsumer { //交換機名稱 public static final String LOG_EXCHANGE = "LogExchange"; //佇列名稱 public static final String ERR_QUEUE = "ErrQueue"; //路由繫結關係 Routing Key public static final String ERR_KEY = "ErrKey"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告exchange交換機 並設定為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(LOG_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //建立一個基本紀錄檔佇列 channel.queueDeclare(ERR_QUEUE, true, false, false, null); //佇列繫結到交換機上,並通過路由key來對應兩者的連線 channel.queueBind(ERR_QUEUE, LOG_EXCHANGE, ERR_KEY); //接收佇列訊息 channel.basicConsume(ERR_QUEUE, true, (consumerTag, message) -> { System.out.println("錯誤紀錄檔佇列裡獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } }
public class NotifyConsumer { //交換機名稱 public static final String LOG_EXCHANGE = "LogExchange"; //佇列名稱 public static final String NOTIFY_QUEUE = "NotifyQueue"; //路由繫結關係 Routing Key public static final String ERR_KEY = "ErrKey"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告exchange交換機 並設定為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(LOG_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //建立一個基本紀錄檔佇列 channel.queueDeclare(NOTIFY_QUEUE, true, false, false, null); //佇列繫結到交換機上,並通過路由key來對應兩者的連線 channel.queueBind(NOTIFY_QUEUE, LOG_EXCHANGE, ERR_KEY); //接收佇列訊息 channel.basicConsume(NOTIFY_QUEUE, true, (consumerTag, message) -> { System.out.println("接收到錯誤紀錄檔並處理任務郵件傳送,錯誤紀錄檔內容為:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } }
public class DirectProducer { //交換機名稱 public static final String LOG_EXCHANGE = "LogExchange"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告exchange交換機 並設定為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(LOG_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //待傳送的訊息 HashMap<String, List<String>> sendMsg = new HashMap<>(); List<String> errMsg = Arrays.asList("[1001]系統存在重大問題,可能會發生宕機!!", "[1002]電腦受到蠕蟲病毒攻擊!!"); List<String> basicMsg = Arrays.asList("[2001]尊敬的螞蟻小哥歡迎登入系統", "[2002]螞蟻小哥已退出賬號"); sendMsg.put("ErrKey", errMsg); sendMsg.put("BasicLogKey", basicMsg); //迴圈傳送訊息任務 for (Map.Entry<String, List<String>> msg : sendMsg.entrySet()) { String key = msg.getKey();//路由key List<String> messages = msg.getValue();//待傳送訊息 for (String message : messages) { channel.basicPublish(LOG_EXCHANGE, key, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } } System.out.println("訊息傳送完成!!!"); } }
從上面可以看出若exchange的繫結型別是direct,但是它繫結的多個佇列的key如果都相同,在這種情況下雖然繫結型別是direct但是它表現的就和fanout有點類似了,就跟廣播差不多。
適用場景:有優先順序的任務,根據任務的優先順序把訊息傳送到對應的佇列,這樣可以指派更多的資源去處理高優先順序的佇列。
之前我們使用只能進行隨意廣播的fanout扇出交換機,但只能群發給每個佇列,不能傳送到指定某個佇列,但是使用了direct交換機,就可以實現有選擇性地傳送到指定佇列了。儘管使用direct交換機,但是它仍然存在侷限性,如果我們希望一條訊息傳送給多個佇列,那麼這個交換機需要繫結上非常多的routing_key,假設每個交換機上都繫結一堆的routing_key連線到各個佇列上。那麼訊息的管理就會異常地困難。所以RabbitMQ提供了一種主題交換機,傳送到主題交換機上的訊息需要攜帶指定規則的routing_key,主題交換機會根據這個規則將資料傳送到對應的(多個)佇列上;可以理解為模糊匹配。
主題交換機的routing_key需要有一定的規則,交換機和佇列的binding_key需要採用*.#.*.....的格式,每個部分用 . 分開。
其中:
* (星號):可以代替一個單詞
#(井號):可以替代零個或多個單詞
上圖是一個佇列繫結關係圖,我們來看看他們之間資料接收情況是怎麼樣的:
佇列繫結交換機的Key 匹配規則
quick.orange.rabbit 被佇列 Q1 Q2 接收到
lazy.orange.elephant 被佇列 Q1 Q2 接收到
quick.orange.fox 被佇列 Q1 接收到
lazy.brown.fox 被佇列 Q2 接收到
lazy.pink.rabbit 雖然滿足兩個繫結規則但兩個規則都是在Q2佇列,所有隻有Q2接收一次
quick.brown.fox 不匹配任何繫結不會被任何佇列接收到會被丟棄
quick.orange.male.rabbit 是四個單詞不匹配任何繫結會被丟棄
lazy.orange.male.rabbit 是四個單詞但匹配 Q2
注:當一個佇列繫結鍵是#,那麼這個佇列將接收所有資料,就有點像fanout扇出交換機了;如果佇列繫結鍵當中沒有#和*出現,那麼該佇列繫結型別就是direct直接交換機了;下面我將以上圖的案例來程式碼實現:
public class CAConsumer { //交換機名稱 public static final String TOPIC_EXCHANGE = "TopicExchange"; //佇列Q1名稱 public static final String Q1 = "Q1Queue"; //路由繫結關係 Routing Key public static final String Q1_KEY = "*.orange.*"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告exchange交換機 並設定為主題交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null); //建立一個 Q1 佇列 channel.queueDeclare(Q1, true, false, false, null); //佇列繫結到交換機上,並通過主題路由key來對應兩者的連線 channel.queueBind(Q1, TOPIC_EXCHANGE, Q1_KEY); //接收佇列訊息 channel.basicConsume(Q1, true, (consumerTag, message) -> { System.out.println("Q1獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } }
public class CBConsumer { //交換機名稱 public static final String TOPIC_EXCHANGE = "TopicExchange"; //佇列Q2名稱 public static final String Q2 = "Q2Queue"; //路由繫結關係 Routing Key 1 public static final String Q2_KEY_A = "*.*.rabbit"; //路由繫結關係 Routing Key 2 public static final String Q2_KEY_B = "lazy.#"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告exchange交換機 並設定為主題交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null); //建立一個 Q2 佇列 channel.queueDeclare(Q2, true, false, false, null); //佇列繫結到交換機上,並通過主題路由key來對應兩者的連線(這裡設定了2個連線) channel.queueBind(Q2, TOPIC_EXCHANGE, Q2_KEY_A); channel.queueBind(Q2, TOPIC_EXCHANGE, Q2_KEY_B); //接收佇列訊息 channel.basicConsume(Q2, true, (consumerTag, message) -> { System.out.println("Q2獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } }
public class TopicProducer { //交換機名稱 public static final String TOPIC_EXCHANGE = "TopicExchange"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告exchange交換機 並設定為主題交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null); //訊息任務準備 HashMap<String, String> sendMsg = new HashMap<>(); sendMsg.put("quick.orange.rabbit", "被佇列 Q1 Q2 接收到"); sendMsg.put("lazy.orange.elephant", "被佇列 Q1 Q2 接收到"); sendMsg.put("quick.orange.fox", "被佇列 Q1 接收到"); sendMsg.put("lazy.brown.fox", "被佇列 Q2 接收到"); sendMsg.put("lazy.pink.rabbit", "雖然滿足兩個繫結規則但兩個規則都是在Q2佇列,所有隻要Q2接收一次"); sendMsg.put("quick.brown.fox", "不匹配任何繫結不會被任何佇列接收到會被丟棄"); sendMsg.put("quick.orange.male.rabbit", "是四個單詞不匹配任何繫結會被丟棄"); sendMsg.put("lazy.orange.male.rabbit", "是四個單詞但匹配 Q2"); //迴圈傳送訊息任務 for (Map.Entry<String, String> msg : sendMsg.entrySet()) { String routKey = msg.getKey(); //主題路由key String message = msg.getValue();//訊息任務 channel.basicPublish(TOPIC_EXCHANGE, routKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); } } }
死信佇列(DLX,Dead-Letter-Exchange)就是無法被消費的訊息,一般來說producer將訊息投遞到broker或者直接到queue裡了,consumer從queue取出訊息進行消費,但某些時候由於特定的原因導致queue中的某些訊息無法被消費,這樣的訊息如果沒有後續的處理,就變成了死信,有死信自然就有了死信佇列。當訊息在一個佇列中變成死信後,它能被重新發布到另一個Exchange中,通過Exchange分發到另外的佇列;本質就是該訊息不會再被任何消費端消費(但你可以自定義某消費者單獨處理這些死信)。
應用場景:為了保證訂單業務的訊息資料不丟失,需要使用到RabbitMQ的死信佇列機制,當訊息消費發生異常時,將訊息投入死信佇列中;比如說: 使用者在商城下單成功並點選去支付後在指定時間未支付時自動失效
「死信」訊息會被RabbitMQ進行特殊處理,如果設定了死信佇列資訊,那麼該訊息將會被丟進死信佇列中,如果沒有設定,則該訊息將會被丟棄。
①:訊息被拒絕(basic.reject/basic.nack),且 requeue = false(代表不重新回到佇列)
②:訊息因TTL過期(就是任務訊息上攜帶過期時間)
③:訊息佇列的訊息數量已經超過最大佇列長度,先入隊的訊息會被丟棄變為死信
普通消費者程式碼:
public class TTLConsumer { //宣告普通的交換機名稱 public static final String NORMAL_EXCHANGE = "NormalExchange"; //宣告死信交換機名稱 public static final String DLX_EXCHANGE = "DLXExchange"; //宣告普通佇列名稱 public static final String Normal_Queue = "NormalQueue"; //宣告死信佇列名稱 public static final String DLX_QUEUE = "DLXQueue"; //宣告路由繫結關係 Routing Key 普通交換機到普通佇列 public static final String NORMAL_KEY = "NormalKey"; //宣告路由繫結關係 Routing Key 死信交換機到死信佇列 public static final String DLX_KEY = "DLXKey"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告普通exchange交換機 並設定為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //宣告死信exchange交換機 並設定為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //宣告死信佇列 channel.queueDeclare(DLX_QUEUE, true, false, false, null); //死信佇列繫結死信交換機routingKey channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_KEY); //引數設定 Map<String, Object> arguments = new HashMap<>(); //正常佇列設定死信交換機 引數key是固定值 arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); //正常佇列設定死信交換機到死信佇列繫結Routing Key 引數key是固定值 arguments.put("x-dead-letter-routing-key", DLX_KEY); //宣告普通佇列 channel.queueDeclare(Normal_Queue, true, false, false, arguments); //普通佇列繫結普通交換機routingKey channel.queueBind(Normal_Queue, NORMAL_EXCHANGE, NORMAL_KEY); System.out.println("初始化完成,等待接收訊息"); //接收佇列訊息 channel.basicConsume(Normal_Queue, true, (consumerTag, message) -> { System.out.println("如同佇列獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } }
死信消費者程式碼:
public class DLXConsumer { //宣告死信交換機名稱 public static final String DLX_EXCHANGE = "DLXExchange"; //宣告死信佇列名稱 public static final String DLX_QUEUE = "DLXQueue"; //宣告路由繫結關係 Routing Key 死信交換機到死信佇列 public static final String DLX_KEY = "DLXKey"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告死信exchange交換機 並設定為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(DLX_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //宣告死信佇列 channel.queueDeclare(DLX_QUEUE, true, false, false, null); //死信佇列繫結死信交換機routingKey channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_KEY); System.out.println("初始化完成,等待接收訊息"); //接收佇列訊息 channel.basicConsume(DLX_QUEUE, true, (consumerTag, message) -> { System.out.println("死信佇列裡獲取的任務並處理:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); }); } }
生產者程式碼編寫:
public class DLXProducer { //宣告普通的交換機名稱 public static final String NORMAL_EXCHANGE = "NormalExchange"; //宣告路由繫結關係 Routing Key 普通交換機到普通佇列 public static final String NORMAL_KEY = "NormalKey"; public static void main(String[] args) throws IOException { //呼叫自己的工具類獲取通道 Channel channel = ChannelUtil.getChannel(); //宣告普通exchange交換機 並設定為直接交換機;防止消費者先啟動報錯,找不到交換機 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null); //傳送訊息的引數設定 expiration過期時間10秒 deliveryMode 訊息持久化方式 AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").deliveryMode(2).build(); //迴圈傳送訊息 for (int i = 0; i < 5; i++) { String str = "測試佇列任務 " + i; //釋出資訊 channel.basicPublish(NORMAL_EXCHANGE, NORMAL_KEY, properties, str.getBytes(StandardCharsets.UTF_8)); } System.out.println("訊息傳送完畢!"); } }
編寫好程式碼以後執行普通消費者和死信消費者程式碼檢視建立的佇列及交換機的狀況
測試(為了可以更好的演示效果,關閉普通消費者和死信消費者):
生產者傳送5條訊息到普通 佇列中,此時普通佇列裡面存在10條未消費資訊:
訊息達到過期時間後會從普通佇列推播到死信佇列裡(因為提前設定了訊息變死信後傳送到死信交換機)
接下來我們就可以啟動死信消費者來消費這一批死信佇列裡的任務訊息
程式碼優化:剔除生產者程式碼中的訊息過期時間,並在普通消費者裡面設定佇列最大長度
//引數設定 Map<String, Object> arguments = new HashMap<>(); //正常佇列設定死信交換機 引數key是固定值 arguments.put("x-dead-letter-exchange", DLX_EXCHANGE); //正常佇列設定死信交換機到死信佇列繫結Routing Key 引數key是固定值 arguments.put("x-dead-letter-routing-key", DLX_KEY); //設定正常佇列的長度限制 為3 arguments.put("x-max-length",3);
注:因為佇列引數改變,需要先刪除原佇列,並啟動消費者,建立出帶佇列長度的佇列
程式碼優化:剔除普通消費者裡面設定佇列最大長度,並優化普通消費者訊息接收程式碼
//接收佇列訊息 channel.basicConsume(Normal_Queue, false, (consumerTag, message) -> { //獲取的任務訊息 String msg = new String(message.getBody(), StandardCharsets.UTF_8); //手動不確認,拒收,並丟去佇列 if ("測試佇列任務 3".equals(msg)) { //出現異常手動進行不應答;並且不放入佇列中 channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("如同佇列獲取的任務並處理:" + msg); } }, consumerTag -> { System.out.println("監聽的佇列出現異常;可能佇列被刪除!"); });
普通佇列:它是一種佇列,佇列意味著內部的元素是有序的,元素出隊和入隊是有方向性的,元素從一端進入,從另一端取出。
延時佇列:最重要的特性就體現在它的延時屬性上,跟普通的佇列不一樣的是,普通佇列中的元素總是等著希望被早點取出處理,而延時佇列中的元素則是希望被在指定時間得到取出和處理,所以延時佇列中的元素是都是帶時間屬性的,通常來說是需要被處理的訊息或者任務。簡單來說,延時佇列就是用來存放需要在指定時間以後被處理的元素的佇列(到達設定的延遲時間後再推給消費者進行任務處理)。
那麼什麼時候需要用延時佇列呢?考慮一下以下場景:
①:訂單在十分鐘之內未支付則自動取消。
②:新建立的店鋪,如果在十天內都沒有上傳過商品,則自動傳送訊息提醒。
③:賬單在一週內未支付,則自動結算。
④:使用者註冊成功後,如果三天內沒有登陸則進行簡訊提醒。
⑤:使用者發起退款,如果三天內沒有得到處理則通知相關運營人員。
⑥:預定會議後,需要在預定的時間點前十分鐘通知各個與會人員參加會議。
TTL是RabbitMQ中一個訊息或者佇列的屬性,表明一條訊息或者該佇列中的所有訊息的最大存活時間,單位是毫秒。換句話說,如果一條訊息設定了TTL屬性或者進入了設定TTL屬性的佇列,那麼這條訊息如果在TTL設定的時間內沒有被消費,則會成為「死信」。如果同時設定了佇列的TTL和訊息的TTL,那麼較小的那個值將會被使用。
設定這個TTL值有兩種方式(佇列設定、訊息設定):
第一種是在建立佇列的時候設定佇列的 "x-message-ttl" 屬性,如下:
Map<String, Object> arguments = new HashMap<>();
//設定訊息延遲10秒;投遞到該佇列的訊息超過10秒直接丟棄
arguments.put("x-message-ttl",10000);
//建立佇列,並指定引數
channel.queueDeclare(Normal_Queue, true, false, false, arguments);
第二種方式針對每條訊息設定TTL:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
//這條訊息的過期時間也被設定成了10s , 超過10秒未處理則執行到此訊息後被丟棄
builder.expiration("10000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
區別的:第一種在佇列上設定TTL屬性,那麼一旦訊息過期,就會被佇列丟棄;而第二種方式,訊息即使過期,也不一定會被馬上丟棄,
因為訊息是否過期是在即將投遞到消費者之前判定的,如果當前佇列有嚴重的訊息積壓情況,則已過期的訊息也許還能存活較長時間。
另外,還需要注意的一點是,如果不設定TTL,表示訊息永遠不會過期,如果將TTL設定為0,則表示除非此時可以直接投遞該訊息到消費者,
否則該訊息將會被丟棄。
看到這裡也代表基本的RabbitMQ已經知道了,下面可以看一看下篇的SpringBoot整合RabbitMQ,下篇有延遲佇列的詳細說明。