spring-boot學習:二十、spring-boot整合RabbitMQ

2020-08-13 16:12:30

由於JMS存在跨語言跨平臺的缺陷,所以出現了AMQP(Advanced Message Queuing Protocol),一個提供統一訊息服務的應用層標準高階訊息佇列協定,代表RabbitMQ

一、下載安裝RabbitMQ

  1. 安裝
    從https://www.rabbitmq.com/download.html下載RabbitMQ
    由於RabbitMQ是用Erlang語言編寫的,安裝RabbitMQ需要先安裝Erlang執行平臺
    http://www.erlang.org/downloads

    設定環境變數:
    ERLANG_HOME:D:\software\erl10.4 (安裝後可能已自動設定)
    path加入%ERLANG_HOME%\bin\erl.exe

    重新啓動計算機

  2. 啓動RabbitMQ
    cmd視窗需要管理員許可權執行,否則啓動時報錯(發生系統錯誤 5)
    在这里插入图片描述
    也可以在服務列表中啓動

  3. 安裝管理介面management ui
    參考:https://www.rabbitmq.com/management.html
    運營命令:rabbitmq-plugins enable rabbitmq_management

  4. 存取http://127.0.0.1:15672/
    在这里插入图片描述

  5. 設定使用者許可權
    1) 通過命令rabbitmqctl.bat list_users檢視使用者
    在这里插入图片描述
    2)新增使用者
    rabbitmqctl.bat add_user root root
    在这里插入图片描述
    3)設定角色
    rabbitmqctl.bat set_user_tags root administrator

4)設定許可權
rabbitmqctl.bat set_permissions -p / root 「." ".」 「.*」

5)使用賬號登錄
在这里插入图片描述
6)刪除使用者
rabbitmqctl delete_user username

7) 修改改密碼
rabbimqctl change_password username newpassword

二、RabbitMQ詳解
參考:
https://www.rabbitmq.com/getstarted.html
https://www.cnblogs.com/dongkuo/p/6001791.html 依託於官方文件詳細解釋
https://www.jianshu.com/p/80eefec808e5

0. 概念
訊息佇列服務一般由生產者、訊息佇列和消費者組成,RabbitMQ加入了交換機 (Exchange)的概念,這樣生產者和佇列就沒有直接聯繫, 轉而變成生產者把訊息給交換器, 交換器根據排程策略再把訊息再給佇列。
1)Server(Broker):接收用戶端連線,實現AMQP協定的訊息佇列和路由功能的進程;
2)Virtual Host:虛擬主機的概念,類似許可權控制組,一個Virtual Host裡可以有多個Exchange和Queue;
3)Exchange:交換機,接收生產者發送的訊息,並根據Routing Key將訊息路由到伺服器中的佇列Queue。
4)ExchangeType:交換機型別決定了路由訊息行爲,RabbitMQ中有四種類型Exchange,分別是fanout、direct、topic、headers
5)Message Queue:訊息佇列,用於儲存還未被消費者消費的訊息;
6)Message:由Header和body組成,Header是由生產者新增的各種屬性的集合,包括Message是否被持久化、優先順序是多少、由哪個Message Queue接收等;body是真正需要發送的數據內容;
7)BindingKey:系結關鍵字,將一個特定的Exchange和一個特定的Queue系結起來。

1. 最簡單的訊息發送Hello World
在这里插入图片描述
功能:一個生產者P發送訊息到佇列Q,一個消費者C接收
生產者:
通過連線工廠ConnectionFactory建立連線connection,使用連線建立通道channel, channel宣告佇列queue,channel使用queue發送訊息

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel());
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

消費者:跟生產者一樣,建立連線,建立channel,宣告queue,然後監聽queue

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

2. 工作佇列work queues
在这里插入图片描述

功能:一個生產者(Boss)向佇列發訊息(任務),多個消費者(worker)從佇列接受訊息(任務)
特點:
1)一條訊息只會被一個消費者接收;
2)訊息是平均分配給消費者的;
3)消費者只有在處理完某條訊息後,纔會收到下一條訊息。

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.
預設情況下,RabbitMQ將按順序向消費者發送訊息,平均每個消費者收到相同數量的訊息,這種分發訊息叫做輪詢。可以通過三個以上的worker(消費者)來進行測試。

由於worker執行任務需要一定時間,爲了確保訊息不丟失,RabbitMQ 支援
1)訊息確認機制 機製

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");
  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);//可使用sleep模擬
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};

2)訊息持久化

// 將第二個參數設爲true,表示宣告一個需要持久化的佇列。
// 需要注意的是,若你已經定義了一個非持久的,同名字的佇列,要麼將其先刪除(不然會報錯),要麼換一個名字。
channel.queueDeclare("hello", true, false, false, null);

// 修改了第三個參數,這是表明訊息需要持久化
channel.basicPublish("",  "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

3. 發佈/訂閱Publish/Subscribe
在这里插入图片描述

生產者是把訊息發送到了交換機(exchange)中,然後交換機負責(決定)將訊息發送到(哪一個)訊息佇列中。
前面的兩個案例其實是經過了預設交換機(Default Exchange,用空字串表示),每一個被建立的佇列都會被自動的系結到預設交換機上,並且路由鍵就是佇列的名字。路由鍵用來指定交換機將訊息發到指定的佇列。

交換機有4種不同的型別,分別是direct,fanout,topic,headers:
direct:要求和它系結的佇列帶有一個路由鍵K,若有一個帶有路由鍵R的訊息到達了交換機,交換機會將此訊息路由到路由鍵K = R的佇列。預設交換機便是該型別。
fanout:會路由每一條訊息到所有和它系結的佇列,忽略路由鍵。即廣播形式。

4. Routing模式
當有些訊息只能部分消費者消費時,可使用Routing模式;佇列系結交換機,同時帶上routing key,以多次呼叫佇列系結方法,呼叫時,佇列名和交換機名都相同,而routing key不同,這樣可以使一個佇列帶有多個routing key。

5. Topic
萬用字元匹配訊息發送佇列,routing key由多個關鍵詞組成,詞與詞之間由點號(.)隔開,規定*表示任意的一個詞,#號表示任意的0個或多個詞。

6. RPC
參考文件

三、springboot rabbitmq詳解
參考:
https://docs.spring.io/spring-amqp/docs/2.1.6.RELEASE/reference/html/#sending-messages
https://www.cnblogs.com/ityouknow/p/6120544.html

1.pom.xml

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. application.properties

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123

3. 註冊Queue,RabbitConfig.java

@Configuration
public class RabbitConfig {
	@Bean
	public Queue helloQueue() {
		return new Queue("hello");
	}
}

4. 生產者,RabbitMQProducer.java

@Component
public class RabbitMQProducer {
	@Autowired
	private AmqpTemplate amqpTemplate;

	public void sendMsg(String routingKey, String msg) {
		amqpTemplate.convertAndSend(routingKey, msg);
	}
}

5. 測試

@Component
@OpenAPI
public class ApiTest {
	@Autowired
	private RabbitMQProducer rabbitMQProducer;
	
	@OpenAPIMethod(methodName = "testRabbitMQSender")
	public Object testRabbitMQSender(final String msg) throws Exception {
		rabbitMQProducer.sendMsg("hello",msg);
		return null;
	}
}

發送訊息:http://10.0.0.57:9001/api/testRabbitMQSender?msg=Hello%20World
檢視控制檯,建立了一個叫hello的queue,積壓了1條訊息
在这里插入图片描述
訊息內容:
在这里插入图片描述
6. 消費者

@Component
public class RabbitMQConsumer {
	@RabbitListener(queues = "hello")
	public void consumerHelloQueueMessage(String message){
        System.out.println("收到hello-queue報文:"+message);
    }
}

啓動消費者後,會立即收到積壓的訊息
收到hello-queue報文:Hello World

7. 測試多個消費者監聽同一個通道,如1個hello queue,3個消費者
發送9條訊息,列印日誌日下:

收到hello-queue報文:Hello World
收到hello-queue3報文:Hello World
收到hello-queue2報文:Hello World
收到hello-queue報文:Hello World
收到hello-queue報文:Hello World
收到hello-queue2報文:Hello World
收到hello-queue3報文:Hello World
收到hello-queue3報文:Hello World
收到hello-queue2報文:Hello World

結果表示RabbitMQ會輪詢發送訊息給消費者,一條訊息只能被一個消費者接收

8. Direct Exchange

@Configuration
public class RabbitConfig {

	@Bean
	public Queue helloQueue() {
		return new Queue("hello");
	}
	
	@Bean
	public DirectExchange helloExchange() {
		return new DirectExchange("helloExchange");
	}
	
	@Bean
	public Binding helloBinding() {
		return BindingBuilder.bind(helloQueue()).to(helloExchange()).with("helloDirect");
	}
}

註冊了一個叫helloExchange的Direct Exchange
在这里插入图片描述
檢視helloExchange 詳情:direct型別,系結了hello queue
在这里插入图片描述
檢視hello queue,除了系結預設exchange還系結了helloExchange
在这里插入图片描述

  • 測試demo1

    一個生產者,一個helloExchage,一個叫hello的queue,一個路由helloDirect,四個消費者(三個監聽叫hello的queue,一個監聽路由helloDirect對應hello queue且系結了helloExchage交換器)

    發送訊息:

    public void sendMsg(String exchange, String routingKey, String msg) {
    	amqpTemplate.convertAndSend(exchange, routingKey, msg);
    }
    
    public Object testRabbitMQSender(final String msg) throws Exception {
    	for(int i=0; i<9; i++) {
    		rabbitMQProducer.sendMsg("helloExchange", "helloDirect",msg);
    	}
    	return null;
    }
    

    消費訊息:

    @RabbitListener(queues = "hello")
    public void consumerHelloQueueMessage(String message){
    	System.out.println("收到hello-queue報文:"+message);
    }
    
    @RabbitListener(queues = "hello")
    public void consumerHelloQueueMessage2(String message){
    	System.out.println("收到hello-queue2報文:"+message);
    }
    
    @RabbitListener(queues = "hello")
    public void consumerHelloQueueMessage3(String message){
    	System.out.println("收到hello-queue3報文:"+message);
    }
    
    // 當RabbitMQ中不存在系結關係時,自動生成相應的系結
    // 如不存在helloExchange、hello queue或helloDirect任意一個,都會自動生成並系結關係
    @RabbitListener(bindings = @QueueBinding(
    		value = @Queue(value = "hello", durable = "true"),
    		exchange = @Exchange(value = "helloExchange", durable = "true"),
    		key = "helloDirect"
    	)
    )
    public void consumerQueueMessage(String message){
    	System.out.println("收到helloExchage->hello queue->helloDirect route報文:"+message);
    }
    

    測試結果:四個消費者均勻收到訊息

    收到hello-queue2報文:Hello World
    收到hello-queue報文:Hello World
    收到hello-queue3報文:Hello World
    收到helloExchage->hello queue->helloDirect route報文:Hello World
    收到hello-queue2報文:Hello World
    收到hello-queue報文:Hello World
    收到hello-queue3報文:Hello World
    收到hello-queue報文:Hello World
    收到helloExchage->hello queue->helloDirect route報文:Hello World
    

    結果說明:
    關聯示意圖如下
    在这里插入图片描述
    當P通過helloExchange發送訊息,經過路由helloDirect到達hello queue,然後均勻下發給所監聽的C,所以四個消費者都收到了訊息

  • 測試demo2
    在測試demo1的基礎上增加一個hello2 queue, 並與helloExchange系結,路由key爲helloDirect2,一個消費者監聽,示意圖如下:
    在这里插入图片描述
    當P通過helloExchange發送訊息,經過路由helloDirect2到達hello2 queue,監聽的消費者收到訊息

9. Fanout Exchange
以廣播模式,給 Fanout 交換機發送訊息,系結了這個交換機的所有佇列都收到這個訊息。

// 定義exchange、queue、binding
@Bean
public Queue fanoutQueue1() {
	return new Queue("fanout1");
}

@Bean
public Queue fanoutQueue2() {
	return new Queue("fanout2");
}

@Bean
public Queue fanoutQueue3() {
	return new Queue("fanout3");
}

@Bean
public FanoutExchange fanoutExchange() {
	return new FanoutExchange("fanoutExchange");
}

@Bean
public Binding fanoutBinding() {
	return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}

@Bean
public Binding fanoutBinding2() {
	return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}

@Bean
public Binding fanoutBinding3() {
	return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}
// 發送訊息
public Object testRabbitMQSender(final String msg) throws Exception {
	rabbitMQProducer.sendMsg("fanoutExchange", "",msg);
	return null;
}
// 消費訊息
@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = "fanout1", durable = "true"),
		exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
	)
)
public void consumerFanoutMessage1(String message){
	System.out.println("收到fanoutExchange->fanout報文1:"+message);
}

@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = "fanout2", durable = "true"),
		exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
	)
)
public void consumerFanoutMessage2(String message){
	System.out.println("收到fanoutExchange->fanout報文2:"+message);
}

@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = "fanout3", durable = "true"),
		exchange = @Exchange(value = "fanoutExchange", durable = "true", type = ExchangeTypes.FANOUT)
	)
)
public void consumerFanoutMessage3(String message){
	System.out.println("收到fanoutExchange->fanout報文3:"+message);
}

執行結果:

收到fanoutExchange->fanout報文1:Hello World fanout
收到fanoutExchange->fanout報文3:Hello World fanout
收到fanoutExchange->fanout報文2:Hello World fanout

10. Topic Exchange
使用萬用字元(*和#)路由

* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
// 定義exchange、queue、binding
@Bean
public Queue topicQueue1() {
	return new Queue("topic.q1");
}

@Bean
public Queue topicQueue2() {
	return new Queue("topic.q2");
}

@Bean
public Queue topicQueue3() {
	return new Queue("topic.q3");
}

@Bean
public TopicExchange topicExchange() {
	return new TopicExchange("topicExchange");
}

@Bean
public Binding topicBinding1() {
	return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.q1");
}

@Bean
public Binding topicBinding2() {
	return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
}

@Bean
public Binding topicBinding3() {
	return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("topic.*");
}
// 消費者
@RabbitListener(queues = "topic.q1")
public void consumerTopicMessage(String message){
	System.out.println("收到topic.q1報文:"+message);
}

@RabbitListener(queues = "topic.q2")
public void consumerTopicMessage2(String message){
	System.out.println("收到topic.q2報文:"+message);
}

@RabbitListener(queues = "topic.q3")
public void consumerTopicMessage3(String message){
	System.out.println("收到topic.q3報文:"+message);
}
// 發送訊息
1)rabbitMQProducer.sendMsg("topicExchange", "topic.q1",msg);
列印日誌:(滿足topic.q1、topic.#、topic.*的監聽)
收到topic.q1報文:Hello World topic
收到topic.q2報文:Hello World topic
收到topic.q3報文:Hello World topic


2)rabbitMQProducer.sendMsg("topicExchange", "topic.q2",msg);
列印日誌:(滿足topic.#、topic.*的監聽)
收到topic.q2報文:Hello World topic
收到topic.q3報文:Hello World topic

3)rabbitMQProducer.sendMsg("topicExchange", "topic.q2.x",msg);
列印日誌:(滿足topic.#的監聽)
收到topic.q2報文:Hello World topic

常見異常
1. 未定義queue,直接監聽queue報錯:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[hello2]
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'hello2' in vhost '/', class-id=50, method-id=10)

解決方法:
1) 控制檯手動建立queue
2)設定queue

@Configuration
public class RabbitConfig {

	@Bean
	public Queue helloQueue() {
		return new Queue("hello");
	}
}