由於JMS存在跨語言跨平臺的缺陷,所以出現了AMQP(Advanced Message Queuing Protocol),一個提供統一訊息服務的應用層標準高階訊息佇列協定,代表RabbitMQ
一、下載安裝RabbitMQ
安裝
從https://www.rabbitmq.com/download.html下載RabbitMQ
由於RabbitMQ是用Erlang語言編寫的,安裝RabbitMQ需要先安裝Erlang執行平臺
http://www.erlang.org/downloads
設定環境變數:
ERLANG_HOME:D:\software\erl10.4 (安裝後可能已自動設定)
path加入%ERLANG_HOME%\bin\erl.exe
重新啓動計算機
啓動RabbitMQ
cmd視窗需要管理員許可權執行,否則啓動時報錯(發生系統錯誤 5)
也可以在服務列表中啓動
安裝管理介面management ui
參考:https://www.rabbitmq.com/management.html
運營命令:rabbitmq-plugins enable rabbitmq_management
存取http://127.0.0.1:15672/
設定使用者許可權
1) 通過命令rabbitmqctl.bat list_users檢視使用者
2)新增使用者
rabbitmqctl.bat add_user root root
3)設定角色
rabbitmqctl.bat set_user_tags root administrator
4)設定許可權
rabbitmqctl.bat set_permissions -p / root 「." ".」 「.*」
5)使用賬號登錄
6)刪除使用者
rabbitmqctl delete_user username
7) 修改改密碼
rabbimqctl change_password username newpassword
二、RabbitMQ詳解
參考:
https://www.rabbitmq.com/getstarted.html
https://www.cnblogs.com/dongkuo/p/6001791.html 依託於官方文件詳細解釋
https://www.jianshu.com/p/80eefec808e5
0. 概念
訊息佇列服務一般由生產者、訊息佇列和消費者組成,RabbitMQ加入了交換機 (Exchange)的概念,這樣生產者和佇列就沒有直接聯繫, 轉而變成生產者把訊息給交換器, 交換器根據排程策略再把訊息再給佇列。
1)Server(Broker):接收用戶端連線,實現AMQP協定的訊息佇列和路由功能的進程;
2)Virtual Host:虛擬主機的概念,類似許可權控制組,一個Virtual Host裡可以有多個Exchange和Queue;
3)Exchange:交換機,接收生產者發送的訊息,並根據Routing Key將訊息路由到伺服器中的佇列Queue。
4)ExchangeType:交換機型別決定了路由訊息行爲,RabbitMQ中有四種類型Exchange,分別是fanout、direct、topic、headers
5)Message Queue:訊息佇列,用於儲存還未被消費者消費的訊息;
6)Message:由Header和body組成,Header是由生產者新增的各種屬性的集合,包括Message是否被持久化、優先順序是多少、由哪個Message Queue接收等;body是真正需要發送的數據內容;
7)BindingKey:系結關鍵字,將一個特定的Exchange和一個特定的Queue系結起來。
1. 最簡單的訊息發送Hello World
功能:一個生產者P發送訊息到佇列Q,一個消費者C接收
生產者:
通過連線工廠ConnectionFactory建立連線connection,使用連線建立通道channel, channel宣告佇列queue,channel使用queue發送訊息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel());
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
消費者:跟生產者一樣,建立連線,建立channel,宣告queue,然後監聽queue
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
2. 工作佇列work queues
功能:一個生產者(Boss)向佇列發訊息(任務),多個消費者(worker)從佇列接受訊息(任務)
特點:
1)一條訊息只會被一個消費者接收;
2)訊息是平均分配給消費者的;
3)消費者只有在處理完某條訊息後,纔會收到下一條訊息。
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.
預設情況下,RabbitMQ將按順序向消費者發送訊息,平均每個消費者收到相同數量的訊息,這種分發訊息叫做輪詢。可以通過三個以上的worker(消費者)來進行測試。
由於worker執行任務需要一定時間,爲了確保訊息不丟失,RabbitMQ 支援
1)訊息確認機制 機製
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);//可使用sleep模擬
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
2)訊息持久化
// 將第二個參數設爲true,表示宣告一個需要持久化的佇列。
// 需要注意的是,若你已經定義了一個非持久的,同名字的佇列,要麼將其先刪除(不然會報錯),要麼換一個名字。
channel.queueDeclare("hello", true, false, false, null);
// 修改了第三個參數,這是表明訊息需要持久化
channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
3. 發佈/訂閱Publish/Subscribe
生產者是把訊息發送到了交換機(exchange)中,然後交換機負責(決定)將訊息發送到(哪一個)訊息佇列中。
前面的兩個案例其實是經過了預設交換機(Default Exchange,用空字串表示),每一個被建立的佇列都會被自動的系結到預設交換機上,並且路由鍵就是佇列的名字。路由鍵用來指定交換機將訊息發到指定的佇列。
交換機有4種不同的型別,分別是direct,fanout,topic,headers:
direct:要求和它系結的佇列帶有一個路由鍵K,若有一個帶有路由鍵R的訊息到達了交換機,交換機會將此訊息路由到路由鍵K = R的佇列。預設交換機便是該型別。
fanout:會路由每一條訊息到所有和它系結的佇列,忽略路由鍵。即廣播形式。
4. Routing模式
當有些訊息只能部分消費者消費時,可使用Routing模式;佇列系結交換機,同時帶上routing key,以多次呼叫佇列系結方法,呼叫時,佇列名和交換機名都相同,而routing key不同,這樣可以使一個佇列帶有多個routing key。
5. Topic
萬用字元匹配訊息發送佇列,routing key由多個關鍵詞組成,詞與詞之間由點號(.)隔開,規定*表示任意的一個詞,#號表示任意的0個或多個詞。
6. RPC
參考文件
三、springboot rabbitmq詳解
參考:
https://docs.spring.io/spring-amqp/docs/2.1.6.RELEASE/reference/html/#sending-messages
https://www.cnblogs.com/ityouknow/p/6120544.html
1.pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123
3. 註冊Queue,RabbitConfig.java
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}
4. 生產者,RabbitMQProducer.java
@Component
public class RabbitMQProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMsg(String routingKey, String msg) {
amqpTemplate.convertAndSend(routingKey, msg);
}
}
5. 測試
@Component
@OpenAPI
public class ApiTest {
@Autowired
private RabbitMQProducer rabbitMQProducer;
@OpenAPIMethod(methodName = "testRabbitMQSender")
public Object testRabbitMQSender(final String msg) throws Exception {
rabbitMQProducer.sendMsg("hello",msg);
return null;
}
}
發送訊息:http://10.0.0.57:9001/api/testRabbitMQSender?msg=Hello%20World
檢視控制檯,建立了一個叫hello的queue,積壓了1條訊息
訊息內容:
6. 消費者
@Component
public class RabbitMQConsumer {
@RabbitListener(queues = "hello")
public void consumerHelloQueueMessage(String message){
System.out.println("收到hello-queue報文:"+message);
}
}
啓動消費者後,會立即收到積壓的訊息
收到hello-queue報文:Hello World
7. 測試多個消費者監聽同一個通道,如1個hello queue,3個消費者
發送9條訊息,列印日誌日下:
收到hello-queue報文:Hello World
收到hello-queue3報文:Hello World
收到hello-queue2報文:Hello World
收到hello-queue報文:Hello World
收到hello-queue報文:Hello World
收到hello-queue2報文:Hello World
收到hello-queue3報文:Hello World
收到hello-queue3報文:Hello World
收到hello-queue2報文:Hello World
結果表示RabbitMQ會輪詢發送訊息給消費者,一條訊息只能被一個消費者接收
8. Direct Exchange
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
@Bean
public DirectExchange helloExchange() {
return new DirectExchange("helloExchange");
}
@Bean
public Binding helloBinding() {
return BindingBuilder.bind(helloQueue()).to(helloExchange()).with("helloDirect");
}
}
註冊了一個叫helloExchange的Direct Exchange
檢視helloExchange 詳情:direct型別,系結了hello queue
檢視hello queue,除了系結預設exchange還系結了helloExchange
測試demo1
一個生產者,一個helloExchage,一個叫hello的queue,一個路由helloDirect,四個消費者(三個監聽叫hello的queue,一個監聽路由helloDirect對應hello queue且系結了helloExchage交換器)
發送訊息:
public void sendMsg(String exchange, String routingKey, String msg) {
amqpTemplate.convertAndSend(exchange, routingKey, msg);
}
public Object testRabbitMQSender(final String msg) throws Exception {
for(int i=0; i<9; i++) {
rabbitMQProducer.sendMsg("helloExchange", "helloDirect",msg);
}
return null;
}
消費訊息:
@RabbitListener(queues = "hello")
public void consumerHelloQueueMessage(String message){
System.out.println("收到hello-queue報文:"+message);
}
@RabbitListener(queues = "hello")
public void consumerHelloQueueMessage2(String message){
System.out.println("收到hello-queue2報文:"+message);
}
@RabbitListener(queues = "hello")
public void consumerHelloQueueMessage3(String message){
System.out.println("收到hello-queue3報文:"+message);
}
// 當RabbitMQ中不存在系結關係時,自動生成相應的系結
// 如不存在helloExchange、hello queue或helloDirect任意一個,都會自動生成並系結關係
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "hello", durable = "true"),
exchange = @Exchange(value = "helloExchange", durable = "true"),
key = "helloDirect"
)
)
public void consumerQueueMessage(String message){
System.out.println("收到helloExchage->hello queue->helloDirect route報文:"+message);
}
測試結果:四個消費者均勻收到訊息
收到hello-queue2報文:Hello World
收到hello-queue報文:Hello World
收到hello-queue3報文:Hello World
收到helloExchage->hello queue->helloDirect route報文:Hello World
收到hello-queue2報文:Hello World
收到hello-queue報文:Hello World
收到hello-queue3報文:Hello World
收到hello-queue報文:Hello World
收到helloExchage->hello queue->helloDirect route報文:Hello World
結果說明:
關聯示意圖如下
當P通過helloExchange發送訊息,經過路由helloDirect到達hello queue,然後均勻下發給所監聽的C,所以四個消費者都收到了訊息
測試demo2
在測試demo1的基礎上增加一個hello2 queue, 並與helloExchange系結,路由key爲helloDirect2,一個消費者監聽,示意圖如下:
當P通過helloExchange發送訊息,經過路由helloDirect2到達hello2 queue,監聽的消費者收到訊息
9. Fanout Exchange
以廣播模式,給 Fanout 交換機發送訊息,系結了這個交換機的所有佇列都收到這個訊息。
// 定義exchange、queue、binding
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout1");
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout2");
}
@Bean
public Queue fanoutQueue3() {
return new Queue("fanout3");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding fanoutBinding() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding3() {
return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}
// 發送訊息
public Object testRabbitMQSender(final String msg) throws Exception {
rabbitMQProducer.sendMsg("fanoutExchange", "",msg);
return null;
}
// 消費訊息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout1", durable = "true"),
exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
)
)
public void consumerFanoutMessage1(String message){
System.out.println("收到fanoutExchange->fanout報文1:"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout2", durable = "true"),
exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
)
)
public void consumerFanoutMessage2(String message){
System.out.println("收到fanoutExchange->fanout報文2:"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout3", durable = "true"),
exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
)
)
public void consumerFanoutMessage3(String message){
System.out.println("收到fanoutExchange->fanout報文3:"+message);
}
執行結果:
收到fanoutExchange->fanout報文1:Hello World fanout
收到fanoutExchange->fanout報文3:Hello World fanout
收到fanoutExchange->fanout報文2:Hello World fanout
10. Topic Exchange
使用萬用字元(*和#)路由
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
// 定義exchange、queue、binding
@Bean
public Queue topicQueue1() {
return new Queue("topic.q1");
}
@Bean
public Queue topicQueue2() {
return new Queue("topic.q2");
}
@Bean
public Queue topicQueue3() {
return new Queue("topic.q3");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.q1");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
}
@Bean
public Binding topicBinding3() {
return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("topic.*");
}
// 消費者
@RabbitListener(queues = "topic.q1")
public void consumerTopicMessage(String message){
System.out.println("收到topic.q1報文:"+message);
}
@RabbitListener(queues = "topic.q2")
public void consumerTopicMessage2(String message){
System.out.println("收到topic.q2報文:"+message);
}
@RabbitListener(queues = "topic.q3")
public void consumerTopicMessage3(String message){
System.out.println("收到topic.q3報文:"+message);
}
// 發送訊息
1)rabbitMQProducer.sendMsg("topicExchange", "topic.q1",msg);
列印日誌:(滿足topic.q1、topic.#、topic.*的監聽)
收到topic.q1報文:Hello World topic
收到topic.q2報文:Hello World topic
收到topic.q3報文:Hello World topic
2)rabbitMQProducer.sendMsg("topicExchange", "topic.q2",msg);
列印日誌:(滿足topic.#、topic.*的監聽)
收到topic.q2報文:Hello World topic
收到topic.q3報文:Hello World topic
3)rabbitMQProducer.sendMsg("topicExchange", "topic.q2.x",msg);
列印日誌:(滿足topic.#的監聽)
收到topic.q2報文:Hello World topic
常見異常
1. 未定義queue,直接監聽queue報錯:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[hello2]
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'hello2' in vhost '/', class-id=50, method-id=10)
解決方法:
1) 控制檯手動建立queue
2)設定queue
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}