官網
簡單來說:RabbitMQ 是開源的、訊息導向中介層。RabbitMQ伺服器是用Erlang語言編寫的。
官網安裝介紹(windows)
【注意】: 在 windows 操作系統安裝 RabbitMQ 之前,需要安裝 Erlang。
安裝過程可以參考部落格:Windows下RabbitMQ安裝及設定
登陸成功後:
可以看出上述管理介面有許多資訊(Exchanges、Queues 等等),這裏我們先不看它。
RabbitMQ 是一個訊息代理:它接受、儲存、轉發訊息(二進制 blob 數據)。
RabbitMQ 和訊息傳遞通常使用一些術語
【注意】:
接下來是用Java編寫兩個程式:
如下圖:
新建一個 Maven 專案。
<!-- rabbitmq -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version>
</dependency>
<!-- slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
</dependency>
<!-- slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
如果不新增 「slf4j-simple」、「slf4j-api」依賴,會報錯:
發佈者將連線到 RabbitMQ,發送一條訊息,然後退出。如下圖:
public class Send {
// 佇列名
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
// 1. 設定 RabbitMQ 地址
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
// 2. 建立一個通道
Channel channel = connection.createChannel()) {
// 3. 宣告一個佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "Hello World";
// 4. 發送訊息到佇列中去
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println(" [x] Sent '" + msg + "'");
// 5. 關閉通道和連線
//channel.close();
//connection.close();
}
}
}
上述程式碼流程:
【注意】:在建立連線和通道的 Java 程式碼中,我這裏使用了 try-with-resources 語句。
try-with-resources 是 Java7 中一個新的例外處理機制 機製,它能夠很優雅地關閉在 try-catch 語句塊中使用的資源。try-with-resources 語句中使用的類都應實現 java.lang.AutoCloseable 介面
可以瞭解一下:Java使用Try with resources自動關閉資源
如:
private static void printFileJava7() throws IOException {
try(FileInputStream input = new FileInputStream("file.txt")) {
int data = input.read();
while(data != -1){
System.out.print((char) data);
data = input.read();
}
}
}
當 try 語句塊執行結束時,FileInputStream 會被自動關閉。這是因爲 FileInputStream 實現了java中的java.lang.AutoCloseable 介面。所有實現了這個介面的類都可以在try-with-resources結構中使用。
當 try-with-resources 結構中拋出一個異常,同時 FileInputStream 被關閉時(呼叫了其close方法)也拋出一個異常,try-with-resources 結構中拋出的異常會向外傳播,而 FileInputStream 被關閉時拋出的異常被抑制了。
消費者偵聽來自 RabbitMQ 的訊息並將其列印,如下圖:
public class Recv {
// 佇列名
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 確保改佇列已經存在
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (comsumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
【注意】:
啓動 Send 類,登陸管理介面。
新增了一個 hello 的佇列,並且還有一條 message。生產者已經把訊息發送到 RabbitMQ 中去了,等待消費者接受。
再啓動 Recv 類
message 爲 0,表示:消費者已接受
而且後臺也列印出了此條訊息