一張圖進階 RocketMQ

2022-06-15 12:02:02

前 言

三此君看了好幾本書,看了很多遍原始碼整理的 一張圖進階 RocketMQ 圖片連結,關於 RocketMQ 你只需要記住這張圖!如果你第一次看到這個系列,牆裂建議你開啟連結。覺得不錯的話,記得點贊關注哦。

視訊在 B 站同步更新(B站:三此君),歡迎圍觀 一張圖進階 ROcketMQ -整體架構 (視訊版),記得調整視訊清晰度哦~

本文是《一張圖進階 RocketMQ》系列的第 1 篇,今天的內容主要分為兩個部分:

  • 整體架構:會從大家熟悉的「生產者-消費者模式」逐步推出 RocketMQ 完整架構,只需要記住一張完整的架構圖即可。
  • 訊息收發範例:通過 Docker 部署 RocketMQ,並用簡單的範例串起 RocketMQ 訊息收發流程。

整體架構

什麼是訊息佇列?顧名思義,首先得有一個佇列,這個佇列用來儲存訊息。那有了訊息佇列就得有人往裡面放,有人往裡面取。有沒有似曾相識的感 jio,這莫非就是連小學生都知道的,經典的「生產者-消費者模式」?接下來我們就來看看它裡面穿了什麼?

別急,先來回顧一下 「生產者-消費者模式」 這個老朋友。簡單來說,這個模型是由兩類執行緒和一個佇列構成:

  • 生產者執行緒:生產產品,並把產品放到佇列裡。
  • 消費者執行緒:從佇列裡面獲取產品,並消費。

有了這個佇列,生產者就只需要關注生產,而不用管消費者的消費行為,更不用等待消費者執行緒執行完;消費者也只管消費,不用管生產者是怎麼生產的,更不用等著生產者生產。

這意味著什麼呢,生產者和消費者之間實現解藕非同步。這就老厲害了,你未來工作學習中會不斷遇到這些概念,但是越看越看不懂。我們生活中很多都是非同步的。比如最近新冠疫情捲土重來,我點的外賣只能送到小區門口的外賣佇列裡面,而我只能去外賣佇列裡面取外賣,然後一頓狼吞虎嚥。

具體 「生產者-消費者模式」 怎麼實現,想必各位小學都學過了,我們來看看這個模式還有什麼問題吧。最大的問題就是我們小學學的 「生產者-消費者模式」 是個單機版的,只能自嗨。這就相當於,我就是外賣騎手,我點了個外賣放到外賣佇列,然後我再從外賣佇列裡面去取,一頓操作猛如虎呀!於是就有了進化版,我們把消費者,佇列,生產者放到不同的伺服器上,這就是傳說中的分散式訊息佇列了。

生產者生產的訊息通過網路傳遞給佇列儲存,消費者通過網路從佇列獲取訊息。但是還有問題,訊息可能有很多種,全都放在一起豈不是亂套了?我點的外賣和快遞全都放在一起,太難找了吧。於是我們就需要區分不同型別訊息,相同型別的訊息稱為一個 Topic。同時,騎手不可能只有一個,點外賣的也不會只有我一個人,於是就有了生產者組消費者組

但還是有問題呀,小區那麼大,一個佇列放不下。我住在小區南門,點個外賣還要跑去北門拿,那真的是 eggs hurt。於是物業在東南西北門各設了一個外賣快遞放置點。也就是我們有多個佇列,組成 佇列叢集

可是,問題又雙叒叕來了(還有完沒完),一個小區那麼多個外賣快遞佇列,騎手怎麼知道送到哪裡去,我又怎麼知道去哪裡取?很簡單,導航呀。我們把導航的資訊稱為路由資訊,這些資訊需要有一個管理的地方,它告訴生產者,某這個 Topic 的訊息可以發給哪些佇列,同時告訴消費者你需要的訊息可以從哪些佇列裡面取。RocketMQ 為這些路由資訊的設定了管理員 NameServer,當然 NameServer 也可以有很多個,組成 NameServer 叢集。

到這裡,你就應該知道 RocketMQ 裡面都穿了什麼啦。包括了生產者(Producer),消費者(Consumer),NameServer 以及佇列本身(Broker)。Broker 是代理的意思,負責佇列的存取等操作,我們可以把 Broker 理解為佇列本身。

  • NameServer:我們可以同時部署很多臺 NameServer 伺服器,並且這些伺服器是無狀態的,節點之間無任何資訊同步。
    NameServer 起來後監聽 埠,等待 Broker、Producer、Consumer 連上來,NameServer 是叢集後設資料管理中心。

  • Broker:Broker 啟動,跟所有的 NameServer 保持長連線,每 30s 傳送一次傳送心跳包(像心跳一樣持續穩定的傳送請求)。心跳包中包含當前 Broker 資訊 ( IP+ 埠等)以及儲存所有 Topic 資訊。註冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的對映關係。

    我們可以同時部署多個 Master Broker和多個 Slave Broker,一個 Master 可以對應多個 Slave,但是一個 Slave 只能對應一個 Master。Master 與 Slave 需要有相同的 BrokerName,不同的 BrokerId 。BrokerId 為 0 表示 Master,非 0 表示 Slave,但只有 BrokerId=1 的從伺服器才會參與訊息的讀負載。(可以暫時忽略 Broker 的主從角色)

  • Topic:收發訊息前,先建立 Topic,建立 Topic 時需要指定該 Topic 要儲存在哪些 Broker 上,也可以在傳送訊息時自動建立 Topic。

  • Producer:Producer 傳送訊息,啟動時先跟 NameServer 叢集中的其中一臺建立長連線,並從 NameServer 中獲取當前傳送的 Topic 存在哪些 Broker 上,採用輪詢策略從選擇一個佇列,然後與佇列所在的 Broker 建立長連線,並向 Broker 發訊息。

  • Consumer:Consumer 跟 Producer 類似,跟其中一臺 NameServer 建立長連線,獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連線通道,開始消費訊息。

    我們剛剛提到騎手不止一個,取外賣快遞的也不止我一個,所以會有生產者組和消費者組的概念。這裡需要補充說明一下,訊息分為叢集訊息和廣播訊息:

    • 叢集訊息:一個 Topic 的一條訊息,一個消費者組只能有一個消費者範例消費。例如,同樣是外賣 Topic,一份外賣,我們整個小區也只有一個人消費,就是叢集消費。

    • 廣播訊息:一個 Topic 的一條訊息,一個消費者組所有消費者範例都會消費。例如,如果是因為疫情,政府發放食品,那我們小區每個人都會消費,就是廣播消費。

訊息收發範例

RocketMQ 部署

剛剛我們瞭解 RocketMQ 整體架構,那怎麼樣通過 RocketMQ 收發訊息呢?需要先通過 Docker 部署一套 RocketMQ:

如果你沒有安裝 Docker,可以根據菜鳥教學 MacOS Docker 安裝/Windows Docker 安裝 進行安裝。然後,通過 docker-compose 部署 RocketMQ:

  • 克隆 docker-middleware 倉庫,開啟 dockers 目錄;
  • 修改./conf/broker.conf中的brokerIP1 引數為本機 IP;
  • 進入docker-compose.yml檔案所在路徑,執行docker-compose up命令即可;

注意:如果你現在不瞭解 Docker 不重要,只需要按照步驟部署好 RocketMQ 即可,並不會阻礙我們理解 RocketMQ 相關內容。

部署完成後我們就可以在 Docker Dashboard 中看到 RocketMQ 相關容器,包括 Broker、NameServer 及 Console(RocketMQ 控制檯),到這裡我們就可以使用部署的 RocketMQ 收發訊息了。

RocketMQ 已經部署好了,接下來先來看一個簡單的訊息收發範例,可以說是 RocketMQ 的 "Hello World"。

訊息傳送

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 範例化訊息生產者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 設定NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 啟動Producer範例
        producer.start();
        // 建立訊息,並指定Topic,Tag和訊息體
        Message msg = new Message("Topic1","Tag", "Key",
                                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); 
        // 傳送訊息到一個Broker
        SendResult sendResult = producer.send(msg);
      	// 通過sendResult返回訊息是否成功送達
        System.out.printf("%s%n", sendResult);
        // 如果不再傳送訊息,關閉Producer範例。
        producer.shutdown();
    }
}
  • 首先,範例化一個生產者 producer,並告訴它 NameServer 的地址,這樣生產者才能從 NameServer 獲取路由資訊。
  • 然後 producer 得做一些初始化(這是很關鍵的步驟),它要和 NameServer 通訊,要先建立通訊連線等。
  • producer 已經準備好了,那得準備好要發的內容,把 "Hello World" 傳送到 Topic1。
  • 內容準備好,那 producer 就可以把訊息傳送出去了。producer 怎麼知道 Broker 地址呢?他就會去 NameServer 獲取路由資訊,得到 Broker 的地址是 localhost:10909,然後通過網路通訊將訊息傳送給 Broker。
  • 生產者傳送的訊息通過網路傳輸給 Broker,Broker 需要對訊息按照一定的結構進行儲存。儲存完成之後,把儲存結果告知生產者。

訊息接收

public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {
    	// 範例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    	// 設定NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
    	// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的訊息
        erbconsumerijun.subscribe("sancijun", "*");
    	// 註冊回撥實現類來處理從broker拉取回來的訊息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
              List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 標記該訊息已經被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者範例
        consumer.start();
	}
}
  • 首先,範例化一個消費者 consumer,告訴它 NameServer 的地址,這樣消費者才能從 NameServer 獲取路由資訊。
  • 然後這個消費者需要知道自己可以消費哪些 Topic 的訊息,也就是每個消費者需要訂閱一個或多個 Topic。
  • 消費者也需要做一些初始化,業務本身並沒有理會怎麼從 Broker 拉取訊息,這些都是消費者默默無聞的奉獻。所以,我們需要啟動消費者,消費者會從 NameServer 拉取路由資訊,並不斷從 Broker 拉取訊息。拉取回來的訊息提供給業務定義的 MessageListener。
  • 訊息拉取回來後,消費者需要怎麼處理呢?每個消費者都不一樣(業務本身決定),由我們業務定義的 MessageListener 處理。處理完之後,消費者也需要確認收貨,就是告訴 Broker 消費成功了。

以上就是本文的全部內容,本文沒有堆砌太多無意義的概念,沒有講什麼削峰解耦,非同步通訊。這些內容網上也很多,看了和沒看沒什麼兩樣。

最後,看懂的點贊,沒看懂的收藏。來都來了,交個朋友,遇到什麼問題都可以和三此君分享呀。關注微信公眾號:三此君。回覆 mq,可以領取 RocketMQ 相關的所有資料。

參考文獻

  • RocketMQ 官方檔案

  • RocketMQ 原始碼

  • 丁威, 周繼鋒. RocketMQ技術內幕:RocketMQ架構設計與實現原理. 機械工業出版社, 2019-01.

  • 李偉. RocketMQ分散式訊息中介軟體:核心原理與最佳實踐. 電子工業出版社, 2020-08.

  • 楊開元. RocketMQ實戰與原理解析. 機械工業出版社, 2018-06.

轉載請註明出處