SpringBoot學習筆記-10:第十章-SpringBoot 與訊息

2020-08-10 09:53:24

第十章-SpringBoot 與訊息

JMS&AMQP 簡介

訊息服務中介軟體可以提升系統非同步通訊,擴充套件解耦能力

兩個重要概念:

  • 訊息代理 message broker
  • 目的地 destination
    • 佇列 queue :
      • 對等訊息通訊 point-to-point
      • 唯一的發送者和接收者
    • 主體 topic
      • 發佈 publish/訂閱 subscribe 訊息通訊
      • 多接收者
協定 JMS AMQP
英文 Java Message Service Java Advanced Message Queuing Protocol
中文 訊息服務 高階訊息佇列協定
實現 ActiveMQ、HornetMQ RabbitMQ
定義 JAVA API 網路線級協定
跨語言
跨平臺
訊息模型 peer-2-peer、Pub/Sub 5 種
支援訊息型別 多訊息型別 byte[]
支援 spring-jms spring-rabbit
發送訊息 @JmsTemplate @RabbitTemplate
監聽訊息 @JmsListener @RabbitListener
開啓支援 @EnableJms @EnableRabbit
自動設定 JmsAutoConfiguration RabbitAutoConfiguration

RabbitMQ 基本概念簡介

https://www.rabbitmq.com/

RabbitMQ 是由 erlang 開發的 AMQP(Advanced Message Queuing Protocol) 實現

核心概念

  • Message 訊息(訊息頭+訊息體)
  • Publisher 訊息發佈者
  • Exchange 交換器 4 種類型
    • direct(預設)
    • fanout
    • topic
    • headers
  • Queue 訊息佇列
  • Binding 系結
  • Connection 網路連線
  • Channel 通道
  • Consumer 消費者
  • Virtual Host 虛擬主機 預設/
  • Broker 訊息佇列伺服器實體

[外連圖片轉存失敗,源站可能有防盜鏈機制 機製,建議將圖片儲存下來直接上傳(img-qul9BCQd-1597024380528)(img/rabbitmq.jpg)]

RabbitMQ 執行機制 機製

生產者把訊息發佈到 Exchange 上,
訊息最終達到佇列並被消費者接收,
而 Binding 決定交換器的訊息應該發送到哪個佇列

[外連圖片轉存失敗,源站可能有防盜鏈機制 機製,建議將圖片儲存下來直接上傳(img-8gnBrRxr-1597024380530)(img/rabbitmq-2.jpg)]

RabbitMQ 安裝測試

安裝啓動 RabbitMQ

# 安裝帶有管理介面
docker pull rabbitmq:management

# 用戶端:5672 管理介面:15672
docker run -d -p 5672:5672 -p 15672:15672  --name myrabbitmq rabbitmq:management

管理介面
http://localhost:15672/

賬號:guest
密碼:guest

RabbitTemplate 發送接受訊息&序列化機制 機製

引入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

自動設定類:RabbitAutoConfiguration
設定:RabbitProperties
給 Rabbit 發送和接收訊息:RabbitTemplate
系統管理元件:AmqpAdmin

RabbitMQ 中新建:

queue: message
exchange: exchange.message
Routing key : message

自定義物件序列化規則

package com.example.demo.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyAMQPConfig {

    // 以json的方式序列化物件
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

點對點發送訊息測試

package com.example.demo;


import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 單播:發送數據
    @Test
    public void testSendRabbitMQ() {
        Map<String, Object> map = new HashMap<>();
        map.put("name", "Tom");
        map.put("age", 23);

        rabbitTemplate.convertAndSend("exchange.message", "message", map);
    }

    // 單播:接收數據
    @Test
    public void testReceiveRabbitMQ() {
        Object obj = rabbitTemplate.receiveAndConvert("message");
        System.out.println(obj);
        // {name=Tom, age=23}
    }
}

@RabbitListener&@EnableRabbit

開啓 RabbitMQ

package com.example.demo;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
@EnableRabbit
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

監聽佇列數據

package com.example.demo.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
public class MessageService {
    // 接收訊息數據
    @RabbitListener(queues={"message"})
    public void listenMessage(Map<String, Object> map){
        System.out.println("收到訊息: " + map);
    }

    // 接收訊息頭和訊息體
    @RabbitListener(queues={"message"})
    public void receiveMessage(Message message){
        System.out.println("收到訊息: " + message.getMessageProperties());
        System.out.println("收到訊息: " + message.getBody());
    }
}

AmqpAdmin 管理元件的使用

使用範例

package com.example.demo;


import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
public class RabbitMQTest {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange(){
        // 建立交換器
        amqpAdmin.declareExchange(new DirectExchange("exchange.admin"));

        // 建立queque
        amqpAdmin.declareQueue(new Queue("queue.admin"));

        // 建立系結規則
        amqpAdmin.declareBinding(new Binding("queue.admin", Binding.DestinationType.QUEUE, "exchange.admin", "admin", null));

        System.out.println("建立成功");
    }
}