IP |
hostname |
節點說明 |
埠 |
控制檯地址 |
192.168.247.150 |
rabbitmq.master |
rabbitmq master |
5672 |
http://192.168.247.150:15672 |
192.168.247.151 |
rabbitmq.s.o |
rabbitmq slave |
5672 |
http://192.168.247.151:15672 |
192.168.247.152 |
rabbitmq.s.t |
rabbitmq slave |
5672 |
http://192.168.247.152:15672 |
192.168.247.153 |
haproxy.k.m |
haproxy+keepalived |
8100 |
http://192.168.247.153:8100/rabbitmq-stats |
192.168.247.154 |
haproxy.k.s |
haproxy+keepalived |
8100 |
http://192.168.247.154:8100/rabbitmq-stats |
官方檔案手冊:
叢集設定檔案:Clustering Guide — RabbitMQ
映象佇列檔案:Classic Queue Mirroring — RabbitMQ
叢集操作檔案:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
中文版AMQP使用者手冊:
Spring AMQP檔案:http://www.blogjava.net/qbna350816/category/55056.html?Show=All
事務檔案:http://www.blogjava.net/qbna350816/archive/2016/08/13/431567.html
5臺伺服器設定Host, 參考虛擬機器器規劃
vi /etc/hostname 150改為rabbitmqmaster 151改為rabbitmqso 152改為rabbitmqst 153改為haproxykm 154改為haproxyks vi /etc/hosts 192.168.247.150 rabbitmqmaster 192.168.247.151 rabbitmqso 192.168.247.152 rabbitmqst 192.168.247.153 haproxykm 192.168.247.154 haproxyks
都修改完成後重啟: 一定要重啟, 我在這裡就碰到了個大坑, 應為沒有重啟, 所以導致hostname沒有生效 新增叢集節點一致報錯,我丟 重啟後關閉防火牆
yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
150,151,152
# 我用的直接就是3.10.7版本, 我沒有找到老版本的, 而且現在應該也都是新版本的了 1: 安裝參考安裝與啟動, 建議三臺同時進行 2: 關閉5臺電腦的防火牆
官方的部署檔案, 應為是英文的, 看起來優點蒙圈, 哎, 探索新版本總是困難的, 但是也總要有人前行
在虛擬機器器規劃中有控制檯地址
不知道為什麼, 150,151,可以存取, 152又提示不是私密連線, 我就又建立了一個賬戶就可以了
150 toor 123456 151 toor 123456 152 toor 123456
# 停止 rabbitmqctl stop # 檢測埠 lsof -i:5672 lsof -i:15672 # 啟動 rabbitmq-server -detached
因為我是最新版本安裝的所以.erlang.cookie並沒有在var/lib/rabbitmq下面, 而是在root下
scp /root/.erlang.cookie 192.168.247.151:/root/
scp /root/.erlang.cookie 192.168.247.152:/root/
選擇150,151,152任意一個節點為Master, 我選擇150, 也就是說需要把150的cookie檔案同步到151,152節點上, 進入/root目錄, ll -a才可以看到隱藏檔案, 使用上面的命令傳送就可以
注意: 需要在三臺伺服器RabbitMQ都啟動的情況下, 去同步
150直接啟動即可
# 啟動
rabbitmq-server -detached
151,152, 將自身加入叢集
# 停止app rabbitmqctl stop_app # 將節點加入叢集 --ram是採用記憶體的方式 151,採用記憶體, 152不加, 預設使用磁碟 rabbitmqctl join_cluster --ram rabbit@rabbitmqmaster # 啟動app rabbitmqctl start_app
# 任意節點
rabbitmqctl cluster_status
設定映象佇列策略
# 任意節點
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
將所有佇列設定為映象佇列,即佇列會被複制到各個節點,各個節點狀態一致,RabbitMQ高可用叢集就已經搭建好了,我們可以重啟服務,檢視其佇列是否在從節點同步
// 在要移除的節點上執行 rabbitmqctl forget_cluster_node rabbit@bhz71
# 任意節點
rabbitmqctl set_cluster_name rabbitmq_cluster1
我一般不修改, 正式環境有運維, 開發的話, 能跑就行
/usr/local/rabbitmq/plugins/rabbit-3.10.7/ebin/rabbit.app
測試佇列同步
在150上新建佇列
佇列開始同步
同步映象佇列兩個
151,152都已經同步
# 下載 wget http://www.haproxy.org/download/1.6/src/haproxy-1.6.6.tar.gz # 解壓 tar -zxvf haproxy-1.6.6.tar.gz # 進入資料夾 cd haproxy-1.6.6/ # 編譯 make target=linux31 prefix=/usr/local/haproxy # 安裝 make install prefix=/usr/local/haproxy # 建立資料夾 mkdir /etc/haproxy # 許可權 groupadd -r -g 149 haproxy useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy # 建立組態檔 touch /etc/haproxy/haproxy.cfg # haproxy 組態檔haproxy.cfg詳解 # 編輯組態檔 vi /etc/haproxy/haproxy.cfg
組態檔
#logging options global log 127.0.0.1 local0 info maxconn 5120 chroot /usr/local/haproxy uid 99 gid 99 daemon quiet nbproc 20 pidfile /var/run/haproxy.pid defaults log global #使用4層代理模式,」mode http」為7層代理模式 mode tcp #if you set mode to tcp,then you nust change tcplog into httplog option tcplog option dontlognull retries 3 option redispatch maxconn 2000 contimeout 10s ##使用者端空閒超時時間為 60秒 則HA 發起重連機制 clitimeout 10s ##伺服器端連結超時時間為 15秒 則HA 發起重連機制 srvtimeout 10s #front-end IP for consumers and producters listen rabbitmq_cluster bind 0.0.0.0:5672 #設定TCP模式 mode tcp #balance url_param userid #balance url_param session_id check_post 64 #balance hdr(User-Agent) #balance hdr(host) #balance hdr(Host) use_domain_only #balance rdp-cookie #balance leastconn #balance source //ip #簡單的輪詢 balance roundrobin #rabbitmq叢集節點設定 #inter 每隔五秒對mq叢集做健康檢查, 2次正確證明伺服器可用,2次失敗證明伺服器不可用,並且設定主備機制 server rabbitmqmaster 192.168.247.150:5672 check inter 5000 rise 2 fall 2 server rabbitmqso 192.168.247.151:5672 check inter 5000 rise 2 fall 2 server rabbitmqst 192.168.247.152:5672 check inter 5000 rise 2 fall 2 #設定haproxy web監控,檢視統計資訊 listen stats # 154改為154 bind 192.168.247.153:8100 mode http option httplog stats enable #設定haproxy監控地址為http://localhost:8100/rabbitmq-stats stats uri /rabbitmq-stats stats refresh 5s
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
http://192.168.247.153:8100/rabbitmq-stats http://192.168.247.154:8100/rabbitmq-stats
killall haproxy ps -ef | grep haproxy netstat -tunpl | grep haproxy ps -ef |grep haproxy |awk '{print $2}'|xargs kill -9
我直接採用2.0.18版本, 上傳到linux
tar -zxvf keepalived-2.0.18.tar.gz cd keepalived-2.0.18/ yum install libnl libnl-devel ./configure \ --prefix=/usr/local/keepalived \ --sysconf=/etc make && make install cd /etc/keepalived/ vi keepalived.conf
153設定
! Configuration File for keepalived global_defs { router_id haproxykm ##標識節點的字串,通常為hostname } vrrp_script chk_haproxy { script "/etc/keepalived/haproxy_check.sh" ##執行指令碼位置 interval 2 ##檢測時間間隔 weight -20 ##如果條件成立則權重減20 } vrrp_instance VI_1 { state MASTER ## 主節點為MASTER,備份節點為BACKUP interface ens33 ## 繫結虛擬IP的網路介面(網路卡),與本機IP地址所在的網路介面相同(我這裡是eth0) virtual_router_id 74 ## 虛擬路由ID號(主備節點一定要相同) mcast_src_ip 192.168.247.153 ## 本機ip地址 priority 100 ##優先順序設定(0-254的值) nopreempt advert_int 1 ## 組播資訊傳送間隔,倆個節點必須設定一致,預設1s authentication { ## 認證匹配 auth_type PASS auth_pass bhz } track_script { chk_haproxy } virtual_ipaddress { 192.168.247.160 ## 虛擬ip,可以指定多個 } }
154設定
! Configuration File for keepalived global_defs { router_id haproxyks ##標識節點的字串,通常為hostname } vrrp_script chk_haproxy { script "/etc/keepalived/haproxy_check.sh" ##執行指令碼位置 interval 2 ##檢測時間間隔 weight -20 ##如果條件成立則權重減20 } vrrp_instance VI_1 { state BACKUP ## 主節點為MASTER,備份節點為BACKUP interface ens33 ## 繫結虛擬IP的網路介面(網路卡),與本機IP地址所在的網路介面相同(我這裡是eth0) virtual_router_id 74 ## 虛擬路由ID號(主備節點一定要相同) mcast_src_ip 192.168.247.154 ## 本機ip地址 priority 80 ##優先順序設定(0-254的值) nopreempt advert_int 1 ## 組播資訊傳送間隔,倆個節點必須設定一致,預設1s authentication { ## 認證匹配 auth_type PASS auth_pass bhz } track_script { chk_haproxy } virtual_ipaddress { 192.168.247.160 ## 虛擬ip,可以指定多個 } }
touch /etc/keepalived/haproxy_check.sh chmod +x /etc/keepalived/haproxy_check.sh vi /etc/keepalived/haproxy_check.sh
指令碼內容
#!/bin/bash COUNT=`ps -C haproxy --no-header |wc -l` if [ $COUNT -eq 0 ];then /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg sleep 2 if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then killall keepalived fi fi
# 記得先啟動haproxy, 再啟動Keepalived /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg # 檢視程序 ps -ef | grep haproxy # 將Keepalived註冊為系統服務 cd /home/software/keepalived-2.0.18/keepalived/etc/ cp init.d/keepalived /etc/init.d/ cp sysconfig/keepalived /etc/sysconfig/ systemctl daemon-reload # 啟動Keepalived systemctl [start | stop | status | restart] keepalived systemctl start keepalived.service systemctl stop keepalived.service # 檢視服務 ps -ef|grep keepalived
160預設在153主Keepalived上
停止153上的keepalived
檢視153 ip
160的vip已經不再了, 檢視154
已經自動繫結154了, 重啟153
重啟後160重新漂移回153上, 高可用測試ok
直接使用最簡單的hello world程式測試, IP使用虛擬的VIP 160
消費者
package com.dance.redis.mq.rabbit.helloworld; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class Receiver { private static final String QUEUE_NAME = "queue-test"; private static final String IP_ADDRESS = "192.168.247.160"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Address[] address = new Address[]{ new Address(IP_ADDRESS, PORT) }; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("toor"); factory.setPassword("123456"); // 這裡的連線方式與生產者的demo略有不同,注意區別。 Connection connection = factory.newConnection(address); //建立連線 final Channel channel = connection.createChannel();//建立通道 channel.basicQos(64);//設定使用者端最多接收未被ack的訊息個數 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("recvive message:" + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); //等待回撥函數執行完畢之後,關閉資源。 TimeUnit.SECONDS.sleep(50); channel.close(); connection.close(); } }
生產者
package com.dance.redis.mq.rabbit.helloworld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange-test"; private static final String ROUTING_KEY = "text.*"; private static final String QUEUE_NAME = "queue-test"; private static final String IP_ADDRESS = "192.168.247.160"; private static final int PORT = 5672; //RabbitMQ服務預設埠為5672 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setVirtualHost("/"); factory.setUsername("toor"); factory.setPassword("123456"); Connection connection = factory.newConnection();//建立連線 Channel channel = connection.createChannel();//建立通道 //建立一個type="topic"、持久化的、非自動刪除的交換器。 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null); //建立一個持久化、非排他的、非自動刪除的佇列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //將交換機與佇列通過路由鍵繫結 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); //傳送一條持久化訊息:Hello World! String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //關閉資源 channel.close(); connection.close(); } }
我的賬戶和密碼是toor和123456, 請改為自己的
啟動消費者
啟動生產者
檢視消費者
消費成功, ip也是160
檢視控制檯 exchange
多了exchange-test 功能是ha-all
佇列也是, 到此映象佇列+高可用已經實現
也可以在HA的控制檯上檢視統計
我們實現的是RabbitMQ Cluster + Mirror Queue + Haproxy + Keepalived
RabbitMQ Cluster 3臺
Mirror Queue RabbitMQ叢集方式
Haproxy 反向代理
Keepalived Haproxy叢集檢測, 虛擬VIP, 實現統一IP對外提供
架構圖手繪
emm, 就是這樣一個架構, 我應該李姐的挺到位的