RabbitMQ訊息中介軟體技術精講

2020-08-13 09:46:14

第一章 課程介紹

1.3 業界主流訊息中介軟體介紹

ActiveMQ

  • 效能不好

ActiveMQ叢集模式

Kafka

  • 主要特點是基於Pull的模式來處理訊息消費,追求高吞吐量,一開始的目的就是用於日誌收集和傳輸。不支援事務,對訊息的重複、丟失、錯誤沒有嚴格要求,適合產生大量數據的網際網路服務的數據收集業務。

關注於效能,對數據可靠性要求不高

Kafka叢集模式

RocketMQ

  • 具有高吞吐量、高可用性、適合大規模分佈式系統應用的特點。起源於Kafka,對訊息的可靠傳輸及事務性做了優化
  • 不易維護
  • 商業版收費

RocketMQ叢集模式

RabbitMQ

  • 開源,效能好,穩定性好
  • 提供可靠性訊息投遞模式、返回模式
  • 與SpringAMQP完美地整合,API豐富
  • 叢集模式豐富,表達式設定,HA模式,映象佇列模型
  • 保證數據不丟失的前提下做到高可靠性,可用性
  • 效能和吞吐量不如Kafka,但是比ActiveMQ高

RabbitMQ叢集模式

第二章 RabbitMQ核心概念

2.3 RabbitMQ高效能的原因

  • Erlang語言最初在於交換機領域,這樣使得RabbitMQ在Broker之間進行數據互動的效能是非常優秀的
  • Erlang的優點:Erlang有着和原生Socket一樣的延遲

2.4 AMQP高階訊息佇列協定與模型

AMQP定義:是具有現代特徵的二進制協定。是應用層協定的一個開放標準,爲訊息導向中介層設計。

生產者投遞到Exchange上,消費者從Message Queue中取數據

2.5 AMQP核心概念講解

  • Server:又稱broker,接受用戶端的連線,實現AMQP實體服務
  • ````Connection```:應用程式與broker的連線
  • Channel:網路通道,幾乎所有的操作都在Channel中進行。Channel是進行訊息讀寫的通道,用戶端可建立多個Channel,每個Channel代表一個對談任務。類似與數據庫中的Session
  • Message:伺服器和應用程式之間傳送的數據。由PropertiesBidy組成。
    • Properties:可以對訊息進行修飾,比如訊息的優先順序、延遲等高階特性
    • Body:訊息體內容
  • Virtual Host:虛擬地址,用於進行邏輯隔離,最上層的訊息路由。一個Virtual Host裏面可以有若幹個Exchange和Queue,同一個Virtual Host裏面不能有相同名稱的Exchange或Queue。Vhost之於RabbitMQ就像虛擬機器之於物理伺服器一樣:它既能將同一個RabbitMQ的衆多客戶區分開來,又可以避免佇列和交換器的命名衝突
  • Exchange:交換機,接收訊息,根據路由鍵轉發訊息到系結的佇列
  • Binding:Exchange和Queue之間的虛擬鏈接,binding中可以包含routing key
  • Routing Key:路由規則,虛擬機器可用它來確定如何路由一個特定訊息
  • Queue:訊息佇列,儲存訊息並將它們轉發給消費者

2.6 RabbitMQ整體架構與訊息流轉

2.7 RabbitMq環境安裝

  • 服務的啓動:rabbitmq-server start &
  • 服務的停止:rabbitmqctl stop_app
  • 管理外掛:rabbitmq-plugins enable rabbitmq_management

5672是java端進行通訊的埠號;15672是管控台的埠號;25672是叢集通訊的埠號。

2.9 命令列與管理台結合講解

凡是可以在管理台執行的操作都可以通過命令列來操作。

高階操作

  • rabbitmqctl reset:移除所有數據,要在rabbitmqctl stop_app之後使用
  • rabbitmqctl change_cluster_node_type disc | ram:修改叢集節點的儲存形式
  • rabbitmqctl forget_cluster_node [--offline]:忘記節點(摘除節點)

2.10 生產者消費者模型構建

需要先執行消費者,因爲在消費者中宣告瞭佇列

如圖所示,有一個問題:

爲什麼生產者系結的routingKey和消費者監聽的queue名字一樣時(「test001」),訊息也可以路由過去?

RabbitMQ的生產者在發送訊息時,必須要指定一個exchange,如果exchange爲空的話(""),會走第一個exchange(AMQP default),這個exchange的路由規則就是按照routingKey的名字來尋找同名的queue,如果有,就把訊息路由過去;如果沒有,就把訊息刪除。

2.12 交換機詳解

Exchange:接收訊息,並根據路由鍵轉發訊息到所系結的佇列

交換機的屬性

  • Name:交換機名稱
  • Type:交換機型別:direct、topic、fanout、headers
  • Durability:是否需要持久化
  • Auto Delete:當最後一個系結到Exchange上的佇列刪除後,自動刪除該Exchange
  • Internal:當前Exchange是否用於RabbitMQ內部使用,預設爲False(不常使用)
  • Arguments:擴充套件參數,用於擴充套件AMQP協定自定製化

Direct Exchange

所有發送到Direct Exchange的訊息被轉發到RoutingKey中指定的Queue

Producer
// 3. 通過connection建立一個Channel
Channel channel = connection.createChannel();

// 4. 宣告
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";

// 5. 發送
String msg = "Direct Exchange Message";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
Consumer
// 3. 通過connection建立一個Channel
Channel channel = connection.createChannel();

// 4. 宣告
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String routingKey = "test.direct";
String queueName = "test_direct_queue";
// 宣告一個交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 宣告一個佇列
channel.queueDeclare(queueName, false, false, false, null);
// 建立一個系結關係
channel.queueBind(queueName, exchangeName, routingKey);

// 5. 建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

Topic Exchange

  • 所有發送到Topic Exchange的訊息被轉發到訂閱了RoutingKey中指定Topic的所有的Queue上

  • Exchange將RoutingKey和某Topic進行模糊匹配,此時佇列需要系結一個Topic

    模糊匹配可以使用萬用字元

    • #:匹配一個或多個詞:「log.#」可以匹配到log.info.oa
    • *:匹配不多不少一個詞:「log.*」只會匹配到log.erro

Producer
// 4. 宣告
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.delete.abc";

// 5. 發送
String msg = "Direct Exchange Message";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
Consumer
// 4. 宣告
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String routingKey = "user.#";
String queueName = "test_topic_queue";
// 宣告一個交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 宣告一個佇列
channel.queueDeclare(queueName, false, false, false, null);
// 建立一個系結關係
channel.queueBind(queueName, exchangeName, routingKey);

Consumer可以收到兩條訊息。

注意,如果要修改Consumer中的系結關係(例如修改成:channel.queueBind(queueName, exchangeName, "user.*);,需要先將原來的系結關係解綁,否則test_topic_queue和test_topic_exchange會有兩個系結關係( 「user.#」 和 「user.*」 )

Fanout Exchange

  • 不處理任何路由鍵,只需要簡單地將佇列系結到交換機上
  • 發送到交換機的訊息都會被轉發到與該交換機系結的所有佇列上
  • 轉發訊息的速度是最快的

Producer
// 4. 宣告
String exchangeName = "test_topic_exchange";

// 5. 發送
String msg = "Direct Exchange Message";
channel.basicPublish(exchangeName, "", null, msg.getBytes());
Consumer
// 4. 宣告
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String routingKey = "";            // 系結關係爲空串
String queueName = "test_fanout_queue";
// 宣告一個交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 宣告一個佇列
channel.queueDeclare(queueName, false, false, false, null);
// 建立一個系結關係
channel.queueBind(queueName, exchangeName, routingKey);

這裏Producer和Consumer中的routingKey可以隨便設定,不會有任何影響。

總結來說,對於不同的交換機型別,Producer的程式碼幾乎相同,只是Consumer端的exchangeType不同

2.15 系結、佇列、訊息、虛擬主機詳解

  • 系結:Exchange和Exchange、Queue之間的連線關係。系結中可以包含routingKey或者參數

  • 佇列:實際儲存訊息數據

    • Durability:是否持久化
    • Auto Delete:爲true時,當最後一個監聽被移除後,該Queue會自動被刪除
  • 訊息:伺服器和應用程式之間傳送的數據。由Properties和Payload組成

    • 常用屬性:delivery mode、headers(自定義屬性)

    • 其它屬性:

      • content_type、content_encoding、priority

      • correlation_id、reply_to、expiration、message_id

        correlation_id 通常和 reply_to 一起使用:用於RPC服務

        • replyTo:通常用來設定一個回撥佇列

        • correlationId:用來關聯請求和RPC呼叫後的恢復

      • timestamp、type、user_id、app_id、cluster_id

      Map<String, Object> headers = new HashMap<>();
      headers.put("蓉兒", "郭靖");
      headers.put("盈盈", "令狐沖");
      headers.put("趙敏", "張無忌");
      
      AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
          .deliveryMode(2)                    // 持久化訊息
          .contentEncoding("UTF-8")           // 字元集
          .expiration("10000")                // 10s 後如果這個訊息沒有被消費,則被自動移除
          .headers(headers)                   // 自定義屬性
          .build();
      
      // 4. 通過Channel發送數據
      for (int i = 0; i < 5; i++) {
          String msg = "Leihou RabbitMq";
          channel.basicPublish("", "test001", props, msg.getBytes());
      }
      
      while (true) {
          QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
          String msg = new String(delivery.getBody());
          Map<String, Object> headers = delivery.getProperties().getHeaders();
          System.out.println(msg + "------" + headers);
          // Envelope envelope = delivery.getEnvelope();
      }
      
  • 虛擬主機:虛擬地址,用於進行邏輯隔離,最上層的訊息路由

第三章 深入RabbitMQ高階特性

3.2 訊息如何保障100%的投遞成功

什麼是生產端的可靠性投遞?

  • 保障訊息的成功發出
  • 保障MQ節點的成功接收
  • 發送端收到MQ節點(Broker)確認應答
  • 完善的訊息進行補償機制 機製

網際網路大廠的解決方案:

  • 訊息落庫,對訊息狀態進行打標(訊息狀態的變化)
  • 訊息的延遲投遞,做二次確認,回撥檢查

方案一:訊息落庫,對訊息狀態進行打標

  • step1:業務數據入庫(BIZ DB),生成訊息也入庫(MSG DB),這個數據庫都要做持久化
  • step2:發送訊息
  • step3:Producer非同步監聽Broker返回的確認訊息
    • step4:從數據庫中把這條訊息抓取出來,對status進行更新(從0更新成1,訊息投遞成功)
    • step5:有一個初始timeout,當超過了這個timeout後status仍爲0,就從數據庫中把該條訊息抽取出來
    • step6:把訊息再次投遞,重複step3、4、5
    • step7:重試次數大於3時,把status設爲2,進行一些補償機制 機製

方案二:訊息的延遲投遞,做二次確認,回撥檢查

方案一在高併發的場景下不太合適,因爲需要做兩次數據庫的同步操作。

  • step1:先把訂單入庫後,再發送訊息
  • step2:發送延遲訊息(比如2分鐘後訊息纔到)
  • step3:消費端監聽並處理訊息
  • step4:消費端生成一條訊息(Send Confirm),投遞到MQ
  • step5:callback服務監聽(step4發送的服務),對訊息進行持久化的儲存(MSG DB)
  • step6:2分鐘後,step2的延遲訊息發送到MQ,Callback服務監聽延遲投遞的佇列,去檢查DB數據庫,檢視是否有Confirm
    • 如果沒有Confirm,Callback給Upstream發送一個RPC Resend命令。Upstream去查BIZ,再把這個訊息重新發出去

3.3 冪等性概念及業界主流解決方案

冪等性

借鑑數據庫的樂觀鎖機制 機製:比如執行一個更新庫存的SQL語句

UPDATE T_REPS SET count=count-1, version=version+1 where version=1

先查出來version在數據庫中的當前值,更新庫存時也更新version值,這樣如果有併發的更新請求過來,仍舊用的old version,就查不到數據了。

對一件事情進行操作,操作一次的結果和操作多次的結果是一樣的。

消費端-冪等性保障

在海量訂單產生的業務高峯期,如何避免訊息的重複消費問題

消費端實現冪等性,就意味着,我們的訊息永遠不會消費多次,即使我們收到了多條一樣的訊息

業界主流的冪等性操作:

  • 唯一ID + 指紋碼 機制 機製,利用數據庫主鍵去重
  • 利用Redis的原子性去實現

唯一ID + 指紋碼

SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指紋碼
  • 如果數量爲0,就執行insert操作
  • 如果數量爲1,就返回失敗

好處:實現簡單

壞處:高併發下有數據庫寫入的效能瓶頸

解決方案:跟進ID進行分庫分表進行演算法路由

利用Redis原子性實現

利用Redis進行冪等,需要考慮的問題:

  • 是否要進行數據落庫,如果落庫的話,關鍵解決的問題是數據庫和緩衝如何做到原子性
  • 如果不進行落庫,那麼都儲存到快取中,如何設定定時同步的策略

3.4 持久化

RabbitMQ的持久化分爲三個部分:

  • 交換器的持久化:在宣告交換器時將 durable 參數設定爲 true
    • 否則交換器的元數據會丟失,但訊息不會丟失,只是不能將訊息發送到這個交換器
  • 佇列的持久化:宣告佇列時將 durable 參數設定爲 true
    • 佇列的元數據會丟失,訊息也會丟失
  • 訊息的持久化:將訊息的投遞模式(BasicProperties 中的 deliveryMode 屬性)設定爲2
    • 確保RabbitMQ發生異常情況時,訊息不會丟失

單單隻設定佇列持久化,重新啓動之後訊息會丟失;單單隻設定訊息的持久化,重新啓動之後佇列消失,繼而訊息也
丟失。單單設定訊息持久化而不設定佇列的持久化顯得毫無意義。

將交換器、佇列、訊息都設定了持久化之後就能百分之百保證數據不丟失了嗎 ? 答案是否定的。

  • 從消費者來說,如果在訂閱消費佇列時將autoAck 參數設定爲true ,那麼當消費者接收到相關訊息之後,還沒來得及處理就看機了,這樣也算數據丟失。

    • 這種情況很好解決,將 autoAck 參數設定爲false , 並進行手動確認。
  • 在持久化的訊息正確存入RabbitMQ 之後,還需要有一段時間(雖然很短,但是不可忽視)才能 纔能存入磁碟之中。RabbitMQ 並不會爲每條訊息都進行同步存檔(呼叫內核的fsyncl方法)的處理,可能僅僅儲存到操作系統快取之中而不是物理磁碟之中。如果在這段時間內RabbitMQ 服務節點發生了異常情況,訊息儲存還沒來得及落盤,那麼這些訊息將會丟失。

    fsync 在Linux 中的意義在於同步數據到儲存裝置上。大多數塊裝置的數據都是通過快取進行的,將數據寫到檔案上通常將該數據由內核複製到快取中,如果快取尚未寫滿,則不將其排入輸出佇列上,而是等待其寫滿或者當內核需要重用該快取時,再將該快取排入輸出佇列,進而同步到裝置上。內核提供了fsync介面,使用者可以根據自己的需要通過此介面更新數據到儲存裝置上。

    • 可以使用RabbitMQ的映象佇列機制 機製來解決這個問題
  • 在發送端引入事務機制 機製或者發送方確認機制 機製保證訊息已經正確地發送並儲存到RabbitMQ中。

3.5 生產者確認

預設情況下生產者是不知道訊息有沒有正確地到達伺服器。如果在訊息到達伺服器之前己經丟失,持久化操作也解決不了這個問題。RabbitMQ 針對這個問題,提供了兩種解決方式:

  • 事務機制 機製
  • 發送方確認機制 機製(publisher confirm)

3.5.1 事務機制 機製

try {
    channel.txSelect();              // 將當前通道設定爲事務模式
    for(int i = 0; i < LOOP_TIMES; i++) { // 如果發送多條訊息,把發送和提交事務包進回圈中
        channel.basicPublish("exchange",
                         "routingKey",
                         MessageProperties.PERSISTENT_TEXT_PLAIN,
                         ("msg:" + i).getBytes());
        channel.txCommit();                // 提交事務
    }
}catch(Exception e) {
    e.printStackTrace();
    channel.txRollback();            // 事務回滾
}

使用事務機制 機製會極大影響RabbitMQ的效能,降低訊息吞吐量。

3.5.2 發送方確認機制 機製

事務機制 機製在一條訊息發送之後會 使發送端阻塞,以等待RabbitMQ 的迴應,之後才能 纔能繼續發送下一條訊息。相比之下, 發送方確認機制 機製最大的好處在於它是非同步的,一旦發佈一條訊息,生產者應用程式就可以在等通道返回確認的同時繼續發送下一條訊息,當訊息最終得到確認之後,生產者應用程式便可以通過回撥方法來處理該確認訊息,如果RabbitMQ 因爲自身內部錯誤導致訊息丟失,就會發送一條nack CBas i c . Nack) 命令,生產者應用程式同樣可以在回撥方法中處理該nack 命令。

如果佇列和訊息是持久化的,那麼確認訊息會在訊息寫入磁碟之後發出。

如何實現Confirm確認訊息

  1. 在channel上開啓確認模式:channel.confirmSelect()
  2. 在channel上新增監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對訊息進行重新發送、或記錄日誌等後續處理
// 3. 通過Connection建立一個新的Channel
Channel channel = connection.createChannel();

// 4. 指定我們的訊息投遞模式:訊息的確認模式
channel.confirmSelect();

String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";

// 5. 發送一條訊息
String msg = "hello, RabbitMQ send confirm message";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

// 6. 新增一個確認監聽
channel.addConfirmListener(new ConfirmListener() {
    /**
             *
             * @param deliveryTag 訊息的唯一標籤
             * @param multiple
             * @throws IOException
             */
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("----------------ack----------------");
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("----------------no ack----------------");
    }
});

Consumer沒有變化

3.5.3 注意

  • 事務機制 機製和 Publisher Confirm 機制 機製時互斥的
  • 事務機制 機製和 Publisher Confirm 機制 機製能保證正確發送至RabbitMQ的交換器,如果此交換器沒有匹配的佇列,那麼訊息也會丟失。此時需要發送方使用 mandatory 參數,詳見【3.6 Return返回訊息詳解】

3.6 Return返回訊息詳解

Return訊息機制 機製:

  • Return Listener用於處理一些不可路由的訊息
  • 某些情況下,在發送訊息時,當前的exchange不存在或者指定的路由key路由不到,這個時候我們需要監聽這種不可達的訊息,就要使用Return Listener
  • 有一個關鍵的設定項 mandatory:如果爲true,則監聽器會接收到路由不可達的訊息,然後進行後續處理;如果爲false,那麼broker端自動刪除該訊息

3.7 自定義消費者

之前的消費端使用while回圈

// 5. 建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    System.out.println("消費端接收" + new String(delivery.getBody()));
}

我們可以用一種更優雅的方式來實現消費者

// 5. 建立消費者
channel.basicConsume(queueName, true, new MyConsumer(channel));


class MyConsumer extends DefaultConsumer {

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("----------------------consume message");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

3.8 消費端的限流策略

什麼是消費端限流?

假設我們的RabbitMQ伺服器有上萬條未處理的訊息,我們隨便開啓一個消費者用戶端,巨量的訊息瞬間全部推播過來,但是我們單個用戶端無法同時處理這麼多數據,可能導致伺服器崩潰。

RabbitMQ提供了一種qos(服務品質保證)功能,即在非自動確認訊息的前提下,如果一定數目的訊息(通過基於consume或者channel設定Qos的值)未被確認前,不進行消費新的訊息。

所以autoack一定要設定爲false

/*
 * prefetchSize: 0,
 * prefetchCount: 1,一次最多推播給consumer 1 條訊息,收到回覆 回復後再推播下1條
 * global: false,這個限流作用在queue上,true表示作用再channel上
 */
channel.basicQos(0, 1, false);

// autoAck設定爲false
channel.basicConsume(queueName, false, new MyConsumer(channel));



public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        System.out.println("----------------------consume message");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));

       
        // 注意:這裏一定要ack,否則消費者會被block住
        // false表示不支援 批次簽收
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

3.10 消費端ACK與重回佇列機制 機製

消費端的手工ACK和NACK

  • 消費端進行消費的時候,如果由於業務異常我們可以進行日誌的記錄,然後進行補償
  • 如果由於伺服器宕機等嚴重問題,那我們就需要手工進行ACK保障消費端消費成功

消費端的重回佇列

消費端重回佇列是爲了對沒有處理成功的訊息,把訊息重新回遞給Broker。在實際應用中,一半都會***關閉重回佇列***。

Producer

for (int i = 0; i < 5; i++) {
    String msg = "hello, RabbitMQ send ack message " + i;

    Map<String, Object> headers = new HashMap<>();
    headers.put("num", i);

    AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
        .deliveryMode(2)
        .contentEncoding("UTF-8")
        .headers(headers)
        .build();

    channel.basicPublish(exchangeName, routingKey2, false, props, msg.getBytes());
}

channel.close();
connection.close();

Consumer

// autoAck設定爲false
channel.basicConsume(queueName, false, new MyConsumer(channel));


class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        System.out.println("----------------------consume message----------------------");
        System.err.println("body: " + new String(body));

        if((Integer)properties.getHeaders().get("num") == 1) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /*
             * 最後一個參數:requeue=true:消費失敗的這個訊息重回佇列,並且在佇列的尾部
             */
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        }else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
}

3.11 TTL訊息詳解

TTL:Time To Live的縮寫,也就是生存時間。RabbitMQ支援訊息的過期時間,在訊息發送時可以進行指定。也支援佇列的過期時間,從訊息入隊開始,只要超過了佇列的超時時間設定,那麼訊息會自動清除

如下程式碼是作用於訊息上的

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .expiration("10000")                // 10s 後如果這個訊息沒有被消費,則被自動移除
                .build();

如下程式碼是作用在佇列上,佇列上的所有訊息10s未被消費,都會被自動移除

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 10000);
channel.queueDeclare(queueName, true, false, false, arguments);

3.12 死信佇列DLX

利用DLX,當訊息在一個佇列中變成死信(dead message)之後,它能被重新publish到另一個Exchange(DLX)上

訊息變成死信有以下幾種情況:

  • 訊息被拒絕(basic.reject / basic.nack),並且 requeue=false
  • 訊息TTL過期
  • 佇列達到最大長度

DLX和一般的Exchange沒有任何區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。當這個佇列中有死信時,RabbitMQ會自動地將這個死信訊息重新發布到設定的Exchange上去,進而路由到另一個佇列。可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補RabbitMQ 3.0之前支援的immediate參數的功能。

Producer

String msg = "hello, RabbitMQ send dlx message";
for (int i = 0; i < 1; i++) {
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)                    // 持久化訊息
        .contentEncoding("UTF-8")           // 字元集
        .expiration("10000")                // 10s 後如果這個訊息沒有被消費,則被自動移除
        .build();
    channel.basicPublish(exchangeName, routingKey2, false, props, msg.getBytes());
}

channel.close();
connection.close();

Consumer

3.13 訊息傳輸保障

RabbitMQ 支援 「最多一次」 和 「最少一次」。

「最少一次投遞需要考慮以下這幾個方面」:

  • 生產者開啓 Publisher Confirm 機制 機製,確保訊息可以可靠地傳輸到 RabbitMQ 中
  • 生產者配合使用 mandatory 參數 來確保訊息能夠從交換器路由到佇列中
  • 訊息和佇列都要進行持久化
  • 消費者手動 ack

RabbitMQ 沒有去重機制 機製保證 「恰好一次」,去重處理一般是在業務用戶端實現,比如引入 Global Unique Identifier。