【寧泊雲】訊息中介軟體-RocketMQ

2020-08-09 08:57:33

訊息中介軟體-RocketMQ

簡介

阿裡巴巴雙十一官方指定訊息產品,支撐阿裡巴巴集團所有的訊息服務,歷經十餘年高可用與高可靠的嚴苛考驗,是阿裡巴巴交易鏈路的核心產品;
服務可用性 99.95%,Region 化、多可用區、分佈式叢集化部署,確保服務高可用,即便整個機房不可用仍可正常提供訊息服務;
數據可靠性 99.99999999%,同步雙寫、超三副本數據冗餘與快速切換技術確保數據可靠;

使用場景

非同步解耦

向上面的假設每次發送請求的處理時間都要50ms,在沒有任何優化的情況就要150ms。如果不用訊息中間鍵的話我們可以用多執行緒的技術,啓用兩個執行緒分別執行發送簡訊和發送郵件的請求,這樣我處理的總處理時間就是100ms。

使用訊息中間鍵:

高可用松耦合架構設計

通過上、下遊業務系統的松耦合設計,即便下遊子系統出現不可用甚至宕機,都不會影響到核心交易系統的正常運轉;

削峯填谷

商城專案,往往有這種秒殺活動,限時搶購活動,在某一時刻數據量會劇增。

  • 超高流量脈衝處理能力

    MQ 超高效能的訊息處理能力可以承接流量脈衝而不被擊垮,在確保系統可用性同時,因快速有效的請求響應而提升使用者的體驗;

  • 海量訊息堆積能力

    確保下遊業務在安全水位內平滑穩定的執行,避免超高流量的衝擊;

  • 合理的成本控制

    通過削弱填谷可控制下遊業務系統的叢集規模,從而降低投入成本;

分佈式快取同步/訊息分發

假設使用者發送請求到a系統,a系統分發到b、c、d系統。比如處理a系統裏面的業務,把處理完的數據分發到b、c、d中,如果這時候又有一個e系統該怎麼處理?

用訊息中間鍵,把a系統處理完的數據封裝到裏面去,其他的系統監控訊息中間鍵裏面的數據。

  • 實時數據更新

    通過訊息實時推播的方式,讓數據實時得以更新;

  • 降低頁面響應時間

    大量併發存取商品數據庫,減少頁面響應時間

  • 滿足大規模存取需求

    大促衆多分會場,多快取的架構設計,滿足對商品變更的大量存取需求;

常見的訊息中間鍵

ActiveMQ

ActiveMQ是Apache出品,比較老的一個開源的訊息中介軟體, 是一個完全支援JMS規範的訊息中介軟體.
API豐富,以前在中小企業應用廣泛

MQ衡量的指標:服務效能,數據儲存,叢集架構

KafKa

Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分佈式發佈訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制 機製來統一線上和離線的訊息處理,也是爲了通過叢集來提供實時的訊息。

RabbitMQ

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。
AMQP :Advanced Message Queue,高階訊息佇列協定。它是應用層協定的一個開放標準,爲訊息導向中介層設計,基於此協定的用戶端與訊息中介軟體可傳遞訊息,並不受產品、開發語言等條件的限制。
RabbitMQ 最初起源於金融系統,用於在分佈式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。

對數據的一致性,穩定性和可靠性要求比較高的場景

RocketMQ

RocketMQ 是阿裡巴巴在 2012 年開源的分佈式訊息中介軟體,目前已經捐贈給 Apache 軟體基金會,並於 2017 年 9 月 25 日成爲 Apache 的頂級專案。作爲經歷過多次阿裡巴巴雙十一這種「超級工程」的洗禮並有穩定出色表現的國產中介軟體,以其高效能、低延時和高可靠等特性近年來已經也被越來越多的國內企業使用。

對比

RocketMQ的核心概念

生產者Producer

負責生產訊息,一般由業務系統負責生產訊息。一個訊息生產者會把業務應用系統裡產生的訊息發送到broker伺服器。RocketMQ提供多種發送方式,同步發送、非同步發送、順序發送、單向發送。同步和非同步方式均需要Broker返回確認資訊,單向發送不需要。

消費者Consumer

負責消費訊息,一般是後臺系統負責非同步消費。一個訊息消費者會從Broker伺服器拉取訊息、並將其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。

假如有訂單服務和商品服務,訂單服務會把訊息發送到訊息中間裡,所以訂單服務就是生產者,反之商品服務就就是消費者。

名字服務Name Server

名稱服務充當路由訊息的提供者。生產者或消費者能夠通過名字服務查詢各主題相應的Broker IP列表。多個Namesrv範例組成叢集,但相互獨立,沒有資訊交換。

代理伺服器Broker Server

訊息中轉角色,負責儲存訊息、轉發訊息。代理伺服器在RocketMQ系統中負責接收從生產者發送來的訊息並儲存、同時爲消費者的拉取請求作準備。代理伺服器也儲存訊息相關的元數據,包括消費者組、消費進度偏移和主題和佇列訊息等。

訊息內容Message

訊息系統所傳輸資訊的物理載體,生產和消費數據的最小單位,每條訊息必須屬於一個主題。RocketMQ中每個訊息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢訊息的功能。

訊息主題Topic

表示一類訊息的集合,每個主題包含若幹條訊息,每條訊息只能屬於一個主題,是RocketMQ進行訊息訂閱的基本單位。

標籤Tag

爲訊息設定的標誌,用於同一主題下區分不同類型的訊息。來自同一業務單元的訊息,可以根據不同業務目的在同一主題下設定不同標籤。標籤能夠有效地保持程式碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴充套件性。

訊息佇列MessageQueue

對於每個Topic都可以設定一定數量的訊息佇列用來進行數據的讀取

RocketMQ環境搭建

下載RocketMQ

http://rocketmq.apache.org/
https://github.com/apache/rocketmq

window的安裝設定

  1. 使用rocketmq-4.5.1.zip 解壓到指定目錄

  2. 需要設定環境變數ROCKETMQ_HOME

  3. 修改broker的組態檔 broker.conf

    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = SYNC_MASTER
    flushDiskType = SYNC_FLUSH
    enablePropertyFilter=true
    namesrvAddr=127.0.0.1:9876
    
  4. 啓動nameserver

    執行bin目錄下的mqnameserver.cmd

  5. 啓動broker

    在bin目錄下執行cmd,輸入命令 mqbroker.cmd -c ../conf/broker.conf

  6. 啓動管理控制檯

    在所在目錄下建立組態檔 application.properties

    rocketmq.config.namesrvAddr=127.0.0.1:9876
    server.port=9999
    

    執行 rocketmq-console-ng-1.0.1.jar jar包,瀏覽器存取 localhost:9999/ jar 包已上傳:https://download.csdn.net/download/fine_Ning/12698310

Linux單機環境搭建

1 上傳rocketmq-all-4.4.0-bin-release.zip 到/usr/local
2 使用解壓命令進行解壓
	unzip /usr/local/rocketmq-all-4.4.0-bin-release.zip
3 軟體重新命名
  mv  /usr/local/rocketmq-all-4.4.0-bin-release/  /usr/local/rocketmq-4.4/
4 修改啓動參數設定
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g"
vi  /usr/local/rocketmq-4.4/bin/runbroker.sh
vi  /usr/local/rocketmq-4.4/bin/runserver.sh
5 啓動名字服務和代理服務
nohup sh /usr/local/rocketmq-4.4/bin/mqnamesrv &
# -n localhost:9876 指定名稱服務的地址, 類似於zk的地址
nohup sh /usr/local/rocketmq-4.4/bin/mqbroker -n localhost:9876 &  
6 檢驗是否啓動正常
	使用java的內建命令: jps  可以看到BrokerStartup和NamesrvStartup進程
   使用Linux命令: netstat-ntlp  可以看到9876的埠和10911的埠
   使用ps-ef |grep java
   檢視啓動日誌:
   tail -100f ~/logs/rocketmqlogs/namesrv.log
   tail -100f ~/logs/rocketmqlogs/broker.log
7 發送訊息測試 
	# 1.設定環境變數
	export NAMESRV_ADDR=localhost:9876
	# 2.使用安裝包的Demo發送訊息
	sh /usr/local/rocketmq-4.4/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
8 消費訊息測試
  	# 1.設定環境變數
	export NAMESRV_ADDR=localhost:9876
	# 2.使用安裝包的Demo發送訊息
	sh /usr/local/rocketmq-4.4/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
9 關閉RocketMQ
# 1.關閉NameServer
sh /usr/local/rocketmq-4.4/bin/mqshutdown namesrv
# 2.關閉Broker
sh /usr/local/rocketmq-4.4/bin/mqshutdown broker

核心基礎使用

基本入門程式

匯入依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>

發送訊息

public class Producer {
    public static void main(String[] args) throws Exception{
        //1 建立一個生產者物件, 並且指定一個生產者組
        DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer");
        //2 設定名字服務的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3 啓動生產者
        producer.start();
        //4 建立一個訊息
        Message message = new Message("01-hello", "tagA", "hello,rocketmq".getBytes("utf-8"));
        System.out.println(message);
        //5 發送訊息
        producer.send(message);
        //6 關閉連線
        producer.shutdown();
    }
}

消費訊息

public class Consumer {
    public static void main(String[] args) throws Exception{
        //建立一個拉取訊息的消費者物件
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wolfcode-consumer");
        //設定名字地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
  
        //系結訊息的主題
        consumer.subscribe("01-hello", "*");
        //消費者監聽處理訊息方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消費執行緒:"+Thread.currentThread().getName()+",訊息ID:"+msg.getMsgId()+",訊息內容:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //啓動消費者
        consumer.start();
    }
}

發送訊息方式

  • 同步發送訊息

    生產者往訊息中間鍵發送訊息,需要等訊息中間鍵把訊息存到磁碟中,返回儲存成功以後纔可以繼續往下執行。

  • 非同步訊息

    生產者往訊息中間鍵扔訊息,不需要等他儲存成功以後才能 纔能執行後面的操作

  • 一次訊息

    訊息發送到訊息中間鍵當中不關心他的返回結果

消費模式

  • 叢集模式

    MessageModel.CLUSTERING 多個消費者分擔一個消費者的壓力, 一個訊息只會給一個消費者消費

  • 廣播模式

    MessageModel.BROADCASTING 需要對同一個訊息進行不同處理的時候, 比如對同一個訊息, 需要同時發送簡訊和發送郵件, 一個訊息會發送給所有的消費者

消費方式

  • 推播消費 DefaultMQPushConsumer
  • 拉取消費 DefaultMQPullConsumer

延時訊息

在商城的專案當中,使用者下完單以後如果使用者不去支付那麼訂單保留半個小時

延時訊息的使用限制
現在RocketMq並不支援任意時間的延時,需要設定幾個固定的延時等級,從1s到2h分別對應着等級1到18
private String messageDelayLevel = 「1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h」;

訊息消費失敗會進入延時訊息佇列,訊息發送時間與設定的延時等級和重試次數有關,詳見程式碼

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // 範例化一個生產者來產生延時訊息
        DefaultMQProducer producer = new DefaultMQProducer("wolfcode-producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 啓動生產者
        producer.start();
        Message message = new Message("06-delay", ("delay message").getBytes());
        // 設定延時等級3,這個訊息將在10s之後發送(現在只支援固定的幾個時間,詳看delayTimeLevel)
        message.setDelayTimeLevel(3);
        // 發送訊息
        producer.send(message);
        // 關閉生產者
        producer.shutdown();
    }
}

訊息過濾

  • Tag標籤過濾

    在大多數情況下,TAG是一個簡單而有用的設計,其可以來選擇您想要的訊息。例如:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wolfcode-consumer");
    consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
    
  • SQL92過濾

    消費者將接收包含TAGA或TAGB或TAGC的訊息。但是限制是一個訊息只能有一個標籤,這對於複雜的場景可能不起作用。在這種情況下,可以使用SQL表達式篩選訊息。SQL特性可以通過發送訊息時的屬性來進行計算。在RocketMQ定義的語法下,可以實現一些簡單的邏輯。

    RocketMQ只定義了一些基本語法來支援這個特性。你也可以很容易地擴充套件它。

    • 數值比較,比如:>,>=,<,<=,BETWEEN,=;
    • 字元比較,比如:=,<>,IN;
    • IS NULL 或者 IS NOT NULL;
    • 邏輯符號 AND,OR,NOT;

    常數支援型別爲:

    • 數值,比如:123,3.1415;
    • 字元,比如:‘abc’,必須用單引號包裹起來;
    • NULL,特殊的常數
    • 布爾值,TRUEFALSE

    只有使用push模式的消費者才能 纔能用使用SQL92標準的sql語句

    注意: 在使用SQL過濾的時候, 需要在 broker.conf 組態檔中設定參數enablePropertyFilter=true

SpringBoot整合

匯入依賴

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>

生產者

設定資訊

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
server.port=8888

實現程式碼

@RestController
public class HelloController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @RequestMapping("sendMsg")
    public String sendMsg(String message,String age) throws Exception{
   		SendResult sendResult = rocketMQTemplate.syncSend("01-boot-hello", message);
        System.out.println(sendResult.getMsgId());
        System.out.println(sendResult.getSendStatus());
        return "success";
    }
}

消費者

設定資訊

rocketmq.name-server=127.0.0.1:9876
server.port=8887

實現程式碼

@Component
@RocketMQMessageListener(
        topic = "01-boot-hello",
        consumerGroup = "wolfcode-consumer"
)
public class HelloConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
         System.out.println("消費訊息"+messageExt);
    }
}

生產訊息型別

  • 同步訊息

    rocketMQTemplate.syncSend("01-boot-hello", message);
    
  • 非同步訊息

    rocketMQTemplate.asyncSend("01-boot-hello", message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("訊息消費成功");
        }
    
        @Override
        public void onException(Throwable e) {
            System.out.println("訊息消費失敗");
        }
    });
    
  • 一次性訊息

    rocketMQTemplate.sendOneWay("01-boot-hello", message);
    

消費模式

@RocketMQMessageListener 註解的設定項設定

  • 叢集 messageModel = MessageModel.CLUSTERING
  • 廣播 messageModel = MessageModel.BROADCASTING

延時訊息

  • 使用原生的Producer物件

    DefaultMQProducer producer = rocketMQTemplate.getProducer();
    
  • 使用API

     rocketMQTemplate.syncSend("01-boot-hello", MessageBuilder.withPayload(message).build(), 3000, 3);
    

訊息過濾

設定訊息標籤

在發送的訊息Topic:Tag 中間使用冒號隔開

rocketMQTemplate.convertAndSend("01-boot-hello:TagB",message,map);

自定義屬性設定

Map<String,Object> map=new HashMap<>();
//使用者自定義屬性
map.put("age", age);
map.put("name", "hesj");
//也可以設定系統屬性
map.put(MessageConst.PROPERTY_KEYS,age);
rocketMQTemplate.convertAndSend("01-boot-hello:TagB",message,map);

訊息過濾

在RocketMQMessageListener新增註 加注解

selectorType = SelectorType.TAG,
selectorExpression = "age > 16"