爲什麼要用 MQ?
MQ的應用場景:
缺點:
各類 MQ 產品的比較:
- | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|
單機吞吐量 | 萬級 | 10萬級 | 10萬級 |
時效性 | us級 | ms級 | ms級以內 |
可用性 | 高 (主從架構) | 非常高 (分佈式架構) | 非常高 (分佈式架構) |
功能特性 | 基於 erlang 開發,所以併發能力很強,效能極其好,延時很低,管理介面較豐富 | MQ 功能比較完備,擴充套件性佳 | 只支援主要的 MQ 功能,像一些訊息查詢、訊息回溯等功能沒有提供,畢竟是爲大數據準備的,在大數據領域應用廣 |
RocketMQ 各角色:
角色 | 說明 | |
---|---|---|
Producer | 訊息的生產者,類似 「發件人」 | 與 NameServer 叢集中的其中一個節點 (隨機選擇) 建立長連線,定期從 NameServer 取 Topic 路由資訊,並向提供 Topic 服務的 Master 建立長連線,定時向 Master 發送心跳,Producer 完全無狀態,可叢集部署。 |
Consumer | 訊息的消費者,類似 「收件人」 | 與 NameServer 叢集中的其中一個節點 (隨機選擇) 建立長連線,定期從 NameServer 取 Topic 路由資訊,並向提供 Topic 服務的 Master、Slave 建立長連線,定時向 Master、Slave 發送心跳,Consumer 既可以從 Master 訂閱訊息,也可以從 Slave 訂閱訊息,訂閱規則由 Broker 設定決定。 |
Broker | 暫存和傳輸訊息,核心角色,類似 「郵局」 | Master 主要處理寫操作,Slave 主要處理讀操作,Master 和 Slave 的對應關係通過指定相同的 BrokerName,不同的 BrokerId 來定義,BrokerId 爲 0 表示 Master,非 0 表示 Slave。每個 Broker 與 NameServer 叢集中的所有節點建立長連線,定時註冊 Topic 資訊到所有 NameServer。 |
NameServer | 管理 Broker,核心角色,類似 「郵局的管理機構」 | NameServer 是無狀態節點,節點之間無任何資訊同步,因爲 Broker 啓動後,會給每一個 NameServer 節點上報資訊。 |
Topic | 區分訊息的種類 | 一個生產者可以發送訊息給一個或者多個 Topic;一個消費者可以訂閱一個或者多個 Topic 訊息。 |
叢集工作流程:
使用 RocketMQ 需要新增 Maven 依賴:
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
1、生產者生產訊息
發送同步訊息:
這種可靠性同步地發送方式使用的比較廣泛,比如重要的訊息通知、簡訊通知。
// 範例化訊息生產者
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設定NameServer的地址
producer.setNamesrvAddr("localhost:9876;localhost:9877");
producer.start();
// 建立訊息, 指定Topic、Tag和訊息體
Message msg = new Message("TopicTest", "Tag1", "hello rocketmq".getBytes("UTF-8"));
// 發送訊息到一個Broker
SendResult sendResult = producer.send(msg);
// 發送狀態
SendStatus sendStatus = sendResult.getSendStatus();
// 訊息ID
String msgId = sendResult.getMsgId();
// 訊息接收佇列ID
int queueId = sendResult.getMessageQueue().getQueueId();
// 如果不再發送訊息, 關閉訊息生產者
// producer.shutdown();
發送非同步訊息:
通常用在對響應時間敏感的業務場景,即生產者不能容忍長時間地等待 Broker 的響應。
只需要呼叫 DefaultMQProducer 的非同步實現即可:
// 發送訊息到一個Broker
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
}
});
發送單向訊息:
這種方式主要用在生產者不關心發送結果的場景,例如發送日誌。
只需要呼叫 DefaultMQProducer 的 sendOneway() 方法即可:
// 發送單向訊息, 沒有任何返回結果
producer.sendOneway(msg);
2、消費者消費訊息
負載均衡模式(預設模式):
// 範例化訊息消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 設定NameServer的地址
consumer.setNamesrvAddr("localhost:9876;localhost:9877");
// 訂閱Topic、Tag
consumer.subscribe("TopicTest", "Tag1"); // 多個Tag可以用||隔開, 例如"Tag1||Tag2", 消費所有Tag使用"*"
// 設定負載均衡模式, 預設MessageModel.CLUSTERING
// consumer.setMessageModel(MessageModel.CLUSTERING);
// 設定回撥函數, 處理訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 訊息
String message = new String(msg.getBody());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
消費者廣播模式:
只需要消費者範例呼叫 setMessageModel() 方法設定廣播模式即可:
// 設定廣播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
順序訊息是指可以按照訊息的發送順序來消費,RocketMQ 可以嚴格保證訊息有序,分爲分割區有序或者全域性有序。
順序訊息的原理?
在預設的情況下訊息發送會採取 Round Robin 輪詢方式把訊息發送到不同的 queue (分割區佇列);而消費訊息的時候從多個 queue 上拉取訊息,這種情況發送和消費是不能保證順序的。
但是如果控制發送的順序訊息只依次發送到同一個 queue 中,消費的時候只從這個 queue 上依次拉取,則就保證了順序,當發送和消費參與 queue 只有一個,則是全域性有序;如果多個 queue 參與,則是分割區有序,即相對每個 queue,訊息都是有序的。
比如以訂單的順序流程爲例,建立、付款、推播、完成。訂單號相同的訊息會被先後 先後發送到同一個佇列中,消費時,同一個訂單號獲取到的肯定是同一個佇列。
1、生產者生產訊息
對訂單 id 取模選擇佇列:
// 訂單流程
String[] msgs = new String[] {"建立訊息", "付款訊息", "推播訊息", "完成訊息"};
for (int i = 0; i < msgs.length; i++) {
// 建立訊息, 指定Topic、Tag、key和訊息體
Message msg = new Message("OrderTopic", "Order", "i" + i, msgs[i].getBytes("UTF-8"));
// 發送訊息到一個Broker, 參數二: 訊息佇列的選擇器, 參數三: 選擇的業務標識
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long orderId = (long) arg;
long index = orderId % mqs.size(); // 取模
return mqs.get((int) index);
}
}, orderId); // 動態傳入訂單id
}
2、消費者消費訊息
只是設定 Consumer 回撥函數不同:
// 訂閱Topic、Tag
consumer.subscribe("OrderTopic", "*");
// 設定回撥函數, 處理訊息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
// 訊息
String message = new String(msg.getBody());
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
比如電商場景,提交了一個訂單就可以發送一個延時訊息,1h 後去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
只需要爲 Message 物件設定延時等級即可:
// 建立訊息, 指定Topic、Tag和訊息體
Message msg = new Message("TopicTest", "Tag1", "hello rocketmq".getBytes("UTF-8"));
// 設定延時等級3, 這個訊息將在10s後發送
msg.setDelayTimeLevel(3);
// 發送訊息到一個Broker
producer.send(msg);
使用限制:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
現在 RocketMQ 不支援任意時間的延時,需要設定幾個固定的延時等級,從 1s 到 2h 分別對應着等級 1 到 8。
2、消費者消費訊息
只是設定 Consumer 回撥函數不同:
// 訂閱Topic、Tag
consumer.subscribe("TopicTest", "*");
// 設定回撥函數, 處理訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 訊息
String message = new String(msg.getBody());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
批次發送訊息能顯著提高傳遞小訊息的效能,限制是這些批次訊息應該有相同的 Topic,相同的 waitStoreMsgOK,而且不能是延時訊息,此外,這一批訊息的總大小不應超過 4MB。
List<Message> messages = new ArrayList<>();
messages.add(new Message("TopicTest", "Tag1", "OrderID001", "hello 0".getBytes("UTF-8")));
messages.add(new Message("TopicTest", "Tag1", "OrderID002", "hello 1".getBytes("UTF-8")));
messages.add(new Message("TopicTest", "Tag1", "OrderID003", "hello 2".getBytes("UTF-8")));
// 發送訊息到一個Broker
producer.send(messages);
如果訊息的總長度可能大於 4MB 時,最好把訊息進行分割:
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日誌的開銷20位元組
if (tmpSize > SIZE_LIMIT) { // 單個訊息超過了最大的限制
// 假如下一個子列表沒有元素, 則新增這個子列表然後退出回圈, 否則只是退出回圈
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) { break; }
else { totalSize += tmpSize; }
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
// 把大的訊息分裂成若幹個小的訊息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
}
}
Apache RocketMQ在4.3.0版中已經支援分佈式事務訊息。
採用了2PC的思想來實現了提交事務訊息,同時增加一個補償邏輯來處理二階段超時或者失敗的訊息。
整體流程:
常見問題:
1、RocketMQ 訊息怎麼保證高可用?
Broker(訊息伺服器)幾種部署方式對比:
方式 | 優點 | 缺點 |
---|---|---|
單 Master | 1.一旦Broker重新啓動或者宕機時,會導致整個服務不可用 2.不建議線上環境使用 |
|
多 Master | 設定簡單 | 1.單臺機器宕機期間,該機器上未被消費的訊息在機器恢復之前不可訂閱,訊息實時性會受影響 |
多 Master 多 Slave(同步雙寫) | 數據與服務都無單點,Master宕機情況下,訊息無延遲,服務可用性與數據可用性都非常高 | 效能比非同步複製模式略低,大約低 10%左右,發送單個訊息的 RT會略高。目前主宕機後,備機不能自動切換爲主機 |
多 Master多 Slave(非同步複製) | 1.訊息實時性不會受影響 2.主流生產環境部署叢集採用方案 |
Master 宕機,磁碟損壞情況,會丟失少量訊息 |
2、RocketMQ 訊息種類以及怎麼保證訊息有序?