RabbitMQ

2020-08-09 10:02:18

RabbitMQ是實現了高階訊息佇列協定(AMQP)的開源訊息代理軟體(亦稱訊息導向中介層)。RabbitMQ伺服器是用Erlang語言編寫的,而叢集和故障轉移是構建在開放電信平臺框架上的。所有主要的程式語言均有與代理介面通訊的用戶端庫。

在这里插入图片描述


在这里插入图片描述

一、RabbitMQ安裝

1.1 離線安裝

下載離線安裝包檔案

上傳離線安裝包
rabbitmq-install 目錄上傳到 /root

切換到rabbitmq-install目錄

cd rabbitmq-install

安裝

rpm -ivh *.rpm

推薦使用工具:MobaXterm,免費版的功能就相當強大了,個人感覺可以代替xshell,winscp等

下載地址https://mobaxterm.mobatek.net/download-home-edition.html
在这里插入图片描述

1.2 Yum線上安裝

以下內容來自 RabbitMQ 官方手冊 (選擇其中一個複製就可以,centos6/7)

命令列逐條複製執行

rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc


# centos7 用這個
cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF


# centos6 用這個
cat <<EOF > /etc/yum.repos.d/rabbitmq.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/6/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF


yum makecache

yum install socat

wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.12/erlang-21.3.8.12-1.el7.x86_64.rpm
rpm -ivh erlang-21.3.8.12-1.el7.x86_64.rpm --force --nodeps

yum install rabbitmq-server

1.3 啓動rabbitmq伺服器

# 設定服務,開機自動啓動
systemctl enable rabbitmq-server

# 啓動服務
systemctl start rabbitmq-server

1.4 rabbitmq管理介面

1.4.1 啓用管理介面

# 開啓管理介面外掛
rabbitmq-plugins enable rabbitmq_management

# 防火牆開啓 15672 管理埠
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload

1.4.2 重新啓動RabbitMQ服務

systemctl restart rabbitmq-server

1.5 存取

存取伺服器的15672埠,例如:http://192.168.126.129:15672

1.6 新增使用者

1.6.1 新增使用者

# 新增使用者
rabbitmqctl add_user admin admin

# 新使用者設定使用者爲超級管理員
rabbitmqctl set_user_tags admin administrator

1.6.2 設定存取許可權

在这里插入图片描述
在这里插入图片描述
正確修改之後!!!
在这里插入图片描述

使用者管理參考:https://www.cnblogs.com/AloneSword/p/4200051.html

1.7 開放用戶端連線埠

# 開啓用戶端連線埠
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload

主要埠介紹

  • 4369 – erlang發現口
  • 5672 – client端通訊口
  • 15672 – 管理介面ui埠
  • 25672 – server間內部通訊口

二、RabbitMQ 使用場景

2.1 服務解耦

假設有這樣一個場景, 服務A產生數據, 而服務B,C,D需要這些數據, 那麼我們可以在A服務中直接呼叫B,C,D服務,把數據傳遞到下遊服務即可

但是,隨着我們的應用規模不斷擴大,會有更多的服務需要A的數據,如果有幾十甚至幾百個下遊服務,而且會不斷變更,再加上還要考慮下遊服務出錯的情況,那麼A服務中呼叫程式碼的維護會極爲困難

這是由於服務之間耦合度過於緊密
在这里插入图片描述
再來考慮用RabbitMQ解耦的情況

A服務只需要向訊息伺服器發送訊息,而不用考慮誰需要這些數據;下遊服務如果需要數據,自行從訊息伺服器訂閱訊息,不再需要數據時則取消訂閱即可
解耦

2.2 流量削峯

假設我們有一個應用,平時存取量是每秒300請求,我們用一臺伺服器即可輕鬆應對
低流量
而在高峯期,存取量瞬間翻了十倍,達到每秒3000次請求,那麼單台伺服器肯定無法應對,這時我們可以考慮增加到10台伺服器,來分散存取壓力

但如果這種瞬時高峯的情況每天只出現一次,每次只有半小時,那麼我們10台伺服器在多數時間都只分擔每秒幾十次請求,這樣就有點浪費資源了
在这里插入图片描述
這種情況,我們就可以使用RabbitMQ來進行流量削峯,高峯情況下,瞬間出現的大量請求數據,先發送到訊息佇列伺服器,排隊等待被處理,而我們的應用,可以慢慢的從訊息佇列接收請求數據進行處理,這樣把數據處理時間拉長,以減輕瞬時壓力

這是訊息佇列伺服器非常典型的應用場景
在这里插入图片描述

2.3 非同步呼叫

考慮定外賣支付成功的情況

支付後要發送支付成功的通知,再尋找外賣小哥來進行配送,而尋找外賣小哥的過程非常耗時,尤其是高峯期,可能要等待幾十秒甚至更長

這樣就造成整條呼叫鏈路響應非常緩慢
在这里插入图片描述
而如果我們引入RabbitMQ訊息佇列,訂單數據可以發送到訊息佇列伺服器,那麼呼叫鏈路也就可以到此結束,訂單系統則可以立即得到響應,整條鏈路的響應時間只有200毫秒左右

尋找外賣小哥的應用可以以非同步的方式從訊息佇列接收訂單訊息,再執行耗時的尋找操作
在这里插入图片描述

三、rabbitmq 基本概念

RabbitMQ是一種訊息中介軟體,用於處理來自用戶端的非同步訊息。伺服器端將要發送的訊息放入到佇列池中。接收端可以根據RabbitMQ設定的轉發機制 機製接收伺服器端發來的訊息。RabbitMQ依據指定的轉發規則進行訊息的轉發、緩衝和持久化操作,主要用在多伺服器間或單伺服器的子系統間進行通訊,是分佈式系統標準的設定。
在这里插入图片描述

3.1 Exchange

接受生產者發送的訊息,並根據Binding規則將訊息路由給伺服器中的佇列。ExchangeType決定了Exchange路由訊息的行爲。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三種。
在这里插入图片描述

3.2 Message Queue

訊息佇列。我們發送給RabbitMQ的訊息最後都會到達各種queue,並且儲存在其中(如果路由找不到相應的queue則數據會丟失),等待消費者來取。

3.3 Binding Key

它表示的是Exchange與Message Queue是通過binding key進行聯繫的,這個關係是固定。

3.4 Routing Key

生產者在將訊息發送給Exchange的時候,一般會指定一個routing key,來指定這個訊息的路由規則。這個routing key需要與Exchange Type及binding key聯合使用才能 纔能生,我們的生產者只需要通過指定routing key來決定訊息流向哪裏。

四、rabbitmq六種工作模式

4.1 簡單模式

在这里插入图片描述
RabbitMQ是一個訊息中介軟體,你可以想象它是一個郵局。當你把信件放到郵箱裏時,能夠確信郵遞員會正確地遞送你的信件。RabbitMq就是一個郵箱、一個郵局和一個郵遞員。

  • 發送訊息的程式是生產者
  • 佇列就代表一個郵箱。雖然訊息會流經RbbitMQ和你的應用程式,但訊息只能被儲存在佇列裡。佇列儲存空間只受伺服器記憶體和磁碟限制,它本質上是一個大的訊息緩衝區。多個生產者可以向同一個佇列發送訊息,多個消費者也可以從同一個佇列接收訊息.
  • 消費者等待從佇列接收訊息

在这里插入图片描述

4.1.1 建立Maven專案

在这里插入图片描述

4.1.2 pom.xml

新增 slf4j 依賴, 和 rabbitmq amqp 依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.tedu</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.8.0-alpha2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.8.0-alpha2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

4.1.3 生產者發送訊息

  1. 連線rabbitmq伺服器
  2. 定義佇列
  3. 發送訊息
  4. 關閉連線
package m1_simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        //1.連線rabbitmq伺服器
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.126.129");//寫rabbitmq伺服器地址
        f.setPort(5672);//5672是通訊埠,收發訊息是5672,15672是管理介面
        f.setUsername("admin");
        f.setPassword("admin");

        // 用誰的伺服器,設定一個自己的虛擬主機,用虛擬機器的話就不必操作這行程式碼
        //f.setVirtualHost("/lx");
        /*
         * 與rabbitmq伺服器建立連線,
         * rabbitmq伺服器端使用的是nio,會複用tcp連線,
         * 並開闢多個通道與用戶端通訊
         * 以減輕伺服器端建立連線的開銷
         */
        Connection con = f.newConnection();
        //建立通道
        Channel c = con.createChannel();

        //2.定義佇列(會通知伺服器想使用一個「helloworld佇列」,伺服器會找到這個佇列,如果不存在,伺服器會新建佇列)
        /*
         * 宣告佇列,會在rabbitmq中建立一個佇列
         * 如果已經建立過該佇列,就不能再使用其他參數來建立
         *
         * 參數含義:
         *   -queue: 佇列名稱
         *   -durable: 佇列持久化,true表示RabbitMQ重新啓動後佇列仍存在
         *   -exclusive: 排他,true表示限制僅當前連線可用
         *   -autoDelete: 當最後一個消費者斷開後,是否刪除佇列
         *   -arguments: 其他參數
         */
        c.queueDeclare("helloworld111",false,false,false,null);

        //3.發送訊息
        /*
         * 發佈訊息
         * 這裏把訊息向預設交換機發送.
         * 預設交換機隱含與所有佇列系結,routing key即爲佇列名稱
         *
         * 參數含義:
         *  -exchange: 交換機名稱,空串表示預設交換機"(AMQP default)",不能用 null
         * 	-routingKey: 對於預設交換機,路由鍵就是目標佇列名稱
         * 	-props: 其他參數,例如頭資訊
         * 	-body: 訊息內容byte[]陣列
         */
        c.basicPublish("","helloworld",null,"hello".getBytes());
        System.out.println("訊息已發送!!");
        //4.斷開鏈接
        c.close();
        con.close();
    }
}

4.1.4 消費者接收訊息

package m1_simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //連線
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.151"); // www.wht6.cn
        // f.setPort(5672); //預設埠可以省略
        f.setUsername("admin");
        f.setPassword("admin");
        // f.setVirtualHost("/wht"); // 如果用我的伺服器,要設定虛擬主機

        Channel c = f.newConnection().createChannel();

        //定義佇列
        c.queueDeclare("helloworld", false, false, false, null);

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                byte[] body = message.getBody();
                String msg = new String(body);
                System.out.println("收到: "+msg);
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
            }
        };

        //從 helloworld 佇列接收訊息,消費訊息
        c.basicConsume("helloworld", true, deliverCallback, cancelCallback);
    }
}

未完待續。。。