18-基於CentOS7搭建RabbitMQ3.10.7叢集映象佇列+HaProxy+Keepalived高可用架構

2022-10-05 06:00:32

叢集架構

虛擬機器器規劃

IP

hostname

節點說明

控制檯地址

192.168.247.150

rabbitmq.master

rabbitmq master

5672

http://192.168.247.150:15672

192.168.247.151

rabbitmq.s.o

rabbitmq slave

5672

http://192.168.247.151:15672

192.168.247.152

rabbitmq.s.t

rabbitmq slave

5672

http://192.168.247.152:15672

192.168.247.153

haproxy.k.m

haproxy+keepalived

8100

http://192.168.247.153:8100/rabbitmq-stats

192.168.247.154

haproxy.k.s

haproxy+keepalived

8100

http://192.168.247.154:8100/rabbitmq-stats

映象模式

  • 映象模式: 叢集模式非常經典的就是Mirror映象模式, 保證100%資料不丟失, 在實際工作中也是用的最多的, 並且實現叢集非常的簡單, 一般網際網路大廠都會構建這種映象叢集模式
  • Mirror映象佇列, 目的是為了保證RabbitMQ資料的高可用性解決方案, 主要就是實現資料的同步, 一般來講是2-3個節點實現資料同步[一般都是3節點+(奇數個節點)](對於100%資料可靠性解決方案一般都是3節點)叢集架構如下

伺服器規劃

  • 架構: RabbitMQ Cluster + Queue HA + Haproxy + Keepalived
  • 解釋: 3臺rabbitMQ伺服器構建broker叢集,允許任意2臺伺服器故障而服務不受影響,在此基礎上,通過Queue HA (queue mirror)實現佇列的高可用,在本例中映象到所有伺服器節點(即1個master,2個slave);為保證使用者端存取入口地址的唯一性,通過haproxy做4層代理來提供MQ服務,並通過簡單的輪詢方式來進行負載均衡,設定健康檢查來遮蔽故障節點對使用者端的影響;使用2臺haproxy並且通過keepalived實現使用者端存取入口的高可用機制。

服務架構設計

參考資料

官方檔案手冊:

叢集設定檔案:Clustering Guide — RabbitMQ

映象佇列檔案:Classic Queue Mirroring — RabbitMQ

叢集操作檔案:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

中文版AMQP使用者手冊:

Spring AMQP檔案:http://www.blogjava.net/qbna350816/category/55056.html?Show=All

事務檔案:http://www.blogjava.net/qbna350816/archive/2016/08/13/431567.html

叢集架構搭建

設定HOST[5臺]

5臺伺服器設定Host, 參考虛擬機器器規劃

vi /etc/hostname
150改為rabbitmqmaster
151改為rabbitmqso
152改為rabbitmqst
153改為haproxykm
154改為haproxyks

vi /etc/hosts
192.168.247.150 rabbitmqmaster
192.168.247.151 rabbitmqso
192.168.247.152 rabbitmqst
192.168.247.153 haproxykm
192.168.247.154 haproxyks

都修改完成後重啟: 一定要重啟, 我在這裡就碰到了個大坑, 應為沒有重啟, 所以導致hostname沒有生效 新增叢集節點一致報錯,我丟 重啟後關閉防火牆

安裝依賴環境[5臺]

yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

安裝RabbitMQ[三臺]

150,151,152

# 我用的直接就是3.10.7版本, 我沒有找到老版本的, 而且現在應該也都是新版本的了
1: 安裝參考安裝與啟動, 建議三臺同時進行
2: 關閉5臺電腦的防火牆

官方的部署檔案, 應為是英文的, 看起來優點蒙圈, 哎, 探索新版本總是困難的, 但是也總要有人前行

存取控制檯[三臺]

在虛擬機器器規劃中有控制檯地址

不知道為什麼, 150,151,可以存取, 152又提示不是私密連線, 我就又建立了一個賬戶就可以了

150 toor 123456

151 toor 123456

152 toor 123456

重啟所有的RabbitMQ[三臺]

# 停止
rabbitmqctl stop
# 檢測埠
lsof -i:5672
lsof -i:15672
# 啟動
rabbitmq-server -detached

檔案同步步驟[150->151,152]

因為我是最新版本安裝的所以.erlang.cookie並沒有在var/lib/rabbitmq下面, 而是在root下

scp /root/.erlang.cookie 192.168.247.151:/root/
scp /root/.erlang.cookie 192.168.247.152:/root/

選擇150,151,152任意一個節點為Master, 我選擇150, 也就是說需要把150的cookie檔案同步到151,152節點上, 進入/root目錄, ll -a才可以看到隱藏檔案, 使用上面的命令傳送就可以

注意: 需要在三臺伺服器RabbitMQ都啟動的情況下, 去同步

節點加入叢集[151,152]

150直接啟動即可

# 啟動
rabbitmq-server -detached

151,152, 將自身加入叢集

# 停止app
rabbitmqctl stop_app
# 將節點加入叢集 --ram是採用記憶體的方式 151,採用記憶體, 152不加, 預設使用磁碟
rabbitmqctl join_cluster --ram rabbit@rabbitmqmaster
# 啟動app
rabbitmqctl start_app

控制檯檢視叢集

檢視叢集狀態

# 任意節點
rabbitmqctl cluster_status

設定映象佇列

設定映象佇列策略

# 任意節點
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

將所有佇列設定為映象佇列,即佇列會被複制到各個節點,各個節點狀態一致,RabbitMQ高可用叢集就已經搭建好了,我們可以重啟服務,檢視其佇列是否在從節點同步

移除叢集節點[擴充套件]

// 在要移除的節點上執行
rabbitmqctl forget_cluster_node rabbit@bhz71

修改叢集名稱(預設為第一個node名稱)[擴充套件]

# 任意節點
rabbitmqctl set_cluster_name rabbitmq_cluster1

RabbitMQ設定位置

我一般不修改, 正式環境有運維, 開發的話, 能跑就行

/usr/local/rabbitmq/plugins/rabbit-3.10.7/ebin/rabbit.app

測試叢集

測試佇列同步

在150上新建佇列

佇列開始同步

同步映象佇列兩個

151,152都已經同步

安裝Ha-Proxy[153,154]

簡介

  • HAProxy是一款提供高可用性、負載均衡以及基於TCP和HTTP應用的代理軟體,HAProxy是完全免費的、藉助HAProxy可以快速並且可靠的提供基於TCP和HTTP應用的代理解決方案。
  • HAProxy適用於那些負載較大的web站點,這些站點通常又需要對談保持或七層處理。
  • HAProxy可以支援數以萬計的並行連線,並且HAProxy的執行模式使得它可以很簡單安全的整合進架構中,同時可以保護web伺服器不被暴露到網路上。

安裝

# 下載
wget http://www.haproxy.org/download/1.6/src/haproxy-1.6.6.tar.gz
# 解壓
tar -zxvf haproxy-1.6.6.tar.gz
# 進入資料夾
cd haproxy-1.6.6/
# 編譯
make target=linux31 prefix=/usr/local/haproxy
# 安裝
make install prefix=/usr/local/haproxy
# 建立資料夾
mkdir /etc/haproxy
# 許可權
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
# 建立組態檔
touch /etc/haproxy/haproxy.cfg
# haproxy 組態檔haproxy.cfg詳解
# 編輯組態檔
vi /etc/haproxy/haproxy.cfg

組態檔

#logging options
global
    log 127.0.0.1 local0 info
    maxconn 5120
    chroot /usr/local/haproxy
    uid 99
    gid 99
    daemon
    quiet
    nbproc 20
    pidfile /var/run/haproxy.pid

defaults
    log global
    #使用4層代理模式,」mode http」為7層代理模式
    mode tcp
    #if you set mode to tcp,then you nust change tcplog into httplog
    option tcplog
    option dontlognull
    retries 3
    option redispatch
    maxconn 2000
    contimeout 10s
     ##使用者端空閒超時時間為 60秒 則HA 發起重連機制
     clitimeout 10s
     ##伺服器端連結超時時間為 15秒 則HA 發起重連機制
     srvtimeout 10s    
#front-end IP for consumers and producters

listen rabbitmq_cluster
    bind 0.0.0.0:5672
    #設定TCP模式
    mode tcp
    #balance url_param userid
    #balance url_param session_id check_post 64
    #balance hdr(User-Agent)
    #balance hdr(host)
    #balance hdr(Host) use_domain_only
    #balance rdp-cookie
    #balance leastconn
    #balance source //ip
    #簡單的輪詢
    balance roundrobin
    #rabbitmq叢集節點設定 #inter 每隔五秒對mq叢集做健康檢查, 2次正確證明伺服器可用,2次失敗證明伺服器不可用,並且設定主備機制
        server rabbitmqmaster 192.168.247.150:5672 check inter 5000 rise 2 fall 2
        server rabbitmqso 192.168.247.151:5672 check inter 5000 rise 2 fall 2
        server rabbitmqst 192.168.247.152:5672 check inter 5000 rise 2 fall 2
#設定haproxy web監控,檢視統計資訊
listen stats
    # 154改為154
    bind 192.168.247.153:8100
    mode http
    option httplog
    stats enable
    #設定haproxy監控地址為http://localhost:8100/rabbitmq-stats
    stats uri /rabbitmq-stats
    stats refresh 5s

啟動

/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg

檢視服務

http://192.168.247.153:8100/rabbitmq-stats
http://192.168.247.154:8100/rabbitmq-stats

停止[擴充套件]

killall haproxy
ps -ef | grep haproxy
netstat -tunpl | grep haproxy
ps -ef |grep haproxy |awk '{print $2}'|xargs kill -9

安裝Keepalived[153,154]

簡介

  • 之前在Nginx已經用過Keepalived了
  • Keepalived,它是一個高效能的伺服器高可用或熱備解決方案,Keepalived主要來防止伺服器單點故障的發生問題,可以通過其與Nginx、Haproxy等反向代理的負載均衡伺服器配合實現web伺服器端的高可用。Keepalived以VRRP協定為實現基礎,用VRRP協定來實現高可用性(HA).VRRP(Virtual Router Redundancy Protocol)協定是用於實現路由器冗餘的協定,VRRP協定將兩臺或多臺路由器裝置虛擬成一個裝置,對外提供虛擬路由器IP(一個或多個)。

安裝

我直接採用2.0.18版本, 上傳到linux

tar -zxvf keepalived-2.0.18.tar.gz

cd keepalived-2.0.18/

yum install libnl libnl-devel

./configure \
--prefix=/usr/local/keepalived \
--sysconf=/etc

make && make install

cd /etc/keepalived/

vi keepalived.conf

153設定

! Configuration File for keepalived

global_defs {
   router_id haproxykm  ##標識節點的字串,通常為hostname

}

vrrp_script chk_haproxy {
    script "/etc/keepalived/haproxy_check.sh"  ##執行指令碼位置
    interval 2  ##檢測時間間隔
    weight -20  ##如果條件成立則權重減20
}

vrrp_instance VI_1 {
    state MASTER  ## 主節點為MASTER,備份節點為BACKUP
    interface ens33 ## 繫結虛擬IP的網路介面(網路卡),與本機IP地址所在的網路介面相同(我這裡是eth0)
    virtual_router_id 74  ## 虛擬路由ID號(主備節點一定要相同)
    mcast_src_ip 192.168.247.153 ## 本機ip地址
    priority 100  ##優先順序設定(0-254的值)
    nopreempt
    advert_int 1  ## 組播資訊傳送間隔,倆個節點必須設定一致,預設1s
        authentication {  ## 認證匹配
        auth_type PASS
        auth_pass bhz
    }

    track_script {
        chk_haproxy
    }

    virtual_ipaddress {
        192.168.247.160  ## 虛擬ip,可以指定多個
    }
}

154設定

! Configuration File for keepalived

global_defs {
   router_id haproxyks  ##標識節點的字串,通常為hostname

}

vrrp_script chk_haproxy {
    script "/etc/keepalived/haproxy_check.sh"  ##執行指令碼位置
    interval 2  ##檢測時間間隔
    weight -20  ##如果條件成立則權重減20
}

vrrp_instance VI_1 {
    state BACKUP  ## 主節點為MASTER,備份節點為BACKUP
    interface ens33 ## 繫結虛擬IP的網路介面(網路卡),與本機IP地址所在的網路介面相同(我這裡是eth0)
    virtual_router_id 74  ## 虛擬路由ID號(主備節點一定要相同)
    mcast_src_ip 192.168.247.154 ## 本機ip地址
    priority 80  ##優先順序設定(0-254的值)
    nopreempt
    advert_int 1  ## 組播資訊傳送間隔,倆個節點必須設定一致,預設1s
        authentication {  ## 認證匹配
        auth_type PASS
        auth_pass bhz
    }

    track_script {
        chk_haproxy
    }

    virtual_ipaddress {
        192.168.247.160  ## 虛擬ip,可以指定多個
    }
}

編寫自動檢測指令碼

touch /etc/keepalived/haproxy_check.sh

chmod +x /etc/keepalived/haproxy_check.sh

vi /etc/keepalived/haproxy_check.sh

指令碼內容

#!/bin/bash
COUNT=`ps -C haproxy --no-header |wc -l`
if [ $COUNT -eq 0 ];then
    /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
    sleep 2
    if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then
        killall keepalived
    fi
fi

啟動

# 記得先啟動haproxy, 再啟動Keepalived
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg

# 檢視程序
ps -ef | grep haproxy

# 將Keepalived註冊為系統服務
cd /home/software/keepalived-2.0.18/keepalived/etc/
cp init.d/keepalived /etc/init.d/
cp sysconfig/keepalived /etc/sysconfig/
systemctl daemon-reload

# 啟動Keepalived
systemctl [start | stop | status | restart] keepalived
systemctl start keepalived.service
systemctl stop keepalived.service

# 檢視服務
ps -ef|grep keepalived

高可用測試

160預設在153主Keepalived上

節點宕機測試

停止153上的keepalived

檢視153 ip

160的vip已經不再了, 檢視154

已經自動繫結154了, 重啟153

重啟後160重新漂移回153上, 高可用測試ok

佇列功能程式碼測試

直接使用最簡單的hello world程式測試, IP使用虛擬的VIP 160

消費者

package com.dance.redis.mq.rabbit.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Receiver {
    private static final String QUEUE_NAME = "queue-test";
    private static final String IP_ADDRESS = "192.168.247.160";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        Address[] address = new Address[]{
                new Address(IP_ADDRESS, PORT)
        };

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("toor");
        factory.setPassword("123456");
        // 這裡的連線方式與生產者的demo略有不同,注意區別。
        Connection connection = factory.newConnection(address); //建立連線
        final Channel channel = connection.createChannel();//建立通道
        channel.basicQos(64);//設定使用者端最多接收未被ack的訊息個數
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                System.out.println("recvive message:" + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, consumer);
        //等待回撥函數執行完畢之後,關閉資源。
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        connection.close();
    }
}

生產者

package com.dance.redis.mq.rabbit.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
 
import java.io.IOException;
import java.util.concurrent.TimeoutException;
 
public class RabbitProducer {
    private static final String EXCHANGE_NAME = "exchange-test";
    private static final String ROUTING_KEY = "text.*";
    private static final String QUEUE_NAME = "queue-test";
    private static final String IP_ADDRESS = "192.168.247.160";
    private static final int PORT = 5672; //RabbitMQ服務預設埠為5672
 
    public static void main(String[] args) throws IOException,
            TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setVirtualHost("/");
        factory.setUsername("toor");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();//建立連線
        Channel channel = connection.createChannel();//建立通道
        //建立一個type="topic"、持久化的、非自動刪除的交換器。
        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
        //建立一個持久化、非排他的、非自動刪除的佇列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //將交換機與佇列通過路由鍵繫結
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        //傳送一條持久化訊息:Hello World!
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
        //關閉資源
        channel.close();
        connection.close();
    }
    
}

我的賬戶和密碼是toor和123456, 請改為自己的

啟動消費者

啟動生產者

檢視消費者

消費成功, ip也是160

檢視控制檯 exchange

多了exchange-test 功能是ha-all

佇列也是, 到此映象佇列+高可用已經實現

也可以在HA的控制檯上檢視統計

叢集架構回顧

我們實現的是RabbitMQ Cluster + Mirror Queue + Haproxy + Keepalived

RabbitMQ Cluster 3臺

Mirror Queue RabbitMQ叢集方式

Haproxy 反向代理

Keepalived Haproxy叢集檢測, 虛擬VIP, 實現統一IP對外提供

架構圖手繪

emm, 就是這樣一個架構, 我應該李姐的挺到位的