RabbitMQ的基本原理和模型

2020-08-07 18:09:35

RabbitMQ的基本原理與模型

1.爲什麼使用MQ

MQ是一箇中介軟體

(1)提升系統的響應速度、非同步操作

(2)提升系統的穩定性

(3)服務之間的解耦

(4)佇列FIFO

(5)消除峯值

2.爲什麼選擇RabbitMQ

Rocket和Kafka適合在大型專案中使用

中小型專案我們選擇ActiveMQ和RabbitMQ

img

單論功能功能齊全,可以將訊息持久化到數據庫中

但是RabbitMQ高併發支援比較好(得益於erlang語言)

基於AMQP協定的,跨語言存取的

雖然ActiveMQ功能強大,任何語言也可以存取,但是Java最優

RabbitMQ的優點:

1、使用簡單,功能強大。

2、基於AMQP協定。

3、社羣活躍,文件完善。

4、高併發效能好,這主要得益於Erlang語言。

5、Spring Boot預設已整合RabbitMQ

MQ的基本原理

在这里插入图片描述

3.安裝與啓動RabbitMQ

1.安裝erlang

因爲RabbitMQ是基於erlang語言編寫的,所以要想使用RabbitMQ,我們首先要在電腦上安裝erlang的環境

一鍵安裝安裝包,將安裝包下的bin設定到系統環境變數的path中

檢查是否成功設定環境

erl -version

2.安裝RabbitMQ

一鍵安裝用戶端

在快速啓動欄找到start啓動

在RabbitMQ的Sbin檔案下開啓cmd,啓用命令安裝RabbitMQ的視覺化外掛

rabbitmq-plugins.bat enable rabbitmq_management

輸入網址檢視是否安裝

http://127.0.0.1:15672/

在这里插入图片描述

使用者名稱和密碼都輸入預設的guest

4.使用java操作Rabbit

java操作RabbitMQ一共有7種模型,我們這裏只介紹5種比較常見的。

1.概述

1.基本模型

1.Hello World

img

一個生產者,預設轉換機,一個佇列,一個消費者

2.Work queues

img

一個生產者,一個佇列,多個消費者

一條訊息只能在一個消費者中進行消費

預設是平均分配訊息,我們可以做到按勞分配

2.訂閱模型

之前都是一個訊息被一個消費者消費

某些業務場景下,一個訊息要被多個消費者消費(羣聊)

通過設定不同的訂閱模型實現

之前訊息直接發到佇列中

現在要通過交換機Exchange進行路由,只要滿足路由條件,都被分發到對應的佇列中

佇列都有系結的消費者,這時候就可以實現一個訊息多個消費者消費

關鍵點:

(1)交換機

​ Fanout:廣播 發送給所有系結到這個交換機的佇列

​ Direct:根據指定的routing key(路由),將訊息發送到指定的一個或者多個佇列中

​ Topic:和Direct幾乎一樣,只是routing key支援萬用字元

​ # 匹配任意多個單詞 * 匹配任意一個單詞

(2)路由的設定

3.fanout廣播模型

img

生產者發送一條訊息到交換機,每一條佇列都可以通過交換機拉取這條訊息,然後分別傳遞給與佇列系結的消費者

4.Direct路由

img 路由模式

1.HelloWorld模型

一個生產者,預設交換機,一個消費者

專案測試

1.建立普通maven專案

2.導包

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.4.1</version>
</dependency>

3.編寫連線到RabbitEQ的工具類

public class ConnectionUtil {
    /**
     * 建立與RabbitMQ的連線的工具類
     */
    public static Connection getConnection() throws Exception{
        //定義連線工廠
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //設定服務地址
        connectionFactory.setHost("127.0.0.1");
        //設定埠
        connectionFactory.setPort(5672);
        //設定使用者名稱,密碼,VHost
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        //通過工廠獲取連線
        Connection connection=connectionFactory.newConnection();
        return connection;
    }
}

4.編寫生產者類

/**
 * 該模組實現的是Hello World模型的互動方式。
 * 該類的作用是模擬一個生產者,連線到RabbitMQ,向佇列中提供一條Message;
 */
public class producer {
    //定義一個常數
    private static final String QUEUE_NAME="queue_hello";

    public static void main(String[] args) throws Exception{
        //獲取連線
        Connection connection= ConnectionUtil.getConnection();
        //建立通道
        Channel channel=connection.createChannel();
        /**
         * 宣告一個佇列
         * @param queue 佇列名稱
         * @param durable 是否持久化佇列
         * @param exclusive 專有佇列,佇列是否獨佔此連線
         * @param autoDelete 是否自動刪除此佇列(沒有在使用的時候)
         * @param arguments 佇列的其他參數
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //模擬一條訊息
        String message = "helloworld小明" + System.currentTimeMillis();
        /**
         * 發佈一條訊息
         * @param exchange 發佈到哪個交換機
         * @param routingKey 路由的key
         * @param props 訊息的其他屬性 - routing headers 等等
         * @param body 訊息內容
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        //釋放資源
        channel.close();
        connection.close();
    }

}

5.編寫消費者類

public class Consumer {
    //定義一個常數,要與生產者中的值一樣,用來作爲標識
    private static final String QUEUE_NAME="queue_hello";

    public static void main(String[] args) throws Exception{
        //建立連線
        Connection connection= ConnectionUtil.getConnection();
        //建立通道
        Channel channel=connection.createChannel();
        /**
         * 宣告佇列,佇列一旦在MQ中宣告瞭,不需要反覆 反復宣告
         */


        DefaultConsumer consumer=new DefaultConsumer(channel){
            /**
             * 訊息接收的時候自動呼叫
             * @param consumerTag 消費者的標籤
             * @param envelope 訊息的封裝,訊息的相關資訊都封裝到了這個物件中
             * @param properties
             * @param body 訊息內容
             */
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                //將封裝的訊息body用message接收
                String message = new String(body);
                System.out.println(message);
            }
        };
        /**
         * 消費訊息
         * @param queue 佇列名稱
         * @param autoAck 自動回執
         * @param callback 回撥  監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
         */
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

訊息確認機制 機製ACK

如果訊息在消費過程中(拋出異常)出現問題,導致訊息沒有正常消費,然而訊息佇列中訊息已經刪除,導致訊息丟失。所以我們可以通過訊息確認機制 機製防止這種事情發生,只有訊息正常消費,向佇列回執成功的資訊,佇列接收到回執,再刪除訊息,否則訊息一直存在。

關閉自動回執

//channel.basicConsume(QUEUE_NAME,true,consumer); 
channel.basicConsume(QUEUE_NAME,false,consumer);

手動回執

//將封裝的訊息body用message接收
	String message = new String(body);
	System.out.println(message);
//第一個參數:訊息的標籤  第二個參數:true回執所有訊息,false僅回執當前訊息
	channel.basicAck(envelope.getDeliveryTag(),false);

2.work-queue模型

一個生產者,一個佇列,多個消費者

public class Consumer2 {

    private static final String QUEUE_NAME = "queue_work";

    public static void main(String[] args)throws Exception {
        //建立連線
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();
        //宣告佇列   佇列一旦在MQ中存中了,不需要重複宣告的

        //當前消費者只能預先拉取一條訊息進行消費,只有消費完了才能 纔能拉取另一條
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            /**
             * 訊息接收的時候自動呼叫
             * @param consumerTag 消費者的標籤
             * @param envelope 訊息的封裝,訊息的相關資訊都封裝到了這個物件中
             * @param properties
             * @param body 訊息內容
             */
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                //模擬500毫秒,時間越短,效能越高
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body);

                //int i = 1/0;//異常

                System.out.println("Consumer2:"+message);
                //手動回執  第一個參數 訊息的標籤    第二個參數  true回執所有訊息   false 僅僅回執當前訊息
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };



        /**
         * 消費訊息
         * @param queue 佇列名稱
         * @param autoAck 自動回執
         * @param callback 回撥  監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
         */
        //channel.basicConsume(QUEUE_NAME,true,consumer);
        //關閉訊息的自動確認
        channel.basicConsume(QUEUE_NAME,false,consumer);


    }

生產者向交換機傳遞一條訊息,訊息通過交換機系結到單個佇列,多個消費者通過輪詢的方式分別從佇列中獲取訊息。也可以通過在訊息中設定單次只能拉取一條訊息,必須等訊息消費完畢了過後才能 纔能拉取另一條,然後關閉訊息 自動確認,加入手動回執,這樣就能按效能、執行速度分配。我們這裏使用了一個執行緒來模擬消費者的效能。

3.Fanout模型

生產者:

public class producer {
    //定義一個常數,指定交換機的名稱
    private static final String EXCHANGE_NAME="exchange_fanout";
    private static final String QUEUE_NAME ="queue_fanout_1";

    public static void main(String[] args)throws Exception {
        //建立連線
        Connection connection= ConnectionUtil.getConnection();
        //建立通道
        Channel channel=connection.createChannel();
        /**
         *  定義一個交換機
         * @param exchange 交換機的名稱
         * @param type 交換機的型別
         */
        channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT);
        //定義一條訊息
        String message="今天天氣真真好!";
        /**
         * 發送一條訊息
         *
         * 訊息發送給交換機,佇列是儲存訊息的地方
         * 如果我只定義了交換機,交換機並沒有系結任何的佇列
         * 現在我向交換機中發送訊息,訊息丟失
         *
         * @param exchange 交換機名稱
         * @param routingKey 路由
         * @param props 訊息其他的屬性 - routing headers 等
         * @param body 訊息內容
         */
        channel.basicPublish("exchange_fanout","",null,message.getBytes());
        System.out.println("發送了一條訊息:"+message);

        //釋放資源
        channel.close();
        connection.close();
    }

後面消費者定義的佇列必須要和這個交換機系結,不然訊息就會丟失。

public class consumer {
    //定義交換機的名字
    private static final String EXCHANGE_NAME="exchange_fanout";
    //定義佇列的名字
    private static final String QUEUE_NAME ="queue_fanout_1";

    public static void main(String[] args) throws Exception{
        //建立連線
        Connection connection= ConnectionUtil.getConnection();
        //建立通道
        final Channel channel = connection.createChannel();
        //宣告一個佇列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /**
         *  系結佇列到交換機
         * @param queue 佇列名稱
         * @param exchange 交換機名稱
         * @param routingKey 路由
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        //當前消費者只能預先拉取一條訊息進行消費,只有消費完了才能 纔能拉取另一條
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            /**
             * 訊息接收的時候自動呼叫
             * @param consumerTag 消費者的標籤
             * @param envelope 訊息的封裝,訊息的相關資訊都封裝到了這個物件中
             * @param properties
             * @param body 訊息內容
             */
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {

                //模擬500毫秒,時間越短,效能越高
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                String message = new String(body);

                //int i = 1/0;//異常

                System.out.println("Consumer1:"+message);
                //手動回執  第一個參數 訊息的標籤    第二個參數  true回執所有訊息   false 僅僅回執當前訊息
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };

        /**
         * 消費訊息
         * @param queue 佇列名稱
         * @param autoAck 自動回執
         * @param callback 回撥  監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
         */
        //channel.basicConsume(QUEUE_NAME,true,consumer);
        //關閉訊息的自動確認
        channel.basicConsume(QUEUE_NAME,false,consumer);


    }
}

4.Direct路由模型

生產者系結交換機,交換機中設定路由,消費者中系結的佇列只有與交換機中路由相對應纔可以接受到訊息

生產者:

public class Producer {

    private static final String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args)throws Exception {

        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();

        /**
         *  定義一個交換機
         * @param exchange 交換機的名稱
         * @param type 交換機的型別
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);


        String message = "小明註冊成功!----" + System.currentTimeMillis() ;
        /**
         * 發送一條訊息
         *
         * 訊息發送給交換機,佇列是儲存訊息的地方
         * 如果我只定義了交換機,交換機並沒有系結任何的佇列
         * 現在我向交換機中發送訊息,訊息丟失
         *
         * @param exchange 交換機名稱
         * @param routingKey 路由
         * @param props 訊息其他的屬性 - routing headers 等
         * @param body 訊息內容
         */
        channel.basicPublish(EXCHANGE_NAME,"register.sms",null,message.getBytes());
        System.out.println("發送了一條訊息:"+message);


        //釋放資源
        channel.close();
        connection.close();

    }

消費者:

public class Consumer2 {

    private static final String QUEUE_NAME = "queue_direct_sms";
    private static final String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args)throws Exception {


        //建立連線
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        final Channel channel = connection.createChannel();
        //宣告佇列   佇列一旦在MQ中存中了,不需要重複宣告的
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /**
         *  系結佇列到交換機
         * @param queue 佇列名稱
         * @param exchange 交換機名稱
         * @param routingKey 路由
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"register.sms");

        //當前消費者只能預先拉取一條訊息進行消費,只有消費完了才能 纔能拉取另一條
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            /**
             * 訊息接收的時候自動呼叫
             * @param consumerTag 消費者的標籤
             * @param envelope 訊息的封裝,訊息的相關資訊都封裝到了這個物件中
             * @param properties
             * @param body 訊息內容
             */
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {

                //模擬500毫秒,時間越短,效能越高
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                String message = new String(body);

                //int i = 1/0;//異常

                System.out.println("sms:"+message);
                //手動回執  第一個參數 訊息的標籤    第二個參數  true回執所有訊息   false 僅僅回執當前訊息
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };



        /**
         * 消費訊息
         * @param queue 佇列名稱
         * @param autoAck 自動回執
         * @param callback 回撥  監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
         */
        //channel.basicConsume(QUEUE_NAME,true,consumer);
        //關閉訊息的自動確認
        channel.basicConsume(QUEUE_NAME,false,consumer);


    }

}

5.Topic萬用字元

與Direct路由類似,但是使用了萬用字元來實現多個佇列存取同一個交換機,其中,*可以匹配一個單詞,#可以匹配任意多個單詞

生產者:

public class Producer {

    private static final String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args)throws Exception {

        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel = connection.createChannel();

        /**
         *  定義一個交換機
         * @param exchange 交換機的名稱
         * @param type 交換機的型別
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);


        String message = "小明註冊成功!----" + System.currentTimeMillis() ;
        /**
         * 發送一條訊息
         *
         * 訊息發送給交換機,佇列是儲存訊息的地方
         * 如果我只定義了交換機,交換機並沒有系結任何的佇列
         * 現在我向交換機中發送訊息,訊息丟失
         *
         * @param exchange 交換機名稱
         * @param routingKey 路由
         * @param props 訊息其他的屬性 - routing headers 等
         * @param body 訊息內容
         */
        channel.basicPublish(EXCHANGE_NAME,"register.employee.user.email",null,message.getBytes());
        System.out.println("發送了一條訊息:"+message);


        //釋放資源
        channel.close();
        connection.close();

    }

消費者:

public class Consumer {

    private static final String QUEUE_NAME = "queue_topic_email";
    private static final String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args)throws Exception {


        //建立連線
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        final Channel channel = connection.createChannel();
        //宣告佇列   佇列一旦在MQ中存中了,不需要重複宣告的
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /**
         *  系結佇列到交換機
         * @param queue 佇列名稱
         * @param exchange 交換機名稱
         * @param routingKey 路由
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"register.*.email");

        //當前消費者只能預先拉取一條訊息進行消費,只有消費完了才能 纔能拉取另一條
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel){
            /**
             * 訊息接收的時候自動呼叫
             * @param consumerTag 消費者的標籤
             * @param envelope 訊息的封裝,訊息的相關資訊都封裝到了這個物件中
             * @param properties
             * @param body 訊息內容
             */
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {

                //模擬500毫秒,時間越短,效能越高
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                String message = new String(body);

                //int i = 1/0;//異常

                System.out.println("email:"+message);
                //手動回執  第一個參數 訊息的標籤    第二個參數  true回執所有訊息   false 僅僅回執當前訊息
                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };



        /**
         * 消費訊息
         * @param queue 佇列名稱
         * @param autoAck 自動回執
         * @param callback 回撥  監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
         */
        //channel.basicConsume(QUEUE_NAME,true,consumer);
        //關閉訊息的自動確認
        channel.basicConsume(QUEUE_NAME,false,consumer);


    }

}
   e.printStackTrace();
            }

            String message = new String(body);

            //int i = 1/0;//異常

            System.out.println("email:"+message);
            //手動回執  第一個參數 訊息的標籤    第二個參數  true回執所有訊息   false 僅僅回執當前訊息
            channel.basicAck(envelope.getDeliveryTag(),false);

        }
    };



    /**
     * 消費訊息
     * @param queue 佇列名稱
     * @param autoAck 自動回執
     * @param callback 回撥  監聽佇列,佇列中有訊息的時候,呼叫回撥的方法進行訊息的行爲
     */
    //channel.basicConsume(QUEUE_NAME,true,consumer);
    //關閉訊息的自動確認
    channel.basicConsume(QUEUE_NAME,false,consumer);


}

}