RabbitMQ 入門

2020-08-09 21:19:18

1. RabbitMQ 簡介

官網
簡單來說:RabbitMQ 是開源的、訊息導向中介層。RabbitMQ伺服器是用Erlang語言編寫的。

2. RabbitMQ 安裝

官網安裝介紹(windows)
【注意】: 在 windows 操作系統安裝 RabbitMQ 之前,需要安裝 Erlang。

在这里插入图片描述
安裝過程可以參考部落格:Windows下RabbitMQ安裝及設定

2.1 判斷 RabbitMQ 是否安裝成功?

  1. 啓動 RabbitMQ 服務(可在工作管理員中檢視):rabbitmq-server.bat(安裝過程中已執行)
    在这里插入图片描述
  2. 登陸管理介面:http://localhost:15672(瀏覽器存取 )
    在这里插入图片描述
    預設使用者名稱和密碼都是 guest

登陸成功後:
在这里插入图片描述
可以看出上述管理介面有許多資訊(Exchanges、Queues 等等),這裏我們先不看它。

3. HelloWorld

RabbitMQ 是一個訊息代理:它接受、儲存、轉發訊息(二進制 blob 數據)。

3.1 常用術語

RabbitMQ 和訊息傳遞通常使用一些術語

  • P:生產者。發送訊息的程式
  • queue_name:佇列。存放訊息
  • C:消費者。接受訊息的程式

【注意】:

  1. 生產者、消費者和代理不必駐留在同一主機上;
  2. 一個應用程式也可以既是生產者又是消費者。

3.2 HelloWorld

3.2.1 demo 結構圖

在这里插入图片描述

3.2.2 demo 功能描述

接下來是用Java編寫兩個程式:

  1. 發送單個訊息的生產者;
  2. 接收訊息並將其列印出來的消費者

如下圖:

  1. P:生產者
  2. Box:佇列
  3. C:消費者

在这里插入图片描述

3.2.3 開發環境

  1. IDEA2019.1
  2. Maven3.5.3
  3. Jdk1.8

3.2.4 編碼

新建一個 Maven 專案。

3.2.4.1 新增依賴:

<!-- rabbitmq -->
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.5.0</version>
</dependency>
<!-- slf4j-simple -->
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-simple</artifactId>
  <version>1.7.26</version>
</dependency>
<!-- slf4j-api -->
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-api</artifactId>
  <version>1.7.26</version>
</dependency>

如果不新增 「slf4j-simple」、「slf4j-api」依賴,會報錯:
在这里插入图片描述

3.2.4.2 訊息生產者

發佈者將連線到 RabbitMQ,發送一條訊息,然後退出。如下圖:
在这里插入图片描述

public class Send {
    // 佇列名
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        // 1. 設定 RabbitMQ 地址
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             // 2. 建立一個通道
             Channel channel = connection.createChannel()) {
            // 3. 宣告一個佇列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String msg = "Hello World";
            // 4. 發送訊息到佇列中去
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println(" [x] Sent '" + msg + "'");
            // 5. 關閉通道和連線
            //channel.close();
            //connection.close();
        }
    }
}

上述程式碼流程:

  1. 設定 RabbitMQ 地址:可以是本機,也可以是遠端主機
  2. 建立一個新的連線
  3. 建立一個通道
  4. 宣告一個佇列:在RabbitMQ中,佇列宣告是冪等性的。如果不存在,就建立,如果存在,不會對已經存在的佇列產生任何影響
  5. 關閉通道和連線:已被註釋。因爲使用了 try-with-resources 語句,並且 Connection 和 Channel 都實現了 java.io.Closeable 介面(繼承了 AutoCloseable 介面),所以,它會自動爲我們關閉通道和連線,無須手動顯示關閉。

【注意】:在建立連線和通道的 Java 程式碼中,我這裏使用了 try-with-resources 語句。

3.2.4.3 try-with-resources 語句

try-with-resources 是 Java7 中一個新的例外處理機制 機製,它能夠很優雅地關閉在 try-catch 語句塊中使用的資源。try-with-resources 語句中使用的類都應實現 java.lang.AutoCloseable 介面

可以瞭解一下:Java使用Try with resources自動關閉資源

如:

private static void printFileJava7() throws IOException {
    try(FileInputStream input = new FileInputStream("file.txt")) {
        int data = input.read();
        while(data != -1){
            System.out.print((char) data);
            data = input.read();
        }
    }   
}

當 try 語句塊執行結束時,FileInputStream 會被自動關閉。這是因爲 FileInputStream 實現了java中的java.lang.AutoCloseable 介面。所有實現了這個介面的類都可以在try-with-resources結構中使用。

當 try-with-resources 結構中拋出一個異常,同時 FileInputStream 被關閉時(呼叫了其close方法)也拋出一個異常,try-with-resources 結構中拋出的異常會向外傳播,而 FileInputStream 被關閉時拋出的異常被抑制了。

3.2.4.4 訊息消費者

消費者偵聽來自 RabbitMQ 的訊息並將其列印,如下圖:
在这里插入图片描述

public class Recv {
    // 佇列名
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 確保改佇列已經存在
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (comsumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

【注意】:

  1. 消費者這邊監聽的佇列一定要存在
  2. 這裏不僅沒使用 try-with-resources 語句,而且還沒有顯示地關閉連線和通道,這是因爲:讓消費者一直保持非同步監聽的活動狀態。只要有佇列中有訊息,便能監聽到
  3. 我們將告訴伺服器將佇列中的訊息傳遞給我們。因爲它將非同步地向我們推播訊息,所以我們提供了一個物件形式的回撥,它將緩衝訊息,直到我們準備好使用它們。這就是 DeliverCallback 子類所做的。

3.2.4.5 測試

  1. 啓動 Send 類,登陸管理介面。
    在这里插入图片描述
    新增了一個 hello 的佇列,並且還有一條 message。生產者已經把訊息發送到 RabbitMQ 中去了,等待消費者接受。

  2. 再啓動 Recv 類
    在这里插入图片描述
    message 爲 0,表示:消費者已接受
    在这里插入图片描述
    而且後臺也列印出了此條訊息