springBoot整合Rabbitmq

2020-08-11 15:28:02

好久沒用過rabbitmq了,方便記憶,想着整理整理.

在这里插入图片描述

1.安裝rabbitmq

我選擇的環境爲docker,還不瞭解docker的小夥伴建議去看看,非常贊哦.

1.安裝,我這裏選擇安裝management版的,management自帶web操作頁面的版本.

docker pull rabbitmq:3.7.15-management

2.啓動:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 
-v `$pwd`/data:/var/lib/rabbitmq --hostname myrabbit 
-e RABBITMQ_DEFAULT_USER=guest 
-e RABBITMQ_DEFAULT_PASS=guest 
rabbitmq:3.7.14-management 

3 .測試是否啓動成功.

docker ps //看到服務說明啓動成功
localhost:15672 //瀏覽器存取	

在这里插入图片描述
至此,rabbitmq安裝成功.

2.訊息服務基本介紹

簡單介紹一下訊息服務,加深印象.

爲什麼要使用訊息服務呢?
大多應用中,可以通過訊息中介軟體來提高系統的非同步通訊能力,解耦能力.

應用場景:
非同步處理 , 解耦 , 流量削峯

訊息服務中兩個重要的概念.
1.訊息代理(message broker)
2.目的地(destination)
當訊息發送者發送訊息後,將由訊息代理接管.訊息代理就保證我們的訊息到達指定的目的地.

訊息佇列主要有兩種形式的目的地:

1.佇列(queue):對等式的訊息通訊
2.主題(topic): 發佈/訂閱 式的訊息通訊

對等:
一對一,訊息發佈者發佈訊息後,訊息代理將其放到指定佇列中,訊息接收者
從佇列中獲取訊息內容,訊息被訊息接收者讀取之後將會被移除佇列.
訊息發佈者和接受者都是唯一的(指同一條訊息只會被一個訊息接收者接收)

發佈訂閱式:
訊息發佈者 發送訊息到主題,多個訊息接收者監聽這個主題,同時接收到達主題的訊息.

訊息服務分爲JMS和AMQP兩種:

JMS(Java Message Service):
基於JVM訊息代理的規範.如ActiveMQ是JMS的實現.
受java語言的限制,只允許在java語言中使用.

AMQP(Advanced Message Queuing Protocol)
高階訊息佇列協定,也是訊息代理的一個規範,相容JMS
我們本文所要描述的rabbitmq是AMQP的實現.

在这里插入图片描述

3.Rabbitmq基本介紹

RabbitMQ簡介:

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現

核心概念:

Message:
訊息,由訊息頭和訊息體組成. 訊息體是不透明的,而訊息頭則是由一系列的可選屬性組成的,這些屬性包括 routing-key (路由鍵) , priority (相對於其他訊息的優先權) ,delivery-mode(指出該訊息是否需要持久儲存)等.

Publisher:
訊息生產者,也是向交換器發佈訊息的用戶端應用程式.

Exchange:
交換器,用來接收訊息生產者發送的訊息,並將這些訊息路由給伺服器中的佇列.
Exchange有4中型別 : direct(預設) , fanout , topic , headers , 不同類型的交換器轉發訊息具有不同的策略.

Queue:
訊息佇列,用來儲存訊息知道發送給消費者 , 是訊息的容器 , 也是訊息的墳墓 ,
訊息會一直存放在訊息佇列中,知道消費者連線到這個佇列將其取走.

Binding:
系結,用於訊息佇列和交換器之間的關聯, 一個系結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交換器理解爲一個由系結構成的路由表.Exchange和Queue的系結是可以多對多的關係.

Connection:
網路連線.如tcp連線

Channel:
通道 , 多路複用連線中的一條獨立的雙向數據通道.通道是建立在真實的TCP連線內的虛擬連線,AMQP命令都是通過通道發出去的, 不管是發佈訊息 ,訂閱佇列還是接收訊息 , 這些動作都是通過通道完成的. 因爲對操作系統來說,建立和銷燬tcp都是非常昂貴的開銷,所以引入了通道的概念 , 以複用一條TCP連線.

Consumer:
訊息消費者, 表示一個從訊息佇列中獲得訊息的用戶端應用程式.

Virtual Host:
虛擬主機, 表示一批交換器 , 訊息佇列 和相關物件. 虛擬主機是共用相同的身份認證和加密環境的獨立伺服器域. 每個 虛擬主機本質上就是一個mini版的Rabbitmq伺服器, 擁有自己的佇列,交換器 , 系結 和 許可權機制 機製 . 虛擬主機 是AMQP概唸的基礎 , 必須在連線時指定 , RabbitMq預設的vhost 是 /

Broker:
表示訊息佇列伺服器的實體

在这里插入图片描述
在这里插入图片描述

4.rabbitmq中Exchange型別

Exchange分發訊息時根據型別的不同分發策略有所區別,目前共四種類型:
1.direct
2.fanout
3.topic
4.headers(headers匹配AMQP訊息的header而不是路由鍵,與direct完全一致,且效能較差,基本使用不到)

Direct Exchange:
訊息中的路由鍵(routing key)如果和Binding中的binding key 完全一致,交換器就將訊息發送到對應的訊息佇列中.它是完全匹配,單播的模式.

Fanout Exchange:
每個發到fanoutExchange 型別交換器的訊息都會分到交換器所系結的所有訊息佇列上.fanout交換器不會處理路由鍵,只是簡單地將佇列系結到交換器上.每個發送到交換器的訊息都會被髮送到與之系結的所有佇列上.廣播模式,轉發訊息是最快的.

Topic Exchange:
topic交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要系結到一個模式上.它將路由鍵和系結建的字串切分成單詞,這些單詞之間用點隔開.
簡言之就是匹配模式

4.springboot與rabbitmq的整合

整合大致分爲這幾步:
1.引入spring-boot-starter-amqp maven依賴
2.使用application.yml 進行參數設定
3.設定交換器與路由系結
4.訊息發佈與接收

在这里插入图片描述
1.引入依賴

<dependencies>
        <!--   web啓動器     -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--   rabbitmq所需依賴     -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--   單元測試     -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

2.設定yml

spring:
  application:
    name: rabbitmq-provider
  rabbitmq:
    #使用者名稱
    username: admin
    #密碼
    password: admin
    #rabbitmq的伺服器端口
    port: 5672
    #ip
    host: localhost
    #vhost路徑
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual
server:
  port: 8083

3 .設定路由,交換器並系結

@Configuration
public class DirectRabbitmqConfig {

    /** 建立Queue01佇列*/
    @Bean
    public Queue getQueue01(){
        //參數1 佇列名
        // 參數2 是否持久化
        // 參數3 是否只能被當前建立的連線使用,並在連線關閉後銷燬佇列
        // 參數4 是否在佇列爲空後自動銷燬
        //參數五5 其他設定資訊
        return new Queue("Queue01",true);
    }
    
    /** 建立Direct型別的交換器*/
    @Bean
    public DirectExchange getDirectExchange01(){
        return new DirectExchange("DirectExchange01",true,true);
    }
    
    /** 將路由與交換器系結*/
    @Bean
    public Binding bingQueueWithDirectExchange01(){
        //將路由系結到交換器並設定匹配鍵
        return BindingBuilder.bind(getQueue01()).to(getDirectExchange01()).with("DirectRouting01");
    }
}

4.訊息的發佈與接收
最後階段,讓我們編寫介面進行訊息的發佈與接收.

@RestController
public class RabbitController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /** 使用rabbitTemplate 的send方法發送訊息*/
    @GetMapping("/sendMessage/{number}")
    public ResponseEntity<String> sendMessage(@PathVariable("number") String number){
        //指定訊息型別
        MessageProperties build = MessagePropertiesBuilder.newInstance().setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build();
        //發送
        rabbitTemplate.send("TestExchange1","TestQueue1",new Message(number.getBytes(StandardCharsets.UTF_8),build));
        return ResponseEntity.ok("發送成功:"+number);
    }

    /** 使用rabbitTemplate 的convertAndSend方法發送訊息*/
    @GetMapping("/convertAndSendMessage/{number}")
    public ResponseEntity<String> convertAndSendMessage(@PathVariable("number") String number){
        rabbitTemplate.convertAndSend("TestExchange1","TestQueue1",number);
        return ResponseEntity.ok("發送成功:"+number);
    }
}

啓動服務,存取服務路徑
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
看到訊息佇列中已經就收到了訊息.訊息推播成功.
下面 下麪我們就編寫程式碼去接收訊息
建立監聽類,監聽指定佇列

@Component
@RabbitListener(queues = {"Queue01"})
public class MyRabbitListener {
    @RabbitHandler
    public void accept(String message){
        System.out.println("接收到訊息:"+message);
    }
}

再次重新啓動服務
在这里插入图片描述
訊息接收成功.
至此,rabbitmq與springboot的整合完成,其他型別exchange的api操作便不再多講,小夥伴們自行研究.

在这里插入图片描述