RabbitMQ是實現了高階訊息佇列協定(AMQP)的開源訊息代理軟體(亦稱訊息導向中介層)。RabbitMQ伺服器是用Erlang語言編寫的,而叢集和故障轉移是構建在開放電信平臺框架上的。所有主要的程式語言均有與代理介面通訊的用戶端庫。
下載離線安裝包檔案
上傳離線安裝包
rabbitmq-install
目錄上傳到 /root
切換到rabbitmq-install目錄
cd rabbitmq-install
安裝
rpm -ivh *.rpm
推薦使用工具:MobaXterm,免費版的功能就相當強大了,個人感覺可以代替xshell,winscp等
下載地址https://mobaxterm.mobatek.net/download-home-edition.html
以下內容來自 RabbitMQ 官方手冊 (選擇其中一個複製就可以,centos6/7)
命令列逐條複製執行
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
# centos7 用這個
cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF
# centos6 用這個
cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/6/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF
yum makecache
yum install socat
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.12/erlang-21.3.8.12-1.el7.x86_64.rpm
rpm -ivh erlang-21.3.8.12-1.el7.x86_64.rpm --force --nodeps
yum install rabbitmq-server
# 設定服務,開機自動啓動
systemctl enable rabbitmq-server
# 啓動服務
systemctl start rabbitmq-server
# 開啓管理介面外掛
rabbitmq-plugins enable rabbitmq_management
# 防火牆開啓 15672 管理埠
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
systemctl restart rabbitmq-server
存取伺服器的15672埠,例如:http://192.168.126.129:15672
# 新增使用者
rabbitmqctl add_user admin admin
# 新使用者設定使用者爲超級管理員
rabbitmqctl set_user_tags admin administrator
正確修改之後!!!
# 開啓用戶端連線埠
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
主要埠介紹
假設有這樣一個場景, 服務A產生數據, 而服務B,C,D需要這些數據, 那麼我們可以在A服務中直接呼叫B,C,D服務,把數據傳遞到下遊服務即可
但是,隨着我們的應用規模不斷擴大,會有更多的服務需要A的數據,如果有幾十甚至幾百個下遊服務,而且會不斷變更,再加上還要考慮下遊服務出錯的情況,那麼A服務中呼叫程式碼的維護會極爲困難
這是由於服務之間耦合度過於緊密
再來考慮用RabbitMQ解耦的情況
A服務只需要向訊息伺服器發送訊息,而不用考慮誰需要這些數據;下遊服務如果需要數據,自行從訊息伺服器訂閱訊息,不再需要數據時則取消訂閱即可
假設我們有一個應用,平時存取量是每秒300請求,我們用一臺伺服器即可輕鬆應對
而在高峯期,存取量瞬間翻了十倍,達到每秒3000次請求,那麼單台伺服器肯定無法應對,這時我們可以考慮增加到10台伺服器,來分散存取壓力
但如果這種瞬時高峯的情況每天只出現一次,每次只有半小時,那麼我們10台伺服器在多數時間都只分擔每秒幾十次請求,這樣就有點浪費資源了
這種情況,我們就可以使用RabbitMQ來進行流量削峯,高峯情況下,瞬間出現的大量請求數據,先發送到訊息佇列伺服器,排隊等待被處理,而我們的應用,可以慢慢的從訊息佇列接收請求數據進行處理,這樣把數據處理時間拉長,以減輕瞬時壓力
這是訊息佇列伺服器非常典型的應用場景
考慮定外賣支付成功的情況
支付後要發送支付成功的通知,再尋找外賣小哥來進行配送,而尋找外賣小哥的過程非常耗時,尤其是高峯期,可能要等待幾十秒甚至更長
這樣就造成整條呼叫鏈路響應非常緩慢
而如果我們引入RabbitMQ訊息佇列,訂單數據可以發送到訊息佇列伺服器,那麼呼叫鏈路也就可以到此結束,訂單系統則可以立即得到響應,整條鏈路的響應時間只有200毫秒左右
尋找外賣小哥的應用可以以非同步的方式從訊息佇列接收訂單訊息,再執行耗時的尋找操作
RabbitMQ是一種訊息中介軟體,用於處理來自用戶端的非同步訊息。伺服器端將要發送的訊息放入到佇列池中。接收端可以根據RabbitMQ設定的轉發機制 機製接收伺服器端發來的訊息。RabbitMQ依據指定的轉發規則進行訊息的轉發、緩衝和持久化操作,主要用在多伺服器間或單伺服器的子系統間進行通訊,是分佈式系統標準的設定。
接受生產者發送的訊息,並根據Binding規則將訊息路由給伺服器中的佇列。ExchangeType決定了Exchange路由訊息的行爲。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三種。
訊息佇列。我們發送給RabbitMQ的訊息最後都會到達各種queue,並且儲存在其中(如果路由找不到相應的queue則數據會丟失),等待消費者來取。
它表示的是Exchange與Message Queue是通過binding key進行聯繫的,這個關係是固定。
生產者在將訊息發送給Exchange的時候,一般會指定一個routing key,來指定這個訊息的路由規則。這個routing key需要與Exchange Type及binding key聯合使用才能 纔能生,我們的生產者只需要通過指定routing key來決定訊息流向哪裏。
RabbitMQ是一個訊息中介軟體,你可以想象它是一個郵局。當你把信件放到郵箱裏時,能夠確信郵遞員會正確地遞送你的信件。RabbitMq就是一個郵箱、一個郵局和一個郵遞員。
新增 slf4j 依賴, 和 rabbitmq amqp 依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.tedu</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
package m1_simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
//1.連線rabbitmq伺服器
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.126.129");//寫rabbitmq伺服器地址
f.setPort(5672);//5672是通訊埠,收發訊息是5672,15672是管理介面
f.setUsername("admin");
f.setPassword("admin");
// 用誰的伺服器,設定一個自己的虛擬主機,用虛擬機器的話就不必操作這行程式碼
//f.setVirtualHost("/lx");
/*
* 與rabbitmq伺服器建立連線,
* rabbitmq伺服器端使用的是nio,會複用tcp連線,
* 並開闢多個通道與用戶端通訊
* 以減輕伺服器端建立連線的開銷
*/
Connection con = f.newConnection();
//建立通道
Channel c = con.createChannel();
//2.定義佇列(會通知伺服器想使用一個「helloworld佇列」,伺服器會找到這個佇列,如果不存在,伺服器會新建佇列)
/*
* 宣告佇列,會在rabbitmq中建立一個佇列
* 如果已經建立過該佇列,就不能再使用其他參數來建立
*
* 參數含義:
* -queue: 佇列名稱
* -durable: 佇列持久化,true表示RabbitMQ重新啓動後佇列仍存在
* -exclusive: 排他,true表示限制僅當前連線可用
* -autoDelete: 當最後一個消費者斷開後,是否刪除佇列
* -arguments: 其他參數
*/
c.queueDeclare("helloworld111",false,false,false,null);
//3.發送訊息
/*
* 發佈訊息
* 這裏把訊息向預設交換機發送.
* 預設交換機隱含與所有佇列系結,routing key即爲佇列名稱
*
* 參數含義:
* -exchange: 交換機名稱,空串表示預設交換機"(AMQP default)",不能用 null
* -routingKey: 對於預設交換機,路由鍵就是目標佇列名稱
* -props: 其他參數,例如頭資訊
* -body: 訊息內容byte[]陣列
*/
c.basicPublish("","helloworld",null,"hello".getBytes());
System.out.println("訊息已發送!!");
//4.斷開鏈接
c.close();
con.close();
}
}
package m1_simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws Exception {
//連線
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.151"); // www.wht6.cn
// f.setPort(5672); //預設埠可以省略
f.setUsername("admin");
f.setPassword("admin");
// f.setVirtualHost("/wht"); // 如果用我的伺服器,要設定虛擬主機
Channel c = f.newConnection().createChannel();
//定義佇列
c.queueDeclare("helloworld", false, false, false, null);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("收到: "+msg);
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//從 helloworld 佇列接收訊息,消費訊息
c.basicConsume("helloworld", true, deliverCallback, cancelCallback);
}
}