MQ全稱爲Message Queue,訊息佇列是應用與應用之間的通訊方法
jms是java訊息通訊的一種應用程式介面,同jdbc一樣,只定義了介面規範,用於兩個應用程式之間進行非同步通訊。
amqp是面向協定的,應用程式之間進行通訊必須遵循這個協定,amqp不直接定義API介面,只在網路通訊層定義規範,通過網路協議規範傳輸格式。
名稱 | 介紹 |
---|---|
ActiveMQ | 基於JMS進行通訊 |
ZeroMQ | 基於C語言進行開發 |
RabbitMQ | 基於AMQP協定,erlang語言開發 |
RocketMQ | 基於JMS,阿裡巴巴旗下產品 |
Kafka | 類似MQ的產品,高吞吐量 |
windows版本
鏈接:https://pan.baidu.com/s/1sZjgiwpWF-XiWAzvH8oncw
提取碼:by2p
壓縮包內有教學,跟着教學安裝即可。
安裝完成後開啓瀏覽器,存取下面 下麪鏈接地址
初始化賬號:guest
初始化密碼:guest
登錄後就進入到RabbitMQ圖形管理介面
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
package com.dyh.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestProducer {
public static void main(String[] args) throws IOException, TimeoutException {
// 建立連線工廠,並初始化參數
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 建立連線
Connection connection = connectionFactory.newConnection();
// 建立頻道
Channel channel = connection.createChannel();
/*
宣告佇列
queue:佇列名稱
durable:是否爲持久佇列,伺服器重新啓動後是否存在
exclusive:是否被此頻道獨佔
autoDelete:是否在不使用時自動刪除
arguments:其他參數
* */
channel.queueDeclare("queue01", true, false, false, null);
// 初始化要發送的訊息
String message = "hello rabbitMQ";
/*
通過佇列發送訊息
exchange:發送至哪個交換機,""空字串代表不選擇交換機
routingKey:路由祕鑰 當不選擇交換機時,路由祕鑰就是發送的佇列名稱
props:其他參數
body:發送資訊的位元組碼陣列
* */
channel.basicPublish("", "queue01", null, message.getBytes());
// 關閉連線
channel.close();
connection.close();
}
}
執行main方法後,可以看見控制檯多了一個佇列queue01,並且裏面有一條訊息
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
package com.dyh.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TestConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 建立連線工廠,並初始化參數
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 建立連線
Connection connection = connectionFactory.newConnection();
// 建立頻道
Channel channel = connection.createChannel();
/*
宣告佇列
queue:佇列名稱
durable:是否爲持久佇列,伺服器重新啓動後是否存在
exclusive:是否被此頻道獨佔
autoDelete:是否在不使用時自動刪除
arguments:其他參數
* */
channel.queueDeclare("queue01", true, false, false, null);
// 初始化消費者並監聽此頻道
DefaultConsumer defaultConsumer = new DefaultConsumer(channel);
/*
進行消費
queue:佇列名稱
autoAck:伺服器收到訊息後自動確認則爲true,如果手動確認則爲false
callback:消費者物件
* */
channel.basicConsume("queue01", true, defaultConsumer);
}
}
執行main方法後,檢視管理控制檯,發現訊息已被處理
注意!當開啓服務消費者後,執行緒會呈現阻塞狀態,不斷等待訊息進入佇列。
簡單模式從字面意思上來看就是最簡單的模式,容易上手,也是我們快速入門的案例使用的模式,但是工作中因爲業務邏輯不同,所以不會這樣直接使用。
工作佇列模式目的在於增加一個相同的服務節點,提高訊息任務處理的效率
多個消費端共同監聽同一個佇列時,消費者之間對於任務之間的關係是競爭關係
發佈訂閱模式引入了一個新的元件 交換機 它的作用是可以通過交換機對不同的佇列發送相同的訊息,而每一個佇列又可以執行不同的操作
比如使用者發送訊息後,首先會被交換機接收,隨後將訊息轉發至該交換機所系結的所有佇列,而每一個佇列都會執行不同的操作,這樣就實現了使用者發送了一條訊息,但是被交換機下所有佇列執行對應的操作。
雖然上面的訂閱模式可以實現將訊息進行分發,只要是交換機所系結的佇列都可以收到訊息,但是如果使用者只想往佇列1裡發送訊息,不想讓佇列2裡發送訊息怎麼辦呢?
這樣就引出裡新的模式,路由模式
通過指定交換機後,跟着再系結一個路由key,我們可以理解爲一個標記,目的是爲了告訴交換機,你只能向系結了這個路由key的佇列發送這條訊息。這樣就實現了條件過濾
當然一個佇列可以系結多個key
比如
使用者發送一條訊息並且攜帶key1,那麼根據下圖,兩個佇列都系結了key1,那麼兩個佇列都可以收到使用者訊息並處理。
使用者發送一條訊息並攜帶key2,那麼根據下圖,只有佇列2系結了key2,所以只用佇列2可以收到使用者訊息並處理。
雖然路由模式可以系結多個路由key,並通過路由key進行佇列的選擇,但是如果業務過於龐大,系結佇列也會顯得非常麻煩,並且維護其他也不夠方便,所以又引入了更簡單的主題模式(萬用字元模式)
我們不在通過使用路由key的系結來進行佇列的選擇,而使用萬用字元的方式進行佇列選擇,這樣在設定上就顯得更爲輕鬆。
路由key改變爲一個或多個詞,多個詞中間必須使用.
連線
例如:order.insert
order.delet
萬用字元路由規則;
萬用字元 | 說明 |
---|---|
# | 匹配一個或多個詞 |
* | 匹配一個詞 |
例如
order.*
:字首爲order,後面只能有一個詞 比如order.insert
order.delete
order.#
:字首爲order,後面可以是任意詞 比如 order.detail
order.detail.insert
order.detail.info.insert
*.order
:後綴爲order,前面必須有一個詞 比如user.order
#.order
:後綴爲order,前面可以有0個或多個詞
這樣就實現了使用萬用字元替代路由key,實現了更加靈活的設定