MQ全稱Message Queue,訊息佇列的意思,是訊息導向中介層的一種(message-oriented middleware),它底層使用Queue儲存訊息,遵循先進先出的原則。系統與系統之間進行資訊交流時,可以把訊息發送到MQ中,然後由MQ完成訊息的推播,訊息佇列可以在分佈式環境下提供應用解耦,彈性伸縮,冗餘儲存、流量削峯,非同步通訊,數據同步等功能。
大致的過程是這樣的:
發送者把訊息發送給訊息伺服器,訊息伺服器將訊息存放在若幹佇列/主題topic中,在合適的時候,訊息伺服器回將訊息轉發給接受者。在這個過程中,發送和接收是非同步的,也就是發送無需等待,而且發送者和接受者的生命週期也沒有必然的關係;尤其在發佈pub/訂閱sub模式下,也可以完成一對多的通訊,即讓一個訊息有多個接受者。
以電商系統舉個例子:
在沒有MQ情況下,當使用者完成下訂單成功之後,訂單系統需要呼叫支付系統完成支付,還需要呼叫倉儲系統來安排物流,一個下訂單的操作可能需要級聯的呼叫許多個微服務,當某一時刻訂單系統的併發量急劇增大,比如秒殺活動,此時因爲後臺要呼叫的服務較多,造成訂單系統的整體負荷急劇增加,最終可能會導致訂單系統的崩潰。
那麼,我們可不可以把倉儲系統及之後的微服務剝離開來,因爲使用者訂單下發成功之後是不用馬上就安排物流配送的,整個物流鏈路可以與訂單系統非同步執行,只需要訂單系統發送過來一個確認訊息即可。
這種情況下就可以引入MQ,當使用者下訂單成功之後,訂單系統向MQ發送一個訊息,通知倉儲系統安排物流配送,MQ不必等待倉儲系統鏈路的完成,只需要接着處理支付系統鏈路的業務即可,這樣可以很大程度的減輕訂單系統乃至整個應用的壓力。
那麼,我們可不可以繞過MQ,直接把訊息發送給倉儲系統呢?
原理上是行得通的,但是這是就要求倉儲系統和訂單系統必須同時線上,否則訊息是無法發送成功的。並且這樣一來,上下遊的耦合度較高,如果後續增加了一個XX系統,也需要接收訂單系統的訊息,此時就需要修改訂單系統發送訊息的程式碼,不符合開閉原則。
而加入了訊息中介軟體之後,上下遊之間的資訊交流就不用直接耦合了,訊息生產者把訊息發送給MQ,MQ在合適的時機把訊息推播給訊息消費者,消費者和生產者之間是非同步進行的,並且當大流量來襲時,消費者可以根據自身的能力消費MQ中的資訊,不會被大流量沖垮 衝垮,達到流量削峯的目的。
總的來講,在分佈式系統中引入MQ有以下幾點好處:
系統解耦:
當新的模組接進來時,可以做到程式碼改動最小;因爲訊息生產者是把訊息發送到一條虛擬的通道(主題或者佇列)上的,當有新的模組接入時,只需要在新的模組中設定要監聽/訂閱的通道即可,原來的上遊模組是不需要進行改動的。
執行非同步:
MQ採用非同步處理模式,通過梳理服務之間的強弱依賴關係,將非關鍵呼叫鏈路的操作非同步化,提升整體系統的吞吐能力;訊息發送者可以發送一個訊息而無須等待響應。訊息發送者將訊息發送到一條虛擬的通道(主題或者佇列)上;訊息接收者則訂閱或者監聽該愛通道。一條訊息可能最終轉發給一個或者多個訊息接收者,這些訊息接收者都無需對訊息發送者做出同步迴應。整個過程都是非同步的。
案例:
也就是說,一個系統跟另一個系統之間進行通訊的時候,假如系統A希望發送一個訊息給系統B,讓他去處理。但是系統A不關注系統B到底怎麼處理或者有沒有處理好,所以系統A把訊息發送給MQ,然後就不管這條訊息的「死活了」,接着系統B從MQ裏面消費出來處理即可。至於怎麼處理,是否處理完畢,什麼時候處理,都是系統B的事兒,與系統A無關。
流量削峯:
在MQ中,可以設定流量緩衝池,可以讓後端系統按照自身吞吐能力進行消費,不被沖垮 衝垮;
但是,引入MQ後,也會產生一些問題:
1)系統更復雜,多了一個MQ元件
2)訊息傳遞路徑更長,延時會增加
3)訊息可靠性和重複性互爲矛盾,訊息不丟不重難以同時保證
4)上遊無法知道下遊的執行結果,這一點是很致命的。所以請牢記, 呼叫方實時依賴執行結果的業務場景,請直接呼叫,不能使用MQ。
權衡利弊之後,可以得到適用MQ的場景:
上遊不關注下遊的處理結果,並且同步呼叫耗時較長;
不適用的場景:
上遊實時關注下遊的處理結果。
MQ指的是訊息佇列,放置在系統之間的訊息中介軟體,是一種理念,常用的落地實現有一下幾種:
從整體上來講,ActiveMQ屬於Apache公司,是比較老牌的MQ產品,並且底層使用java編寫;RabbitMQ使用的也比較多,但是它底層採用的是Erlang語言;Kafka主要用在大數據領域,RocketMQ是ailibaba參考Kafka推出的一個MQ產品,使用的比較多。
本次僅介紹ActiveMQ的使用,其他產品的使用大致思路相同,因爲它們都是基於MQ理唸的產品,只不過一些落地的實現有所區別。
第一步:進入官網下載Linux系統安裝包,官網下載地址:
http://activemq.apache.org/download-archives
選擇版本後,進入選擇操作系統頁面:
第二步:下載完成之後,將tar包上傳到Linux操作系統/opt目錄下,並解壓
tar -zxvf apache-activemq-5.15.9-bin.tar.gz
第三步:安裝成功,進入 apache-activemq-5.15.9/bin目錄下,執行activemq命令:
./activemq start ---- 啓動activemq
./activemq stop ---- 停止activemq
./activemq restart ---- 重新啓動activemq
#將啓動、停止、重新啓動日誌重定向至日誌檔案(檔名自定義)
./activemq start >> run_activemq.log
./activemq stop >> run_activemq.log
./activemq restart >> run_activemq.log
第四步:啓動之後,可以通過ps命令檢視是否啓動成功:
ps -ef | grep activemq
第五步:ActiveMQ啓動之後,預設佔用兩個埠號,一個提供JMS服務(61616),一個提供管理控制檯服務(8161),如果需要存取這兩個埠,請設定linux防火牆,開放這兩個埠。
第六步:開啓防火牆之後,存取http://192.168.234.133:8161(Linux主機ip+port),即可進入ActiveMQ的管理控制檯。得到此頁面也意味着ActiveMQ安裝和啓動成功。
在ActiveMQ中,訊息通道有兩種,一種是Queue,一種是Topic,二者的比較如下:
Queue是一種一對一的方式,生產者將訊息發送到MQ中的一個Queue中,MQ再將訊息推播給監聽此Queue的消費者。注意,這種方式的訊息只會被推播一次,也就是說,消費者應當只有一個。當消費者是一個叢集時,預設採用輪詢的方式進行訊息推播,所有服務範例均分訊息。
Topic是一種一對多的方式,訊息發佈者將訊息發送到MQ中的一個Topic中,MQ再將訊息推播給訂閱此Topic的訂閱者。注意,當Topic的訂閱者不止一個時,這種方式的訊息會被重複推播,總推播次數=訂閱者個數*訊息數。
更詳細的比較如下:
比較專案 | Queue佇列模式 | Topic佇列模式 |
---|---|---|
工作模式 | "負載均衡"模式,如果當前沒有消費者,訊息也不會丟棄;如果有多個消費者,那麼一條訊息也只會發送給其中一個消費者,並且要求消費者ack資訊 | "訂閱-發佈"模式,如果當前沒有訂閱者,訊息將會被丟棄,如果有多個訂閱者,那麼這些訂閱者都會收到訊息 |
有無狀態 | Queue數據預設會在mq伺服器上已檔案形式儲存,比如Active MQ一般儲存在$AMQ_HOME\data\kr-store\data下面 下麪,也可以設定成DB儲存 | 無狀態 |
傳遞完整性 | 訊息不會被丟棄 | 如果沒有訂閱者,訊息會被丟棄 |
處理效率 | 由於一條訊息只發送給一個消費者,所以就算消費者再多,效能也不會有明顯降低。當然不同訊息協定的具體效能也是有差異的 | 由於訊息要按照訂閱者的數量進行復制,所以處理效能會隨着訂閱者的增加而明顯降低,並且還要結合不同訊息協定自身的效能差異 |
下面 下麪我們分Queue和Topic兩種模式來書寫入門程式,但是二者的整體書寫思路是一致的:
JMS(java message service)開發的基本步驟:
1:建立一個connection factory
2:通過connection factory來建立JMS connection
3:啓動JMS connection
4:通過JMS connection建立JMS session
5:建立JMS destination(目的地 佇列/主題)
6:建立JMS producer或者建立JMS consume並設定destination
7:建立JMS consumer或者註冊一個JMS message listener
8:發送(send)或者接收(receive)JMS message
9:關閉所有JMS資源
第一步:環境搭建
建立Maven工程,引入ActiveMQ依賴:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
第二步:書寫生產者主程式
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
private static final String ACTIVEMQ_URL = "tcp://192.168.234.133:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連線工廠,獲得connection並啓動存取
Connection connection = activeMQConnectionFactory.createConnection();
//3.建立對談session
//兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是佇列queue還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.建立訊息的生產者
MessageProducer messageProducer = session.createProducer(queue);
//6、啓動連線
connection.start();
//7.通過使用訊息生產者,生產三條訊息,發送到MQ的佇列裏面
for (int i = 0; i < 3; i++) {
//8.建立訊息
TextMessage textMessage = session.createTextMessage("msg---hello" + i);//理解爲一個字串
//9.通過messageProducer發送給MQ佇列
messageProducer.send(textMessage);
}
//10.關閉資源
messageProducer.close();
session.close();
System.out.println("****訊息發佈到MQ佇列完成");
}
}
第三步:書寫消費者主程式
兩種書寫方式,一種是阻塞式,一種是監聽式。
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 阻塞式訊息消費者
*/
public class JmsConsumer {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連線工廠,獲得connection並啓動存取
Connection connection = activeMQConnectionFactory.createConnection();
//3.建立對談session
//兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是佇列queue還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.建立訊息的消費者,指定消費哪一個佇列裏面的訊息
MessageConsumer messageConsumer = session.createConsumer(queue);
//6、啓動連線
connection.start();
//回圈獲取
while (true) {
//7.通過消費者呼叫方法獲取佇列裏面的訊息(發送的訊息是什麼型別,接收的時候就強轉成什麼型別)
//receive()方法爲阻塞方法,如果沒有接受到訊息,就會一致阻塞等待
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (textMessage != null) {
System.out.println("****消費者接收到的訊息: " + textMessage.getText());
}else {
break;
}
}
//8.關閉資源
messageConsumer.close();
session.close();
connection.close();
}
}
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* 監聽模式下的消費者
*/
public class JmsConsumer2 {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連線工廠,獲得connection並啓動存取
Connection connection = activeMQConnectionFactory.createConnection();
//3.建立對談session
//兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是佇列queue還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.建立訊息的消費者,指定消費哪一個佇列裏面的訊息
MessageConsumer messageConsumer = session.createConsumer(queue);
//6、啓動連線
connection.start();
//7.通過監聽的方式消費訊息
/*
非同步非阻塞式方式監聽器(onMessage)
訂閱者或消費者通過建立的消費者物件,給消費者註冊訊息監聽器setMessageListener,
當訊息有訊息的時候,系統會自動呼叫MessageListener類的onMessage方法
我們只需要在onMessage方法內判斷訊息型別即可獲取訊息
*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
//8.把message轉換成訊息發送前的型別並獲取訊息內容
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("****消費者接收到的訊息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.out.println("執行了39行");
//保證控制檯不關閉,阻止程式關閉
System.in.read();
//關閉資源
messageConsumer.close();
session.close();
connection.close();
}
}
第四步:啓動生產者,觀察ActiveMQ管理控制檯,得到如下:
Number Of Pending Messages=等待消費的訊息,這個是未出佇列的數量,=總接收數-總出佇列數。
Number Of Consumers=消費者數量,消費者端的消費者數量。
Messages Enqueued=進隊訊息數,進佇列的總訊息量,包括出佇列的。這個數只增不減。
Messages Dequeued=出隊訊息數,可以理解爲是消費者消費掉的數量。
第五步:啓動消費者JmsConsumer,觀察控制檯
可以看到當前的消費者數量爲1,待消費爲0,總進佇列數爲3,已經消費爲3.
如果我們先啓動JmsConsumer和JmsConsumer2,再啓動生產者,此時生產者生產的訊息將通過負載均衡的方式分發到JmsConsumer和JmsConsumer2中。
第一步:環境搭建
建立Maven工程,引入ActiveMQ依賴:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
第二步:書寫生產者主程式
package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProducer_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
//1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連線工廠,獲得connection並啓動存取
Connection connection = activeMQConnectionFactory.createConnection();
//3.建立對談session
//兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是佇列queue還是主題topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.建立訊息的生產者
MessageProducer messageProducer = session.createProducer(topic);
//6、啓動連線
connection.start();
//7.通過使用訊息生產者,生產三條訊息,發送到MQ的佇列裏面
for (int i = 0; i < 3; i++) {
//8.通過session建立訊息
TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
//9.使用指定好目的地的訊息生產者發送訊息
messageProducer.send(textMessage);
}
//10.關閉資源
messageProducer.close();
session.close();
connection.close();
System.out.println("****TOPIC_NAME訊息發佈到MQ完成");
}
}
第三步:書寫消費者主程式
兩種書寫方式,一種是阻塞式,一種是監聽式。
package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 阻塞式訊息消費者
*/
public class JmsConsumer_Topic {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String TOPIC_NAME = "topic-ly";
public static void main(String[] args) throws JMSException {
//1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連線工廠,獲得connection並啓動存取
Connection connection = activeMQConnectionFactory.createConnection();
//3.建立對談session
//兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是佇列queue還是主題topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.建立訊息的消費者,指定消費哪一個佇列裏面的訊息
MessageConsumer messageConsumer = session.createConsumer(topic);
//6、啓動連線
connection.start();
//回圈獲取
while (true) {
//7.通過消費者呼叫方法獲取佇列裏面的訊息(發送的訊息是什麼型別,接收的時候就強轉成什麼型別)
//receive()方法爲阻塞方法,如果沒有接受到訊息,就會一致阻塞等待
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (textMessage != null) {
System.out.println("****消費者接收到的訊息: " + textMessage.getText());
}else {
break;
}
}
//8.關閉資源
messageConsumer.close();
session.close();
connection.close();
}
}
package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer2_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
public static final String TOPIC_NAME = "topic-ly";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("我是2號消費者");
//1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通過連線工廠,獲得connection並啓動存取
Connection connection = activeMQConnectionFactory.createConnection();
//3.建立對談session
//兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是佇列queue還是主題topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.建立訊息的消費者
MessageConsumer messageConsumer = session.createConsumer(topic);
//6、啓動連線
connection.start();
//7.建立訊息的消費者,指定消費哪一個佇列裏面的訊息
messageConsumer.setMessageListener(message -> {
if (message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
}
}
第四步:先啓動消費者JmsConsumer_Topic和JmsConsumer2_Topic,此時觀察ActiveMQ管理控制檯,得到如下:
第五步:啓動生產者,觀察控制檯
可以看到當前的消費者數量爲2,總進佇列數爲3,已經消費爲6(2*3)。
因爲Topic是發佈/訂閱模式,生產者發佈一次,所有的訂閱此主題的消費者都能夠接收全部訊息,類似於微信公衆號。
但是要注意的是,在Topic模式下,消費者只能消費它存在之後生產者發佈的訊息,所以必須要先啓動消費者,再啓動生產者,如果生產者發佈的訊息沒有消費者,則這些訊息會成爲廢訊息,永遠不會有消費者接收。
而Queue模式下就不存在這種問題,因爲ActiveMQ會儲存Queue訊息,等有消費者時再進行推播。