rabbitmq安裝部署和常用命令

2023-06-25 15:00:45

 python操作rabbitmq

rabbitmq實現可以使用java或者springboot的封裝方法,自己建立實現,也可以使用中介軟體實現,相對於自建,使用rabbitmq應用場景及使用更系統安全。本文具體介紹rabbitmq中介軟體部署。

訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題實現高效能,高可用,可伸縮和最終一致性[架構] 使用較多的訊息佇列有 ActiveMQ(安全),RabbitMQ,ZeroMQ,Kafka(巨量資料),MetaMQ,RocketMQ
在實際應用中常用的使用場景:非同步處理,應用解耦,流量削鋒和訊息通訊四個場景

以下本人linux部署步驟:

安裝erlang(rabbitmq需要該語言執行)

設定相關依賴環境

##yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget
yum install -y make gcc gcc-c++ m4 openssl openssl-devel ncurses-devel unixODBC unixODBC-devel java java-devel socat

下載安裝包(本人rabbitmq要求的版本是22以上)

cd  /home/erlang
 wget http://www.erlang.org/download/otp_src_23.3.tar.gz

解壓並安裝

tar -xzvf otp_src_23.3.tar.gz
cd otp_src_23.3
./configure --prefix=/usr/local/erlang
make && make install

設定環境

vim /etc/profile

###為檔案新增值start:
ERL_PATH=/home/erlang/otp_src_23.3/bin
PATH=$ERL_PATH:$PATH
###end

source /etc/profile  使設定生效,在shell中使用

檢視安裝情況

erl -version 檢視版本號
#啟動erlang命令
./bin/erl

 

下載rabbitmq安裝包

rabbitmq官網下載地址:https://www.rabbitmq.com/ 下載壓縮包(注意下載linux版的,類似:rabbitmq-server-3.9.6-1.el7.noarch.rpm
)本人下載的 centos7相應版本:在window機中下載相關包上傳到linux機中

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-3.8.16-1.el7.noarch.rpm

rpm包安裝後沒具象檔案位置集,相關檔案應該是融入主檔案裡其他位置中,使用模糊查詢,檢視安裝位置,如: rpm -aq rabbitmq*

安裝rabbitmq

##安裝命令
rpm -Uvh rabbitmq-server-3.9.6-1.el7.noarch.rpm 
##若報錯:erlang >= 23.2 被 rabbitmq-server-3.9.6-1.el7.noarch 需要;可使用如下命令:(nodeps是忽視依賴關係,可不加)
rpm -ivh --nodeps rabbitmq-server-3.8.16-1.el7.noarch --force --nodeps

啟動rabbitmq

 service rabbitmq-server start
# 設定使用者及WEBUI外掛
##啟用外掛:
rabbitmq-plugins enable rabbitmq_management
##新增使用者:
 rabbitmqctl add_user admin admin
##設定使用者tag:
 rabbitmqctl set_user_tags admin administrator
##賦予使用者預設vhost的全部操作許可權:
 rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

##開機自動啟動
chkconfig rabbitmq-server on

 找不到。啟動程式找不到erl命令,那麼新增erlang安裝的bin目錄,我這裡是編譯安裝的,目錄如下。

vim /usr/lib/rabbitmq/bin/rabbitmq-server

在set -e 下面加兩行

#erlang
export PATH=$PATH:/opt/otp_src_23.3/bin

無法存取頁面,檢視message紀錄檔,發現命令沒有發現,rabbitmq失敗 

檢視位置 

 在環境檔案前面也加上他的環境變數

 這下重啟沒有報錯了

 

新增admin使用者

http://localhost:15672 登入,無法直接使用遠端ip登入存取,需要進行設定,在設定之前先建立一個admin賬號,並進行授權:

檢視使用者 :rabbitmqctl list_users

建立一個admin使用者:rabbitmqctl add_user admin admin

使用者授權 :rabbitmqctl set_user_tags admin administrator

            #  rabbitmqctl set_permissions -p / asdf ".*" ".*" ".*"  #這條命令好像不用執行就可以

建立使用者之後需要進行設定,若使用rpm安裝,則修改設定/etc/rabbitmq/rabbitmq.config,設定內容大致如下:

[
{rabbit,
[%%
%% Network Connectivity
%% ====================
%%
%% By default, RabbitMQ will listen on all interfaces, using
%% the standard (reserved) AMQP port.
%%
{tcp_listeners, [5672]},
{loopback_users, ["admin"]}
]}
].

最後面那個點必填的

設定之後,如果rabbitmq無法正常啟動,出現錯誤:

init terminating in do_boot ()

Crash dump is being written to: erl_crash.dump...done

此時有可能是組態檔設定有問題,確保組態檔沒問題之後,重啟即可。

 

 可以看到,沒有那個組態檔,建立並寫上面的設定,然後重啟rabbitmq,可以看到rabbitmq服務狀態中,顯示使用這個新建的設定

 再次檢視使用者,可以看到多了個admin使用者

 

開啟管理頁面

需要開啟15672 web管理頁面埠,需要執行命令 ,不然頁面無法存取,這個埠沒有監聽

rabbitmq-plugins enable rabbitmq_management

 

 

存取Rabbitmq http://xxx.xxx.xxx.xxx:15672/
預設賬號密碼:guest guest 只能在本地存取
使用新添使用者:admin admin
存取即可,建立成功:

 
admin admin 進入的

 

 

 

 

 

 

 
 
 

 

其他命令

1、可以檢視服務狀態:
[root@localhost ~]# rabbitmqctl status

[root@mcw14 opt]# rabbitmqctl status
Status of node rabbit@mcw14 ...
Runtime

OS PID: 114466
OS: Linux
Uptime (seconds): 1512
Is under maintenance?: false
RabbitMQ version: 3.8.16
Node name: rabbit@mcw14
Erlang configuration: Erlang/OTP 23 [erts-11.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
Erlang processes: 371 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60

Plugins

Enabled plugin file: /etc/rabbitmq/enabled_plugins
Enabled plugins:

 * rabbitmq_management
 * amqp_client
 * rabbitmq_web_dispatch
 * cowboy
 * cowlib
 * rabbitmq_management_agent

Data directory

Node data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14
Raft data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14/quorum/rabbit@mcw14

Config files

 * /etc/rabbitmq/rabbitmq.config

Log file(s)

 * /var/log/rabbitmq/[email protected]
 * /var/log/rabbitmq/rabbit@mcw14_upgrade.log

Alarms

(none)

Memory

Total memory used: 0.0906 gb
Calculation strategy: rss
Memory high watermark setting: 0.4 of available memory, computed to: 1.4708 gb

code: 0.0282 gb (28.15 %)
other_proc: 0.0273 gb (27.19 %)
allocated_unused: 0.0202 gb (20.11 %)
other_system: 0.0124 gb (12.36 %)
plugins: 0.006 gb (5.97 %)
other_ets: 0.0033 gb (3.26 %)
atom: 0.0015 gb (1.45 %)
binary: 0.0011 gb (1.1 %)
mgmt_db: 0.0002 gb (0.21 %)
mnesia: 0.0001 gb (0.09 %)
metrics: 0.0001 gb (0.07 %)
msg_index: 0.0 gb (0.03 %)
quorum_ets: 0.0 gb (0.01 %)
connection_other: 0.0 gb (0.0 %)
connection_channels: 0.0 gb (0.0 %)
connection_readers: 0.0 gb (0.0 %)
connection_writers: 0.0 gb (0.0 %)
queue_procs: 0.0 gb (0.0 %)
queue_slave_procs: 0.0 gb (0.0 %)
quorum_queue_procs: 0.0 gb (0.0 %)
reserved_unallocated: 0.0 gb (0.0 %)

File Descriptors

Total: 2, limit: 32671
Sockets: 0, limit: 29401

Free Disk Space

Low free disk space watermark: 0.05 gb
Free disk space: 16.7942 gb

Totals

Connection count: 0
Queue count: 0
Virtual host count: 1

Listeners

Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Interface: [::], port: 15672, protocol: http, purpose: HTTP API
[root@mcw14 opt]# 
命令執行結果


2、重啟服務:
[root@localhost ~]# service rabbitmq-server restart
Restarting rabbitmq-server (via systemctl):
[ 確定 ]

 


3、關閉服務
[root@localhost ~]# service rabbitmq-server stop


4、重置服務
[root@localhost ~]# service rabbitmq-server reset
force_reset
強制RabbitMQ node還原到最初狀態.
不同於reset , force_reset 命令會無條件地重設node,不論當前管理資料庫的狀態和叢集設定是什麼。它只能在資料庫或叢集設定已損壞的情況下才可使用。
執行reset和force_reset之前,必須停止RabbitMQ application
將RabbitMQ node還原到最初狀態.包括從所在群集中刪除此node,從管理資料庫中刪除所有設定資料,如已設定的使用者和虛擬主機,以及刪除所有持久化訊息.

角色及許可權

1.RabbitMQ的使用者角色分類:
none、management、policymaker、monitoring、administrator
2.RabbitMQ各類角色描述:
none
不能存取 management plugin
management
使用者可以通過AMQP做的任何事外加:
列出自己可以通過AMQP登入的virtual hosts
檢視自己的virtual hosts中的queues, exchanges 和 bindings
檢視和關閉自己的channels 和 connections
檢視有關自己的virtual hosts的「全域性」的統計資訊,包含其他使用者在這些virtual hosts中的活動。
policymaker
management可以做的任何事外加:
檢視、建立和刪除自己的virtual hosts所屬的policies和parameters
monitoring
management可以做的任何事外加:
列出所有virtual hosts,包括他們不能登入的virtual hosts
檢視其他使用者的connections和channels
檢視節點級別的資料如clustering和memory使用情況
檢視真正的關於所有virtual hosts的全域性的統計資訊
administrator
policymaker和monitoring可以做的任何事外加:
建立和刪除virtual hosts
檢視、建立和刪除users
檢視建立和刪除permissions
關閉其他使用者的connections
3.建立使用者並設定角色
可以建立管理員使用者,負責整個MQ的運維;可以建立RabbitMQ監控使用者,負責整個MQ的監控;可以建立某個專案的專用使用者,只能存取專案自己的virtual hosts

 

常用命令

服務管理

啟動: service rabbitmq-server start 或 rabbitmq-service start
關閉: service rabbitmq-server stop 或 rabbitmq-service stop
重啟: service rabbitmq-server restart
狀態: rabbitmqctl status

 

[root@mcw14 ~]# rabbitmq-service stop
-bash: rabbitmq-service: command not found
[root@mcw14 ~]# service rabbitmq-server restart
Redirecting to /bin/systemctl restart rabbitmq-server.service
[root@mcw14 ~]# rabbitmqctl status
Status of node rabbit@mcw14 ...
Runtime

OS PID: 81996
OS: Linux
Uptime (seconds): 28
Is under maintenance?: false
RabbitMQ version: 3.8.16
Node name: rabbit@mcw14
Erlang configuration: Erlang/OTP 23 [erts-11.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe]
Erlang processes: 367 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60

Plugins

Enabled plugin file: /etc/rabbitmq/enabled_plugins
Enabled plugins:

 * rabbitmq_management
 * amqp_client
 * rabbitmq_web_dispatch
 * cowboy
 * cowlib
 * rabbitmq_management_agent

Data directory

Node data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14
Raft data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14/quorum/rabbit@mcw14

Config files

 * /etc/rabbitmq/rabbitmq.config

Log file(s)

 * /var/log/rabbitmq/[email protected]
 * /var/log/rabbitmq/rabbit@mcw14_upgrade.log

Alarms

(none)

Memory

Total memory used: 0.1046 gb
Calculation strategy: rss
Memory high watermark setting: 0.4 of available memory, computed to: 1.4708 gb

other_proc: 0.0357 gb (34.15 %)
code: 0.0282 gb (27.01 %)
other_system: 0.0124 gb (11.84 %)
allocated_unused: 0.0105 gb (10.05 %)
reserved_unallocated: 0.0069 gb (6.62 %)
plugins: 0.0055 gb (5.27 %)
other_ets: 0.0033 gb (3.12 %)
atom: 0.0015 gb (1.4 %)
binary: 0.0002 gb (0.22 %)
mgmt_db: 0.0002 gb (0.15 %)
mnesia: 0.0001 gb (0.09 %)
metrics: 0.0001 gb (0.06 %)
msg_index: 0.0 gb (0.03 %)
quorum_ets: 0.0 gb (0.01 %)
connection_other: 0.0 gb (0.0 %)
connection_channels: 0.0 gb (0.0 %)
connection_readers: 0.0 gb (0.0 %)
connection_writers: 0.0 gb (0.0 %)
queue_procs: 0.0 gb (0.0 %)
queue_slave_procs: 0.0 gb (0.0 %)
quorum_queue_procs: 0.0 gb (0.0 %)

File Descriptors

Total: 2, limit: 32671
Sockets: 0, limit: 29401

Free Disk Space

Low free disk space watermark: 0.05 gb
Free disk space: 16.5811 gb

Totals

Connection count: 0
Queue count: 0
Virtual host count: 1

Listeners

Interface: [::], port: 15672, protocol: http, purpose: HTTP API
Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
[root@mcw14 ~]# 
rabbitmqctl status

或者systemctl管理

 

使用者管理

新增賬號: rabbitmqctl add_user username password
刪除使用者: rabbitmqctl delete_user username
所有使用者: rabbitmqctl list_users
修改密碼: rabbitmqctl change_password username newpassword
清除密碼: rabbitmqctl clear_password {userName}

admin登入

建立使用者和檢視使用者:

 rabbitmqctl add_user machangwei  123456

 頁面上面也可以檢視到這個使用者

 剛剛建立的使用者沒有授權,不是管理使用者,上面兩個賬號是管理使用者,所以可以登入,好像是授權管理使用者才能登入管理頁面

 根據下面的角色管理,給賬號賦予administrator角色後,就i你登入到管理頁面了

修改machangwei密碼:

rabbitmqctl change_password machangwei 12

 再看已經登入的頁面,提示登入失敗

 重新整理頁面就這樣了

 使用新密碼登入

 接下來演示清除密碼

 清除密碼之後

 頁面又退出登入了

 admin使用者登入檢視,machangwei賬號是沒有密碼了的

重新加上密碼,直接修改密碼就可以

 清除了密碼,跟你檢視使用者沒有關係,照樣是查到的

 清除密碼必須接賬號

角色管理

使用者角色分為5中型別:
  none:無任何角色。新建立的使用者的角色預設為 none。
  management:可以存取web管理頁面。
  policymaker: 包含managerment所有許可權,並且可以管理策略(Policy)和引數(Parameter)
  monitoring: 包含management所有許可權,並且可以看到所有連結、通道及節點相關的資訊
  administartor:包含monitoring所有許可權,並且可以管理使用者、虛擬機器器、許可權、策略、引數等。(最高許可權)
設定使用者角色: rabbitmqctl set_user_tags zhaojigang administrator
設定多個角色: rabbitmqctl set_user_tags hncscwc monitoring policymaker
檢視使用者角色: rabbitmqctl list_users

 

設定上面建立的machangwei為管理使用者

頁面上檢視machangwei賬號擁有的角色。

 繼續執行命令,設定machangwei賬號有多個角色

 頁面上檢視,可以看到,它這個是設定標籤,每次都是重新設定,會將之前的角色清空,換成本次設定的角色,這裡不是新增,這點需要注意

 我們再加上管理角色,

 這下machangwei賬號有登入頁面的許可權了,檢視頁面,顯示是machangwei登入。

命令列檢視使用者,也可以看到使用者有哪些角色

 Vhost管理

所有虛擬主機: rabbitmqctl list_vhosts
新增虛擬主機: rabbitmqctl add_vhost vhostname
刪除虛擬主機: rabbitmqctl delete_vhost vhostname

 

檢視虛擬主機

 頁面上面檢視下虛擬機器器主機

 新增一個虛擬主機

 可以看到剛剛建立的虛擬主機跟賬號沒有繫結,名字也能看到頁面和命令列看到的一樣

 

 

許可權管理

命令格式如下:rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}
查詢所有許可權:rabbitmqctl list_permissions  [-p  VHostPath]
檢視使用者許可權:rabbitmqctl list_user_permissions username
清除使用者許可權:rabbitmqctl clear_permissions [-p VHostPath] username

 

 rabbitmqctl set_permissions   -p mcw_vhost1  machangwei   /etc/rabbitmq/rabbitmq.config  read     [-p vhost] {user} {conf} {write} {read}

rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}
# 表示設定使用者許可權。 {vhost} 表示待授權使用者存取的vhost名稱,預設為 "/"; {user} 表示待授權反問特定vhost的使用者名稱稱; {conf}表示待授權使用者的設定許可權,是一個匹配資源名稱的正規表示式; {write} 表示待授權使用者的寫許可權,是一個匹配資源名稱的正規表示式; {read}表示待授權使用者的讀許可權,是一個資源名稱的正規表示式。
# rabbitmqctl set_permissions -p /  admin "^mip-.*" ".*" ".*"
# 例如上面例子,表示授權給使用者 "admin" 具有所有資源名稱以 "mip-" 開頭的 設定許可權;所有資源的寫許可權和讀許可權。

 

 
設定相關 vhost 許可權案例
rabbitmqctl add_vhost /myhost # 新增 vhost
rabbitmqctl add_user me me123 # 設定使用者和密碼
rabbitmqctl set_permissions -p /myhost me ".*" ".*" ".*" # vhost 設定許可權

下面檢視所有使用者許可權,檢視使用者許可權:

 檢視,me這個沒有新增角色,列出許可權的時候沒有看到它,指定vhost的列出許可權,就看到me了。也就是/myhost目前只有這個使用者。而許可權,是分為對vhost的設定許可權,寫許可權,讀許可權。設定許可權那裡,可以用以什麼開頭的許可權,這種來通配,就像之前在redis作為celery的bakend一樣,寫入的資料是以指定字串開頭,這樣來區分訊息,這裡,不清楚是不是這樣的作用。

 頁面顯示如下

 清除使用者的vhost許可權

 沒有了

 

其它幾個頁面

 

 

 這裡有vhost

 也可以頁面上新增使用者

 也可以頁面上新增vhost

 

檢視外掛

rabbitmq-plugins list

如下,之前開啟了管理頁面的外掛,才能通過頁面存取rabbitmq,現在可以看到,前面是有標記的,也就是沒有標記的應該就是沒有開通,而且標記不同,大寫的標記,應該是外掛的伺服器端程式吧。


[root@mcw14 ~]# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@mcw14
 |/
[  ] rabbitmq_amqp1_0                  3.8.16
[  ] rabbitmq_auth_backend_cache       3.8.16
[  ] rabbitmq_auth_backend_http        3.8.16
[  ] rabbitmq_auth_backend_ldap        3.8.16
[  ] rabbitmq_auth_backend_oauth2      3.8.16
[  ] rabbitmq_auth_mechanism_ssl       3.8.16
[  ] rabbitmq_consistent_hash_exchange 3.8.16
[  ] rabbitmq_event_exchange           3.8.16
[  ] rabbitmq_federation               3.8.16
[  ] rabbitmq_federation_management    3.8.16
[  ] rabbitmq_jms_topic_exchange       3.8.16
[E*] rabbitmq_management               3.8.16
[e*] rabbitmq_management_agent         3.8.16
[  ] rabbitmq_mqtt                     3.8.16
[  ] rabbitmq_peer_discovery_aws       3.8.16
[  ] rabbitmq_peer_discovery_common    3.8.16
[  ] rabbitmq_peer_discovery_consul    3.8.16
[  ] rabbitmq_peer_discovery_etcd      3.8.16
[  ] rabbitmq_peer_discovery_k8s       3.8.16
[  ] rabbitmq_prometheus               3.8.16
[  ] rabbitmq_random_exchange          3.8.16
[  ] rabbitmq_recent_history_exchange  3.8.16
[  ] rabbitmq_sharding                 3.8.16
[  ] rabbitmq_shovel                   3.8.16
[  ] rabbitmq_shovel_management        3.8.16
[  ] rabbitmq_stomp                    3.8.16
[  ] rabbitmq_top                      3.8.16
[  ] rabbitmq_tracing                  3.8.16
[  ] rabbitmq_trust_store              3.8.16
[e*] rabbitmq_web_dispatch             3.8.16
[  ] rabbitmq_web_mqtt                 3.8.16
[  ] rabbitmq_web_mqtt_examples        3.8.16
[  ] rabbitmq_web_stomp                3.8.16
[  ] rabbitmq_web_stomp_examples       3.8.16
[root@mcw14 ~]# 

 

 監控管理器

這個就是前面開啟和關閉rabbitmq的管理頁面的命令

rabbitmq-plugins enable rabbitmq_management #啟動
rabbitmq-plugins disable rabbitmq_management #關閉

 

應用管理

慎用,好像是開啟和停止節點用的

 

關閉應用:rabbitmqctl stop_app
啟動應用:rabbitmqctl start_app

 

stop之後,就不能存取頁面了,根據命令返回列印資訊,好像是停止的這個節點

 stop之後,程式程序還是在的,再次start一下,又能正常存取頁面了。我這裡只有一個node,如果有多個node,停止一個,應該是可以在頁面存取看到情況的

 

佇列管理

檢視所有佇列:rabbitmqctl list_queues
清除所有佇列:rabbitmqctl reset #需要先執行rabbitmqctl stop_app
強制清除佇列:rabbitmqctl force_reset

刪除單個佇列:rabbitmqctl delete_queue queue_name 

 

目前沒有佇列

 新增之後,存取拒絕

 參考:https://blog.csdn.net/hefeng_aspnet/article/details/125865990

 檢視許可權

 有正則匹配的設定

 點選設定許可權,把以mip-開頭的設定,給重新設定了,為有所有許可權

 再次新增佇列

 成功新增佇列,點選進入佇列頁面

 這個訊息下面,我們看到了之前新增的三個鍵值對。

 再次查詢訊息佇列,可以看到我們剛剛建立的佇列,但是訊息,顯示的是0,不知道是為什麼

 這裡有個push訊息,我們試一試

 

 訊息被推播了

 之前新增的引數,不是訊息,我們上面點選釋出訊息,才是真的往這個佇列裡面新增了訊息,可以看到多了條訊息,persistent應該是是否持久化的意思吧,而memory是指記憶體裡面有條訊息,但是沒有持久化儲存呢吧,持久化儲存為0,總共一條訊息

 此時再去Linux上檢視,可以看到訊息佇列mcwcountage已經多了一條訊息了。

 再次釋出一條訊息

 可以看到,兩條訊息了

 再次檢視,兩條訊息了

 點選get訊息,可以看到我們剛剛推播的訊息

 兩條訊息都get一下。如果填的訊息資料,大於實際 有的訊息數量,那麼只顯示已存在的訊息

 點選之後,訊息被清空了

 再次push兩個訊息

 移除訊息,沒有安裝外掛,那麼安裝一下

 顯示開啟了兩個外掛在這個節點

 可以看到,這兩個外掛已經開啟了

 剛剛的操作沒有截圖,我們將多出來的佇列1刪除掉

 看上面,可以知道此時只有一個佇列。我們進入之後,點選移動,這時候會新建mcwqueue2佇列,並把mcwcountage下的所有訊息移動到新的佇列,有點像是備份似的,不清楚,如果下面移動時,填寫的是已有的佇列名稱,是不是隻是新增到已有的佇列裡面,大概率是這樣子吧。

 點選移動訊息,跳轉到展示所有訊息佇列的頁面,可以看到新增的佇列,並且兩條 訊息已經從舊的mcwcountage佇列移走,移動到新的佇列mcwqueue2裡面去了。

 我們再測試一下刪除,這裡的刪除好像是刪除佇列

 

 可以看到,進入佇列的詳情頁面,點選刪除,的確把該佇列刪除掉了,如下,已經看不到mcwqueue2佇列了

測試重啟訊息佇列服務,對沒有持久化訊息的影響

 我們push兩個訊息進入訊息佇列

 我們重啟之後,可以看到訊息丟失。那麼重啟之後是否需要將訊息持久化呢,怎麼做持久化呢,持久化後是否上圖中persistent就是2了呢

 訊息如何持久化呢?

下面看下有兩條訊息

 

[root@controller ~]# rabbitmqctl  list_queues
Listing queues ...
q-agent-notifier-security_group-update_fanout_0134e1dc992d4f3291571f4f1150f033    0
q-l3-plugin_fanout_67ad4da1e291429b9a561a0b03734552    0
q-plugin    0
conductor_fanout_e2d9de48537d4ec09edb946f2fdb5008    0
q-agent-notifier-l2population-update_fanout_48739ab147b04dc895fe2fd340aa7eae    0
q-agent-notifier-network-delete_fanout_ed0e5abc181c4fe0bb4a802ea928a2e8    0
reply_20ac7e9ae5a349aea2ea680a5aec7d79    0
consoleauth.controller    0
q-agent-notifier-network-delete.controller    0
q-plugin.controller    0
compute_fanout_9f8cf7456e73451e8ce7d65460849817    0
reply_2455b8e5c5dc40da8650f56ae4e45174    0
q-agent-notifier-l2population-update.controller    0
cinder-volume_fanout_8dc268f1cf7d46d8bbfe4a521da8b202    0
cinder-volume.controller@lvm_fanout_b606cbf8a67c4600879b2a64a5320db4    0
q-agent-notifier-network-update    0
q-agent-notifier-port-update.controller    0
conductor_fanout_2b33be46a0d448868f6dc32678a21500    0
q-agent-notifier-port-update_fanout_2b5d1a5c039e4e2caddea9ed34185fe8    0
q-agent-notifier-port-update_fanout_ebc99b8c158c48529e653e9ceb2f7760    0
consoleauth_fanout_4964cf77ca7944f8a8d85fe555703a0c    0
q-server-resource-versions_fanout_753834a40abd430998a0ba30955f1122    0
cinder-volume.compute2@nfs_fanout_2868e267441e4e528f4d300f33282d19    0
conductor.controller    0
q-agent-notifier-l2population-update_fanout_9cae8c6abdc84202bc35ebbee373b422    0
reply_c9910c029ba1432c98ae4c3576ef7df6    0
conductor    0
q-agent-notifier-network-update.controller    0
scheduler_fanout_5eea12b3d0114d58ac0d10fc3799f0d1    0
q-agent-notifier-security_group-update_fanout_1452a407fcd54a5ab47285254b86a8d3    0
q-agent-notifier-security_group-update.compute1    0
reply_52a5090b02f5436c9e7aa541e732ef52    0
cinder-volume    0
q-agent-notifier-network-update_fanout_a4a9e02adfc748fd9a8835996e802bfa    0
cinder-volume_fanout_75d1c7f92a394bd088b7f65e121d6f9a    0
q-server-resource-versions    0
neutron-vo-Trunk-1.1_fanout_138759917974485faad26891827e7447    0
compute.compute2    0
neutron-vo-Trunk-1.1_fanout_676122d00eb34a59be324d0604c9111c    0
scheduler.controller    0
neutron-vo-SubPort-1.0_fanout_e49ddc715b1943389e7e0ddd6dfb5032    0
cinder[email protected]    0
dhcp_agent.controller    0
neutron-vo-Trunk-1.1.controller    0
q-agent-notifier-port-update.compute1    0
neutron-vo-SubPort-1.0.compute2    0
dhcp_agent_fanout_54b2d45e5d0d490f8d62283a9abcb0b8    0
cinder-volume.compute2@nfs    0
q-reports-plugin.controller    0
neutron-vo-SubPort-1.0_fanout_496e5f4cee6445af8a1edcb5c312abe6    0
q-reports-plugin_fanout_d4e460c745f445c8a22c39cbc15b9f92    0
q-agent-notifier-network-update.compute1    0
cinder-scheduler    0
consoleauth    0
q-reports-plugin_fanout_89d519d055464103a1ec5029a36390a8    0
dhcp_agent    0
q-agent-notifier-security_group-update.compute2    0
q-agent-notifier-l2population-update.compute1    0
q-agent-notifier-network-delete_fanout_d518f4f111774991971c7cdc5302fb0d    0
q-reports-plugin    0
q-agent-notifier-network-delete_fanout_4368adffe18e4742a12e2c4559b07985    0
neutron-vo-SubPort-1.0_fanout_2a542add26d8456396efe5ef3cb4c23f    0
cinder[email protected]    0
q-agent-notifier-l2population-update_fanout_5e726a5d09f942e1afb8b02d89cdf9fe    0
q-agent-notifier-network-update.compute2    0
reply_f3414e08cc93430a914663877436085d    0
cinder-volume.controller@lvm    0
scheduler    0
neutron-vo-SubPort-1.0    0
q-l3-plugin.controller    0
neutron-vo-Trunk-1.1.compute1    0
q-agent-notifier-l2population-update.compute2    0
neutron-vo-SubPort-1.0.controller    0
q-agent-notifier-port-update.compute2    0
reply_4961a61a307a428fb0b03043c7654f7d    0
q-agent-notifier-security_group-update.controller    0
q-agent-notifier-security_group-update    0
compute_fanout_b980ee35b47c47688d4fda1a1eb9fe34    0
cinder-scheduler_fanout_3b662fb1565b4a9f8c1224380cf8c8da    0
neutron-vo-Trunk-1.1    0
compute    0
q-agent-notifier-network-delete.compute1    0
cinder-scheduler.controller    0
compute.compute1    0
q-l3-plugin_fanout_31c332c953be4c9194085590c844f934    0
neutron-vo-Trunk-1.1_fanout_a8dca45b75d8436a9ba44f8832d04d58    0
q-agent-notifier-l2population-update    0
q-agent-notifier-network-update_fanout_84a4aa333c7e435482fc64b04d120051    0
q-server-resource-versions.controller    0
neutron-vo-SubPort-1.0.compute1    0
q-agent-notifier-port-update    0
q-agent-notifier-security_group-update_fanout_49d673aeec024d8fa36a49523cf335fd    0
q-agent-notifier-network-delete.compute2    0
q-agent-notifier-port-update_fanout_e3f806abc57744b290540a21aba6626f    0
q-l3-plugin    0
reply_80e9590155d64a2480314f86613ea532    0
neutron-vo-Trunk-1.1.compute2    0
q-agent-notifier-network-delete    0
q-agent-notifier-network-update_fanout_c60c9bcfa3eb40c7b5c025980bece3f5    0
q-plugin_fanout_a07db20790814d52ba25d914e01fe82f    0
[root@controller ~]# 
檢視openstack中有的訊息佇列

 

叢集管理

檢視叢集狀態: rabbitmqctl cluster_status
摘除節點: rabbitmqctl forget_cluster_node [--offline]
組成叢集命令: rabbitmqctl join_cluster <clusternode> [--ram]
修改節點儲存形式: rabbitmqctl change_cluster_node_type disc | ram
修改節點名稱: rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2 newnode2] [oldnode3 newnode3...]

 

[root@mcw14 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@mcw14 ...
Basics

Cluster name: rabbit@mcw14

Disk Nodes

rabbit@mcw14

Running Nodes

rabbit@mcw14

Versions

rabbit@mcw14: RabbitMQ 3.8.16 on Erlang 23

Maintenance status

Node: rabbit@mcw14, status: not under maintenance

Alarms

(none)

Network Partitions

(none)

Listeners

Node: rabbit@mcw14, interface: [::], port: 15672, protocol: http, purpose: HTTP API
Node: rabbit@mcw14, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@mcw14, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0

Feature flags

Flag: drop_unroutable_metric, state: disabled
Flag: empty_basic_get_metric, state: disabled
Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled
[root@mcw14 ~]# 
rabbitmqctl cluster_status

看下我們部署的openstack,它的叢集狀態是什麼樣的,也就一個節點,跟上面命令存取結果不一樣,那麼上面可能是沒有形成叢集,只是一個單節點。

 加入叢集失敗

 

 

 

 

資訊檢視

rabbitmqadmin list connections #檢視所有連線
rabbitmqadmin show overview #概覽 Overview
rabbitmqadmin list nodes #檢視所有節點 Node
rabbitmqadmin list channels #檢視所有通道 Channel
rabbitmqadmin list consumers #檢視所有消費者 Consumer
rabbitmqadmin list exchanges #檢視所有路由 Exchange
rabbitmqadmin list bindings #檢視所有路由與佇列的關係繫結 Binding

 
找到這個命令,然後加個軟連線或者複製一份。軟連線的話,也是要將原始檔新增執行許可權的,不然無法執行,沒有許可權。

 這樣就可以了,之前節點被關閉了。開啟節點後,然後檢視有哪些節點,可以正常看到

 檢視所有連線

 概覽

 檢視所有節點

 檢視所有通道 

 檢視所有消費者

 檢視所有路由

 檢視所有路由與佇列的關係繫結

 

python操作rabbitmq

RabbitMq生產者消費者模型

生產者(producter) 佇列訊息的產生者,複製生產訊息,並將訊息傳入佇列
生產者程式碼:

import pika
import json

credentials = pika.PlainCredentials('admin','admin')#mq使用者名稱和密碼,用於認證
#虛擬佇列需要指定引數virtual_host,如果是預設的可以不填
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.24',port=5672,virtual_host='/',credentials=credentials))
channel = connection.channel()# 建立一個AMQP通道

#宣告佇列,並設定durable為True,為了避免rabbitMq-server掛掉資料丟失,將durable設為True
channel.queue_declare(queue='1',durable=True)
for i in range(10):   # 建立10個q
    message = json.dumps({'OrderId':"1000%s"%i})
    # exchange表示交換器,可以精確的指定訊息應該發到哪個佇列中,route_key設定佇列的名稱,body表示傳送的內容
    channel.basic_publish(exchange='',routing_key='1',body=message)
    print(message)
connection.close()
操作前

通過pika生命一個認證用的憑證,然後用pika建立rabbitmq的塊連線,再用上面的連線建立一個AMQP通道 。建立訊息佇列的連線時,需要指定ip,斷開,虛擬主機,憑證。

然後根據上面的通道,宣告一個佇列,

我們可以看到,下面通道點佇列宣告裡的queue引數值就佇列的名字。這裡是遍歷0到9,然後列印了下訊息,這裡的生成的訊息,是json序列化後的資料。然後將資料作為i,通道點基礎釋出的body引數的值。上面通道點佇列宣告是建立一個佇列,佇列名字是’1‘,下面我們用通道點基本釋出,是將我們建立的訊息體傳送到佇列中,路由_key就是指定佇列名稱,指定釋出訊息到哪個佇列,訊息是作為body的引數,

最後,需要將這個訊息佇列的連線關閉。

 我們通過頁面可以看到,已經建立好了這個佇列,佇列名字為1,並且已經通過遍歷生成的10個訊息,呼叫十次通道點基礎釋出方法,將這十個產生的訊息釋出到訊息佇列中

 

 我們可以再看下,可以看到我們建立的訊息的具體內容。

 消費者(consumer):佇列訊息的接收者,扶著接收並處理訊息佇列中的訊息

import pika
credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='10.0.0.24',
    port=5672,
    virtual_host='/',
    credentials=credentials
))
channel = connection.channel()
#宣告訊息佇列,訊息在這個佇列中傳遞,如果不存在,則建立佇列
channel.queue_declare(queue='1',durable=True)
# 定義一個回撥函數來處理訊息佇列中訊息,這裡是列印出來
def callback(ch,method,properties,body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())
#告訴rabbitmq,用callback來接收訊息
channel.basic_consume('1',callback)
#開始接收資訊,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理
channel.start_consuming()

獲取訊息,建立憑證,連線,通道,然後什麼一下佇列。指定我們要獲取哪個佇列中的訊息,如果沒有這個佇列,就會建立這個佇列,存在,那麼後面使用這個通道,就會從這個佇列中獲取資料。通道是通過rabbitmq的連線物件來生成的,連線物件中放了連線用的憑證。所以,通道點基礎消費方法,指定是哪個訊息佇列,那麼就會從這個佇列中獲取訊息。然後傳參回撥函數。而回撥函數中,

 我們可以看到,基礎消費方法裡面有訊息回撥,就是上面我們自定義的回撥函數

 這個方法定義了回撥函數的寫法。第一個引數是通道

 第二個引數是方法,第三個引數是屬性,第四個是body,這些不用管,只需要按如下格式,就可以從body,做個解碼,就將通道點基礎消費中指定的佇列中的訊息,取出來了,我們是用回撥函數來接收訊息,當需要獲取訊息的時候,就需要執行通道點開始消費的方法。這裡好像是遍歷佇列一個一個的將訊息獲取出來。那麼怎樣實現,實時監聽訊息,實時消費呢

 

RabbitMq持久化

 

RabbitMq持久化
MQ預設建立的臨時的queue和exchange,如果不宣告持久化,一旦rabbitmq掛掉,queue,exchange將會全部丟失,所以我們一般在建立queue或者exchange的時候會宣告持久化
1.queue宣告持久化

# 宣告訊息佇列,訊息將在這個佇列傳遞,如不存在,則建立。durable = True 代表訊息佇列持久化儲存,False 非持久化儲存
result = channel.queue_declare(queue = 'python-test',durable = True)

使用True

 重啟訊息佇列服務

訊息佇列還在,但是訊息被清空了

 當我改為false的時候,因為佇列1已經存在,並且是Tue宣告的,所以這裡就報錯了

 我們設定為false,然後宣告一個不存在的佇列2

 建立好了佇列,並且10個訊息

 重啟一下訊息佇列服務

 剛剛上面建立的佇列2已經不存在,這已經不是訊息被清空了,而是佇列直接被清除了

 也就是這個Ture,是保留佇列用的,持久化佇列的。

channel.queue_declare(queue='2',durable=True)

2、exchange宣告持久化

# 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立.durable = True 代表exchange持久化儲存,False 非持久化儲存
channel.exchange_declare(exchange = 'python-test', durable = True)

注意:如果已存在一個非持久化的queue或exchange,執行上述程式碼會報錯,因為當前狀態不能更該queue 或 exchange儲存屬性,需要刪除重建,如果queue和exchange中一個宣告了持久化,另一個沒有宣告持久化,則不允許繫結

 

我們在1處改了,但是在2處沒有修改。結果有問題。

 佇列2不存在,所以沒有將訊息放進去

 而exchange這裡,沒有寫將訊息推播到宣告的python-test裡面,所以裡面也沒有訊息

 這次是宣告的exchange,並且將訊息推播到python-test裡面

 還是沒有看到有東西呀

 我們這裡釋出個訊息,可以看到,是需要路由的

 加上路由,再次執行程式

 由於佇列2 不存在,好像還是不行

 我在這裡給它bind一個路由

 感覺還是沒有弄明白,先放棄了

原來是如下方式呀。

首先,在python-test2裡面,

 給exchange繫結佇列1和2

 1和2目前的訊息數量

 我往路由1裡面push一個訊息

 push成功

 然後再看佇列1裡面,可以看到多了一條剛剛push的訊息

 接下來用程式實現,宣告exchange,然後釋出方法不變,釋出到exchage中,因為已經繫結了兩個路由了,這裡指定路由key,根據路由key,可以將訊息push到對應的佇列中去

 我們可以看到,之前是頁面點選push了一條,上面程式push了十條到exchange,現在這個佇列就有11條資料。可是這個exchange和佇列的繫結,是我自己在頁面上繫結的,這個應該不合理。以後有時間看下,怎麼用程式繫結。

 我們可以看到,應該是程式中缺少使用這個繫結方法吧

 

3、訊息持久化
雖然exchange和queue都宣告了持久化,但如果訊息只存在記憶體裡,rabbitmq重啟後,記憶體裡的東西還是會丟失,所以必須宣告訊息也是持久化,從記憶體轉存到到硬碟

# 向佇列插入數值 routing_key是佇列名。delivery_mode = 2 宣告訊息在佇列中持久化,delivery_mod = 1 訊息非持久化
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))

4、acknowledgement訊息不丟失
消費者(consume)呼叫callback函數時,會存在處理訊息失敗的風險,如果處理失敗,則訊息會丟失,但是也可以選擇消費者處理失敗時,將訊息回退給rabbitmq,重新再被消費者消費,這個時候需要設定確認標識。

channel.basic_consume(callback,queue = 'python-test',
# no_ack 設定成 False,在呼叫callback函數時,未收到確認標識,訊息會重回佇列。True,無論呼叫callback成功與否,訊息都被消費掉
             no_ack = False)

 



 

 
 
 
 
 
 
 


參考連結:https://www.jianshu.com/p/cc322adf060c
https://blog.csdn.net/weixin_45144837/article/details/104335115

 https://blog.csdn.net/weixin_45144837/article/details/104335115