衡量訊息中介軟體的指標:服務效能,數據儲存,叢集架構
RabbitMQ是一個開源的訊息代理和佇列伺服器,用來通過普通協定在完全不同的應用之間共用數據,
RabbitMQ是使用Erlang語言來編寫的,並且RabbitMQ是基於AMQP協定的。
滴滴、美團、頭條、去哪兒…
AMQP:Advanced Message Queuing Protocol(高階訊息佇列協定)
AMQP定義:是具有現代特徵的二進制協定。是一個提供統一訊息服務的應用層標準高階訊息佇列協定,是應用層協定的一個開放標準,爲訊息導向中介層設計。
核心概念
兩種方式
需要用到的軟體包
注意:erlang 和 RabbitMQ 版本要對應
rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
啓動 / 關閉 RabbitMQ
centos 7
systemctl start rabbitmq-server
lsof -i:5672
systemctl stop rabbitmq-server
預設使用者
guest / guest,只能本地存取
開啓 guest 遠端存取
將 rabbitmq.conf 上傳到 /etc/rabbitmq 目錄下,並修改設定
loopback_users.guest = false
埠號
如果安裝完成後,啓動特別慢,或者Java連線mq非常慢(導致Socket連線異常)
vim /etc/hosts
192.168.229.116 Cloud01
常用命令
# 關閉應用
rabbitmqctl stop_app
# 啓動應用
rabbitmqctl start_app
# 檢視節點狀態
rabbitmqctl status
# 新增使用者
rabbitmqctl add_user username password
# 設定使用者爲管理員
rabbitmqctl set_user_tags username administrator
# 設定使用者許可權
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
# 檢視所有使用者
rabbitmqctl list_users
# 刪除使用者
rabbitmqctl delete_user username
# 清除使用者許可權
rabbitmqctl clear_permissions -p vhostpath username
# 建立虛擬主機
rabbitmqctl add_vhost vhostpath
# 檢視所有的虛擬主機
rabbitmqctl list_vhosts
# 檢視虛擬主機上所有許可權
rabbitmqctl list_permissions -p vhostpath
# 刪除虛擬主機
rabbitmqctl delete_vhost vhostpath
# 檢視所有佇列
rabbitmqctl list_queues
# 清除佇列裡的訊息
rabbitmqctl -p vhostpath purge_queue blue
#================== 高階操作 ====================
# 移除所有數據,要在rabbitmqctl stop_app 之後使用
rabbitmqctl reset
# 組成叢集命令, [--ram] 指定節點的儲存形式
rabbitmqctl join_cluster <clusternode> [--ram]
# 檢視叢集狀態
rabbitmqctl cluster_status
# 修改叢集節點的儲存形式
rabbitmqctl change_cluster_node_type disc | ram
# 忘記節點(摘除節點):叢集失敗或者故障轉移
rabbitmqctl forget_cluster_node [--offline]
# 修改節點名稱
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2] ...
管控台
rabbitmq-plugins enable rabbitmq_management
Exchange:接收訊息,並根據路由鍵轉發訊息所系結的佇列
所有發送到 Direct Exchange 的訊息被轉發到 RoutingKey 中指定的 Queue。
Exchange將RoutingKey和某個Topic進行模糊匹配,此時佇列需要系結一個Topic。
注意:可以使用萬用字元進行模糊匹配
「#」 匹配一個或多個詞
「" 匹配一個詞
「log.#」 能夠匹配到 「log.info.oa」
"log.」 能夠匹配到 「log.erro」
扇型交換機(funout exchange)將訊息路由給系結到它身上的所有佇列,而不理會系結的路由鍵。如果N個佇列系結到某個扇型交換機上,當有訊息發送給此扇型交換機時,交換機會將訊息的拷貝分別發送給這所有的N個佇列。扇型用來交換機處理訊息的廣播路由(broadcast routing)。
預設交換機(defaultexchange)實際上是一個由訊息代理預先宣告好的沒有名字(名字爲空字串)的直連交換機(direct exchange)。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每個新建佇列(queue)都會自動系結到預設交換機上,系結的路由鍵(routing key)名稱與佇列名稱相同。
Confirm機制 機製流程
實現Confirm機制 機製
Return機制 機製流程
實現Return機制 機製
比如發送了100條訊息,最原始的消費方式是通過一個回圈,挨個的獲取訊息
自定義Consumer實現
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
/**
*
* @param consumerTag 內部生成的消費者標籤
* @param envelope 包含屬性:deliveryTag(標籤), redeliver, exchange, routingKey
* redeliver是一個標記,如果設爲true,表示訊息之前可能已經投遞過了,現在是重新投遞訊息到監聽佇列的消費者
* @param properties 訊息屬性
* @param body 訊息內容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("==========Consume Message===========");
// 輸出訊息內容
System.out.println("message: " + new String(body, "UTF-8"));
System.out.println("consumerTag: " + consumerTag);
System.out.println("envelope: " + envelope.getDeliveryTag()
+ "," + envelope.getExchange()
+ "," + envelope.getRoutingKey()
+ "," + envelope.isRedeliver());
System.out.println("properties: " + properties);
}
}
說明:與Spring整合之後,就不需要使用了,而只需要一個 監聽的註解 就可以直接搞定!!!
訊息過載場景
消費端限流機制 機製
RabbitMQ提供了一種Qos (服務品質保證)功能,即在非自動確認訊息的前提下,如果一定數目的訊息 (通過基於consume或者channel設定Qos的值) 未被確認前,不消費新的訊息!
不能設定自動簽收功能(autoAck = false)
如果訊息未被確認,就不會到達消費端,目的就是給消費端減壓
限流設定API
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize和global這兩項,RabbitMQ沒有實現,暫且不研究
prefetchCount在autoAck=false
的情況下生效,即在自動應答的情況下該值無效
手工ACK
void basicAck(Integer deliveryTag,boolean multiple)
呼叫這個方法就會主動回送給Broker一個應答,表示這條訊息我處理完了,你可以給我下一條了。參數multiple表示是否批次簽收,由於我們是一次處理一條訊息,所以設定爲 false。
實現消費端限流
訊息堆積?(生產中的解決方案)
首先是先加大消費端機器,將訊息逐漸的消費完成
是什麼原因導致訊息堆積?
生產端過快
消費端消費過慢
程式上來優化,JVM優化…
重複消費?
冪等消費設計
當我們設定autoACK=false
時,就可以使用手工ACK方式了,其實手工方式包括了手工ACK與NACK
當我們手工 ACK 時,會發送給Broker一個應答,代表訊息處理成功,Broker就可回送響應給生產端。
NACK 則表示訊息處理失敗,如果設定了重回佇列,Broker端就會將沒有成功處理的訊息重新發送。
使用方式
消費端消費時,如果由於業務異常,我們可以手工 NACK 記錄日誌,然後進行補償
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
如果由於伺服器宕機等嚴重問題,我們就需要手工 ACK 保障消費端消費成功
void basicAck(long deliveryTag, boolean multiple)
重回佇列實現
DLX - 死信佇列(Dead-Letter-Exchange)
利用DLX,當訊息在一個佇列中變成死信 (dead message) 之後,它能被重新Publish到另一個Exchange中,這個Exchange就是DLX。
immediate
參數的功能arguments.put("x-dead-letter-exchange","dlx.exchange");
這樣訊息在過期、requeue、 佇列在達到最大長度時,訊息就可以直接路由到死信佇列!
// 業務方法
public void consume () {
boolean ack = true;
try {
// 消費訊息
// 若出現異常
} catch (Exception e) {
ack = false;
}
if (!ack) {
// 訊息變爲死信
// 這條訊息會被自動發佈到 DLX 上
channel.basicNack("", false, false);
} else {
channel.basicAck();
}
}
在實際生產中,很難保障前三點的完全可靠,比如在極端的環境中,生產者發送訊息失敗了,發送端在接受確認應答時突然發生網路閃斷等等情況,很難保障可靠性投遞,所以就需要有第四點完善的訊息補償機制 機製。
使用者對於同一操作發起的一次請求或者多次請求的結果是一致的
在海量訂單產生的業務高峯期,如何避免訊息的重複消費問題?
在業務高峯期最容易產生訊息重複消費問題,當Consumer消費完訊息時,在給Producer返回ack時由於網路中斷,導致Producer未收到確認資訊,該條訊息就會重新發送並被Consumer消費,但實際上該消費者已成功消費了該條訊息,這就造成了重複消費。
消費端實現冪等性,就意味着我們的訊息永遠不會被多次消費,即使我們收到了很多一樣的訊息。
解決訊息重複消費問題
業界主流的解決方案
設定類
@Configuration
public class RabbitConfig {
@Bean
public ConnectionFactory connectionFactory () {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses("192.168.229.111:5672");
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
cachingConnectionFactory.setVirtualHost("/");
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin (ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// autoStartup必須要設定爲true,否則Spring容器不會載入RabbitAdmin類
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
@SpringBootTest
class RabbitmqSpringApplicationTests {
@Autowired
private RabbitAdmin rabbitAdmin;
/**
* 交換機名稱
*/
private final String DIRECT_EXCHANGE = "test_direct_exchange";
private final String TOPIC_EXCHANGE = "test_topic_exchange";
private final String FANOUT_EXCHANGE = "test_fanout_exchange";
/**
* 佇列名稱
*/
private final String DIRECT_EXCHANGE_QUEUE = "test_direct_queue";
private final String TOPIC_EXCHANGE_QUEUE = "test_topic_queue";
private final String FANOUT_EXCHANGE_QUEUE = "test_fanout_queue";
@Test
public void testRabbitAdmin () {
// 宣告三個交換機
rabbitAdmin.declareExchange(new DirectExchange(DIRECT_EXCHANGE, false, false));
rabbitAdmin.declareExchange(new TopicExchange(TOPIC_EXCHANGE, false, false));
rabbitAdmin.declareExchange(new FanoutExchange(FANOUT_EXCHANGE, false, false));
// 宣告三個佇列
rabbitAdmin.declareQueue(new Queue(DIRECT_EXCHANGE_QUEUE, false));
rabbitAdmin.declareQueue(new Queue(TOPIC_EXCHANGE_QUEUE, false));
rabbitAdmin.declareQueue(new Queue(FANOUT_EXCHANGE_QUEUE, false));
// 系結佇列和交換機
rabbitAdmin.declareBinding(new Binding(DIRECT_EXCHANGE_QUEUE, Binding.DestinationType.QUEUE,
DIRECT_EXCHANGE, "direct.#", null));
// 直接建立佇列和交換機,並系結關係
rabbitAdmin.declareBinding(BindingBuilder
// 直接建立佇列
.bind(new Queue(TOPIC_EXCHANGE_QUEUE, false))
// 直接建立交換機
.to(new TopicExchange(TOPIC_EXCHANGE, false,false))
// 指定路由key
.with("topic.#"));
rabbitAdmin.declareBinding(BindingBuilder
// 直接建立佇列
.bind(new Queue(FANOUT_EXCHANGE_QUEUE, false))
// 直接建立交換機
.to(new FanoutExchange(FANOUT_EXCHANGE, false,false)));
}
}
/**
* 宣告一個 Topic Exchange
* @return
*/
@Bean
public TopicExchange topicExchange () {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
/**
* 宣告一個 QueueA
* @return
*/
@Bean
public Queue queueA () {
return new Queue(TOPIC_EXCHANGE_QUEUE_A, true);
}
/**
* 將佇列A系結到交換機
* @return
*/
@Bean
public Binding bindingA () {
return BindingBuilder.bind(queueA()).to(topicExchange()).with(TOPIC_EXCHANGE_QUEUE_A_ROUTING_KEY);
}
RabbitTemplate:訊息模板
RabbitAdmin類可以很好的操作RabbitMQ,在Spring中直接進行注入即可
@Bean
public RabbitTemplate rabbitTemplate (ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發送訊息
*/
@Test
public void testSendMessage () {
// 建立訊息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message msg = new Message("Hello World".getBytes(), messageProperties);
// 發送訊息
rabbitTemplate.send("spring.amqp.topic.exchange", "c.msg", msg);
}
/**
* 簡單訊息發送
*/
@Test
public void testSendMessage2 () {
String msg = "Hello yuan deng ta...";
rabbitTemplate.convertAndSend("spring.amqp.topic.exchange", "b.msg", msg);
}
/**
* 帶屬性訊息發送
*/
@Test
public void testSendMessage3 () {
// 建立訊息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "訊息描述...");
messageProperties.getHeaders().put("type", "自定義訊息型別");
Message msg = new Message("Hello yuan deng ta".getBytes(), messageProperties);
// 發送訊息
rabbitTemplate.convertAndSend("spring.amqp.topic.exchange", "a.msg", msg, message -> {
System.out.println("------新增額外的設定------");
message.getMessageProperties().getHeaders().put("desc", "訊息詳細描述...");
message.getMessageProperties().getHeaders().put("attr", "新增屬性");
return message;
});
}
簡單訊息監聽器
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer (
ConnectionFactory connectionFactory,
Queue queueA, Queue queueB, Queue queueC) {
SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(connectionFactory);
// 監聽多個佇列
simpleMessageListenerContainer.setQueues(queueA, queueB, queueC);
// 當前消費者數量
simpleMessageListenerContainer.setConcurrentConsumers(1);
// 最大消費者數量
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
// 是否重回佇列
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
// 簽收機制 機製
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 消費者標籤
simpleMessageListenerContainer.setConsumerTagStrategy(queue -> {
return queue + "_" + UUID.randomUUID().toString().substring(0,6);
});
// 監聽方式處理訊息
simpleMessageListenerContainer.setMessageListener(message -> {
System.out.println("消費者:" + new String(message.getBody()));
});
return simpleMessageListenerContainer;
}
除了使用監聽方式處理訊息,還可以使用訊息監聽適配器方式來處理訊息
自定義訊息處理委派類
/**
* 自定義訊息委派類 {@link MessageDelegate}
* @author Kevin
*/
public class MessageDelegate {
/**
* handleMessage 預設方法名
* body
* @param body
*/
public void handleMessage (byte[] body) {
System.out.println("預設方法,訊息內容:" + new String(body));
}
/**
* 自定義方法名
* @param body
*/
public void consumeMessage (byte[] body) {
System.out.println("自定義方法,訊息內容:" + new String(body));
}
public void consumeMessage (String message) {
System.out.println("自定義方法,訊息內容:" + message);
}
/**
* 佇列系結的方法名
* @param message
*/
public void queueAMethod (String message) {
System.out.println("queueAMethod,訊息內容:" + message);
}
public void queueBMethod (String message) {
System.out.println("queueBMethod,訊息內容:" + message);
}
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer (
ConnectionFactory connectionFactory,
Queue queueA, Queue queueB, Queue queueC) {
SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(connectionFactory);
// 監聽多個佇列
simpleMessageListenerContainer.setQueues(queueA, queueB, queueC);
// 當前消費者數量
simpleMessageListenerContainer.setConcurrentConsumers(1);
// 最大消費者數量
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
// 是否重回佇列
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
// 簽收機制 機製
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 消費者標籤
simpleMessageListenerContainer.setConsumerTagStrategy(queue -> {
return queue + "_" + UUID.randomUUID().toString().substring(0,6);
});
// 適配方式,這裏委派類使用預設的方法來處理訊息 handleMessage
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
// 指定訊息轉換器
messageListenerAdapter.setMessageConverter(new TextMessageConverter());
// 1. 指定自定義方法名
messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
// 2. 佇列名 和 方法名 匹配系結
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put(TOPIC_EXCHANGE_QUEUE_A, "queueAMethod");
queueOrTagToMethodName.put(TOPIC_EXCHANGE_QUEUE_B, "queueBMethod");
messageListenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
}
我們在進行發送訊息的時候,正常情況下訊息體爲二進制的數據方式進行傳輸,如果希望內部幫我們進行轉換,或者指定自定義的轉換器,就需要用到MessageConverter
引入Starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生產端
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* Confirm 確認訊息
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("CorrelationData: " + correlationData);
System.out.println("ack: " + ack);
if (!ack) {
System.err.println("例外處理...");
}
}
};
/**
* return 返回訊息
*/
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
String replyText, String exchange, String routingKey) {
System.out.println("return exchange: " + exchange + ", routingKey: " + routingKey
+ ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
/**
* 發送訊息
* @param msg 訊息主體
* @param properties 訊息屬性
*/
public void send (Object msg, Map<String, Object> properties) {
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message message = MessageBuilder.createMessage(msg, messageHeaders);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData();
// 全域性唯一 id + 時間戳
// message id
correlationData.setId(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("spring.boot.topic.exchange", "boot.msg", message, correlationData);
}
/**
* 發送 Order 訊息
* @param order 物件
*/
public void sendOrder (Order order) {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("spring.boot.order.exchange", "order.*", order, correlationData);
}
}
消費端 @RabbitListener 監聽
不需要再使用 @Bean 方式來建立 Exchange、Queue、Binding
監聽並且建立佇列、交換機、routingkey
// 通過 ${} 方式讀取 Spring boot 組態檔中的參數
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable = "${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
type = "${spring.rabbitmq.listener.order.exchange.type}",
durable = "${spring.rabbitmq.listener.order.exchange.durable}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
public void onMessage (@Payload Order order,
Channel channel,
@Headers Map<String, Object> headers) throws IOException {
System.err.println("Order 訊息內容:" + order);
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
// 手工ACK
channel.basicAck(deliveryTag, false);
}
參數設定化
spring:
rabbitmq:
addresses: 192.168.229.111:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
# 自定義設定
order:
exchange:
name: spring.boot.order.exchange
durable: true
type: topic
ignoreDeclarationExceptions: true
queue:
name: spring.boot.order.queue
durable: true
key: order.*
@StreamListener