Rabbitmq 學習

2020-08-14 17:13:02

訊息中介軟體 RabbitMQ 技術精講

  1. 基礎篇 - RabbitMQ核心概念及基礎API應用
  2. 高階篇 - RabbitMQ高階特性
  3. 實戰篇 - Spring家族整合(SpringAMQP、Spring Boot、Spring Cloud Stream)
  4. 叢集篇 - RabbitMQ叢集架構
  5. 擴充套件篇 - SET化架構設計與大廠MQ元件設計及實現思路

主流中介軟體介紹

衡量訊息中介軟體的指標:服務效能,數據儲存,叢集架構

  • ActiveMQ
  • Kafka
  • RocketMQ
  • RabbitMQ

第一天:RabbitMQ核心概念及基礎API應用

初識RabbitMQ

RabbitMQ是一個開源的訊息代理和佇列伺服器,用來通過普通協定在完全不同的應用之間共用數據,
RabbitMQ是使用Erlang語言來編寫的,並且RabbitMQ是基於AMQP協定的。

哪些大廠在用RabbitMQ,爲什麼?

滴滴、美團、頭條、去哪兒…

  • 開源、效能優秀,穩定性保障
  • 提供可靠性訊息投遞模式(confirm)、返回模式(return)
  • 與SpringAMQP完美的整合、API豐富
  • 叢集模式豐富,表達式設定,HA模式,映象佇列模型
  • 保證數據不丟失的前提做到高可靠性、可用性

RabbitMQ高效能原因?

  1. Erlang語言最初在於交換機領域的架構模式,這樣使得 RabbitMQ 在 Broker 之間進行數據互動的效能是非常優秀的。
  2. Erlang的優點:Erlang有着和原生Socket一樣的延遲

什麼是AMQP高階訊息佇列協定?

AMQP:Advanced Message Queuing Protocol(高階訊息佇列協定)

AMQP定義:是具有現代特徵的二進制協定。是一個提供統一訊息服務的應用層標準高階訊息佇列協定,是應用層協定的一個開放標準,爲訊息導向中介層設計。

AMQP協定模型

AMQP協定棧

  1. Modle Layer:位於協定最高層,主要定義了一些供用戶端呼叫的命令,用戶端可以利用這些命令實現自己的業務邏輯,例如,用戶端可以通過queue.declare宣告一個佇列,利用consume命令獲取一個佇列中的訊息。
  2. Session Layer:主要負責將用戶端的命令發送給伺服器,在將伺服器端的應答返回給用戶端,主要爲用戶端與伺服器之間通訊提供可靠性、同步機制 機製和錯誤處理。
  3. Transport Layer:主要傳輸二進制數據流,提供幀的處理、通道複用、錯誤檢測和數據表示。

AMQP協定模型

核心概念

  • Server:又稱作Broker,訊息伺服器範例,用於接受用戶端的連線,實現AMQP實體服務;
  • Connection:連線,應用程式與Broker的網路連線;
  • Channel:網路通道,幾乎所有的操作都在Channel中進行,Channel是進行訊息讀寫的通道;
    用戶端可建立多個Channel,每個Channel代表一個對談任務;
  • Message:訊息,伺服器和應用程式之間傳送的數據,有Properties和Body組成。
    Properties可以對訊息進行修飾,比如訊息的優先順序、延遲等高階特性;Body則是訊息體內容,即我們要傳輸的數據;
  • Virtual Host:虛擬地址,是一個邏輯概念,用於進行邏輯隔離,是最上層的訊息路由;
  • Exchange:交換機,用於接收訊息,可根據路由鍵將訊息轉發到系結的佇列;
  • Binding:Exchange和Queue之間的虛擬連線,binding中可以包含routing key;
  • Routing Key:一個路由規則,虛擬機器可用它來確定如何路由一個特定的訊息;
  • Queue:也稱作Message Queue,即訊息佇列,用於儲存訊息並將他們轉發給消費者;
  • Producer:訊息生產者,就是投遞訊息的程式;
  • Consumer:訊息消費者,就是接受訊息的程式;

RabbitMQ整體架構

RabbitMQ訊息是如何流轉

RabbitMQ安裝

兩種方式

  1. 解壓安裝
  2. rpm安裝(簡單)

需要用到的軟體包

  • erlang-23.0.2-1.el7.x86_64.rpm
  • socat-1.7.3.2-5.el7.lux.x86_64.rpm
  • rabbitmq-server-3.8.5-1.el7.noarch.rpm

注意:erlang 和 RabbitMQ 版本要對應

rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm

啓動 / 關閉 RabbitMQ
centos 7

systemctl start rabbitmq-server
lsof -i:5672
systemctl stop rabbitmq-server

預設使用者
guest / guest,只能本地存取

開啓 guest 遠端存取
將 rabbitmq.conf 上傳到 /etc/rabbitmq 目錄下,並修改設定

loopback_users.guest = false

埠號

  • amqp: 5672(RabbitMQ伺服器端程式埠)
  • http: 15672(RabbitMQ管控台程式埠)

優化啓動和連線效能

如果安裝完成後,啓動特別慢,或者Java連線mq非常慢(導致Socket連線異常)

  • 將本地IP地址(RabbitMQ伺服器IP地址)和 hostname 進行系結
vim /etc/hosts
192.168.229.116 Cloud01

命令列與管控台

常用命令

# 關閉應用
rabbitmqctl stop_app
# 啓動應用
rabbitmqctl start_app
# 檢視節點狀態
rabbitmqctl status

# 新增使用者
rabbitmqctl add_user username password
# 設定使用者爲管理員
rabbitmqctl set_user_tags username administrator
# 設定使用者許可權
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
# 檢視所有使用者
rabbitmqctl list_users
# 刪除使用者
rabbitmqctl delete_user username
# 清除使用者許可權
rabbitmqctl clear_permissions -p vhostpath username

# 建立虛擬主機
rabbitmqctl add_vhost vhostpath
# 檢視所有的虛擬主機
rabbitmqctl list_vhosts
# 檢視虛擬主機上所有許可權
rabbitmqctl list_permissions -p vhostpath
# 刪除虛擬主機
rabbitmqctl delete_vhost vhostpath

# 檢視所有佇列
rabbitmqctl list_queues
# 清除佇列裡的訊息
rabbitmqctl -p vhostpath purge_queue blue

#================== 高階操作 ====================
# 移除所有數據,要在rabbitmqctl stop_app 之後使用
rabbitmqctl reset
# 組成叢集命令, [--ram] 指定節點的儲存形式
rabbitmqctl join_cluster <clusternode> [--ram]
# 檢視叢集狀態
rabbitmqctl cluster_status
# 修改叢集節點的儲存形式
rabbitmqctl change_cluster_node_type disc | ram
# 忘記節點(摘除節點):叢集失敗或者故障轉移
rabbitmqctl forget_cluster_node [--offline]
# 修改節點名稱
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2] ...

管控台

  1. 開啓管控台外掛
rabbitmq-plugins enable rabbitmq_management
  1. 存取管控台:http://0.0.0.0:15672

編碼實現訊息生產與消費

Exchange 交換機

Exchange:接收訊息,並根據路由鍵轉發訊息所系結的佇列

交換機屬性

  • Name:交換機名稱
  • Type:交換機型別(Direct、Topic、Fanout、Headers)
  • Durability:是否需要持久化,true爲持久化
  • Auto Delete:當最後一個系結到Exchange上的佇列刪除後,自動刪除該 Exchange
  • Internal:當前Exchange是否用於RabbitMQ內部使用,預設爲false
  • Arguments:擴充套件參數,用於擴充套件AMQP協定自制 自製定化使用

Direct Exchange(直連交換機)

所有發送到 Direct Exchange 的訊息被轉發到 RoutingKey 中指定的 Queue。

Topic Exchange(主題交換機)

Exchange將RoutingKey和某個Topic進行模糊匹配,此時佇列需要系結一個Topic。

注意:可以使用萬用字元進行模糊匹配
「#」 匹配一個或多個詞
" 匹配一個詞
「log.#」 能夠匹配到 「log.info.oa」
"log.
」 能夠匹配到 「log.erro」

Fanout Exchange(扇形交換機)

扇型交換機(funout exchange)將訊息路由給系結到它身上的所有佇列,而不理會系結的路由鍵。如果N個佇列系結到某個扇型交換機上,當有訊息發送給此扇型交換機時,交換機會將訊息的拷貝分別發送給這所有的N個佇列。扇型用來交換機處理訊息的廣播路由(broadcast routing)。

Default Exchange(預設交換機)

預設交換機(defaultexchange)實際上是一個由訊息代理預先宣告好的沒有名字(名字爲空字串)的直連交換機(direct exchange)。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每個新建佇列(queue)都會自動系結到預設交換機上,系結的路由鍵(routing key)名稱與佇列名稱相同。

Binding 系結

  • Exchange和Exchange、Queue之間的連線關係
  • Binding中可以包含RoutingKey或者參數

Queue 佇列

  • 訊息佇列,實際儲存訊息數據
  • Durability:是否持久化,Durable:是,Transient:否
  • Auto Delete:yes,表示當最後一個監聽被移除後,該Queue會自動被刪除

Message 訊息

  • 伺服器和應用程式之間傳送的數據
  • 本質上就是一段數據,由 Properties 和 Payload(Body)組成
  • 常用屬性:delivery mode、headers(自定義屬性)
  • 其他屬性:
    • content_type、content_encoding、priority(優先順序0-9)
    • correlation_id、reply_to、expiration、message_id
    • timestamp、type、user_id、app_id、cluster_id

Virtual Host 虛擬主機

  • 虛擬地址,用於進行邏輯隔離,最上層的訊息路由
  • 一個Virtual Host裏面可以有若幹個Exchange和Queue
  • 同一個Virtual Host裏面不能有相同名稱的Exchange或Queue

第二天 RabbitMQ高階特性

Confirm確認訊息

什麼是Confirm機制 機製

  • 訊息的確認,生產者投遞訊息後,如果Broker收到訊息,則會給生產者一個應答
  • 生產者接收應答,用來確定這條訊息是否正常地發送到Broker,這種方式也是訊息可靠性投遞的核心保障!

Confirm機制 機製流程

  1. Producer發送訊息到Broker,Broker接收到訊息後,產生回送響應
  2. Producer中有一個Confirm Listener非同步監聽響應應答

實現Confirm機制 機製

  1. 在channel上開啓確認模式:channel.confirmSelect()
  2. 在channel上新增監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對訊息進行重新發送、或記錄日誌等後續處理!

Return返回訊息

什麼是Return機制 機製

  • Return Listener 用於處理一些不可路由的訊息!
  • 生產端通過指定一個Exchange和Routingke,把訊息送達到某一個佇列中,然後消費端監聽佇列,進行消費處理!
  • 但如果我們在發送訊息時,當前Exchange不存在或者Routingkey路由不到,如果我們要監聽這種不可達的訊息,就要用到Return Listener!

Return機制 機製流程

實現Return機制 機製

  1. 新增return監聽:addReturnListener,生產端去監聽這些不可達的訊息,做一些後續處理,比如說,記錄下訊息日誌,或者及時去跟蹤記錄,有可能重新設定一下就好了
  2. 在基礎的API中的一個關鍵的設定項:Mandatory
    • 如果爲true,則監聽器會接收到路由不可達的訊息,然後進行後續處理
    • 如果爲false,那麼broker端自動刪除該訊息

自定義消費者使用

比如發送了100條訊息,最原始的消費方式是通過一個回圈,挨個的獲取訊息

  • 使用自定義的Consumer,它更方便,解耦性更強,程式碼也更優雅,也是在實際工作中最常用的使用方式
  • 自定義Consumer實現只需要繼承 DefaultConsumer 類,重寫 handleDelivery 方法即可!

自定義Consumer實現

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    /**
     *
     * @param consumerTag 內部生成的消費者標籤
     * @param envelope 包含屬性:deliveryTag(標籤), redeliver, exchange, routingKey
     *                 redeliver是一個標記,如果設爲true,表示訊息之前可能已經投遞過了,現在是重新投遞訊息到監聽佇列的消費者
     * @param properties 訊息屬性
     * @param body 訊息內容
     * @throws IOException
     */
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("==========Consume Message===========");
        // 輸出訊息內容
        System.out.println("message: " + new String(body, "UTF-8"));
        System.out.println("consumerTag: " + consumerTag);
        System.out.println("envelope: " + envelope.getDeliveryTag()
                + "," + envelope.getExchange()
                + "," + envelope.getRoutingKey()
                + "," + envelope.isRedeliver());
        System.out.println("properties: " + properties);
    }
}

說明:與Spring整合之後,就不需要使用了,而只需要一個 監聽的註解 就可以直接搞定!!!

消費端的限流策略

什麼是消費端限流?

訊息過載場景

  • 假設我們有這樣的場景,首先Rabbitmq伺服器有上萬條未處理的訊息,我們隨便開啓一個消費者用戶端,會造成巨量的訊息瞬間全部推播過來,然而我們單個用戶端無法同時處理這麼多數據!此時很有可能導致伺服器崩潰,嚴重的可能導致線上的故障。
  • 還有一些其他的場景,比如說單個生產者一分鐘產生了幾千條數據,但是單個消費者一分鐘可能只能處理100條,這個時候生產端和用戶端肯定是不平衡的。通常生產端是沒辦法做限制的,所以消費端肯定需要做一些限流措施,否則如果超出最大負載,可能導致消費端效能下降,伺服器卡頓甚至崩潰等一系列嚴重後果!因此,我們需要限流

消費端限流機制 機製

RabbitMQ提供了一種Qos (服務品質保證)功能,即在非自動確認訊息的前提下,如果一定數目的訊息 (通過基於consume或者channel設定Qos的值) 未被確認前,不消費新的訊息!

不能設定自動簽收功能(autoAck = false)
如果訊息未被確認,就不會到達消費端,目的就是給消費端減壓

限流設定API

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);

  • prefetchSize: 單條訊息的大小限制,通常設定爲0,表示不做限制
  • prefetchCount: 一次最多能處理多少條訊息,實際工作中一般設定爲1
  • global: 設定true應用於channel級別,設定false表示Consumer級別,一般設定false

prefetchSize和global這兩項,RabbitMQ沒有實現,暫且不研究
prefetchCount在 autoAck=false 的情況下生效,即在自動應答的情況下該值無效

手工ACK

void basicAck(Integer deliveryTag,boolean multiple)

呼叫這個方法就會主動回送給Broker一個應答,表示這條訊息我處理完了,你可以給我下一條了。參數multiple表示是否批次簽收,由於我們是一次處理一條訊息,所以設定爲 false。

實現消費端限流

訊息堆積?(生產中的解決方案)
首先是先加大消費端機器,將訊息逐漸的消費完成
是什麼原因導致訊息堆積?
生產端過快
消費端消費過慢
程式上來優化,JVM優化…
重複消費?
冪等消費設計

消費端ACK與重回佇列機制 機製

消費端的手工ACK和NACK

當我們設定autoACK=false 時,就可以使用手工ACK方式了,其實手工方式包括了手工ACK與NACK

當我們手工 ACK 時,會發送給Broker一個應答,代表訊息處理成功,Broker就可回送響應給生產端。
NACK 則表示訊息處理失敗,如果設定了重回佇列,Broker端就會將沒有成功處理的訊息重新發送。

使用方式

消費端消費時,如果由於業務異常,我們可以手工 NACK 記錄日誌,然後進行補償

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

如果由於伺服器宕機等嚴重問題,我們就需要手工 ACK 保障消費端消費成功

void basicAck(long deliveryTag, boolean multiple)

消費端重回佇列

  • 重回佇列是爲了對沒有處理成功的訊息,將訊息重新投遞給Broker
  • 重回佇列會把消費失敗的訊息重新新增到佇列的尾端,供消費端繼續消費
  • 一般在實際應用中,都會關閉重回佇列,即設定爲false
    • 實際應用中而是採用死信佇列,做後續的處理(補償機制 機製)

重回佇列實現

TTL訊息

什麼是TTL?

  • TTL(Time To Live),即生存時間
  • RabbitMQ支援訊息的過期時間,在訊息發送時可以進行指定
  • RabbitMQ支援爲每個佇列設定訊息的超時時間,從訊息入佇列開始計算,只要超過了佇列的超時時間設定,那麼訊息會被自動清除

死信佇列

什麼是死信佇列?

DLX - 死信佇列(Dead-Letter-Exchange)
利用DLX,當訊息在一個佇列中變成死信 (dead message) 之後,它能被重新Publish到另一個Exchange中,這個Exchange就是DLX。

死信佇列的產生場景(訊息變爲死信)

  • 訊息被拒絕(basic.reject / basic.nack),並且requeue = false(不重回佇列)
  • 訊息TTL過期
  • 佇列達到最大長度

死信佇列的處理過程

  • DLX亦爲一個普通的Exchange,它能在任何佇列上被指定,實際上就是設定某個佇列的屬性
  • 當某佇列中有死信時,RabbitMQ會自動地將該訊息重新發布到設定的Exchange(DLX),進而被路由到另一個佇列(死信佇列)
  • 可以監聽這個佇列中的訊息做相應的處理,該特性可以彌補RabbitMQ 3.0以前支援的immediate參數的功能

死信佇列的設定

  • 設定死信佇列的exchange和queue,然後進行系結
    • Exchange:dlx.exchange
    • Queue:dlx.queue
    • RoutingKey:#
  • 正常宣告交換機、佇列、系結,只不過我們需要在佇列加上一個參數即可
arguments.put("x-dead-letter-exchange","dlx.exchange");

這樣訊息在過期、requeue、 佇列在達到最大長度時,訊息就可以直接路由到死信佇列!

// 業務方法
public void consume () {
	
	boolean ack = true;
	
	try {
		// 消費訊息
		// 若出現異常
	} catch (Exception e) {
		ack = false;
	}

	if (!ack) {
		// 訊息變爲死信
		// 這條訊息會被自動發佈到 DLX 上
		channel.basicNack("", false, false);
	} else {
		channel.basicAck();
	}
}

訊息如何保障100%的投遞成功方案

什麼是生產端的可靠性投遞?

  • 保障訊息的成功發出
  • 保障MQ節點的成功接收
  • 發送端收到MQ節點(Broker)確認應答
  • 完善的訊息進行補償機制 機製

在實際生產中,很難保障前三點的完全可靠,比如在極端的環境中,生產者發送訊息失敗了,發送端在接受確認應答時突然發生網路閃斷等等情況,很難保障可靠性投遞,所以就需要有第四點完善的訊息補償機制 機製。

網際網路大廠解決方案

  • 訊息落庫,對訊息狀態進行打標(中小型企業常見方案)
  • 訊息的延遲投遞,做二次確認,回撥檢查(大廠實現方案)

冪等性概念及業界主流解決方案

什麼是冪等性?

使用者對於同一操作發起的一次請求或者多次請求的結果是一致的

消費端冪等性保障

在海量訂單產生的業務高峯期,如何避免訊息的重複消費問題?

在業務高峯期最容易產生訊息重複消費問題,當Consumer消費完訊息時,在給Producer返回ack時由於網路中斷,導致Producer未收到確認資訊,該條訊息就會重新發送並被Consumer消費,但實際上該消費者已成功消費了該條訊息,這就造成了重複消費。

消費端實現冪等性,就意味着我們的訊息永遠不會被多次消費,即使我們收到了很多一樣的訊息。

解決訊息重複消費問題

業界主流的解決方案

  1. 唯一ID + 指紋碼
  2. Redis原子性

第三天 Spring家族整合

SpringAMQP整合

  1. RabbitAdmin:使用者管理元件
    • RabbitAdmin底層實現就是從Spring容器中獲取Exchange、Binding、RoutingKey以及Queue的@Bean宣告
    • 然後使用RabbitTemplate的execute方法執行對應的宣告、修改、刪除等一系列RabbitMQ基礎功能操作

設定類

@Configuration
public class RabbitConfig {
    @Bean
    public ConnectionFactory connectionFactory () {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setAddresses("192.168.229.111:5672");
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setVirtualHost("/");
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin (ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // autoStartup必須要設定爲true,否則Spring容器不會載入RabbitAdmin類
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}
@SpringBootTest
class RabbitmqSpringApplicationTests {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    /**
     * 交換機名稱
     */
    private final String DIRECT_EXCHANGE = "test_direct_exchange";
    private final String TOPIC_EXCHANGE = "test_topic_exchange";
    private final String FANOUT_EXCHANGE = "test_fanout_exchange";
    /**
     * 佇列名稱
     */
    private final String DIRECT_EXCHANGE_QUEUE = "test_direct_queue";
    private final String TOPIC_EXCHANGE_QUEUE = "test_topic_queue";
    private final String FANOUT_EXCHANGE_QUEUE = "test_fanout_queue";

    @Test
    public void testRabbitAdmin () {
        // 宣告三個交換機
        rabbitAdmin.declareExchange(new DirectExchange(DIRECT_EXCHANGE, false, false));
        rabbitAdmin.declareExchange(new TopicExchange(TOPIC_EXCHANGE, false, false));
        rabbitAdmin.declareExchange(new FanoutExchange(FANOUT_EXCHANGE, false, false));

        // 宣告三個佇列
        rabbitAdmin.declareQueue(new Queue(DIRECT_EXCHANGE_QUEUE, false));
        rabbitAdmin.declareQueue(new Queue(TOPIC_EXCHANGE_QUEUE, false));
        rabbitAdmin.declareQueue(new Queue(FANOUT_EXCHANGE_QUEUE, false));

        // 系結佇列和交換機
        rabbitAdmin.declareBinding(new Binding(DIRECT_EXCHANGE_QUEUE, Binding.DestinationType.QUEUE,
                DIRECT_EXCHANGE, "direct.#", null));

        // 直接建立佇列和交換機,並系結關係
        rabbitAdmin.declareBinding(BindingBuilder
                // 直接建立佇列
                .bind(new Queue(TOPIC_EXCHANGE_QUEUE, false))
                // 直接建立交換機
                .to(new TopicExchange(TOPIC_EXCHANGE, false,false))
                // 指定路由key
                .with("topic.#"));

        rabbitAdmin.declareBinding(BindingBuilder
                // 直接建立佇列
                .bind(new Queue(FANOUT_EXCHANGE_QUEUE, false))
                // 直接建立交換機
                .to(new FanoutExchange(FANOUT_EXCHANGE, false,false)));
    }
}
  • SpringAMQP宣告
/**
* 宣告一個 Topic Exchange
* @return
*/
@Bean
public TopicExchange topicExchange () {
   return new TopicExchange(TOPIC_EXCHANGE, true, false);
}

/**
* 宣告一個 QueueA
* @return
*/
@Bean
public Queue queueA () {
   return new Queue(TOPIC_EXCHANGE_QUEUE_A, true);
}
/**
* 將佇列A系結到交換機
* @return
*/
@Bean
public Binding bindingA () {
   return BindingBuilder.bind(queueA()).to(topicExchange()).with(TOPIC_EXCHANGE_QUEUE_A_ROUTING_KEY);
}
  1. RabbitTemplate:訊息模板

    RabbitAdmin類可以很好的操作RabbitMQ,在Spring中直接進行注入即可

@Bean
public RabbitTemplate rabbitTemplate (ConnectionFactory connectionFactory) {
   return new RabbitTemplate(connectionFactory);
}
@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 發送訊息
*/
@Test
public void testSendMessage () {
   // 建立訊息
   MessageProperties messageProperties = new MessageProperties();
   messageProperties.setContentType("text/plain");
   Message msg = new Message("Hello World".getBytes(), messageProperties);
   // 發送訊息
   rabbitTemplate.send("spring.amqp.topic.exchange", "c.msg", msg);
}

/**
* 簡單訊息發送
*/
@Test
public void testSendMessage2 () {
   String msg = "Hello yuan deng ta...";
   rabbitTemplate.convertAndSend("spring.amqp.topic.exchange", "b.msg", msg);
}

/**
* 帶屬性訊息發送
*/
@Test
public void testSendMessage3 () {
   // 建立訊息
   MessageProperties messageProperties = new MessageProperties();
   messageProperties.getHeaders().put("desc", "訊息描述...");
   messageProperties.getHeaders().put("type", "自定義訊息型別");
   Message msg = new Message("Hello yuan deng ta".getBytes(), messageProperties);

   // 發送訊息
   rabbitTemplate.convertAndSend("spring.amqp.topic.exchange", "a.msg", msg, message -> {
       System.out.println("------新增額外的設定------");
       message.getMessageProperties().getHeaders().put("desc", "訊息詳細描述...");
       message.getMessageProperties().getHeaders().put("attr", "新增屬性");
       return message;
   });
}
  1. SimpleMessageListenerContainer:訊息容器

簡單訊息監聽器

  • 這個類非常強大,我們可以對他進行很多設定,對於消費者的設定項,這個類都可以滿足
  • 監聽佇列(多個佇列)、自動啓動、自動宣告功能
  • 設定事務特性、事務管理器、事務屬性、事務容器(併發)、是否開啓事務、回滾訊息等
  • 設定消費者數量、最小最大數量、批次消費
  • 設定訊息確認和自動確認模式、是否重回佇列、異常捕獲handler函數
  • 設定消費者標籤生成策略、是否獨佔模式、消費者屬性等
  • 設定具體的監聽器、訊息轉換器等等
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer (
    ConnectionFactory connectionFactory,
    Queue queueA, Queue queueB, Queue queueC) {
    SimpleMessageListenerContainer simpleMessageListenerContainer =
        new SimpleMessageListenerContainer(connectionFactory);
    // 監聽多個佇列
    simpleMessageListenerContainer.setQueues(queueA, queueB, queueC);
    // 當前消費者數量
    simpleMessageListenerContainer.setConcurrentConsumers(1);
    // 最大消費者數量
    simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
    // 是否重回佇列
    simpleMessageListenerContainer.setDefaultRequeueRejected(false);
    // 簽收機制 機製
    simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
    // 消費者標籤
    simpleMessageListenerContainer.setConsumerTagStrategy(queue -> {
        return queue + "_" + UUID.randomUUID().toString().substring(0,6);
    });

    // 監聽方式處理訊息
    simpleMessageListenerContainer.setMessageListener(message -> {
        System.out.println("消費者:" + new String(message.getBody()));
    });

    return simpleMessageListenerContainer;
}
  1. MessageListenerAdapter:訊息適配器

除了使用監聽方式處理訊息,還可以使用訊息監聽適配器方式來處理訊息

自定義訊息處理委派類

  • 預設方法名 handleMessage
  • 自定義方法名
  • 佇列系結自定義方法名
/**
* 自定義訊息委派類 {@link MessageDelegate}
* @author Kevin
*/
public class MessageDelegate {

   /**
    * handleMessage 預設方法名
    * body
    * @param body
    */
   public void handleMessage (byte[] body) {
       System.out.println("預設方法,訊息內容:" + new String(body));
   }
   
   /**
    * 自定義方法名
    * @param body
    */
   public void consumeMessage (byte[] body) {
       System.out.println("自定義方法,訊息內容:" + new String(body));
   }

   public void consumeMessage (String message) {
       System.out.println("自定義方法,訊息內容:" + message);
   }

   /**
    * 佇列系結的方法名
    * @param message
    */
   public void queueAMethod (String message) {
       System.out.println("queueAMethod,訊息內容:" + message);
   }

   public void queueBMethod (String message) {
       System.out.println("queueBMethod,訊息內容:" + message);
   }
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer (
   ConnectionFactory connectionFactory,
   Queue queueA, Queue queueB, Queue queueC) {
   SimpleMessageListenerContainer simpleMessageListenerContainer =
       new SimpleMessageListenerContainer(connectionFactory);
   // 監聽多個佇列
   simpleMessageListenerContainer.setQueues(queueA, queueB, queueC);
   // 當前消費者數量
   simpleMessageListenerContainer.setConcurrentConsumers(1);
   // 最大消費者數量
   simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
   // 是否重回佇列
   simpleMessageListenerContainer.setDefaultRequeueRejected(false);
   // 簽收機制 機製
   simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
   // 消費者標籤
   simpleMessageListenerContainer.setConsumerTagStrategy(queue -> {
       return queue + "_" + UUID.randomUUID().toString().substring(0,6);
   });

   // 適配方式,這裏委派類使用預設的方法來處理訊息 handleMessage
   MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
   // 指定訊息轉換器
   messageListenerAdapter.setMessageConverter(new TextMessageConverter());
   
   // 1. 指定自定義方法名
   messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
   
   // 2. 佇列名 和 方法名 匹配系結
   Map<String, String> queueOrTagToMethodName = new HashMap<>();
   queueOrTagToMethodName.put(TOPIC_EXCHANGE_QUEUE_A, "queueAMethod");
   queueOrTagToMethodName.put(TOPIC_EXCHANGE_QUEUE_B, "queueBMethod");
   messageListenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
   
   simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);

   return simpleMessageListenerContainer;
}
  1. MessageConverter :訊息轉換器

我們在進行發送訊息的時候,正常情況下訊息體爲二進制的數據方式進行傳輸,如果希望內部幫我們進行轉換,或者指定自定義的轉換器,就需要用到MessageConverter

  • 自定義訊息轉換器:MessageConverter,一般來講都需要實現這個介面
    • 重寫下面 下麪兩個方法:
      • toMessage:Java物件轉換爲Message
      • fromMessage:Message物件轉換爲Java物件
  • Json轉換器:Jackson2JsonMessageConverter,可以進行Java物件的轉換功能
  • DefaultJackson2JavaTypeMapper對映器:可以進行Java物件的對映關係(可以轉換多個Java物件)
  • 自定義二進制轉換器:比如圖片型別、PDF、PPT、串流媒體

Spring Boot 整合

引入Starter

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

生產端

@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * Confirm 確認訊息
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("CorrelationData: " + correlationData);
            System.out.println("ack: " + ack);
            if (!ack) {
                System.err.println("例外處理...");
            }
        }
    };

    /**
     * return 返回訊息
     */
    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
                                    String replyText, String exchange, String routingKey) {
            System.out.println("return exchange: " + exchange + ", routingKey: " + routingKey
                    + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    /**
     * 發送訊息
     * @param msg 訊息主體
     * @param properties 訊息屬性
     */
    public void send (Object msg, Map<String, Object> properties) {
        MessageHeaders messageHeaders = new MessageHeaders(properties);
        Message message = MessageBuilder.createMessage(msg, messageHeaders);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        CorrelationData correlationData = new CorrelationData();
        // 全域性唯一 id + 時間戳
        // message id
        correlationData.setId(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("spring.boot.topic.exchange", "boot.msg", message, correlationData);
    }

    /**
     * 發送 Order 訊息
     * @param order 物件
     */
    public void sendOrder (Order order) {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("spring.boot.order.exchange", "order.*", order, correlationData);
    }
}

消費端 @RabbitListener 監聽

不需要再使用 @Bean 方式來建立 Exchange、Queue、Binding

監聽並且建立佇列、交換機、routingkey

// 通過 ${} 方式讀取 Spring boot 組態檔中的參數
@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
                    durable = "${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
                    type = "${spring.rabbitmq.listener.order.exchange.type}",
                    durable = "${spring.rabbitmq.listener.order.exchange.durable}",
                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
        )
    )
@RabbitHandler
public void onMessage (@Payload Order order,
                       Channel channel,
                       @Headers Map<String, Object> headers) throws IOException {
    System.err.println("Order 訊息內容:" + order);
    Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
    // 手工ACK
    channel.basicAck(deliveryTag, false);
}

參數設定化

spring:
  rabbitmq:
    addresses: 192.168.229.111:5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 15000
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 5
        max-concurrency: 10
      # 自定義設定
      order:
        exchange:
          name: spring.boot.order.exchange
          durable: true
          type: topic
          ignoreDeclarationExceptions: true
        queue:
          name: spring.boot.order.queue
          durable: true
        key: order.*

Spring Cloud Stream 整合

@StreamListener

叢集架構

RabbitMQ叢集架構模式

構建一個高可靠的映象叢集