我們在微服務中一個命令會逐漸呼叫各個微服務,但如果一一呼叫不僅需要微服務實時同步互動還會浪費效率
所以我們通常會採用MQ,也就是訊息佇列Message Queue來處理這個問題
下面我們會通過幾個方法介紹訊息佇列:
首先我們先來介紹訊息佇列的各個資訊
首先我們需要先去了解同步通訊:
我們給出一個同步通訊的簡單例子:
我們對上圖進行簡單解釋:
/*
使用者使用支付服務,支付服務需要經過一系列操作之後才能返回結果給使用者
具體服務:支付服務->訂單服務->倉儲服務->簡訊服務->...
*/
// 那麼就會存在以下問題:
// 1.假設我們每個服務耗時1s,那麼多個服務累計在一起,耗時逐漸增多使用者得到結果的速度會變慢
// 2.如果我們需要新增新的服務,那麼我們需要在原函數中新增該服務的呼叫方法,會修改原有程式碼,導致修改困難
// 3.並且當前面的操作進行過程中,後面的操作手中仍存有該流程的資源無法釋放,導致資源損耗需要噹噹前服務結束後才可釋放
// 4.最可怕的是,當其中有一個服務出現錯誤,那麼整條服務鏈就會出現錯誤,導致後面的服務無法執行,導致使用者無法得到結果!!!
我們可以很明顯的感覺到同步通訊的優點:
但是缺點也非常的多:
我們同樣給出非同步通訊的概念:
那麼非同步通訊的優點其實很明顯:
但是缺點同樣明顯:
我們來認識一下市面上常見的訊息佇列:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社群 | Rabbit | Apache | 阿里 | Apache |
開發語言 | Erlang | Java | Java | Scala&Java |
協定支援 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協定 | 自定義協定 |
可用性 | 高 | 一般 | 高 | 高 |
單機吞吐量 | 一般 | 差 | 高 | 非常高 |
訊息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內 |
訊息可靠性 | 高 | 一般 | 高 | 一般 |
我們給出一些訊息佇列選擇的建議:
我們主要去學習RabbitMQ的基本使用
我們如果要去使用RabbitMQ,首先需要先進行外掛安裝:
# docker拉取映象(docker在之前的文章中已經介紹過了~)
docker pull rabbitmq:3-management
docker run \ # docker啟動容器
-e RABBITMQ_DEFAULT_USER=root \ # 設定環境:mq使用者名稱
-e RABBITMQ_DEFAULT_PASS=123321 \ # 設定環境:mq密碼
--name mq \ # mq名稱
--hostname mq1 \ # mq主機名(單機部署可以省略,叢集部署需要)
-p 15672:15672 \ # 開放埠號:管理平臺埠,ui介面
-p 5672:5672 \ # 開放埠號:訊息佇列埠,作為Broker的核心埠
-d \
rabbitmq:3-management
首先我們需要知道最基本的訊息佇列模型:
他們的用途分別是:
其基本流程圖為:
那麼下面我們就來完成一個基本的RabbitMQ的小專案(只需瞭解):
/*
釋出者
具體邏輯為:建立連線->建立Channel->宣告佇列->傳送訊息->關閉連線和channel
*/
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立連線
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設定連線引數,分別是:主機名、埠號、vhost、使用者名稱、密碼
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立連線
Connection connection = factory.newConnection();
// 2.建立通道Channel
Channel channel = connection.createChannel();
// 3.建立佇列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.傳送訊息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("傳送訊息成功:【" + message + "】");
// 5.關閉通道和連線
channel.close();
connection.close();
}
}
/*
訂閱者
具體邏輯為:建立連線->建立Channel->宣告佇列->訂閱訊息
*/
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立連線
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設定連線引數,分別是:主機名、埠號、vhost、使用者名稱、密碼
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立連線
Connection connection = factory.newConnection();
// 2.建立通道Channel
Channel channel = connection.createChannel();
// 3.建立佇列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.訂閱訊息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.處理訊息
String message = new String(body);
System.out.println("接收到訊息:【" + message + "】");
}
});
System.out.println("等待接收訊息。。。。");
}
}
到這裡我們已經基本瞭解了RabbitMQ的使用,讓我們進入下一章節!
SpringAMQP是針對MQ的API更新,也就是使用簡單的API去完成上述複雜的RabbitMQ使用過程
在正式接收SpringAMQP之前,我們需要先去了解一下RabbitMQ的五種常見訊息模型:
首先我們需要去了解AMQP:
那麼我們再去了解SpringAMQP:
其實簡單來說SpringAMQP為我們提供了三個功能:
我們利用SpringAMQP來實現簡單訊息佇列:
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# 應當在Publisher釋出者和Consumer訂閱者兩個子工程下均設定地址
spring:
rabbitmq:
host: 192.168.150.101 # 主機名
port: 5672 # 埠
virtual-host: / # 虛擬主機
username: itcast # 使用者名稱
password: 123321 # 密碼
// 注意:在Publisher工程下的test模組下書寫該傳送訊息的test程式碼
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 佇列名稱
String queueName = "simple.queue";
// 訊息
String message = "hello, spring amqp!";
// 傳送訊息
rabbitTemplate.convertAndSend(queueName, message);
}
}
// 注意:在Consumer訂閱者下的Listener檔案(自己建立)下建立該監聽類,需設定為Bean
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// 能夠被Spring掃描到
@Component
public class SpringRabbitListener {
// 核心點:監聽simple.queue佇列
@RabbitListener(queues = "simple.queue")
// 釋出者釋出什麼型別,監聽者就接收什麼型別並做出對應處理
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消費者接收到訊息:【" + msg + "】");
}
}
我們先來簡單介紹一下工作訊息佇列:
我們來使用SpringAMQP來實現工作訊息佇列:
/**
* workQueue
* 向佇列中不停傳送訊息,模擬訊息堆積。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 佇列名稱
String queueName = "simple.queue";
// 訊息
String message = "hello, message_";
// 我們這裡模擬傳送了五十條訊息,平均每20ms傳送一條
for (int i = 0; i < 50; i++) {
// 傳送訊息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
// 第一個訂閱者平均每20ms獲得一個訊息
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消費者1接收到訊息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
// 第二個訂閱者平均每200ms獲得一個訊息
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消費者2........接收到訊息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
/*
但是由於兩個訂閱者均未設定閾值
所以他們並不會在結束後才去拿去訊息
而是依次去獲取訊息,也就意味著不管他們何時結束自己的訊息,他們都平分獲取25條訊息
20ms訂閱者1拿訊息並處理,40ms訂閱者2拿訊息並處理,60ms訂閱者1拿訊息並處理,80ms訂閱者2拿到訊息但並不能處理,依次迴圈
結論:
- 兩者均拿到25條訊息
- 訂閱者1在980ms時結束所有的訊息獲取,並結束所有訊息處理
- 訂閱者2在1000ms時結束所有的訊息獲取,但是還需要在5000ms(大概哈)才能完全處理訊息
*/
spring:
rabbitmq:
listener:
simple: # 佇列名稱
prefetch: 1 # 每次只能獲取一條訊息,處理完成才能獲取下一個訊息
我們首先來詳細介紹一下發布訂閱(廣播)的結構:
我們同樣採用SpringAQMP來實現釋出訂閱廣播:
@Test
public void testFanoutExchange() {
// 佇列名稱
String exchangeName = "qiuluo.fanout";
// 訊息
String message = "hello, everyone!";
// 第一個引數是交換機名稱,因為目前的publisher只能傳送資訊給交換機,由交換機來決定傳遞給哪個訊息佇列
// 第二個引數是key值選擇,我們會在後面用到
// 第三個引數是所傳遞的資訊
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
// 和之前一樣,Consumer從訊息佇列那裡獲取資訊
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消費者1接收到Fanout訊息:【" + msg + "】");
}
// 和之前一樣,Consumer從訊息佇列那裡獲取資訊
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消費者2接收到Fanout訊息:【" + msg + "】");
}
// 在consumer中建立一個類,宣告佇列和交換機
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
/**
* 宣告交換機
* @return Fanout型別交換機
*/
@Bean
public FanoutExchange fanoutExchange(){
// 採用@Bean的形式將其設定為Bean
return new FanoutExchange("itcast.fanout");
}
/**
* 第1個佇列
*/
@Bean
public Queue fanoutQueue1(){
// 採用@Bean的形式將其設定為Bean
return new Queue("fanout.queue1");
}
/**
* 繫結佇列和交換機
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
// 採用BindingBuilder的bind,to方法進行交換機與佇列的繫結即可(固定形式)
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2個佇列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 繫結佇列和交換機
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
我們同樣來簡單介紹一下發布訂閱路由:
我們下面採用SpringAMQP的註解宣告方式來實現釋出訂閱路由:
@Test
public void testSendDirectExchange() {
// 交換機名稱
String exchangeName = "qiuluo.direct";
// 訊息
String message = "紅色警報!";
// 傳送訊息
// 這裡就用到了第二個引數,就是key值
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
// 採用@RabbitListener註解的bindings引數,在裡面需要表明value(佇列名稱),exchange(交換機名稱),key(佇列的key值)
// 其內部的資料都需要採用@註解來給出
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "qiuluo.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消費者接收到direct.queue1的訊息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "qiuluo.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消費者接收到direct.queue2的訊息:【" + msg + "】");
}
我們同樣來簡單介紹一下發布訂閱路由:
我們同樣採用SpringAMQP來給出一個釋出訂閱主題的案例:
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交換機名稱
String exchangeName = "qiuluo.topic";
// 訊息
String message = "喜報!勝!";
// 傳送訊息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
// 這裡僅僅對exchange的type型別進行更改,並且更改了key值
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "qiuluo.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消費者接收到topic.queue1的訊息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "qiuluo.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消費者接收到topic.queue2的訊息:【" + msg + "】");
}
最後我們介紹一個簡單的知識點:
由於我們的RabbitMQ在儲存資訊時會進行序列化處理,而預設的Spring序列化處理是JDK序列化處理
而JDK序列化處理存在有多種缺點:資料體積大,存在安全漏洞,可讀性差等
所以我們在正常使用時通常會去更換預設訊息轉換器,採用JSON訊息轉換器:
<!--在publisher和consumer兩個服務中都引入依賴-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
這篇文章中介紹了訊息佇列的內容並詳細介紹了RabbitMQ以及SpringAMQP,希望能為你帶來幫助
該文章屬於學習內容,具體參考B站黑馬程式設計師的微服務課程
這裡附上視訊連結:01-今日課程介紹4_嗶哩嗶哩_bilibili