MQ——RocketMQ

2020-08-12 22:57:40

爲什麼要用 MQ?

MQ的應用場景:

  • 應用解耦。以電商爲例,使用者建立訂單後,如果耦合調用庫存系統、物流系統、支付系統,任何一個子系統出現故障或者因爲升級等原因暫時不可用都會造成下單操作異常,影響使用者使用體驗。使用 MQ 後,比如物流系統發生故障,需要幾分鐘才能 纔能修復,在這段時間,物流系統要處理的數據被快取到 MQ 中,使用者的下單操作正常完成,當物流系統恢復後,補充處理存在 MQ 中的訂單訊息即可,終端系統感知不到物流系統發生過幾分鐘故障;
  • 流量削峯。將大量請求快取起來,分散到很長一段時間處理,這樣可以大大提高系統的穩定性和使用者體驗;
  • 數據分發。數據的生產者不需要關心誰來消費數據,只需要將數據發送到 MQ ,數據消費者直接在 MQ 中獲取數據即可。

缺點:

  • 系統可用性降低。系統引入的外部依賴越多,系統穩定性越差,一旦 MQ 宕機,就會對業務造成影響;如何保證 MQ 的高可用?
  • 系統複雜性提高。如何保證訊息沒有被重複消費?怎麼處理訊息丟失情況?怎麼保證訊息傳遞的順序性?
  • 一致性問題。A 系統處理完業務,通過 MQ 給 B、C、D 三個系統發訊息數據,如果 B、C 系統處理成功,D 系統處理失敗,就會造成一致性問題。如何保證訊息數據處理的一致性?

各類 MQ 產品的比較:

- RabbitMQ RocketMQ Kafka
單機吞吐量 萬級 10萬級 10萬級
時效性 us級 ms級 ms級以內
可用性 高 (主從架構) 非常高 (分佈式架構) 非常高 (分佈式架構)
功能特性 基於 erlang 開發,所以併發能力很強,效能極其好,延時很低,管理介面較豐富 MQ 功能比較完備,擴充套件性佳 只支援主要的 MQ 功能,像一些訊息查詢、訊息回溯等功能沒有提供,畢竟是爲大數據準備的,在大數據領域應用廣

RocketMQ 各角色:

角色 說明
Producer 訊息的生產者,類似 「發件人」 與 NameServer 叢集中的其中一個節點 (隨機選擇) 建立長連線,定期從 NameServer 取 Topic 路由資訊,並向提供 Topic 服務的 Master 建立長連線,定時向 Master 發送心跳,Producer 完全無狀態,可叢集部署。
Consumer 訊息的消費者,類似 「收件人」 與 NameServer 叢集中的其中一個節點 (隨機選擇) 建立長連線,定期從 NameServer 取 Topic 路由資訊,並向提供 Topic 服務的 Master、Slave 建立長連線,定時向 Master、Slave 發送心跳,Consumer 既可以從 Master 訂閱訊息,也可以從 Slave 訂閱訊息,訂閱規則由 Broker 設定決定。
Broker 暫存和傳輸訊息,核心角色,類似 「郵局」 Master 主要處理寫操作,Slave 主要處理讀操作,Master 和 Slave 的對應關係通過指定相同的 BrokerName,不同的 BrokerId 來定義,BrokerId 爲 0 表示 Master,非 0 表示 Slave。每個 Broker 與 NameServer 叢集中的所有節點建立長連線,定時註冊 Topic 資訊到所有 NameServer。
NameServer 管理 Broker,核心角色,類似 「郵局的管理機構」 NameServer 是無狀態節點,節點之間無任何資訊同步,因爲 Broker 啓動後,會給每一個 NameServer 節點上報資訊。
Topic 區分訊息的種類 一個生產者可以發送訊息給一個或者多個 Topic;一個消費者可以訂閱一個或者多個 Topic 訊息。

在这里插入图片描述
叢集工作流程:

  1. 啓動 NameServer,NameServer 起來後監聽埠,等待 Broker、Producer、Consumer 連線,相當於一個路由控制中心;
  2. Broker 啓動,跟所有的 NameServer 保持長連線,定時發送心跳包,心跳包中包含當前 Broker 資訊(IP + 埠等)以及儲存所有 Topic 資訊,註冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的對映關係;
  3. 收發訊息前,先建立 Topic,建立 Topic 時需指定該 Topic 要儲存在哪些 Broker 上,也可以在發送訊息時自動建立 Topic;
  4. Producer 發送訊息,啓動時先跟 NameServer 叢集中的其中一臺建立長連線,並從 NameServer 中獲取當前發送的 Topic 存在哪些 Broker 上,輪詢從佇列表中選擇一個佇列,然後與佇列所在的 Broker 建立長連線從而向 Broker 發送訊息;
  5. Consumer 跟 Producer 類似,跟其中一臺 NameServer 建立長連線,獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連線通道,開始消費訊息。

RocketMQ的使用

使用 RocketMQ 需要新增 Maven 依賴:

<!-- rocketmq -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

基本用法

1、生產者生產訊息

發送同步訊息:

這種可靠性同步地發送方式使用的比較廣泛,比如重要的訊息通知、簡訊通知。

// 範例化訊息生產者
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設定NameServer的地址
producer.setNamesrvAddr("localhost:9876;localhost:9877");
producer.start();
// 建立訊息, 指定Topic、Tag和訊息體
Message msg = new Message("TopicTest", "Tag1", "hello rocketmq".getBytes("UTF-8"));
// 發送訊息到一個Broker
SendResult sendResult = producer.send(msg);

// 發送狀態
SendStatus sendStatus = sendResult.getSendStatus();
// 訊息ID
String msgId = sendResult.getMsgId();
// 訊息接收佇列ID
int queueId = sendResult.getMessageQueue().getQueueId();

// 如果不再發送訊息, 關閉訊息生產者
// producer.shutdown();

發送非同步訊息:

通常用在對響應時間敏感的業務場景,即生產者不能容忍長時間地等待 Broker 的響應。

只需要呼叫 DefaultMQProducer 的非同步實現即可:

// 發送訊息到一個Broker
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
    }
    @Override
    public void onException(Throwable e) {
    }
});

發送單向訊息:

這種方式主要用在生產者不關心發送結果的場景,例如發送日誌。

只需要呼叫 DefaultMQProducer 的 sendOneway() 方法即可:

// 發送單向訊息, 沒有任何返回結果
producer.sendOneway(msg);

2、消費者消費訊息

負載均衡模式(預設模式):
在这里插入图片描述

// 範例化訊息消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 設定NameServer的地址
consumer.setNamesrvAddr("localhost:9876;localhost:9877");
// 訂閱Topic、Tag
consumer.subscribe("TopicTest", "Tag1"); // 多個Tag可以用||隔開, 例如"Tag1||Tag2", 消費所有Tag使用"*"
// 設定負載均衡模式, 預設MessageModel.CLUSTERING
// consumer.setMessageModel(MessageModel.CLUSTERING);
// 設定回撥函數, 處理訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
                                                    ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 訊息
            String message = new String(msg.getBody());
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

消費者廣播模式:

在这里插入图片描述
只需要消費者範例呼叫 setMessageModel() 方法設定廣播模式即可:

// 設定廣播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

順序訊息

順序訊息是指可以按照訊息的發送順序來消費,RocketMQ 可以嚴格保證訊息有序,分爲分割區有序或者全域性有序。

順序訊息的原理?

在預設的情況下訊息發送會採取 Round Robin 輪詢方式把訊息發送到不同的 queue (分割區佇列);而消費訊息的時候從多個 queue 上拉取訊息,這種情況發送和消費是不能保證順序的。

但是如果控制發送的順序訊息只依次發送到同一個 queue 中,消費的時候只從這個 queue 上依次拉取,則就保證了順序,當發送和消費參與 queue 只有一個,則是全域性有序;如果多個 queue 參與,則是分割區有序,即相對每個 queue,訊息都是有序的。

比如以訂單的順序流程爲例,建立、付款、推播、完成。訂單號相同的訊息會被先後 先後發送到同一個佇列中,消費時,同一個訂單號獲取到的肯定是同一個佇列。

1、生產者生產訊息

對訂單 id 取模選擇佇列:

// 訂單流程
String[] msgs = new String[] {"建立訊息", "付款訊息", "推播訊息", "完成訊息"};
for (int i = 0; i < msgs.length; i++) {
    // 建立訊息, 指定Topic、Tag、key和訊息體
    Message msg = new Message("OrderTopic", "Order", "i" + i, msgs[i].getBytes("UTF-8"));
    // 發送訊息到一個Broker, 參數二: 訊息佇列的選擇器, 參數三: 選擇的業務標識
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            long orderId = (long) arg;
            long index = orderId % mqs.size(); // 取模
            return mqs.get((int) index);
        }
    }, orderId); // 動態傳入訂單id
}

2、消費者消費訊息

只是設定 Consumer 回撥函數不同:

// 訂閱Topic、Tag
consumer.subscribe("OrderTopic", "*");
// 設定回撥函數, 處理訊息
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            // 訊息
            String message = new String(msg.getBody());
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

延時訊息

比如電商場景,提交了一個訂單就可以發送一個延時訊息,1h 後去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。

只需要爲 Message 物件設定延時等級即可:

// 建立訊息, 指定Topic、Tag和訊息體
Message msg = new Message("TopicTest", "Tag1", "hello rocketmq".getBytes("UTF-8"));
// 設定延時等級3, 這個訊息將在10s後發送
msg.setDelayTimeLevel(3);
// 發送訊息到一個Broker
producer.send(msg);

使用限制:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

現在 RocketMQ 不支援任意時間的延時,需要設定幾個固定的延時等級,從 1s 到 2h 分別對應着等級 1 到 8。

2、消費者消費訊息

只是設定 Consumer 回撥函數不同:

// 訂閱Topic、Tag
consumer.subscribe("TopicTest", "*");
// 設定回撥函數, 處理訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
            ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 訊息
            String message = new String(msg.getBody());
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

批次訊息

批次發送訊息能顯著提高傳遞小訊息的效能,限制是這些批次訊息應該有相同的 Topic,相同的 waitStoreMsgOK,而且不能是延時訊息,此外,這一批訊息的總大小不應超過 4MB。

List<Message> messages = new ArrayList<>();
messages.add(new Message("TopicTest", "Tag1", "OrderID001", "hello 0".getBytes("UTF-8")));
messages.add(new Message("TopicTest", "Tag1", "OrderID002", "hello 1".getBytes("UTF-8")));
messages.add(new Message("TopicTest", "Tag1", "OrderID003", "hello 2".getBytes("UTF-8")));
// 發送訊息到一個Broker
producer.send(messages);

如果訊息的總長度可能大於 4MB 時,最好把訊息進行分割:

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }
    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // 增加日誌的開銷20位元組
            if (tmpSize > SIZE_LIMIT) { // 單個訊息超過了最大的限制
            	 // 假如下一個子列表沒有元素, 則新增這個子列表然後退出回圈, 否則只是退出回圈
                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) { break; }
            else { totalSize += tmpSize; }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

// 把大的訊息分裂成若幹個小的訊息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
    try {
        List<Message> listItem = splitter.next();
        producer.send(listItem);
    } catch (Exception e) {
    }
}

事務訊息

Apache RocketMQ在4.3.0版中已經支援分佈式事務訊息。

採用了2PC的思想來實現了提交事務訊息,同時增加一個補償邏輯來處理二階段超時或者失敗的訊息。

  • 待續

整體流程:

在这里插入图片描述

常見問題:

1、RocketMQ 訊息怎麼保證高可用?

Broker(訊息伺服器)幾種部署方式對比:

方式 優點 缺點
單 Master 1.一旦Broker重新啓動或者宕機時,會導致整個服務不可用
2.不建議線上環境使用
多 Master 設定簡單 1.單臺機器宕機期間,該機器上未被消費的訊息在機器恢復之前不可訂閱,訊息實時性會受影響
多 Master 多 Slave(同步雙寫) 數據與服務都無單點,Master宕機情況下,訊息無延遲,服務可用性與數據可用性都非常高 效能比非同步複製模式略低,大約低 10%左右,發送單個訊息的 RT會略高。目前主宕機後,備機不能自動切換爲主機
多 Master多 Slave(非同步複製) 1.訊息實時性不會受影響
2.主流生產環境部署叢集採用方案
Master 宕機,磁碟損壞情況,會丟失少量訊息

2、RocketMQ 訊息種類以及怎麼保證訊息有序?