MQ(Message Queue):訊息佇列又叫訊息中介軟體。業務系統裏面可通過典型的生產者和消費者模型實現系統A和系統B之間的解耦。因爲訊息的生產和消費都是非同步的,而且雙方只關心訊息的發送和接收,中間沒有業務邏輯的侵入,可以輕鬆實現系統間的解耦。
① ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。它是一個完全支援JMS規範的的訊息中介軟體。豐富的API,多種叢集架構模式讓ActiveMQ在業界成爲老牌的訊息中介軟體,在中小型企業頗受歡迎!但是數據量一旦達到某個量級,效能就會很差。
② Kafka
Kafka主要特點是基於Pull的模式來處理訊息消費,追求高吞吐量,一開始的目的就是用於日誌收集和傳輸。0.8版本開始支援複製,不支援事務,對訊息的重複、丟失、錯誤沒有嚴格要求,適合產生大量數據的網際網路服務的數據收集業務(進行大數據的分析)。
③ RocketMQ
RocketMQ是阿裡開源的訊息中介軟體,它是純Java開發,具有高吞吐量、高可用性、適合大規模分佈式系統應用的特點。RocketMQ思路起源於Kafka,但並不是Kafka的一個Copy,它對訊息的可靠傳輸及事務性做了優化,目前在阿裡集團被廣泛應用於交易、充值、流計算、訊息推播、日誌流式處理、binglog分發等場景。
④ RabbitMQ
RabbitMQ是使用Erlang語言開發的開源訊息佇列系統,基於AMQP協定來實現。AMQP的主要特徵是訊息導向、佇列、路由(包括對等和發佈/訂閱)、可靠性、安全。AMQP協定更多用在企業系統內對數據一致性、穩定性和可靠性要求很高的場景,對效能和吞吐量的要求還在其次。
先執行下面 下麪的指令爲erlang設定好倉庫。
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
# 安裝erlang
yum install erlang
出現了上面的錯誤,不急,先使用下面 下麪的指令安裝完deltarpm之後,再執行yum install erlang
。
yum provides '*/applydeltarpm' #檢視依賴包的位置
yum -y install deltarpm #安裝命令
如下圖則erlang安裝成功。
可以使用下面 下麪指令檢視erlang是否成功安裝。
erl
使用下面 下麪指令安裝socat。
yum install socat
安裝RabbitMQ。
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm
如下圖則表示RabbitMQ安裝成功。
注意:高版本的RabbitMQ不帶組態檔,需要去github上面拷貝並放到/etc/rabbitmq
目錄下面 下麪。
rabbitmq-server-3.8.5-1.el7.noarch.rpm包下載鏈接,提取碼:o9z3
① 啓動rabbitmq中的外掛管理。
rabbitmq-plugins enable rabbitmq_management
出現下圖則表示啓動成功。
② RabbitMQ服務的啓動、重新啓動與停止。
# 啓動RabbitMQ服務
systemctl start rabbitmq-server
# 重新啓動RabbitMQ服務
systemctl restart rabbitmq-server
# 停止RabbitMQ服務
systemctl stop rabbitmq-server
③ 檢視RabbitMQ服務的狀態。
systemctl status rabbitmq-server
下圖表示RbbitMQ處於執行狀態。
啓用下面 下麪設定可以允許所有使用者存取,而不僅僅是本地存取。
開啓rabbitmq服務。
service rabbitmq-server start
開啓web管理介面。
rabbitmq-plugins enable rabbitmq_management
使用IP:15672
。
RabbitMQ中生產者和消費者機制 機製是這樣的:生產者負責生產訊息並將該訊息發送到交換機(需指定路由key),消費者根據路由key從交換機上找到對應的訊息佇列,從這個佇列中取訊息。
注意:生產端和消費端都可以申明交換機或者佇列,只要所申明的屬性不變,RabbitMQ自己回去判斷是否一存在過,從而保證交換機或者佇列只建立一次。
這個訊息模型不需要用到交換機,所以生產者在投遞訊息的時候,將路由key直接指定爲消費者要消費的佇列的名字即可。
生產者程式碼:
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
String queueName = "hello world_queue";
//宣告通道對應的訊息佇列(表示建立佇列, 不管是消費者那宣告還是生產者那宣告都可以)
//第一個參數表示佇列名,
//第二個參數表示佇列是否開啓持久化
//第三個參數表示佇列是否開啓獨佔模式(獨佔表示一個佇列只能有一個channel監聽)
//第四個參數表示是否自動刪除佇列
//第五個參數表示額外附加參數
channel.queueDeclare(queueName, false, false, false, null);
String msg = "Hello, I am rabbitMQ!";
//第一個參數表示交換機名字
//第二個參數表示路由key(當第一個參數爲空字串即交換機爲default時, 這個參數直接指向佇列名稱)
//第三個參數表示傳遞訊息的額外設定
//第四個參數表示訊息的具體內容
channel.basicPublish("", queueName, null, msg.getBytes());
channel.close();
connection.close();
}
}
消費者程式碼:
public class Consumer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
String queueName = "hello world_queue";
//宣告通道對應的訊息佇列(表示建立佇列, 不管是消費者那宣告還是生產者那宣告都可以)
//第一個參數表示佇列名,
//第二個參數表示佇列是否開啓持久化
//第三個參數表示佇列是否開啓獨佔模式(獨佔表示一個佇列只能有一個channel監聽)
//第四個參數表示是否自動刪除佇列
//第五個參數表示額外附加參數
channel.queueDeclare(queueName, false, false, false, null);
//建立消費者
//第一個參數表示佇列名
//第二個參數表示是否開啓自動簽收訊息
//第三個訊息是消費時候的回撥介面
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者消費了一條訊息: " + new String(body));
}
});
}
}
這個模型和Hello World訊息模型本質是一樣的,只不過是佇列上面多了幾個消費者,這些消費者共同消費佇列中的這些訊息。
生產者程式碼:
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
String queueName = "work_queues";
//宣告通道對應的訊息佇列(表示建立佇列, 不管是消費者那宣告還是生產者那宣告都可以)
//第一個參數表示佇列名,
//第二個參數表示佇列是否開啓持久化
//第三個參數表示佇列是否開啓獨佔模式(獨佔表示一個佇列只能有一個channel監聽)
//第四個參數表示是否自動刪除佇列
//第五個參數表示額外附加參數
channel.queueDeclare(queueName, false, false, false, null);
String msg = "Hello, I am rabbitMQ!";
//第一個參數表示交換機名字
//第二個參數表示路由key(當第一個參數爲空字串即交換機爲default時, 這個參數直接指向佇列名稱)
//第三個參數表示傳遞訊息的額外設定
//第四個參數表示訊息的具體內容
for (int i = 0; i < 20; i++) {
channel.basicPublish("", queueName, null, msg.getBytes());
}
channel.close();
connection.close();
}
}
消費者1程式碼:
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
String queueName = "work_queues";
//宣告通道對應的訊息佇列(表示建立佇列, 不管是消費者那宣告還是生產者那宣告都可以)
//第一個參數表示佇列名,
//第二個參數表示佇列是否開啓持久化
//第三個參數表示佇列是否開啓獨佔模式(獨佔表示一個佇列只能有一個channel監聽)
//第四個參數表示是否自動刪除佇列
//第五個參數表示額外附加參數
channel.queueDeclare(queueName, false, false, false, null);
//建立消費者
//第一個參數表示佇列名
//第二個參數表示是否開啓自動簽收訊息
//第三個訊息是消費時候的回撥介面
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1消費了一條訊息: " + new String(body));
}
});
}
}
消費者2程式碼:
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
String queueName = "work_queues";
//宣告通道對應的訊息佇列(表示建立佇列, 不管是消費者那宣告還是生產者那宣告都可以)
//第一個參數表示佇列名,
//第二個參數表示佇列是否開啓持久化
//第三個參數表示佇列是否開啓獨佔模式(獨佔表示一個佇列只能有一個channel監聽)
//第四個參數表示是否自動刪除佇列
//第五個參數表示額外附加參數
channel.queueDeclare(queueName, false, false, false, null);
//消費端限流, 一次只接收一條未確認的訊息
channel.basicQos(1);
//建立消費者
//第一個參數表示佇列名
//第二個參數表示是否開啓自動簽收訊息
//第三個訊息是消費時候的回撥介面
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//第一個參數是訊息的唯一識別符號, 第二份參數表示是否開啓批次簽收
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("消費者2消費了一條訊息: " + new String(body));
}
});
}
}
這個訊息模型類似廣播模式,生產者將訊息投遞給交換機,交換機不需要指定路由key,每個消費者有自己的佇列,交換機會把訊息發送給所有系結該交換機的佇列,只要消費者自己的訊息佇列系結的是同一個交換機,就可以實現一條訊息被多個消費者消費。交換機型別需要指定爲fanout
。
生產者程式碼:
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("publish subscribe_exchange", "fanout");
String msg = "Hello, I am rabbitMQ!";
//第一個參數表示交換機名字
//第二個參數表示路由key(當第一個參數爲空字串即交換機爲default時, 這個參數直接指向佇列名稱)
//第三個參數表示傳遞訊息的額外設定
//第四個參數表示訊息的具體內容
channel.basicPublish("publish subscribe_exchange", "", null, msg.getBytes());
channel.close();
connection.close();
}
}
消費者1程式碼:
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//建立臨時佇列, 每個消費者有自己的佇列
String queueName = channel.queueDeclare().getQueue();
//宣告交換機(表示建立交換機, 不管是消費者那宣告還是生產者那宣告都可以)
//第一個參數表示交換機的名字
//第二個參數表示交換機的型別
channel.exchangeDeclare("publish subscribe_exchange", "fanout");
//系結佇列和交換機, fanout即發佈訂閱模式不需要路由key
channel.queueBind(queueName, "publish subscribe_exchange", "");
//建立消費者
//第一個參數表示佇列名
//第二個參數表示是否開啓自動簽收訊息
//第三個訊息是消費時候的回撥介面
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1消費了一條訊息: " + new String(body));
}
});
}
}
消費者2程式碼:
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//建立臨時佇列, 每個消費者有自己的佇列
String queueName = channel.queueDeclare().getQueue();
//宣告交換機(表示建立交換機, 不管是消費者那宣告還是生產者那宣告都可以)
//第一個參數表示交換機的名字
//第二個參數表示交換機的型別
channel.exchangeDeclare("publish subscribe_exchange", "fanout");
//fanout即發佈訂閱模式不需要路由key
channel.queueBind(queueName, "publish subscribe_exchange", "");
//建立消費者
//第一個參數表示佇列名
//第二個參數表示是否開啓自動簽收訊息
//第三個訊息是消費時候的回撥介面
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2消費了一條訊息: " + new String(body));
}
});
}
}
生產者將訊息投遞到交換機同時指定路由key,消費者根據路由key從對應的訊息佇列中獲取訊息進行消費,這個模型必須當雙方的路由key相同時才能 纔能匹配到。
生產者程式碼:
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare("routing_exchange", "direct");
String msg = "Hello, I am rabbitMQ!";
//第一個參數表示交換機名字
//第二個參數表示路由key(當第一個參數爲空字串即交換機爲default時, 這個參數直接指向佇列名稱)
//第三個參數表示傳遞訊息的額外設定
//第四個參數表示訊息的具體內容
channel.basicPublish("routing_exchange", "hello routing", null, msg.getBytes());
channel.close();
connection.close();
}
}
消費者程式碼:
public class Consumer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//建立臨時佇列
String queueName = channel.queueDeclare().getQueue();
//系結佇列和交換機
channel.queueBind(queueName, "routing_exchange", "hello routing");
//建立消費者
//第一個參數表示佇列名
//第二個參數表示是否開啓自動簽收訊息
//第三個訊息是消費時候的回撥介面
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者消費了一條訊息: " + new String(body));
}
});
}
}
這個訊息模型其實就是Routing訊息模型的升級版,消費者的路由key可以使用#表示匹配大於等於零個詞
,*表示匹配一個詞
,其他都和第四種訊息模型一樣。
生產者程式碼:
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//宣告交換機
channel.exchangeDeclare("topics_exchange", "topic");
String msg = "Hello, I am rabbitMQ!";
//第一個參數表示交換機名字
//第二個參數表示路由key(當第一個參數爲空字串即交換機爲default時, 這個參數直接指向佇列名稱)
//第三個參數表示傳遞訊息的額外設定
//第四個參數表示訊息的具體內容
channel.basicPublish("topics_exchange", "user", null, msg.getBytes());
channel.close();
connection.close();
}
}
消費者程式碼:
public class Consumer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//建立臨時佇列
String queueName = channel.queueDeclare().getQueue();
//系結佇列和交換機, #表示匹配零個、一個或多個詞, *表示只匹配一個詞
channel.queueBind(queueName, "topics_exchange", "user.#");
//建立消費者
//第一個參數表示佇列名
//第二個參數表示是否開啓自動簽收訊息
//第三個訊息是消費時候的回撥介面
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者消費了一條訊息: " + new String(body));
}
});
}
}