RabbitMQ簡介以及應用

2020-08-14 11:06:38

一、簡要介紹

  • 開源AMQP實現,Erlang語言編寫,支援多種用戶端

  • 分佈式、高可用、持久化、可靠、安全

  • 支援多種協定:AMQP、STOMP、MQTT、HTTP

  • 適用於多系統之間的業務解耦的訊息中介軟體

二、基本概念

1、exchange:交換器,負責接收訊息,轉發訊息至系結的佇列,有四種類型:

  • direct:完全匹配的路由

  • topic:模式匹配的路由

  • fanout:廣播模式

  • headers:鍵值對匹配路由

Exchange屬性:

  • 持久化:如果啓用,那麼rabbit服務重新啓動之後仍然存在

  • 自動刪除:如果啓用,那麼交換器將會在其系結的佇列都被刪除掉之後自動刪除掉自身

2、Queue:佇列,rabbitmq的內部物件,用於儲存訊息,其屬性類似於Exchange,同樣可以設定是否持久化、自動刪除等。

消費者從Queue中獲取訊息並消費。多個消費者可以訂閱同一個Queue,這時Queue中的訊息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的訊息並處理。

3、Binding:系結,根據路由規則系結交換器與佇列

4、Routing:路由鍵,路由的關鍵字

三、訊息的可靠性

  • Message acknowledgment:訊息確認,在訊息確認機制 機製下,收到回執纔會刪除訊息,未收到回執而斷開了連線,訊息會轉發給其他消費者,如果忘記回執,會導致訊息堆積,消費者重新啓動後會重複消費這些訊息並重復執行業務邏輯。

  • Message durability:訊息持久化,設定訊息持久化可以避免絕大部分訊息丟失,比如rabbitmq服務重新啓動,但是採用非持久化可以提升佇列的處理效率。如果要確保訊息的持久化,那麼訊息對應的Exchange和Queue同樣要設定爲持久化。

  • Prefetch count,每次發送給消費者訊息的數量,預設爲1

另外,如果需要可靠性業務,需要設定持久化和ack機制 機製,如果系統高吞吐,可以設定爲非持久化、noack、自動刪除機制 機製。

四、簡單應用

模擬這樣一個業務場景,使用者下單成功後,需要給使用者增加積分,同時還需要給使用者發送下單成功的訊息,這是在電商業務中很常見的一個業務場景。

如果系統是微服務架構,可能使用者下單功能在訂單服務,給使用者增加積分的功能在積分服務,給使用者發送通知訊息的功能在通知服務,各個服務之間解耦,互不影響。那麼要實現上述的業務場景,訊息中介軟體rabbitmq是一個很好的選擇。

原因如下:

  • 高效能,它的實現語言是天生具備高併發高可用的erlang 語言

  • 支援訊息的持久化,即使伺服器掛了,也不會丟失訊息

  • 訊息應答(ack)機制 機製,消費者消費完訊息後發送一個訊息應答,rabbitmq纔會刪除訊息,確保訊息的可靠性

  • 支援高可用叢集

  • 靈活的路由

實現思路:

使用者下單成功後,rabbitmq發送一條訊息至EXCHANGE.ORDER_CREATE交換器,該交換器系結了兩個佇列,QUEUE.ORDER_INCREASESCOREQUEUE.ORDER_NOTIFY,消費者訂閱這兩個佇列分別用來處理增加積分、發送使用者通知。如果後續日誌系統還需要記錄下單的相關日誌,那麼我們只需要再定義一個佇列並將其系結到EXCHANGE.ORDER_CREATE即可。

下單發rabbitmq訊息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/10/13 10:46
 * @description: 模擬使用者下單之後發送rabbitmq訊息
 */
public class OrderCreator {
    // 交換器名稱
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 訊息內容
    private static String msg = "create order success";

    /**
     * 模擬建立訂單後發送mq訊息
     */
    public void createOrder() {
        System.out.println("下單成功,開始發送rabbitmq訊息");

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");

        Connection connection;
        Channel channel;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 持久化
            boolean durable = true;
            // topic型別
            String type = "topic";
            // 宣告交換器,如果交換器不存在則建立之
            channel.exchangeDeclare(EXCHANGE, type, durable);

            String messgeId = UUID.randomUUID().toString();
            // deliveryMode>=2表示設定訊息持久化
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build();
            // 發佈訊息
            String routingKey = "order_create";
            channel.basicPublish(EXCHANGE, routingKey, props, msg.getBytes("utf-8"));
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

積分系統訂閱訊息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/10/13 16:02
 * @description: rabbitmq消費者,模擬下單成功後給使用者增加積分
 */
public class IncreaseScoreConsumer implements Consumer {
    private Connection connection;
    private Channel channel;
    // 交換器名稱
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 增加積分佇列名稱
    private static final String QUEUENAME = "QUEUE.ORDER_INCREASESCORE";


    public void consume() {
        // 初始化rabbitmq連線資訊
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 宣告交換器
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // 宣告佇列
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // 交換器與佇列系結並設定routingKey
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // 消費訊息,callback是該類,關閉自動確認訊息,在完成業務邏輯後手動確認確認
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("《積分系統》收到訂單訊息:" + msg + ",給使用者增加積分......");
        // 手動確認訊息
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);該方法會丟棄掉佇列中的這條訊息
         * channel.basicReject(envelope.getDeliveryTag(), true);該方法會把訊息重新放回佇列
         * 一般系統會設定一個重試次數,如果超過重試次數,則會丟棄訊息,反之則會把訊息再放入佇列
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }

}

通知系統訂閱訊息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/10/13 16:20
 * @description: rabbitmq消費者,模擬下單成功後給使用者發送通知
 */
public class NotifyConsumer implements Consumer {
    private Connection connection;
    private Channel channel;

    // 交換器名稱
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 通知使用者下單成功通知佇列名稱
    private static final String QUEUENAME = "QUEUE.ORDER_NOTIFY";


    public void consume() {
        // 初始化rabbitmq連線資訊
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 宣告交換器
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // 宣告佇列
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // 交換器與佇列系結並設定routingKey
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // 消費訊息,callback是該類,關閉自動確認訊息,在完成業務邏輯後手動確認確認
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("《通知系統》收到訂單訊息:" + msg + ",開始給使用者發送通知......");
        // 手動確認訊息
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);該方法會丟棄掉佇列中的這條訊息
         * channel.basicReject(envelope.getDeliveryTag(), true);該方法會把訊息重新放回佇列
         * 一般系統會設定一個重試次數,如果超過重試次數,則會丟棄訊息,反之則會把訊息再放入佇列
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }
}

測試

package com.robot.rabbitmq;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/10/13 16:27
 * @description:
 */
public class Test {
    public static void main(String[] args) {

        IncreaseScoreConsumer increaseScoreConsumer = new IncreaseScoreConsumer();
        increaseScoreConsumer.consume();

        NotifyConsumer notifyConsumer = new NotifyConsumer();
        notifyConsumer.consume();

        OrderCreator orderCreator = new OrderCreator();
        for (int i = 0; i < 3; i++) {
            orderCreator.createOrder();
        }
    }
}

輸出:

下單成功,開始發送rabbitmq訊息
《積分系統》收到訂單訊息:create order success,給使用者增加積分......
《通知系統》收到訂單訊息:create order success,開始給使用者發送通知......
下單成功,開始發送rabbitmq訊息
《積分系統》收到訂單訊息:create order success,給使用者增加積分......
《通知系統》收到訂單訊息:create order success,開始給使用者發送通知......
下單成功,開始發送rabbitmq訊息
《積分系統》收到訂單訊息:create order success,給使用者增加積分......
《通知系統》收到訂單訊息:create order success,開始給使用者發送通知...