訊息服務中介軟體可以提升系統非同步通訊,擴充套件解耦能力
兩個重要概念:
協定 | JMS | AMQP |
---|---|---|
英文 | Java Message Service Java | Advanced Message Queuing Protocol |
中文 | 訊息服務 | 高階訊息佇列協定 |
實現 | ActiveMQ、HornetMQ | RabbitMQ |
定義 | JAVA API | 網路線級協定 |
跨語言 | 否 | 是 |
跨平臺 | 否 | 是 |
訊息模型 | peer-2-peer、Pub/Sub | 5 種 |
支援訊息型別 | 多訊息型別 | byte[] |
支援 | spring-jms | spring-rabbit |
發送訊息 | @JmsTemplate | @RabbitTemplate |
監聽訊息 | @JmsListener | @RabbitListener |
開啓支援 | @EnableJms | @EnableRabbit |
自動設定 | JmsAutoConfiguration | RabbitAutoConfiguration |
https://www.rabbitmq.com/
RabbitMQ 是由 erlang 開發的 AMQP(Advanced Message Queuing Protocol) 實現
核心概念
[外連圖片轉存失敗,源站可能有防盜鏈機制 機製,建議將圖片儲存下來直接上傳(img-qul9BCQd-1597024380528)(img/rabbitmq.jpg)]
生產者把訊息發佈到 Exchange 上,
訊息最終達到佇列並被消費者接收,
而 Binding 決定交換器的訊息應該發送到哪個佇列
[外連圖片轉存失敗,源站可能有防盜鏈機制 機製,建議將圖片儲存下來直接上傳(img-8gnBrRxr-1597024380530)(img/rabbitmq-2.jpg)]
安裝啓動 RabbitMQ
# 安裝帶有管理介面
docker pull rabbitmq:management
# 用戶端:5672 管理介面:15672
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq rabbitmq:management
管理介面
http://localhost:15672/
賬號:guest
密碼:guest
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
自動設定類:RabbitAutoConfiguration
設定:RabbitProperties
給 Rabbit 發送和接收訊息:RabbitTemplate
系統管理元件:AmqpAdmin
RabbitMQ 中新建:
queue: message
exchange: exchange.message
Routing key : message
自定義物件序列化規則
package com.example.demo.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyAMQPConfig {
// 以json的方式序列化物件
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
點對點發送訊息測試
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
// 單播:發送數據
@Test
public void testSendRabbitMQ() {
Map<String, Object> map = new HashMap<>();
map.put("name", "Tom");
map.put("age", 23);
rabbitTemplate.convertAndSend("exchange.message", "message", map);
}
// 單播:接收數據
@Test
public void testReceiveRabbitMQ() {
Object obj = rabbitTemplate.receiveAndConvert("message");
System.out.println(obj);
// {name=Tom, age=23}
}
}
開啓 RabbitMQ
package com.example.demo;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableRabbit
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
監聽佇列數據
package com.example.demo.service;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
public class MessageService {
// 接收訊息數據
@RabbitListener(queues={"message"})
public void listenMessage(Map<String, Object> map){
System.out.println("收到訊息: " + map);
}
// 接收訊息頭和訊息體
@RabbitListener(queues={"message"})
public void receiveMessage(Message message){
System.out.println("收到訊息: " + message.getMessageProperties());
System.out.println("收到訊息: " + message.getBody());
}
}
使用範例
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
@SpringBootTest
public class RabbitMQTest {
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange(){
// 建立交換器
amqpAdmin.declareExchange(new DirectExchange("exchange.admin"));
// 建立queque
amqpAdmin.declareQueue(new Queue("queue.admin"));
// 建立系結規則
amqpAdmin.declareBinding(new Binding("queue.admin", Binding.DestinationType.QUEUE, "exchange.admin", "admin", null));
System.out.println("建立成功");
}
}