(一)RabbitMQ的基礎部分

2020-08-07 23:33:58

(一)RabbitMQ的基礎部分

訊息中介軟體概述

什麼是訊息中介軟體

MQ全稱爲Message Queue,訊息佇列是應用程式和應用程式之間的通訊方法。

  • 爲什麼使用MQ

    在專案中,可將一些無需即時返回且耗時的操作提取出來,進行非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高系統吞吐量

  • 開發中訊息佇列通常有如下應用場景:

    1、任務非同步處理

    將不需要同步處理的並且耗時長的操作由訊息佇列通知訊息接收方進行非同步處理。提高了應用程式的響應時間。

    2、應用程式解耦合

    MQ相當於一箇中介,生產方通過MQ與消費方互動,它將應用程式進行解耦合。

    3、削峯填谷

    如訂單系統,在下單的時候就會往數據庫寫數據。但是數據庫只能支撐每秒1000左右的併發寫入,併發量再高就容易宕機。低峯期的時候併發也就100多個,但是在高峯期時候,併發量會突然激增到5000以上,這個時候數據庫肯定卡死了。
    在这里插入图片描述

    訊息被MQ儲存起來了,然後系統就可以按照自己的消費能力來消費,比如每秒1000個數據,這樣慢慢寫入數據庫,這樣就不會卡死數據庫了。

在这里插入图片描述

但是使用了MQ之後,限制消費訊息的速度爲1000,但是這樣一來,高峯期產生的數據勢必會被積壓在MQ中,高峯就被「削」掉了。但是因爲訊息積壓,在高峯期過後的一段時間內,消費訊息的速度還是會維持在1000QPS,直到消費完積壓的訊息,這就叫做「填谷」

在这里插入图片描述

AMQP 和 JMS

MQ是訊息通訊的模型;實現MQ的大致有兩種主流方式:AMQP、JMS。

AMQP

AMQP是一種協定,更準確的說是一種binary wire-level protocol(鏈接協定)。這是其和JMS的本質差別,AMQP不從API層進行限定,而是直接定義網路交換的數據格式。

AMQP,即 Advanced Message Queuing Protocol(高階訊息佇列協定),是一個網路協定,是應用層協定的一個開放標準,爲訊息導向中介層設計。基於此協定的用戶端與訊息中介軟體可傳遞訊息,並不受用戶端/中介軟體不同產品,不同的開發語言等條件的限制。2006年,AMQP 規範發佈。類比HTTP。

在这里插入图片描述

JMS

JMS即Java訊息服務(JavaMessage Service)應用程式介面,是一個Java平臺中關於訊息導向中介軟體(MOM)的API,用於在兩個應用程式之間,或分佈式系統中發送訊息,進行非同步通訊。

AMQP 與 JMS 區別

  • JMS是定義了統一的介面,來對訊息操作進行統一;AMQP是通過規定協定來統一數據互動的格式
  • JMS限定了必須使用Java語言;AMQP只是協定,不規定實現方式,因此是跨語言的。
  • JMS規定了兩種訊息模式;而AMQP的訊息模式更加豐富

訊息佇列產品

市場上常見的訊息佇列有如下:

  • ActiveMQ:基於JMS
  • ZeroMQ:基於C語言開發
  • RabbitMQ:基於AMQP協定,erlang語言開發,穩定性好
  • RocketMQ:基於JMS,阿裡巴巴產品
  • Kafka:類似MQ的產品;分佈式訊息系統,高吞吐量

RabbitMQ

RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高階訊息佇列協定)協定實現的訊息佇列,它是一種應用程式之間的通訊方法,訊息佇列在分佈式系統開發中應用非常廣泛。

RabbitMQ官方地址:http://www.rabbitmq.com/

RabbitMQ提供了6種模式(現在更新爲7種了):簡單模式,work模式,Publish/Subscribe發佈與訂閱模式,Routing路由模式,Topics主題模式,RPC遠端呼叫模式(遠端呼叫,不太算MQ;暫不作介紹);

官網對應模式介紹:https://www.rabbitmq.com/getstarted.html

在这里插入图片描述

RabbitMQ的基礎架構圖:

在这里插入图片描述

RabbitMQ 中的相關概念:

Broker:接收和分發訊息的應用,RabbitMQ Server就是 Message Broker

Virtual host:出於多租戶和安全因素設計的,把 AMQP 的基本元件劃分到一個虛擬的分組中,類似於網路中的 namespace 概念。當多個不同的使用者使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個使用者在自己的 vhost 建立 exchange/queue 等

Connection:publisher/consumer 和 broker 之間的 TCP 連線

Channel:如果每一次存取 RabbitMQ 都建立一個 Connection,在訊息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連線,如果應用程式支援多執行緒,通常每個thread建立單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助用戶端和message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作爲輕量級的 Connection 極大減少了操作系統建立 TCP connection 的開銷

Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發訊息到queue 中去。常用的型別有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:訊息最終被送到這裏等待 consumer 取走

Binding:exchange 和 queue 之間的虛擬連線,binding 中可以包含 routing key。Binding 資訊被儲存到 exchange 中的查詢表中,用於 message 的分發依據

RabbitMQ入門

準備工作:

建立maven工程

pom.xml檔案中新增如下依賴:

    <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>

RabbitMQ幾種常用的模式:

簡單模式

模型圖:
在这里插入图片描述

在上圖的模型中,有以下概念:

  • P:生產者,也就是要發送訊息的程式
  • C:消費者:訊息的接受者,會一直等待訊息到來。
  • queue:訊息佇列,圖中紅色部分。類似一個郵箱,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息。

編寫生產者:

public class HelloWorld_Producer {

    public static void main(String[] args) throws IOException, TimeoutException {


        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //宣告(建立)佇列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive,
         * boolean autoDelete, Map<String, Object> arguments)
         *參數:
         * 1.queue:佇列名稱
         * 2.durable:是否持久化,當mq重新啓動之後,還在
         * 3.exclusive:是否獨佔。只能有一個消費監聽這個佇列
         *             當Connection關閉時,是否刪除佇列
         * 4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
         * 5.arguments:參數
         *
         */
        //如果沒有一個名字叫做hello_word的佇列,則會建立該佇列,如果有則不會建立
        channel.queueDeclare("hello_world",true,false,false,null);

        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         *
         * 1.exchange:交換機的名字
         * 2.routingKey:路由名稱
         * 3.props:設定資訊
         * 4.body:發送的訊息數據
         */

        String body="hello rabbitmq~";
        //發送訊息
        channel.basicPublish("","hello_world",null,body.getBytes());

        //釋放資源
        channel.close();
        connection.close();

    }
}

編寫消費者:

public class HelloWorld_Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        //宣告(建立)佇列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive,
         * boolean autoDelete, Map<String, Object> arguments)
         *參數:
         * 1.queue:佇列名稱
         * 2.durable:是否持久化,當mq重新啓動之後,還在
         * 3.exclusive:是否獨佔。只能有一個消費監聽這個佇列
         *             當Connection關閉時,是否刪除佇列
         * 4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
         * 5.arguments:參數
         *
         */
        //如果沒有一個名字叫做hello_word的佇列,則會建立該佇列,如果有則不會建立

        channel.queueDeclare("hello_world",true,false,false,null);

        /**
         *
         *  basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         *  1.queue:佇列的名稱
         *  2.autoAck:是否自動確認
         *  3.callback:回撥物件
         *
         */

        //接收訊息
        Consumer consumer=new DefaultConsumer(channel){

            /**
             * 回撥方法,當收到訊息之後,會自動執行該方法
             *
             * @param consumerTag: 標識
             * @param envelope:獲取一些資訊,交換機,路由key
             * @param properties:設定資訊
             * @param body:數據
             * @throws IOException
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag======"+consumerTag);
                System.out.println("交換機的一些資訊====="+envelope.getExchange());
                System.out.println("路由key的一些訊息====="+envelope.getRoutingKey());
                System.out.println("properties===="+properties);
                System.out.println("body========="+new String(body));

            }
        };
        channel.basicConsume("hello_world",true,consumer);


        //關閉資源? 不要關閉資源,保持監聽即可
    }


}

測試結果:
在这里插入图片描述

Work queues工作佇列模式

模型圖:
在这里插入图片描述

Work Queues與入門程式的簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個佇列中的訊息。

應用場景:對於 任務過重或任務較多情況使用工作佇列可以提高任務處理的速度。

編寫生產者:

public class WorkQueues_Producer {

    public static void main(String[] args) throws IOException, TimeoutException {


        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //宣告(建立)佇列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive,
         * boolean autoDelete, Map<String, Object> arguments)
         *參數:
         * 1.queue:佇列名稱
         * 2.durable:是否持久化,當mq重新啓動之後,還在
         * 3.exclusive:是否獨佔。只能有一個消費監聽這個佇列
         *             當Connection關閉時,是否刪除佇列
         * 4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
         * 5.arguments:參數
         *
         */
        //如果沒有一個名字叫做hello_word的佇列,則會建立該佇列,如果有則不會建立
        channel.queueDeclare("work_queues",true,false,false,null);

        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         *
         * 1.exchange:交換機的名字
         * 2.routingKey:路由名稱
         * 3.props:設定資訊
         * 4.body:發送的訊息數據
         */


        for (int i=1;i<=10;i++){

            String body="序號"+i+"--work!!! rabbitmq~";

            //發送訊息
            channel.basicPublish("","work_queues",null,body.getBytes());
        }

        //釋放資源
        channel.close();
        connection.close();

    }
}

編寫消費者1:

public class WorkQueues1_Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        //宣告(建立)佇列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive,
         * boolean autoDelete, Map<String, Object> arguments)
         *參數:
         * 1.queue:佇列名稱
         * 2.durable:是否持久化,當mq重新啓動之後,還在
         * 3.exclusive:是否獨佔。只能有一個消費監聽這個佇列
         *             當Connection關閉時,是否刪除佇列
         * 4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
         * 5.arguments:參數
         *
         */
        //如果沒有一個名字叫做hello_word的佇列,則會建立該佇列,如果有則不會建立

        channel.queueDeclare("work_queues",true,false,false,null);

        /**
         *
         *  basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         *  1.佇列的名稱
         *  2.是否自動確認
         *  3.回撥物件
         *
         */

        //接收訊息
        Consumer consumer=new DefaultConsumer(channel){

            /**
             * 回撥方法,當收到訊息之後,會自動執行該方法
             *
             * @param consumerTag: 標識
             * @param envelope:獲取一些資訊,交換機,路由key
             * @param properties:設定資訊
             * @param body:數據
             * @throws IOException
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body========="+new String(body));

            }
        };
        channel.basicConsume("work_queues",true,consumer);


        //關閉資源? 不要關閉資源
    }


}

編寫消費者2:

public class WorkQueues2_Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        //宣告(建立)佇列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive,
         * boolean autoDelete, Map<String, Object> arguments)
         *參數:
         * 1.queue:佇列名稱
         * 2.durable:是否持久化,當mq重新啓動之後,還在
         * 3.exclusive:是否獨佔。只能有一個消費監聽這個佇列
         *             當Connection關閉時,是否刪除佇列
         * 4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
         * 5.arguments:參數
         *
         */
        //如果沒有一個名字叫做hello_word的佇列,則會建立該佇列,如果有則不會建立

        channel.queueDeclare("work_queues",true,false,false,null);

        /**
         *
         *  basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         *  1.佇列的名稱
         *  2.是否自動確認
         *  3.回撥物件
         *
         */

        //接收訊息
        Consumer consumer=new DefaultConsumer(channel){

            /**
             * 回撥方法,當收到訊息之後,會自動執行該方法
             *
             * @param consumerTag: 標識
             * @param envelope:獲取一些資訊,交換機,路由key
             * @param properties:設定資訊
             * @param body:數據
             * @throws IOException
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body========="+new String(body));

            }
        };
        channel.basicConsume("work_queues",true,consumer);


        //關閉資源? 不要關閉資源
    }


}

測試結果:

消費者1:
在这里插入图片描述

消費者2:
在这里插入图片描述

小結:

我們通過測試結果可以發現:

在一個佇列中如果有多個消費者,那麼消費者之間對於同一個訊息的關係是競爭的關係,但是他們是交替消費

Pub/Sub 發佈/訂閱模式

範例圖:
在这里插入图片描述

前面2個案例中,只有3個角色:

  • P:生產者,也就是要發送訊息的程式
  • C:消費者:訊息的接受者,會一直等待訊息到來。
  • queue:訊息佇列,圖中紅色部分

而在訂閱模型中,多了一個exchange角色,而且過程略有變化:

  • P:生產者,也就是要發送訊息的程式,但是不再發送到佇列中,而是發給X(交換機)
  • C:消費者,訊息的接受者,會一直等待訊息到來。
  • Queue:訊息佇列,接收訊息、快取訊息。
  • Exchange:交換機,圖中的X。一方面,接收生產者發送的訊息。另一方面,知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄。到底如何操作,取決於Exchange的型別。Exchange有常見以下3種類型:
    • Fanout:廣播,將訊息交給所有系結到交換機的佇列
    • Direct:定向,把訊息交給符合指定routing key 的佇列
    • Topic:萬用字元,把訊息交給符合routing pattern(路由模式) 的佇列

Exchange(交換機)只負責轉發訊息,不具備儲存訊息的能力,因此如果沒有任何佇列與Exchange系結,或者沒有符合路由規則的佇列,那麼訊息會丟失!

發佈訂閱模式:
1、每個消費者監聽自己的佇列。
2、生產者將訊息發給broker,由交換機將訊息轉發到系結此交換機的每個佇列,每個系結交換機的佇列都將接收
到訊息

編寫生產者:

public class PubSub_Producer {

    public static void main(String[] args) throws IOException, TimeoutException {


        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //建立交換機
        /**
         *
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable,
         * boolean autoDelete, boolean internal, Map<String, Object> arguments)
         *
         * 1.exchange:交換機的名稱
         * 2.type:交換機型別
         *     DIRECT("direct"), 定向
         *     FANOUT("fanout"), 扇形(廣播),發送訊息到每一個與之系結的佇列
         *     TOPIC("topic"), 萬用字元的方式
         *     HEADERS("headers"); 參數匹配
         *
         * 3.durable:是否持久化
         * 4.autoDelete:自動刪除
         * 5.internal:內部使用。一般爲false
         * 6.arguments:參數
         */

        String exchangeName="test_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);

        //建立佇列
        String queue1Name="test_fanout_queue1";
        String queue2Name="test_fanout_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //系結佇列和交換機
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * 1.queue:佇列名稱
         * 2.exchange:交換機名稱
         * 3.routingKey:路由鍵,系結規則
         *        如果交換機型別爲fanout,routingKey設定爲"",代表之後
         *        會將訊息分發給每一個與之系結的佇列
         *
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body="日誌資訊: 李四呼叫了findAll方法----日誌級別info----";
        //發送訊息
        channel.basicPublish(exchangeName,"",null , body.getBytes());

        //釋放資源
        channel.close();
        connection.close();

    }
}

編寫消費者1:

public class PubSub1_Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        String queue1Name="test_fanout_queue1";
        String queue2Name="test_fanout_queue2";

        /**
         *
         *  basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         *  1.佇列的名稱
         *  2.是否自動確認
         *  3.回撥物件
         *
         */

        //接收訊息
        Consumer consumer=new DefaultConsumer(channel){

            /**
             * 回撥方法,當收到訊息之後,會自動執行該方法
             *
             * @param consumerTag: 標識
             * @param envelope:獲取一些資訊,交換機,路由key
             * @param properties:設定資訊
             * @param body:數據
             * @throws IOException
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body========="+new String(body));
                System.out.println("=====將日誌資訊列印到控制檯=====");

            }
        };
        channel.basicConsume(queue1Name,true,consumer);


        //關閉資源? 不要關閉資源
    }


}

編寫消費者2:

public class PubSub2_Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        String queue1Name="test_fanout_queue1";
        String queue2Name="test_fanout_queue2";

        /**
         *
         *  basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         *  1.佇列的名稱
         *  2.是否自動確認
         *  3.回撥物件
         *
         */

        //接收訊息
        Consumer consumer=new DefaultConsumer(channel){

            /**
             * 回撥方法,當收到訊息之後,會自動執行該方法
             *
             * @param consumerTag: 標識
             * @param envelope:獲取一些資訊,交換機,路由key
             * @param properties:設定資訊
             * @param body:數據
             * @throws IOException
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body========="+new String(body));
                System.out.println("=====將日誌資訊存入數據庫=====");

            }
        };
        channel.basicConsume(queue2Name,true,consumer);


        //關閉資源? 不要關閉資源
    }


}

測試結果:

我們可以看到該交換機系結了兩個佇列,且沒有routing Key
在这里插入图片描述

消費者1:
在这里插入图片描述

消費者2:
在这里插入图片描述

Routing路由模式

模式說明

路由模式特點:

  • 佇列與交換機的系結,不能是任意系結了,而是要指定一個RoutingKey(路由key)
  • 訊息的發送方在 向 Exchange發送訊息時,也必須指定訊息的 RoutingKey
  • Exchange不再把訊息交給每一個系結的佇列,而是根據訊息的Routing Key進行判斷,只有佇列的Routingkey與訊息的 Routing key完全一致,纔會接收到訊息

在这里插入图片描述

圖解:

  • P:生產者,向Exchange發送訊息,發送訊息時,會指定一個routing key。
  • X:Exchange(交換機),接收生產者的訊息,然後把訊息遞交給 與routing key完全匹配的佇列
  • C1:消費者,其所在佇列指定了需要routing key 爲 error 的訊息
  • C2:消費者,其所在佇列指定了需要routing key 爲 info、error、warning 的訊息

比較:

在編碼上與 Publish/Subscribe發佈與訂閱模式 的區別是交換機的型別爲:Direct,還有佇列系結交換機的時候需要指定routing key。

編寫生產者:

public class Routing_Producer {

    public static void main(String[] args) throws IOException, TimeoutException {


        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //建立交換機
        /**
         *
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable,
         * boolean autoDelete, boolean internal, Map<String, Object> arguments)
         *
         * 1.exchange:交換機的名稱
         * 2.type:交換機型別
         *     DIRECT("direct"), 定向
         *     FANOUT("fanout"), 扇形(廣播),發送訊息到每一個與之系結的佇列
         *     TOPIC("topic"), 萬用字元的方式
         *     HEADERS("headers"); 參數匹配
         *
         * 3.durable:是否持久化
         * 4.autoDelete:自動刪除
         * 5.internal:內部使用。一般爲false
         * 6.arguments:參數
         */

        //路由模式的交換機的模式爲: direct
        String exchangeName="test_direct";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);

        //建立佇列
        String queue1Name="test_direct_queue1";
        String queue2Name="test_direct_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //系結佇列和交換機
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * 1.queue:佇列名稱
         * 2.exchange:交換機名稱
         * 3.routingKey:路由鍵,系結規則
         *        如果交換機型別爲fanout,routingKey設定爲"",代表之後
         *        會將訊息分發給每一個與之系結的佇列
         *
         */
        //佇列1系結error
        channel.queueBind(queue1Name,exchangeName,"error");
        //佇列2系結info、error、warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");

        String body="日誌資訊: 李四呼叫了findAll方法----日誌級別info----";
        //發送訊息
        channel.basicPublish(exchangeName,"info",null , body.getBytes());

        //釋放資源
        channel.close();
        connection.close();

    }
}

編寫消費者1:

public class Routing1_Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        String queue1Name="test_direct_queue1";
        String queue2Name="test_direct_queue2";

        /**
         *
         *  basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         *  1.佇列的名稱
         *  2.是否自動確認
         *  3.回撥物件
         *
         */

        //接收訊息
        Consumer consumer=new DefaultConsumer(channel){

            /**
             * 回撥方法,當收到訊息之後,會自動執行該方法
             *
             * @param consumerTag: 標識
             * @param envelope:獲取一些資訊,交換機,路由key
             * @param properties:設定資訊
             * @param body:數據
             * @throws IOException
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body========="+new String(body));
                System.out.println("=====將日誌資訊存入數據庫=====");

            }
        };
        channel.basicConsume(queue1Name,true,consumer);


        //關閉資源? 不要關閉資源
    }


}

編寫消費者2:

public class Routing2_Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        String queue1Name="test_direct_queue1";
        String queue2Name="test_direct_queue2";

        /**
         *
         *  basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         *  1.佇列的名稱
         *  2.是否自動確認
         *  3.回撥物件
         *
         */

        //接收訊息
        Consumer consumer=new DefaultConsumer(channel){

            /**
             * 回撥方法,當收到訊息之後,會自動執行該方法
             *
             * @param consumerTag: 標識
             * @param envelope:獲取一些資訊,交換機,路由key
             * @param properties:設定資訊
             * @param body:數據
             * @throws IOException
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body========="+new String(body));
                System.out.println("=====將日誌資訊列印到控制檯=====");

            }
        };
        channel.basicConsume(queue2Name,true,consumer);


        //關閉資源? 不要關閉資源
    }


}

測試:

路由模式下交換機切爲direct

我們將佇列1與交換機系結時的routingKey設爲error

我們將佇列2與交換機系結時的routingKey設有info、error、warning

開始測試:

我們把生產者發送訊息時的routingKey設定爲 info

結果:

消費者1:

在这里插入图片描述

消費者2:

在这里插入图片描述

我們把生產者發送訊息時的routingKey設定爲 error

結果:

消費者1:
在这里插入图片描述

消費者2:

在这里插入图片描述

小結:

Routing模式要求佇列在系結交換機時要指定routing key,訊息會轉發到符合routing key的佇列。

Topics萬用字元模式

模式說明

Topic型別與Direct相比,都是可以根據RoutingKey把訊息路由到不同的佇列。只不過Topic型別Exchange可以讓佇列在系結Routing key 的時候使用萬用字元

Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以」.」分割,例如: savior.insert

萬用字元規則:

#:匹配一個或多個詞

*:匹配不多不少恰好1個詞

舉例:

savior.#:能夠匹配savior.insert.abc 或者 savior.insert

savior.*:只能匹配savior.insert

在这里插入图片描述

在这里插入图片描述

圖解:

  • 紅色Queue:系結的是usa.# ,因此凡是以 usa.開頭的routing key 都會被匹配到
  • 黃色Queue:系結的是#.news ,因此凡是以 .news結尾的 routing key 都會被匹配

編寫生產者:


public class Topic_Producer {

    public static void main(String[] args) throws IOException, TimeoutException {


        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //建立交換機
        /**
         *
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable,
         * boolean autoDelete, boolean internal, Map<String, Object> arguments)
         *
         * 1.exchange:交換機的名稱
         * 2.type:交換機型別
         *     DIRECT("direct"), 定向
         *     FANOUT("fanout"), 扇形(廣播),發送訊息到每一個與之系結的佇列
         *     TOPIC("topic"), 萬用字元的方式
         *     HEADERS("headers"); 參數匹配
         *
         * 3.durable:是否持久化
         * 4.autoDelete:自動刪除
         * 5.internal:內部使用。一般爲false
         * 6.arguments:參數
         */

        //topic 萬用字元模式下交換機爲 topic
        String exchangeName="test_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);

        //建立佇列
        String queue1Name="test_topic_queue1";
        String queue2Name="test_topic_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        //系結佇列和交換機
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * 1.queue:佇列名稱
         * 2.exchange:交換機名稱
         * 3.routingKey:路由鍵,系結規則
         *        如果交換機型別爲fanout,routingKey設定爲"",代表之後
         *        會將訊息分發給每一個與之系結的佇列
         *
         */
        //佇列1 系結的routingKey爲 #.error 、order.*
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        //佇列2 系結的routingKey爲 *.*
        channel.queueBind(queue2Name,exchangeName,"*.*");

        String body="日誌資訊: 李四呼叫了delete方法----日誌級別error----";
        //發送訊息
        channel.basicPublish(exchangeName,"order.error",null , body.getBytes());

        //釋放資源
        channel.close();
        connection.close();

    }
}

編寫消費者1:

public class Topic1_Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        String queue1Name="test_topic_queue1";
        String queue2Name="test_topic_queue2";

        /**
         *
         *  basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         *  1.佇列的名稱
         *  2.是否自動確認
         *  3.回撥物件
         *
         */

        //接收訊息
        Consumer consumer=new DefaultConsumer(channel){

            /**
             * 回撥方法,當收到訊息之後,會自動執行該方法
             *
             * @param consumerTag: 標識
             * @param envelope:獲取一些資訊,交換機,路由key
             * @param properties:設定資訊
             * @param body:數據
             * @throws IOException
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body========="+new String(body));
                System.out.println("=====將日誌資訊存入數據庫=====");

            }
        };
        channel.basicConsume(queue1Name,true,consumer);


        //關閉資源? 不要關閉資源
    }


}

編寫消費者2:

public class Topic2_Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連線工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主機地址;預設爲 localhost
        connectionFactory.setHost("192.168.237.128");
        //連線埠;預設爲 5672
        connectionFactory.setPort(5672);
        //虛擬主機名稱;預設爲 /
        connectionFactory.setVirtualHost("/savior");
        //連線使用者名稱;預設爲guest
        connectionFactory.setUsername("root");
        //連線密碼;預設爲guest
        connectionFactory.setPassword("root");

        //建立新連線
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();


        String queue1Name="test_topic_queue1";
        String queue2Name="test_topic_queue2";

        /**
         *
         *  basicConsume(String queue, boolean autoAck, Consumer callback)
         *
         *  1.佇列的名稱
         *  2.是否自動確認
         *  3.回撥物件
         *
         */

        //接收訊息
        Consumer consumer=new DefaultConsumer(channel){

            /**
             * 回撥方法,當收到訊息之後,會自動執行該方法
             *
             * @param consumerTag: 標識
             * @param envelope:獲取一些資訊,交換機,路由key
             * @param properties:設定資訊
             * @param body:數據
             * @throws IOException
             */

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body========="+new String(body));
                System.out.println("=====將日誌資訊列印到控制檯=====");

            }
        };
        channel.basicConsume(queue2Name,true,consumer);


        //關閉資源? 不要關閉資源
    }


}

測試:

將發送訊息時的將路由鍵routingKey設爲 order.error

佇列1 系結的routingKey爲 #.error 、order.*

佇列2 系結的routingKey爲 .

消費者1:

在这里插入图片描述

消費者2:

在这里插入图片描述

小結:

Topic主題模式可以實現 Publish/Subscribe發佈與訂閱模式Routing路由模式 的功能;只是Topic在設定routing key 的時候可以使用萬用字元,顯得更加靈活。

模式總結:

RabbitMQ工作模式:
1、簡單模式 HelloWorld
一個生產者、一個消費者,不需要設定交換機(使用預設的交換機)

2、工作佇列模式 Work Queue
一個生產者、多個消費者(競爭關係),不需要設定交換機(使用預設的交換機)

3、發佈訂閱模式 Publish/subscribe
需要設定型別爲fanout的交換機,並且交換機和佇列進行系結,當發送訊息到交換機後,交換機會將訊息發送到系結的佇列

4、路由模式 Routing
需要設定型別爲direct的交換機,交換機和佇列進行系結,並且指定routing key,當發送訊息到交換機後,交換機會根據routing key將訊息發送到對應的佇列

5、萬用字元模式 Topic
需要設定型別爲topic的交換機,交換機和佇列進行系結,並且指定萬用字元方式的routing key,當發送訊息到交換機後,交換機會根據routing key將訊息發送到對應的佇列

Spring 整合RabbitMQ

搭建生產者工程

建立maven專案,新增依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>spring-rabbitmq-producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>



</project>

設定整合

在resource資料夾下建立rabbitmq.properties連線參數等組態檔;

rabbitmq.host=192.168.237.128
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.virtual-host=/savior

在resource資料夾下建立spring-rabbitmq-producer.xml 的spring組態檔;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--載入組態檔-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定義rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <!--定義管理交換機、佇列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定義持久化佇列,不存在則自動建立;不系結到交換機則系結到預設交換機
    預設交換機型別爲direct,名字爲:"",路由鍵爲佇列的名稱
    -->
    <!--
        id:bean的名稱
        name:queue的名稱
        auto-declare:自動建立
        auto-delete:自動刪除。 最後一個消費者和該佇列斷開連線後,自動刪除佇列
        exclusive:是否獨佔
        durable:是否持久化
    -->

    <rabbit:queue id="spring_queue" name="spring_queue"    auto-declare="true"/>

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~廣播;所有佇列都能收到訊息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
    <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

    <!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
    <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

    <!--定義廣播型別交換機;並系結上述兩個佇列-->
    <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange"  auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding  queue="spring_fanout_queue_1"  />
            <rabbit:binding queue="spring_fanout_queue_2"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!--<rabbit:direct-exchange name="aa" >
        <rabbit:bindings>
            &lt;!&ndash;direct 型別的交換機系結佇列  key :路由key  queue:佇列名稱&ndash;&gt;
            <rabbit:binding queue="spring_queue" key="xxx"></rabbit:binding>
        </rabbit:bindings>

    </rabbit:direct-exchange>-->

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~萬用字元;*匹配一個單詞,#匹配多個單詞 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
    <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star"  auto-declare="true"/>
    <!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
    <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
    <!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
    <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>

    <rabbit:topic-exchange id="spring_topic_exchange"  name="spring_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="savior.*"  queue="spring_topic_queue_star"/>
            <rabbit:binding pattern="savior.#" queue="spring_topic_queue_well"/>
            <rabbit:binding pattern="savior.#" queue="spring_topic_queue_well2"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定義rabbitTemplate物件操作可以在程式碼中方便發送訊息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>

發送訊息

建立測試類,在裏面建立發送訊息

利用rabbitTemplate呼叫convertAndSend根據不同模式的需求選取不同參數的過載方法

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

    @Autowired
    RabbitTemplate rabbitTemplate;


    //簡單模式下的發訊息
    @Test
    public void testHelloWorld(){

        rabbitTemplate.convertAndSend("spring_queue","hello world spring~~~~");

    }

    //發佈訂閱模式下 fanout  的發訊息
    @Test
    public void testFanout(){

        rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout~~~~");
    }


    //萬用字元模式下 topic  的發訊息
    @Test
    public void testTopic(){

        rabbitTemplate.convertAndSend("spring_topic_exchange","savior.niu.bi","savior come on from spring~~`");

    }

}

搭建消費者工程

新增依賴、建立組態檔rabbitmq.properties 這兩個同上生產者

建立spring-rabbitmq-consumer.xml (spring組態檔)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--載入組態檔-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定義rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <bean id="springQueueListener" class="com.duanping.rabbitmq.listener.SpringQueueListener"/>
    <bean id="fanoutListener1" class="com.duanping.rabbitmq.listener.FanoutListener1"/>
<!--    <bean id="fanoutListener2" class="com.duanping.rabbitmq.listener.FanoutListener2"/>-->
<!--    <bean id="topicListenerStar" class="com.duanping.rabbitmq.listener.TopicListenerStar"/>
    <bean id="topicListenerWell" class="com.duanping.rabbitmq.listener.TopicListenerWell"/>
    <bean id="topicListenerWell2" class="com.duanping.rabbitmq.listener.TopicListenerWell2"/>-->

    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
        <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/><!--
        <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
        <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
        <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
        <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
    </rabbit:listener-container>
</beans>

建立訊息監聽器

簡單模式監聽器
public class SpringQueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println(new String(message.getBody()));
    }
}

廣播模式監聽器1
public class FanoutListener1 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println(new String(message.getBody()));
    }
}

Spring Boot整合RabbitMQ

在Spring專案中,可以使用Spring-Rabbit去操作RabbitMQ
https://github.com/spring-projects/spring-amqp

尤其是在spring boot專案中只需要引入對應的amqp啓動器依賴即可,方便的使用RabbitTemplate發送訊息,使用註解接收訊息。

一般在開發過程中

生產者工程:

  1. application.yml檔案設定RabbitMQ相關資訊;

  2. 在生產者工程中編寫設定類,用於建立交換機和佇列,並進行系結

  3. 注入RabbitTemplate物件,通過RabbitTemplate物件發送訊息到交換機

消費者工程:

  1. application.yml檔案設定RabbitMQ相關資訊

  2. 建立訊息處理類,用於接收佇列中的訊息並進行處理

搭建生產者工程

利用springboot快速構建工程

然後在pom.xml中新增以下依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

設定RabbitMQ

組態檔
spring:
  rabbitmq:
    host: 192.168.237.128
    port: 5672
    username: root
    password: root
    virtual-host: /savior

系結交換機和佇列

建立RabbitMQ佇列與交換機系結的設定類

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME="boot_topic_exchange";
    public static final String QUEUE_NAME="boot_queue";

    //交換機

    @Bean("bootExchange")
    public Exchange bootExchange(){

        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //佇列

    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //佇列和交換機系結關係 Binding
    /*
      1.知道哪個佇列
      2.知道哪個交換機
      3.routing Key
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){

        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();

    }


}

發送訊息

@SpringBootTest
class DemoApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Test
    public void test1(){

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.nice","springboot rabbitMQ~~~");
    }


}

搭建消費者工程

新增依賴和生產者相同

建立application.yml

spring:
  rabbitmq:
    host: 192.168.237.128
    port: 5672
    username: root
    password: root
    virtual-host: /savior


訊息監聽處理類

@Component
public class RabbitMQListener {

    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message){
        System.out.println(new String(message.getBody()));
    }
}