三此君看了好幾本書,看了很多遍原始碼整理的 一張圖進階 RocketMQ 圖片連結,關於 RocketMQ 你只需要記住這張圖!如果你第一次看到這個系列,牆裂建議你開啟連結。覺得不錯的話,記得點贊關注哦。
視訊在 B 站同步更新(B站:三此君),歡迎圍觀 一張圖進階 ROcketMQ -整體架構 (視訊版),記得調整視訊清晰度哦~
本文是《一張圖進階 RocketMQ》系列的第 1 篇,今天的內容主要分為兩個部分:
什麼是訊息佇列?顧名思義,首先得有一個佇列,這個佇列用來儲存訊息。那有了訊息佇列就得有人往裡面放,有人往裡面取。有沒有似曾相識的感 jio,這莫非就是連小學生都知道的,經典的「生產者-消費者模式」?接下來我們就來看看它裡面穿了什麼?
別急,先來回顧一下 「生產者-消費者模式」 這個老朋友。簡單來說,這個模型是由兩類執行緒和一個佇列構成:
有了這個佇列,生產者就只需要關注生產,而不用管消費者的消費行為,更不用等待消費者執行緒執行完;消費者也只管消費,不用管生產者是怎麼生產的,更不用等著生產者生產。
這意味著什麼呢,生產者和消費者之間實現解藕和非同步。這就老厲害了,你未來工作學習中會不斷遇到這些概念,但是越看越看不懂。我們生活中很多都是非同步的。比如最近新冠疫情捲土重來,我點的外賣只能送到小區門口的外賣佇列裡面,而我只能去外賣佇列裡面取外賣,然後一頓狼吞虎嚥。
具體 「生產者-消費者模式」 怎麼實現,想必各位小學都學過了,我們來看看這個模式還有什麼問題吧。最大的問題就是我們小學學的 「生產者-消費者模式」 是個單機版的,只能自嗨。這就相當於,我就是外賣騎手,我點了個外賣放到外賣佇列,然後我再從外賣佇列裡面去取,一頓操作猛如虎呀!於是就有了進化版,我們把消費者,佇列,生產者放到不同的伺服器上,這就是傳說中的分散式訊息佇列了。
生產者生產的訊息通過網路傳遞給佇列儲存,消費者通過網路從佇列獲取訊息。但是還有問題,訊息可能有很多種,全都放在一起豈不是亂套了?我點的外賣和快遞全都放在一起,太難找了吧。於是我們就需要區分不同型別訊息,相同型別的訊息稱為一個 Topic。同時,騎手不可能只有一個,點外賣的也不會只有我一個人,於是就有了生產者組和消費者組。
但還是有問題呀,小區那麼大,一個佇列放不下。我住在小區南門,點個外賣還要跑去北門拿,那真的是 eggs hurt
。於是物業在東南西北門各設了一個外賣快遞放置點。也就是我們有多個佇列,組成 佇列叢集。
可是,問題又雙叒叕來了(還有完沒完),一個小區那麼多個外賣快遞佇列,騎手怎麼知道送到哪裡去,我又怎麼知道去哪裡取?很簡單,導航呀。我們把導航的資訊稱為路由資訊,這些資訊需要有一個管理的地方,它告訴生產者,某這個 Topic 的訊息可以發給哪些佇列,同時告訴消費者你需要的訊息可以從哪些佇列裡面取。RocketMQ 為這些路由資訊的設定了管理員 NameServer,當然 NameServer 也可以有很多個,組成 NameServer 叢集。
到這裡,你就應該知道 RocketMQ 裡面都穿了什麼啦。包括了生產者(Producer),消費者(Consumer),NameServer 以及佇列本身(Broker)。Broker 是代理的意思,負責佇列的存取等操作,我們可以把 Broker 理解為佇列本身。
NameServer:我們可以同時部署很多臺 NameServer 伺服器,並且這些伺服器是無狀態的,節點之間無任何資訊同步。
NameServer 起來後監聽 埠,等待 Broker、Producer、Consumer 連上來,NameServer 是叢集後設資料管理中心。
Broker:Broker 啟動,跟所有的 NameServer 保持長連線,每 30s 傳送一次傳送心跳包(像心跳一樣持續穩定的傳送請求)。心跳包中包含當前 Broker 資訊 ( IP+ 埠等)以及儲存所有 Topic 資訊。註冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的對映關係。
我們可以同時部署多個 Master Broker和多個 Slave Broker,一個 Master 可以對應多個 Slave,但是一個 Slave 只能對應一個 Master。Master 與 Slave 需要有相同的 BrokerName,不同的 BrokerId 。BrokerId 為 0 表示 Master,非 0 表示 Slave,但只有 BrokerId=1 的從伺服器才會參與訊息的讀負載。(可以暫時忽略 Broker 的主從角色)
Topic:收發訊息前,先建立 Topic,建立 Topic 時需要指定該 Topic 要儲存在哪些 Broker 上,也可以在傳送訊息時自動建立 Topic。
Producer:Producer 傳送訊息,啟動時先跟 NameServer 叢集中的其中一臺建立長連線,並從 NameServer 中獲取當前傳送的 Topic 存在哪些 Broker 上,採用輪詢策略從選擇一個佇列,然後與佇列所在的 Broker 建立長連線,並向 Broker 發訊息。
Consumer:Consumer 跟 Producer 類似,跟其中一臺 NameServer 建立長連線,獲取當前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連線通道,開始消費訊息。
我們剛剛提到騎手不止一個,取外賣快遞的也不止我一個,所以會有生產者組和消費者組的概念。這裡需要補充說明一下,訊息分為叢集訊息和廣播訊息:
叢集訊息:一個 Topic 的一條訊息,一個消費者組只能有一個消費者範例消費。例如,同樣是外賣 Topic,一份外賣,我們整個小區也只有一個人消費,就是叢集消費。
廣播訊息:一個 Topic 的一條訊息,一個消費者組所有消費者範例都會消費。例如,如果是因為疫情,政府發放食品,那我們小區每個人都會消費,就是廣播消費。
剛剛我們瞭解 RocketMQ 整體架構,那怎麼樣通過 RocketMQ 收發訊息呢?需要先通過 Docker 部署一套 RocketMQ:
如果你沒有安裝 Docker,可以根據菜鳥教學 MacOS Docker 安裝/Windows Docker 安裝 進行安裝。然後,通過 docker-compose 部署 RocketMQ:
./conf/broker.conf
中的brokerIP1
引數為本機 IP;docker-compose.yml
檔案所在路徑,執行docker-compose up
命令即可;注意:如果你現在不瞭解 Docker 不重要,只需要按照步驟部署好 RocketMQ 即可,並不會阻礙我們理解 RocketMQ 相關內容。
部署完成後我們就可以在 Docker Dashboard 中看到 RocketMQ 相關容器,包括 Broker、NameServer 及 Console(RocketMQ 控制檯),到這裡我們就可以使用部署的 RocketMQ 收發訊息了。
RocketMQ 已經部署好了,接下來先來看一個簡單的訊息收發範例,可以說是 RocketMQ 的 "Hello World"。
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 範例化訊息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設定NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer範例
producer.start();
// 建立訊息,並指定Topic,Tag和訊息體
Message msg = new Message("Topic1","Tag", "Key",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 傳送訊息到一個Broker
SendResult sendResult = producer.send(msg);
// 通過sendResult返回訊息是否成功送達
System.out.printf("%s%n", sendResult);
// 如果不再傳送訊息,關閉Producer範例。
producer.shutdown();
}
}
producer
,並告訴它 NameServer 的地址,這樣生產者才能從 NameServer 獲取路由資訊。producer
得做一些初始化(這是很關鍵的步驟),它要和 NameServer 通訊,要先建立通訊連線等。producer
已經準備好了,那得準備好要發的內容,把 "Hello World" 傳送到 Topic1。producer
就可以把訊息傳送出去了。producer
怎麼知道 Broker 地址呢?他就會去 NameServer 獲取路由資訊,得到 Broker 的地址是 localhost:10909,然後通過網路通訊將訊息傳送給 Broker。public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 範例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 設定NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的訊息
erbconsumerijun.subscribe("sancijun", "*");
// 註冊回撥實現類來處理從broker拉取回來的訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 標記該訊息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者範例
consumer.start();
}
}
consumer
,告訴它 NameServer 的地址,這樣消費者才能從 NameServer 獲取路由資訊。以上就是本文的全部內容,本文沒有堆砌太多無意義的概念,沒有講什麼削峰解耦,非同步通訊。這些內容網上也很多,看了和沒看沒什麼兩樣。
最後,看懂的點贊,沒看懂的收藏。來都來了,交個朋友,遇到什麼問題都可以和三此君分享呀。關注微信公眾號:三此君。回覆 mq,可以領取 RocketMQ 相關的所有資料。
丁威, 周繼鋒. RocketMQ技術內幕:RocketMQ架構設計與實現原理. 機械工業出版社, 2019-01.
李偉. RocketMQ分散式訊息中介軟體:核心原理與最佳實踐. 電子工業出版社, 2020-08.
楊開元. RocketMQ實戰與原理解析. 機械工業出版社, 2018-06.
轉載請註明出處