SpringBoot整合RabbitMQ實現六種工作模式

2022-07-28 12:04:30

RabbitMQ主要有六種工作模式,本文整合SpringBoot分別介紹工作模式的實現。

前提概念

生產者

訊息生產者或者傳送者,使用P表示:

佇列

訊息從生產端傳送到消費端,一定要通過佇列轉發,使用queue_name表示:

消費者

消費的消費者或者接收者,使用C表示,如果有多個消費者也可以用C1C2表示:

SpringBoot整合RabbitMQ基本設定

  1. 新增maven依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.1.RELEASE</version>
</dependency>
  1. 新增application.yml 設定
spring:
  rabbitmq:
    host: 192.168.3.19
    port: 5672
    username: admin
    password: 123456
  1. 訊息生產

生產端傳送訊息,呼叫RabbitTemplate傳送訊息,比如:

@Autowired
private RabbitTemplate rabbitTemplate;

public String send() {
  rabbitTemplate.convertAndSend("routingKey","send message");
}
  1. 消費訊息

消費訊息使用佇列監聽註解@RabbitListener,新增佇列名稱就能消費傳送到佇列上的訊息了:

@RabbitListener(queuesToDeclare = @Queue("queue_name"))
public void consume(String message) {
  // 接收訊息
}

1. 簡單(simple)模式

最簡單的訊息傳送

特點

  • 生產者是消費者是一一對應,也叫做對等模式,生產者傳送訊息經過佇列直接傳送給消費者。
  • 生產者和消費者在傳送和接收訊息時,只需要指定佇列名稱,而不需要指定Exchange 交換機。

程式碼範例

生產訊息:

@GetMapping("/simple-send")
public String simpleSend() {
  rabbitTemplate.convertAndSend("simple","this is news");
  return "ok";
}

消費訊息

@RabbitListener(queuesToDeclare = @Queue("simple"))
public void consume(String message) {
  System.out.println(message);
}

輸出:

this is news

無需建立交換機和繫結佇列,只需要匹配傳送端和消費端的佇列名稱就能成功傳送訊息。

2. 工作模式

在多個消費者之間分配任務

特點

  • 工作模式簡單模式差不多,只需要生產端、消費端、佇列。
  • 不同在於一個生產者、一個佇列對應多個消費者,也就是一對多的關係。
  • 在多個消費者之間分配訊息(競爭消費者模式),類似輪詢傳送訊息,每個訊息都只發給一個消費者。

程式碼範例

  • 生產訊息:
@GetMapping("/work-send")
public String simpleSend() {
  rabbitTemplate.convertAndSend("work","this is news");
  return "ok";
}
  • 消費訊息:
@RabbitListener(queuesToDeclare = @Queue("work"))
public void consume(String message) {
  System.out.println("first:" + message);
}

@RabbitListener(queuesToDeclare = @Queue("work"))
public void consumeSecond(String message) {
  System.out.println("second:" + message);
}

建立一個生產者,兩個消費者,傳送兩條訊息,兩個消費者分別接收到訊息,輸出:

first:this is news
second:this is news

兩個消費者,輪流消費訊息。類似nginx負載均衡

3. 釋出訂閱模式

一次向多個消費者傳送訊息

特點

  • 釋出訂閱類似廣播訊息,每個訊息可以同時傳送給訂閱該訊息的消費者,
  • 上圖中的X表示交換機,使用的扇形交換機(fanout),它將傳送的訊息傳送到所有繫結交換機的佇列。

程式碼範例

  • 建立佇列、交換機以及繫結:
@Bean
public FanoutExchange fanoutExchange() {
  return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");
}

@Bean
public Queue psFirstQueue() {
  return new Queue("psFirstQueue");
}

@Bean
public Queue psSecondQueue() {
  return new Queue("psSecondQueue");
}

@Bean
public Queue psThirdQueue() {
  return new Queue("psThirdQueue");
}

@Bean
public Binding routingFirstBinding() {
  return BindingBuilder.bind(psFirstQueue()).to(fanoutExchange());
}

@Bean
public Binding routingSecondBinding() {
  return BindingBuilder.bind(psSecondQueue()).to(fanoutExchange());
}

@Bean
public Binding routingThirdBinding() {
  return BindingBuilder.bind(psThirdQueue()).to(fanoutExchange());
}
  • 上面定義一個交換機fanoutExchange
  • 分別繫結三個佇列psFirstQueuepsSecondQueuepsThirdQueue
  • 佇列繫結交換機不需要routingKey,直接繫結即可。

  • 生產端:
@GetMapping("/publish-sub-send")
public String publishSubSend() {
  rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, "publish/subscribe hello");
  return "ok";
}

無需指定routingKey,設定為null

  • 消費端:
@RabbitListener(queues = "psFirstQueue")
public void pubsubQueueFirst(String message) {
  System.out.println("【first】:" + message);
}

@RabbitListener(queues = "psSecondQueue")
public void pubsubQueueSecond(String message) {
  System.out.println("【second】:" + message);
}

@RabbitListener(queues = "psThirdQueue")
public void pubsubQueueThird(String message) {
  System.out.println("【third】:" + message);
}
  • 輸出:
【first】: publish/subscribe hello
【second】: publish/subscribe hello
【third】: publish/subscribe hello

傳送一條訊息,繫結的佇列都能接收到訊息。

4. 路由模式

根據routingKey有選擇性的接收訊息

特點

  • 每個佇列根據不同routingKey繫結交換機
  • 訊息傳送到交換機後通過routingKey傳送給特定的佇列,然後傳到消費者消費。
  • 交換由扇形交換機(fanout)改成直連交換機(direct)。

程式碼範例

  • 建立佇列、交換機以及繫結:
@Bean
public Queue routingFirstQueue() {
    return new Queue("routingFirstQueue");
}

@Bean
public Queue routingSecondQueue() {
    return new Queue("routingSecondQueue");
}

@Bean
public Queue routingThirdQueue() {
    return new Queue("routingThirdQueue");
}

@Bean
public DirectExchange routingExchange() {
    return new DirectExchange("routingExchange");
}

@Bean
public Binding routingFirstBind() {
    return BindingBuilder.bind(routingFirstQueue()).to(routingExchange()).with("firstRouting");
}

@Bean
public Binding routingSecondBind() {
    return BindingBuilder.bind(routingSecondQueue()).to(routingExchange()).with("secondRouting");
}

@Bean
public Binding routingThirdBind() {
    return BindingBuilder.bind(routingThirdQueue()).to(routingExchange()).with("thirdRouting");
}
  • 建立一個交換機,根據不同的路由規則匹配不同的佇列routingExchange,根據不同的routingKey繫結不同的佇列:
  • firstRouting路由鍵繫結routingFirstQueue佇列。
  • secondRouting路由鍵繫結routingSecondQueue佇列。
  • thirdRouting路由鍵繫結routingThirdQueue佇列。

  • 生產訊息:
@GetMapping("/routing-first")
public String routingFirst() {
    // 使用不同的routingKey 轉發到不同的佇列
    rabbitTemplate.convertAndSend("routingExchange","firstRouting"," first routing message");
    rabbitTemplate.convertAndSend("routingExchange","secondRouting"," second routing message");
    rabbitTemplate.convertAndSend("routingExchange","thirdRouting"," third routing message");
    return "ok";
}
  • 消費訊息:
@RabbitListener(queues = "routingFirstQueue")
public void routingFirstListener(String message) {
    System.out.println("【routing first】" + message);
}

@RabbitListener(queues = "routingSecondQueue")
public void routingSecondListener(String message) {
    System.out.println("【routing second】" + message);
}

@RabbitListener(queues = "routingThirdQueue")
public void routingThirdListener(String message) {
    System.out.println("【routing third】" + message);
}

輸出:

【routing first】first routing message
【routing second】second routing message
【routing third】third routing message

分析:

rabbitTemplate.convertAndSend("routingExchange","firstRouting"," first routing message");

訊息從生產者指定firstRouting路由鍵,找到對應的繫結佇列routingFirstQueue,就被routingFirstQueue佇列消費了。

5. 主題模式

基於某個主題接收訊息

特點

路由模式傳送的訊息,是需要指定固定的routingKey,如果想要針對一類路由。
比如:

  • 只接收以.com 結尾的訊息。
  • www.開頭的訊息。

主題模式就派上場了,路由模式主題模式類似,路由模式是設定特定的routingKey繫結唯一的佇列,而主題模式的是使用萬用字元匹配一個或者多個佇列。

程式碼範例

  • 建立交換機和佇列:
@Bean
public Queue topicFirstQueue() {
    return new Queue("topicFirstQueue");
}

@Bean
public Queue topicSecondQueue() {
    return new Queue("topicSecondQueue");
}

@Bean
public Queue topicThirdQueue() {
    return new Queue("topicThirdQueue");
}

@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topicExchange");
}
  • 使用萬用字元繫結交換機和交換機:
@Bean
public Binding topicFirstBind() {
    // .com 為結尾
    return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with("*.com");
}

@Bean
public Binding topicSecondBind() {
    // www.為開頭
    return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()).with("www.#");
}

萬用字元有兩種,*#,

  • * 表示可以匹配一個
  • # 表示可以匹配多個

比如:

  • #.com表示接收多個.com結尾的欄位。

    • 例如: taobao.comwww.taobao.comwww.jd.com
  • *.com表示接收一個.com結尾的欄位。

    • 例如: taobao.comjd.com
    • 多個欄位是無法匹配的,比如www.taobao.comcn.taobao.com
  • www.#可以匹配多個www開頭的欄位。

    • 例如www.taobaowww.jd
  • www.*可以匹配一個www開頭的欄位。

    • 例如:www.taobaowww.jd
    • 多個欄位是無法匹配的,比如www.taobao.comwww.jd.com
  • 生產訊息:

@GetMapping("/topic-first-send")
public String topicFirstSend() {
    rabbitTemplate.convertAndSend("topicExchange","www.taobao.com","www.taobao.com");
    rabbitTemplate.convertAndSend("topicExchange","taobao.com","taobao.com");
    rabbitTemplate.convertAndSend("topicExchange","www.jd","www.jd");
    return "topic ok";
}
  • 消費訊息:
@RabbitListener(queues = "topicFirstQueue")
public void topicFirstListener(String message) {
    System.out.println("【topic first】" + message);
}

@RabbitListener(queues = "topicSecondQueue")
public void topicSecondListener(String message) {
    System.out.println("【topic second】" + message);
}
  • 輸出:
【topic second】www.taobao.com
【topic first】taobao.com
【topic second】www.jd

www.#可以匹配多個以www.開頭的路由鍵,例如www.taobao.comwww.jd。而*.com只能匹配一個以.com結尾的路由鍵,例如taobao.com,而無法匹配www.taobao.com

6. RPC模式

訊息有返回值

特點

  • PRC模式和上面的幾種模式唯一不同的點在於,該模式可以收到消費端的返回值
  • 生成端接收消費端的返回值。

程式碼範例

  • 消費端新增返回值:
@RabbitListener(queuesToDeclare =@Queue("rpcQueue"))
public String rpcListener(String message) {
  System.out.println("【rpc接收訊息】" + message);
  return "rpc 返回" + message;
}
  • 生產端傳送訊息:
@GetMapping("/rpc-send")
	public void rpcSend() {
		Object receive = rabbitTemplate.convertSendAndReceive("rpcQueue","rpc send message");
		System.out.println("【傳送訊息訊息】" + receive);
	}
  • 輸出:
【rpc接收訊息】rpc send message
【傳送端接收訊息】rpc 返回rpc send message

交換機型別

上面的 訂閱釋出模式路由模式以及主題模式使用到了不同的交換機,分別是:

  • 直連交換機 Direct
  • 扇形交換機 Fanout
  • 主題交換器 Topic

Direct Exchange(直連)

直連交換機被應用在路由模式下,該交換機需要通過特定的routingKey來繫結佇列,交換機只有接收到了匹配的routingKey才會將訊息轉發到對應的佇列中,否則就不會轉發訊息。

路由模式使用直連交換機,該模式下根據routingKey繫結特定的佇列。

Fanout Exchange(扇形)

扇形交換機沒有路由鍵的概念,只需將佇列繫結在交換機上,傳送到交換機上的訊息會轉發到交換機所以繫結的佇列裡面,類似廣播,只要開啟收音機都能接收到廣播訊息。扇形交換機應用於釋出訂閱模式

Topic Exchange(主題)

主題模式是將路由鍵根據一個主題進行分類,和直連模式不同的是,直連模式繫結特定的路由鍵,而主題模式使用萬用字元繫結路由鍵,繫結鍵有兩種:

  • * 表示可以匹配僅一個
  • # 表示可以匹配零個或多個

總結

整合SpringBoot實現RabbitMQ六種工作模式,並詳細講解RabbitMQ六種工作模式:

  • 簡單模式
    • 無需建立交換機,匹配生產端和消費的routingKey即可。
  • 工作模式
    • 多個消費端公平競爭同一個訊息。
  • 釋出訂閱模式
    • 一次向多個消費者傳送訊息。
  • 路由模式
    • 根據特定的路由鍵轉發訊息。
  • 主題模式
    • 根據萬用字元,匹配路由鍵轉發訊息。
  • RPC模式
    • 生產端接收消費端傳送的返回值。

原始碼範例

參考