RabbitMQ
主要有六種工作模式,本文整合SpringBoot
分別介紹工作模式的實現。
訊息生產者或者傳送者,使用P
表示:
訊息從生產端傳送到消費端,一定要通過佇列轉發,使用queue_name
表示:
消費的消費者或者接收者,使用C
表示,如果有多個消費者也可以用C1
、C2
表示:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
spring:
rabbitmq:
host: 192.168.3.19
port: 5672
username: admin
password: 123456
生產端傳送訊息,呼叫RabbitTemplate
傳送訊息,比如:
@Autowired
private RabbitTemplate rabbitTemplate;
public String send() {
rabbitTemplate.convertAndSend("routingKey","send message");
}
消費訊息使用佇列監聽註解@RabbitListener
,新增佇列名稱就能消費傳送到佇列上的訊息了:
@RabbitListener(queuesToDeclare = @Queue("queue_name"))
public void consume(String message) {
// 接收訊息
}
最簡單的訊息傳送
對等模式
,生產者傳送訊息經過佇列直接傳送給消費者。Exchange
交換機。生產訊息:
@GetMapping("/simple-send")
public String simpleSend() {
rabbitTemplate.convertAndSend("simple","this is news");
return "ok";
}
消費訊息
@RabbitListener(queuesToDeclare = @Queue("simple"))
public void consume(String message) {
System.out.println(message);
}
輸出:
this is news
無需建立交換機和繫結佇列,只需要匹配傳送端和消費端的佇列名稱就能成功傳送訊息。
在多個消費者之間分配任務
工作模式
和簡單模式
差不多,只需要生產端、消費端、佇列。多個消費者
,也就是一對多的關係。@GetMapping("/work-send")
public String simpleSend() {
rabbitTemplate.convertAndSend("work","this is news");
return "ok";
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void consume(String message) {
System.out.println("first:" + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void consumeSecond(String message) {
System.out.println("second:" + message);
}
建立一個生產者,兩個消費者,傳送兩條訊息,兩個消費者分別接收到訊息,輸出:
first:this is news
second:this is news
兩個消費者,輪流消費訊息。類似nginx負載均衡
。
一次向多個消費者傳送訊息
X
表示交換機,使用的扇形交換機
(fanout),它將傳送的訊息傳送到所有繫結交換機的佇列。@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");
}
@Bean
public Queue psFirstQueue() {
return new Queue("psFirstQueue");
}
@Bean
public Queue psSecondQueue() {
return new Queue("psSecondQueue");
}
@Bean
public Queue psThirdQueue() {
return new Queue("psThirdQueue");
}
@Bean
public Binding routingFirstBinding() {
return BindingBuilder.bind(psFirstQueue()).to(fanoutExchange());
}
@Bean
public Binding routingSecondBinding() {
return BindingBuilder.bind(psSecondQueue()).to(fanoutExchange());
}
@Bean
public Binding routingThirdBinding() {
return BindingBuilder.bind(psThirdQueue()).to(fanoutExchange());
}
fanoutExchange
。psFirstQueue
、psSecondQueue
、psThirdQueue
。routingKey
,直接繫結即可。@GetMapping("/publish-sub-send")
public String publishSubSend() {
rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, "publish/subscribe hello");
return "ok";
}
無需指定routingKey
,設定為null
。
@RabbitListener(queues = "psFirstQueue")
public void pubsubQueueFirst(String message) {
System.out.println("【first】:" + message);
}
@RabbitListener(queues = "psSecondQueue")
public void pubsubQueueSecond(String message) {
System.out.println("【second】:" + message);
}
@RabbitListener(queues = "psThirdQueue")
public void pubsubQueueThird(String message) {
System.out.println("【third】:" + message);
}
【first】: publish/subscribe hello
【second】: publish/subscribe hello
【third】: publish/subscribe hello
傳送一條訊息,繫結的佇列都能接收到訊息。
根據
routingKey
有選擇性的接收訊息
routingKey
繫結交換機routingKey
傳送給特定的佇列,然後傳到消費者消費。扇形交換機
(fanout)改成直連交換機
(direct)。@Bean
public Queue routingFirstQueue() {
return new Queue("routingFirstQueue");
}
@Bean
public Queue routingSecondQueue() {
return new Queue("routingSecondQueue");
}
@Bean
public Queue routingThirdQueue() {
return new Queue("routingThirdQueue");
}
@Bean
public DirectExchange routingExchange() {
return new DirectExchange("routingExchange");
}
@Bean
public Binding routingFirstBind() {
return BindingBuilder.bind(routingFirstQueue()).to(routingExchange()).with("firstRouting");
}
@Bean
public Binding routingSecondBind() {
return BindingBuilder.bind(routingSecondQueue()).to(routingExchange()).with("secondRouting");
}
@Bean
public Binding routingThirdBind() {
return BindingBuilder.bind(routingThirdQueue()).to(routingExchange()).with("thirdRouting");
}
routingExchange
,根據不同的routingKey
繫結不同的佇列:firstRouting
路由鍵繫結routingFirstQueue
佇列。secondRouting
路由鍵繫結routingSecondQueue
佇列。thirdRouting
路由鍵繫結routingThirdQueue
佇列。@GetMapping("/routing-first")
public String routingFirst() {
// 使用不同的routingKey 轉發到不同的佇列
rabbitTemplate.convertAndSend("routingExchange","firstRouting"," first routing message");
rabbitTemplate.convertAndSend("routingExchange","secondRouting"," second routing message");
rabbitTemplate.convertAndSend("routingExchange","thirdRouting"," third routing message");
return "ok";
}
@RabbitListener(queues = "routingFirstQueue")
public void routingFirstListener(String message) {
System.out.println("【routing first】" + message);
}
@RabbitListener(queues = "routingSecondQueue")
public void routingSecondListener(String message) {
System.out.println("【routing second】" + message);
}
@RabbitListener(queues = "routingThirdQueue")
public void routingThirdListener(String message) {
System.out.println("【routing third】" + message);
}
輸出:
【routing first】first routing message
【routing second】second routing message
【routing third】third routing message
分析:
rabbitTemplate.convertAndSend("routingExchange","firstRouting"," first routing message");
訊息從生產者指定
firstRouting
路由鍵,找到對應的繫結佇列routingFirstQueue
,就被routingFirstQueue
佇列消費了。
基於某個主題接收訊息
路由模式
傳送的訊息,是需要指定固定的routingKey
,如果想要針對一類路由。
比如:
.com
結尾的訊息。www.
開頭的訊息。主題模式
就派上場了,路由模式
和主題模式
類似,路由模式
是設定特定的routingKey
繫結唯一的佇列,而主題模式
的是使用萬用字元
匹配一個或者多個
佇列。
@Bean
public Queue topicFirstQueue() {
return new Queue("topicFirstQueue");
}
@Bean
public Queue topicSecondQueue() {
return new Queue("topicSecondQueue");
}
@Bean
public Queue topicThirdQueue() {
return new Queue("topicThirdQueue");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
萬用字元
繫結交換機和交換機:@Bean
public Binding topicFirstBind() {
// .com 為結尾
return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with("*.com");
}
@Bean
public Binding topicSecondBind() {
// www.為開頭
return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()).with("www.#");
}
萬用字元
有兩種,*
和#
,
*
表示可以匹配一個
。#
表示可以匹配多個
。比如:
#.com
表示接收多個
以.com
結尾的欄位。
taobao.com
、www.taobao.com
、www.jd.com
。*.com
表示接收一個
以.com
結尾的欄位。
taobao.com
、jd.com
。www.taobao.com
、cn.taobao.com
。www.#
可以匹配多個
以www
開頭的欄位。
www.taobao
、www.jd
。www.*
可以匹配一個
以www
開頭的欄位。
www.taobao
、www.jd
。www.taobao.com
、www.jd.com
。生產訊息:
@GetMapping("/topic-first-send")
public String topicFirstSend() {
rabbitTemplate.convertAndSend("topicExchange","www.taobao.com","www.taobao.com");
rabbitTemplate.convertAndSend("topicExchange","taobao.com","taobao.com");
rabbitTemplate.convertAndSend("topicExchange","www.jd","www.jd");
return "topic ok";
}
@RabbitListener(queues = "topicFirstQueue")
public void topicFirstListener(String message) {
System.out.println("【topic first】" + message);
}
@RabbitListener(queues = "topicSecondQueue")
public void topicSecondListener(String message) {
System.out.println("【topic second】" + message);
}
【topic second】www.taobao.com
【topic first】taobao.com
【topic second】www.jd
www.#
可以匹配多個以www.
開頭的路由鍵,例如www.taobao.com
、www.jd
。而*.com
只能匹配一個以.com
結尾的路由鍵,例如taobao.com
,而無法匹配www.taobao.com
。
訊息有返回值
PRC
模式和上面的幾種模式唯一不同的點在於,該模式可以收到消費端的返回值
。@RabbitListener(queuesToDeclare =@Queue("rpcQueue"))
public String rpcListener(String message) {
System.out.println("【rpc接收訊息】" + message);
return "rpc 返回" + message;
}
@GetMapping("/rpc-send")
public void rpcSend() {
Object receive = rabbitTemplate.convertSendAndReceive("rpcQueue","rpc send message");
System.out.println("【傳送訊息訊息】" + receive);
}
【rpc接收訊息】rpc send message
【傳送端接收訊息】rpc 返回rpc send message
上面的 訂閱釋出模式
、路由模式
以及主題模式
使用到了不同的交換機,分別是:
直連交換機
被應用在路由模式
下,該交換機需要通過特定的routingKey
來繫結佇列,交換機只有接收到了匹配的routingKey
才會將訊息轉發到對應的佇列中,否則就不會轉發訊息。
路由模式
使用直連交換機
,該模式下根據routingKey
繫結特定的佇列。
扇形交換機
沒有路由鍵的概念,只需將佇列繫結在交換機上,傳送到交換機上的訊息會轉發到交換機所以繫結的佇列裡面,類似廣播,只要開啟收音機都能接收到廣播訊息。扇形交換機
應用於釋出訂閱模式
。
主題模式
是將路由鍵根據一個主題進行分類,和直連模式
不同的是,直連模式
繫結特定
的路由鍵,而主題模式
使用萬用字元繫結路由鍵,繫結鍵有兩種:
*
表示可以匹配僅一個
。#
表示可以匹配零個或多個
。整合SpringBoot
實現RabbitMQ
六種工作模式,並詳細講解RabbitMQ
六種工作模式:
routingKey
即可。