RocketMQ 系列(五)高可用與負載均衡

2023-09-14 06:01:04

RocketMQ 系列(五)高可用與負載均衡

RocketMQ 前面系列文章如下:

上一篇講了 RocketMQ 的訊息儲存,這一篇來講一下怎麼實現 RocketMQ 的高可用與負載均衡。

1、Broker 高可用

RocketMQ 的高可用主要是通過 BrokerMasterSlave相互配合來實現的。

4

Broker 部署相對複雜,Broker 分為 Master 與 Slave,一個 Master 可以對應多個 Slave,但是一個 Slave 只能對應一個 Master,Master 與 Slave 的對應關係通過指定相同的 BrokerName,不同的 BrokerId 來定義,BrokerId 為 0 表示 Master,非 0 表示 Slave。Master 也可以部署多個。每個 Broker 與 NameServer 叢集中的所有節點建立長連線,定時註冊 Topic 資訊到所有 NameServer。 注意:當前 RocketMQ 版本在部署架構上支援一 Master多 Slave,但只有BrokerId=1的從伺服器才會參與訊息的讀負載。

1.1、Broker 叢集部署方式

Broker的高可用通過叢集部署的方式實現,而部署方式主要分成了以下 4 種:

  1. 單 master方式(不推薦)

    • 優點:除了設定簡單沒什麼優點,適合個人學習使用。
    • 缺點:不可靠,該機器重啟或宕機,將導致整個服務不可用。無 HA,測試環境玩玩就行。
  2. 多 master 方式(不推薦):多個 master 節點組成叢集,單個 master 節點宕機或者重啟對應用沒有影響。

    • 優點:所有模式中效能最高
    • 缺點:單個 master 節點宕機期間,未被消費的訊息在節點恢復之前不可用,訊息的實時性就受到影響。

    注意:使用同步刷盤可以保證訊息不丟失,同時 Topic 相對應的 queue 應該分佈在叢集中各個節點,而不是隻在某各節點上,否則,該節點宕機會對訂閱該 topic 的應用造成影響。

  3. 多 master 多 slave 非同步複製方式:在多 master 模式的基礎上,每個 master 節點都有至少一個對應的 slave。master 節點可讀可寫,但是 slave 只能讀不能寫,類似於 mysql 的主備模式。

    • 優點:在 master 宕機時,消費者可以從 slave 讀取訊息,訊息的實時性不會受影響,效能幾乎和多 master 一樣。
    • 缺點:使用非同步複製的同步方式有可能會有訊息丟失的問題。
  4. 多 master 多 slave 同步雙寫模式:同多 master 多 slave 非同步複製模式類似,區別在於 master 和 slave 之間的資料同步方式。

    • 優點:同步雙寫的同步模式能保證資料不丟失。
    • 缺點:傳送單個訊息 RT 會略長,效能相比非同步複製低10%左右。
    • 刷盤策略:同步刷盤和非同步刷盤(指的是節點自身資料是同步還是非同步儲存)
    • 同步方式:同步雙寫和非同步複製(指的一組 master 和 slave 之間資料的同步)

    注意:要保證資料可靠,需採用同步刷盤和同步雙寫的方式,但效能會較其他方式低。

這裡提到了兩個概念,刷盤是什麼?複製又是什麼?讓我們帶著疑問往下看。

1.2、同步刷盤與非同步刷盤

RocketMQ 的訊息是儲存到磁碟上的,這樣既能保證斷電後恢復, 又可以讓儲存的訊息量超出記憶體的限制。RocketMQ 為了提高效能,會盡可能地保證磁碟的順序寫。訊息在通過 Producer 寫入 RocketMQ 的時候,有兩種寫磁碟方式,分別為同步刷盤和非同步刷盤。

e29a44ed868e4978a8bb3db3a7be0eca.png
  1. 同步刷盤:在返回寫成功狀態時,訊息已經被寫入磁碟。具體流程是,訊息寫入記憶體的 PAGECACHE 後,立刻通知刷盤執行緒刷盤,然後等待刷盤完成,刷盤執行緒執行完成後喚醒等待的執行緒,返回訊息寫成功的狀態。
    • 優點:效能高。
    • 缺點:Master 宕機,磁碟損壞的情況下,會丟失少量的訊息, 導致MQ的訊息狀態和生產者/消費者的訊息狀態不一致。
  2. 非同步刷盤:在返回寫成功狀態時,訊息可能只是被寫入了記憶體的 PAGECACHE,寫操作的返回快,吞吐量大;當記憶體裡的訊息量積累到一定程度時,統一觸發寫磁碟動作,快速寫入。
    • 優點:可以保持MQ的訊息狀態和生產者/消費者的訊息狀態一致
    • 缺點:效能比非同步的低

同步刷盤和非同步刷盤,都是通過 Broker 組態檔裡的 flushDiskType 引數設定的,把這個引數被設定成 SYNC_FLUSH(同步)、ASYNC_FLUSH (非同步)中的一個。

1.3、同步複製與非同步複製

如果一個 Broker 組有 Master和 Slave,訊息需要從 Master 複製到 Slave 上,有同步和非同步兩種複製方式。

  1. 同步複製:即同步雙寫,等 Master 和 Slave 均寫成功後才反饋給使用者端寫成功狀態。
    • 優點:如果 Master 出故障,Slave 上有全部的備份資料,容易恢復,消費者仍可以從 Slave 消費, 訊息不丟失。
    • 缺點:增巨量資料寫入延遲,降低系統吞吐量,效能比非同步複製模式略低,大約低10%左右,傳送單個 Master 的響應時間會略高。
  2. 非同步複製:只要 Master 寫成功即可反饋給使用者端寫成功狀態。
    • 優點:系統擁有較低的延遲和較高的吞吐量,Master 宕機之後,消費者仍可以從 Slave 消費,此過程對應用透明,不需要人工干預,效能同多個 Master模式幾乎一樣。
    • 缺點:如果 Master 出了故障,有些資料因為沒有被寫入 Slave,而丟失少量訊息。

同步複製和非同步複製是通過 Broker 組態檔裡的 brokerRole 引數進行設定的,這個引數可以被設定成 ASYNC_MASTER、SYNC_MASTER、SLAVE 三個值中的一個。三個值說明:

  • ASYNC_MASTER:非同步複製主節點。
  • SYNC_MASTER:同步複製主節點。
  • SLAVE:從節點。

1.4、小結

實際應用中要結合業務場景,合理設定刷盤方式主從複製方式, 尤其是 SYNC_FLUSH (同步刷盤)方式,由於頻繁地觸發磁碟寫動作,會明顯降低效能。

通常情況下,應該把 MasterSlave 設定成 ASYNC_FLUSH非同步刷盤)的刷盤方式,主從之間設定成 SYNC_MASTER同步複製)的複製方式,這樣即使有一臺機器出故障,仍然能保證資料不丟,是個不錯的選擇。

2、Producer 高可用

在建立 Topic 的時候,把 Topic 的多個 Message Queue 建立在多個 Broker 組上(相同 Broker 名稱,不同 brokerId 的機器組成一個 Broker 組),這樣當一個Broker 組的 Master不可用後,其他組的 Master 仍然可用,Producer 仍然可以傳送訊息。
如果Master掛掉了,那麼如何選取slave成為新的Master,參考官方檔案Controller部署和Broker部署方式,這裡就不詳講了。

大致流程如下:

1
  1. 首先Topic 在兩個Master節點的Broker上都有分別4個Message Queue。
  2. 預設使用輪詢的方式進行佇列和Broker的選擇。例如選中了 Broker A 的 Q4。
  3. 如果傳送成功,則正常返回,結束。
  4. 如果傳送失敗就會觸發重試機制(訊息最大重試次數是2次),並選擇使用哪一種重試策略。
  5. 重試策略有2種:開啟和不開啟故障延遲機制。(預設不開啟)
  6. 如果不開啟故障延遲機制,那麼重試傳送就會輪詢選擇剛才失敗的那個Broker的下一個佇列,例如Broker A的Q1。(這種方式的缺點是有可能重試會再一次失敗,因為如果第一次失敗了大部分情況是這個Broker有問題了,所以當第二次選擇這個Broker的其他佇列時,大概率也會失敗。)
  7. 如果開啟了故障延遲機制,那麼在訊息第一次傳送失敗後就會將該Broker置為不可用列表,轉而重新選擇Broker。(這種方式的缺點是,一旦所有的Broker都失敗了,那麼這個使用者端將無法傳送訊息。)

3、Consumer 高可用

Consumer 的高可用是依賴於 Master-Slave 設定的,由於 Master 能夠支援讀寫訊息,Slave 支援讀訊息,當 Master 不可用或繁忙時, Consumer 會被自動切換到從 Slave 讀取(自動切換,無需設定)。

故當 Master 的機器故障後,訊息仍可從 Slave 中被消費。

4、Producer 負載均衡

對於非順序消(普通訊息、定時/延時訊息、事務訊息)場景,預設且只能使用輪詢模式的負載均衡策略。

Producer 端,每個範例在發訊息的時候,預設會輪詢所有的 message queue 傳送,以達到讓訊息平均落在不同的 queue 上。而由於 queue 可以散落在不同的 broker,所以訊息就傳送到不同的 broker 下,如下圖:

生產者負載策略

如上圖所示,M1、M2表示生產者傳送的第一條訊息、第二條訊息,Queue1、Queue2、Queue3 表示主題中的三個佇列。

生產者按照輪詢方式分別將訊息依次傳送到這三個佇列中,M1 傳送至 Queue1 中、M2 傳送至 Queue2 中、M3 傳送至 Queue3 中,以此類推,第四條訊息 M4又傳送至 Queue1 中,迴圈往復。

注意輪詢模式只使用於非順序訊息(普通訊息、定時/延時訊息、事務訊息)場景,而對於順序訊息,相同的shading key只會對應一個Queue傳送訊息(5.0以上版本傳送順序訊息時可設定 MessageGroupHash 模式)。

5、Consumer 負載均衡

在講ConSumer負載均衡之前,有必要先了解 RocketMQ 的兩種消費模式

RocketMQ 支援兩種訊息模式:叢集消費( Clustering )和廣播消費( Broadcasting )。

5.1、消費模式

叢集消費同一 Topic 下的一條訊息只會被同一消費組中的一個消費者消費。也就是說,訊息被負載均衡到了同一個消費組的多個消費者範例上。

2

廣播消費:當使用廣播消費模式時,每條訊息推播給叢集內所有的消費者,保證訊息至少被每個消費者消費一次。

3

廣播模式其實不是負載均衡,由於每個消費者都能夠拿到所有訊息,故不能達到負載均衡的要求。

注意:對於 Topic、ConsumerGroup、Consumer 三者的關係需要滿足訂閱關係一致

5.2、Queue 分配策略

Consumer 的負載均衡是指將 Broker 端中多個 Queue 按照某種策略分配給同一個消費組中的不同消費者,因此只有叢集消費才能做到負載均衡

一個 Topic 中的 Queue 只能由 Consumer Group 中的一個 Consumer 進行消費,而一個 Consumer 可以同時消費多個 Queue 中的訊息。那麼 Queue 與Consumer 間的配對關係是如何確定的,即 Queue 要分配給哪個 Consumer 進行消費,也是有演演算法策略的,這些策略是通過在建立 Consumer 時的構造器傳進去的。

常見的有四種策略:平均分配、環形分配策略、一致性hash策略、同機房策略。

通過以下程式碼可設定策略:

 consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
5.2.1、平均分配策略(預設)

該演演算法是要根據 avg = QueueCount / ConsumerCount 的計算結果進行分配的。如果能夠整除,則按順序將 avg 個 Queue 逐個分配 Consumer;如果不能整除,則將多餘出的 Queue 按照 Consumer 順序逐個分配。如下圖:

4

上面有 5 個 Queue,3 個 Consumer,那麼每個 Consumer 可以分配到 1 個 Queue,但是還有 2 個 Queue 是多餘的,那麼這 2 個 Queue 將依次按順序分給 Consumer1,Consumer2。

5.2.2、環形分配策略

環形平均演演算法是指,根據消費者的順序,依次由 Queue 佇列組成的環形圖逐個分配,該方法不需要提前計算。如下圖:

5

同樣以 5 個 Queue,3 個 Consumer 為例,Queue1、Queue2、Queue3 按照順序分配給 3 個 Consumer, 剩下的Queue4、Queue5 繼續按照順序分配給 Consumer1,Consumer2。

5.2.3、一致性 hash 策略

該演演算法會將 Consumer 的 hash 值作為Node節點存放到 hash 環上,然後將 Queue 的 hash 值也放到 hash 環 上,通過順時針方向,距離 Queue 最近的那個Qonsumer 就是該 Queue 要分配的 Consumer。

6

順時針方向進行判斷,Queue1 分配給 Consumer1, Queue2、Queue3 分配給 Consumer2, Queue4、Queue5 分配給 Consumer3。

該演演算法存在的問題:分配不均,其可以有效減少由於消費者組擴容或縮容所帶來的大量的 Rebalance。

5.2.4、同機房策略

該演演算法會根據 Queue 的部署機房位置和 Consumer 的位置,過濾出當前 Consumer 相同機房的 Queue。然後按照平均分配策略或環形平均策略對同機房 Queue進行分配。如果沒有同機房 Queue,則按照平均分配策略或環形分配策略對所有 Queue 進行分配。如下圖:

7

Queue1、Queue2、Queue3 與 Consumer1、Consumer2 同個機房,3個 Queue 按照平均分配策略或環形分配策略指定 Consumer1、Consumer2。而Queue4、Queue5 則直接分配給 Consumer3。

5.2.5、負載均衡程式碼

下面展示了按照主題負載均衡的程式碼片段:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        //廣播消費    
        case BROADCASTING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}",
                             consumerGroup,
                             topic,
                             mqSet,
                             mqSet);
                }
            } else {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        //叢集消費    
        case CLUSTERING: {
            //從本地記憶體獲取該Topic主題下的訊息佇列結合
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            //根據Topic和Consumer Group獲取Broker端消費者id的rpc請求
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
            }

            if (null == cidAll) {
                log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
            }

            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);

                Collections.sort(mqAll);
                Collections.sort(cidAll);

                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
                    //按照Queue策略分配佇列
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                              e);
                    return;
                }

                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }
				
                //在負載均衡中更新ProcessQueueTable
                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                if (changed) {
                    log.info(
                        "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                        strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                        allocateResultSet.size(), allocateResultSet);
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
            break;
        }
        default:
            break;
    }
}

5.3、消費型別

消費者從 Broker 中獲取訊息的方式有兩種:pull 方式和 push 方式。

5.3.1、pull 消費

Consumer 主動從 Broker 中拉取訊息,主動權由 Consumer 控制。一旦獲取了批次訊息,就會啟動消費過程。不過,該方式的實時性較弱,即 Broker 中有了新的訊息時消費者並不能及時發現並消費。

拉取時間間隔是由使用者指定,所以在設定該間隔時需要注意:間隔太短,空請求比例會增加;間隔太長,訊息的實時性太差.

5.3.2、push 消費

該模式下 Broker 收到資料後會主動推播給 Consumer,該獲取方式一般實時性較高。

該獲取方式是典型的釋出-訂閱模式,即 Consumer 向其關聯的 Queue 註冊了監聽器,一旦發現有新的訊息到來就會觸發回撥,去 Queue 中拉取訊息。而這些都是基於 Consumer 與 Broker 間的長連線的,長連線的維護是需要消耗系統資源的。

5.3.3、對比
  • pull 需要應用去實現對關聯 Queue 的遍歷,實時性差;但便於應用控制訊息的拉取。
  • push:封裝了對關聯 Queue 的遍歷,實時性強,但會佔用較多的系統資源。

本篇主要以圖解的方式講解 Broker、Producer、Consumer 的高可用及負載均衡方式,Consumer 的負載均衡較為複雜便細化到其消費模式、Queue 分配策略、消費型別三個方面。高可用與負載均衡對於分散式系統是一個剛需,也是一道難關,道阻且長,小夥伴們加油吧!!!

參考資料: