ActiveMQ叢集模式
關注於效能,對數據可靠性要求不高
Kafka叢集模式
RocketMQ叢集模式
RabbitMQ叢集模式
AMQP定義:是具有現代特徵的二進制協定。是應用層協定的一個開放標準,爲訊息導向中介層設計。
生產者投遞到Exchange上,消費者從Message Queue中取數據
Server
:又稱broker,接受用戶端的連線,實現AMQP實體服務Channel
:網路通道,幾乎所有的操作都在Channel中進行。Channel是進行訊息讀寫的通道,用戶端可建立多個Channel,每個Channel代表一個對談任務。類似與數據庫中的SessionMessage
:伺服器和應用程式之間傳送的數據。由Properties和Bidy組成。
Virtual Host
:虛擬地址,用於進行邏輯隔離,最上層的訊息路由。一個Virtual Host裏面可以有若幹個Exchange和Queue,同一個Virtual Host裏面不能有相同名稱的Exchange或Queue。Vhost之於RabbitMQ就像虛擬機器之於物理伺服器一樣:它既能將同一個RabbitMQ的衆多客戶區分開來,又可以避免佇列和交換器的命名衝突Exchange
:交換機,接收訊息,根據路由鍵轉發訊息到系結的佇列Binding
:Exchange和Queue之間的虛擬鏈接,binding中可以包含routing keyRouting Key
:路由規則,虛擬機器可用它來確定如何路由一個特定訊息Queue
:訊息佇列,儲存訊息並將它們轉發給消費者rabbitmq-server start &
rabbitmqctl stop_app
rabbitmq-plugins enable rabbitmq_management
5672是java端進行通訊的埠號;15672是管控台的埠號;25672是叢集通訊的埠號。
凡是可以在管理台執行的操作都可以通過命令列來操作。
rabbitmqctl reset
:移除所有數據,要在rabbitmqctl stop_app
之後使用rabbitmqctl change_cluster_node_type disc | ram
:修改叢集節點的儲存形式rabbitmqctl forget_cluster_node [--offline]
:忘記節點(摘除節點)需要先執行消費者,因爲在消費者中宣告瞭佇列
如圖所示,有一個問題:
爲什麼生產者系結的routingKey和消費者監聽的queue名字一樣時(「test001」),訊息也可以路由過去?
RabbitMQ的生產者在發送訊息時,必須要指定一個exchange,如果exchange爲空的話(""),會走第一個exchange(
AMQP default
),這個exchange的路由規則就是按照routingKey的名字來尋找同名的queue,如果有,就把訊息路由過去;如果沒有,就把訊息刪除。
Exchange:接收訊息,並根據路由鍵轉發訊息到所系結的佇列
所有發送到Direct Exchange的訊息被轉發到RoutingKey中指定的Queue
// 3. 通過connection建立一個Channel
Channel channel = connection.createChannel();
// 4. 宣告
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
// 5. 發送
String msg = "Direct Exchange Message";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// 3. 通過connection建立一個Channel
Channel channel = connection.createChannel();
// 4. 宣告
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String routingKey = "test.direct";
String queueName = "test_direct_queue";
// 宣告一個交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 宣告一個佇列
channel.queueDeclare(queueName, false, false, false, null);
// 建立一個系結關係
channel.queueBind(queueName, exchangeName, routingKey);
// 5. 建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
所有發送到Topic Exchange的訊息被轉發到訂閱了RoutingKey中指定Topic的所有的Queue上
Exchange將RoutingKey和某Topic進行模糊匹配,此時佇列需要系結一個Topic
模糊匹配可以使用萬用字元
log.info.oa
log.erro
// 4. 宣告
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.delete.abc";
// 5. 發送
String msg = "Direct Exchange Message";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
// 4. 宣告
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String routingKey = "user.#";
String queueName = "test_topic_queue";
// 宣告一個交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 宣告一個佇列
channel.queueDeclare(queueName, false, false, false, null);
// 建立一個系結關係
channel.queueBind(queueName, exchangeName, routingKey);
Consumer可以收到兩條訊息。
注意,如果要修改Consumer中的系結關係(例如修改成:
channel.queueBind(queueName, exchangeName, "user.*);
,需要先將原來的系結關係解綁,否則test_topic_queue和test_topic_exchange會有兩個系結關係( 「user.#」 和 「user.*」 )
// 4. 宣告
String exchangeName = "test_topic_exchange";
// 5. 發送
String msg = "Direct Exchange Message";
channel.basicPublish(exchangeName, "", null, msg.getBytes());
// 4. 宣告
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String routingKey = ""; // 系結關係爲空串
String queueName = "test_fanout_queue";
// 宣告一個交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 宣告一個佇列
channel.queueDeclare(queueName, false, false, false, null);
// 建立一個系結關係
channel.queueBind(queueName, exchangeName, routingKey);
這裏Producer和Consumer中的routingKey可以隨便設定,不會有任何影響。
總結來說,對於不同的交換機型別,Producer的程式碼幾乎相同,只是Consumer端的exchangeType不同
系結:Exchange和Exchange、Queue之間的連線關係。系結中可以包含routingKey或者參數
佇列:實際儲存訊息數據
訊息:伺服器和應用程式之間傳送的數據。由Properties和Payload組成
常用屬性:delivery mode、headers(自定義屬性)
其它屬性:
content_type、content_encoding、priority
correlation_id、reply_to、expiration、message_id
correlation_id 通常和 reply_to 一起使用:用於RPC服務
replyTo
:通常用來設定一個回撥佇列
correlationId
:用來關聯請求和RPC呼叫後的恢復
timestamp、type、user_id、app_id、cluster_id
Map<String, Object> headers = new HashMap<>();
headers.put("蓉兒", "郭靖");
headers.put("盈盈", "令狐沖");
headers.put("趙敏", "張無忌");
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化訊息
.contentEncoding("UTF-8") // 字元集
.expiration("10000") // 10s 後如果這個訊息沒有被消費,則被自動移除
.headers(headers) // 自定義屬性
.build();
// 4. 通過Channel發送數據
for (int i = 0; i < 5; i++) {
String msg = "Leihou RabbitMq";
channel.basicPublish("", "test001", props, msg.getBytes());
}
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
Map<String, Object> headers = delivery.getProperties().getHeaders();
System.out.println(msg + "------" + headers);
// Envelope envelope = delivery.getEnvelope();
}
虛擬主機:虛擬地址,用於進行邏輯隔離,最上層的訊息路由
什麼是生產端的可靠性投遞?
網際網路大廠的解決方案:
方案一在高併發的場景下不太合適,因爲需要做兩次數據庫的同步操作。
借鑑數據庫的樂觀鎖機制 機製:比如執行一個更新庫存的SQL語句
UPDATE T_REPS SET count=count-1, version=version+1 where version=1
先查出來version在數據庫中的當前值,更新庫存時也更新version值,這樣如果有併發的更新請求過來,仍舊用的old version,就查不到數據了。
對一件事情進行操作,操作一次的結果和操作多次的結果是一樣的。
在海量訂單產生的業務高峯期,如何避免訊息的重複消費問題
消費端實現冪等性,就意味着,我們的訊息永遠不會消費多次,即使我們收到了多條一樣的訊息
業界主流的冪等性操作:
SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指紋碼
好處:實現簡單
壞處:高併發下有數據庫寫入的效能瓶頸
解決方案:跟進ID進行分庫分表進行演算法路由
利用Redis進行冪等,需要考慮的問題:
RabbitMQ的持久化分爲三個部分:
durable
參數設定爲 true
durable
參數設定爲 true
BasicProperties
中的 deliveryMode
屬性)設定爲2
單單隻設定佇列持久化,重新啓動之後訊息會丟失;單單隻設定訊息的持久化,重新啓動之後佇列消失,繼而訊息也
丟失。單單設定訊息持久化而不設定佇列的持久化顯得毫無意義。
將交換器、佇列、訊息都設定了持久化之後就能百分之百保證數據不丟失了嗎 ? 答案是否定的。
從消費者來說,如果在訂閱消費佇列時將autoAck 參數設定爲true ,那麼當消費者接收到相關訊息之後,還沒來得及處理就看機了,這樣也算數據丟失。
autoAck
參數設定爲false , 並進行手動確認。在持久化的訊息正確存入RabbitMQ 之後,還需要有一段時間(雖然很短,但是不可忽視)才能 纔能存入磁碟之中。RabbitMQ 並不會爲每條訊息都進行同步存檔(呼叫內核的fsyncl方法)的處理,可能僅僅儲存到操作系統快取之中而不是物理磁碟之中。如果在這段時間內RabbitMQ 服務節點發生了異常情況,訊息儲存還沒來得及落盤,那麼這些訊息將會丟失。
fsync 在Linux 中的意義在於同步數據到儲存裝置上。大多數塊裝置的數據都是通過快取進行的,將數據寫到檔案上通常將該數據由內核複製到快取中,如果快取尚未寫滿,則不將其排入輸出佇列上,而是等待其寫滿或者當內核需要重用該快取時,再將該快取排入輸出佇列,進而同步到裝置上。內核提供了fsync介面,使用者可以根據自己的需要通過此介面更新數據到儲存裝置上。
在發送端引入事務機制 機製或者發送方確認機制 機製保證訊息已經正確地發送並儲存到RabbitMQ中。
預設情況下生產者是不知道訊息有沒有正確地到達伺服器。如果在訊息到達伺服器之前己經丟失,持久化操作也解決不了這個問題。RabbitMQ 針對這個問題,提供了兩種解決方式:
try {
channel.txSelect(); // 將當前通道設定爲事務模式
for(int i = 0; i < LOOP_TIMES; i++) { // 如果發送多條訊息,把發送和提交事務包進回圈中
channel.basicPublish("exchange",
"routingKey",
MessageProperties.PERSISTENT_TEXT_PLAIN,
("msg:" + i).getBytes());
channel.txCommit(); // 提交事務
}
}catch(Exception e) {
e.printStackTrace();
channel.txRollback(); // 事務回滾
}
使用事務機制 機製會極大影響RabbitMQ的效能,降低訊息吞吐量。
事務機制 機製在一條訊息發送之後會 使發送端阻塞,以等待RabbitMQ 的迴應,之後才能 纔能繼續發送下一條訊息。相比之下, 發送方確認機制 機製最大的好處在於它是非同步的,一旦發佈一條訊息,生產者應用程式就可以在等通道返回確認的同時繼續發送下一條訊息,當訊息最終得到確認之後,生產者應用程式便可以通過回撥方法來處理該確認訊息,如果RabbitMQ 因爲自身內部錯誤導致訊息丟失,就會發送一條nack CBas i c . Nack) 命令,生產者應用程式同樣可以在回撥方法中處理該nack 命令。
如果佇列和訊息是持久化的,那麼確認訊息會在訊息寫入磁碟之後發出。
如何實現Confirm確認訊息
channel.confirmSelect()
addConfirmListener
,監聽成功和失敗的返回結果,根據具體的結果對訊息進行重新發送、或記錄日誌等後續處理// 3. 通過Connection建立一個新的Channel
Channel channel = connection.createChannel();
// 4. 指定我們的訊息投遞模式:訊息的確認模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
// 5. 發送一條訊息
String msg = "hello, RabbitMQ send confirm message";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// 6. 新增一個確認監聽
channel.addConfirmListener(new ConfirmListener() {
/**
*
* @param deliveryTag 訊息的唯一標籤
* @param multiple
* @throws IOException
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----------------ack----------------");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----------------no ack----------------");
}
});
Consumer沒有變化
Return訊息機制 機製:
之前的消費端使用while回圈
// 5. 建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("消費端接收" + new String(delivery.getBody()));
}
我們可以用一種更優雅的方式來實現消費者
// 5. 建立消費者
channel.basicConsume(queueName, true, new MyConsumer(channel));
class MyConsumer extends DefaultConsumer {
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----------------------consume message");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
什麼是消費端限流?
假設我們的RabbitMQ伺服器有上萬條未處理的訊息,我們隨便開啓一個消費者用戶端,巨量的訊息瞬間全部推播過來,但是我們單個用戶端無法同時處理這麼多數據,可能導致伺服器崩潰。
RabbitMQ提供了一種qos(服務品質保證)功能,即在非自動確認訊息的前提下,如果一定數目的訊息(通過基於consume或者channel設定Qos的值)未被確認前,不進行消費新的訊息。
所以autoack一定要設定爲false
/*
* prefetchSize: 0,
* prefetchCount: 1,一次最多推播給consumer 1 條訊息,收到回覆 回復後再推播下1條
* global: false,這個限流作用在queue上,true表示作用再channel上
*/
channel.basicQos(0, 1, false);
// autoAck設定爲false
channel.basicConsume(queueName, false, new MyConsumer(channel));
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("----------------------consume message");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
// 注意:這裏一定要ack,否則消費者會被block住
// false表示不支援 批次簽收
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
消費端重回佇列是爲了對沒有處理成功的訊息,把訊息重新回遞給Broker。在實際應用中,一半都會***關閉重回佇列***。
for (int i = 0; i < 5; i++) {
String msg = "hello, RabbitMQ send ack message " + i;
Map<String, Object> headers = new HashMap<>();
headers.put("num", i);
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
channel.basicPublish(exchangeName, routingKey2, false, props, msg.getBytes());
}
channel.close();
connection.close();
// autoAck設定爲false
channel.basicConsume(queueName, false, new MyConsumer(channel));
class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("----------------------consume message----------------------");
System.err.println("body: " + new String(body));
if((Integer)properties.getHeaders().get("num") == 1) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
/*
* 最後一個參數:requeue=true:消費失敗的這個訊息重回佇列,並且在佇列的尾部
*/
channel.basicNack(envelope.getDeliveryTag(), false, true);
}else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
TTL:Time To Live的縮寫,也就是生存時間。RabbitMQ支援訊息的過期時間,在訊息發送時可以進行指定。也支援佇列的過期時間,從訊息入隊開始,只要超過了佇列的超時時間設定,那麼訊息會自動清除
如下程式碼是作用於訊息上的
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.expiration("10000") // 10s 後如果這個訊息沒有被消費,則被自動移除
.build();
如下程式碼是作用在佇列上,佇列上的所有訊息10s未被消費,都會被自動移除
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 10000);
channel.queueDeclare(queueName, true, false, false, arguments);
利用DLX,當訊息在一個佇列中變成死信(dead message)之後,它能被重新publish到另一個Exchange(DLX)上。
訊息變成死信有以下幾種情況:
DLX和一般的Exchange沒有任何區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。當這個佇列中有死信時,RabbitMQ會自動地將這個死信訊息重新發布到設定的Exchange上去,進而路由到另一個佇列。可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補RabbitMQ 3.0之前支援的immediate參數的功能。
String msg = "hello, RabbitMQ send dlx message";
for (int i = 0; i < 1; i++) {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化訊息
.contentEncoding("UTF-8") // 字元集
.expiration("10000") // 10s 後如果這個訊息沒有被消費,則被自動移除
.build();
channel.basicPublish(exchangeName, routingKey2, false, props, msg.getBytes());
}
channel.close();
connection.close();
RabbitMQ 支援 「最多一次」 和 「最少一次」。
「最少一次投遞需要考慮以下這幾個方面」:
mandatory
參數 來確保訊息能夠從交換器路由到佇列中RabbitMQ 沒有去重機制 機製保證 「恰好一次」,去重處理一般是在業務用戶端實現,比如引入 Global Unique Identifier。