兩個系統或兩個使用者端之間進行訊息傳送,利用高效可靠的訊息傳遞機制進行平臺無關的資料交流,並基於資料通訊來進行分散式系統的整合。通過提供訊息傳遞和訊息排隊模型,它可以在分散式環境下擴充套件程序間的通訊。
訊息中介軟體,總結起來作用有三個:非同步化提升效能、降低耦合度、流量削峰。
系統A傳送訊息給中介軟體後,自己的工作已經完成了,不用再去管系統B什麼時候完成操作。而系統B拉去訊息後,執行自己的操作也不用告訴系統A執行結果,所以整個的通訊過程是非同步呼叫的。
有些業務不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的時候再去處理它們。
在任何重要的系統中,都會有需要不同的處理時間的元素。訊息佇列通過一個緩衝層來幫助任務最高效率的執行,該緩衝有助於控制和優化資料流經過系統的速度。以調節系統響應時間。
降低工程間的強依賴程度,針對異構系統進行適配。在專案啟動之初來預測將來專案會碰到什麼需求,是極其困難的。通過訊息系統在處理過程中間插入了一個隱含的、基於資料的介面層,兩邊的處理過程都要實現這一介面,當應用發生變化時,可以獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。
有些情況下,處理資料的過程會失敗。除非資料被持久化,否則將造成丟失。訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險。許多訊息佇列所採用的」插入-獲取-刪除」正規化中,在把一個訊息從佇列中刪除之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的儲存直到你使用完畢。
因為訊息佇列解耦了你的處理過程,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變程式碼、不需要調節引數。便於分散式擴容。
系統的一部分元件失效時,不會影響到整個系統。訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。
在大多使用場景下,資料處理的順序都很重要。大部分訊息佇列本來就是排序的,並且能保證資料會按照特定的順序來處理。
在存取量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量無法提取預知;如果以為了能處理這類瞬間峰值存取為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住突發的存取壓力,而不會因為突發的超負荷的請求而完全崩潰。
分散式系統產生的海量資料流,如:業務紀錄檔、監控資料、使用者行為等,針對這些資料流進行實時或批次採集彙總,然後進行巨量資料分析是當前網際網路的必備技術,通過訊息佇列完成此類資料收集是最好的選擇。
特性MQ | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
生產者消費者模式 | 支援 | 支援 | 支援 | 支援 |
釋出訂閱模式 | 支援 | 支援 | 支援 | 支援 |
請求迴應模式 | 支援 | 支援 | 不支援 | 不支援 |
Api完備性 | 高 | 高 | 高 | 高 |
多語言支援 | 支援 | 支援 | java | 支援 |
單機吞吐量 | 萬級 | 萬級 | 萬級 | 十萬級 |
訊息延遲 | 無 | 微秒級 | 毫秒級 | 毫秒級 |
可用性 | 高(主從) | 高(主從) | 非常高(分散式) | 非常高(分散式) |
訊息丟失 | 低 | 低 | 理論上不會丟失 | 理論上不會丟失 |
檔案的完備性 | 高 | 高 | 高 | 高 |
提供快速入門 | 有 | 有 | 有 | 有 |
社群活躍度 | 高 | 高 | 有 | 高 |
商業支援 | 無 | 無 | 商業雲 | 商業雲 |
Queue: 佇列儲存,常用與對等訊息模型 ,預設只能由唯一的一個消費者處理。一旦處理訊息刪除。
Topic: 主題儲存,用於訂閱/釋出訊息模型,主題中的訊息,會傳送給所有的消費者同時處理。只有在訊息可以重複處 理的業務場景中可使用,Queue/Topic都是 Destination 的子介面
ConnectionFactory: 連線工廠,客戶用來建立連線的物件,例如ActiveMQ提供的ActiveMQConnectionFactory
Connection: JMS Connection封裝了客戶與JMS提供者之間的一個虛擬的連線。
Destination: 訊息的目的地,目的地是客戶用來指定它生產的訊息的目標和它消費的訊息的來源的物件。JMS1.0.2規範中定義了兩種訊息傳遞域:對等(PTP)訊息傳遞域和釋出/訂閱訊息傳遞域。
對等訊息傳遞域的特點如下:
- 每個訊息只能有一個消費者。
- 訊息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者傳送訊息的時候是否處於執行狀態,它都可以提取訊息。
釋出/訂閱訊息傳遞域的特點如下:
- 每個訊息可以有多個消費者。
- 生產者和消費者之間有時間上的相關性。
- 訂閱一個主題的消費者只能消費自它訂閱之後釋出的訊息。JMS規範允許客戶建立持久訂閱,這在一定程度上放鬆了時間上的相關性要求 。持久訂閱允許消費者消費它在未處於啟用狀態時傳送的訊息。
在對等訊息傳遞域中,目的地被成為佇列(queue);在釋出/訂閱訊息傳遞域中,目的地被成為主題(topic)。
JMS訊息由以下三部分組成的:
訊息頭:
每個訊息頭欄位都有相應的getter和setter方法。
訊息屬性:
如果需要除訊息頭欄位以外的值,那麼可以使用訊息屬性。
訊息體:
JMS定義的訊息型別有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。
訊息型別:
屬性 | 型別 |
---|---|
TextMessage | 文字訊息 |
MapMessage | k/v |
BytesMessage | 位元組流 |
StreamMessage | java原始的資料流 |
ObjectMessage | 序列化的java物件 |
只有在被確認之後,才認為已經被成功地消費了,訊息的成功消費通常包含三個階段 :客戶接收訊息、客戶處理訊息和訊息被確認。在事務性對談中
,當一個事務被提交的時候,確認自動發生。在非事務性對談中
,訊息何時被確認取決於建立對談時的應答模式(acknowledgement mode)。該引數有以下三個可選值:
Session.AUTO_ACKNOWLEDGE:
當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,對談自動確認客戶收到的訊息。Session.CLIENT_ACKNOWLEDGE:
客戶通過訊息的acknowledge方法確認訊息。需要注意的是,在這種模式中,確認是在對談層上進行:確認一個被消費的訊息將自動確認所有已被對談消費的訊息。例如,如果一個訊息消費者消費了10個訊息,然後確認第5個訊息,那麼所有10個訊息都被確認。Session.DUPS_ACKNOWLEDGE:
該選擇只是對談遲鈍的確認訊息的提交。如果JMS Provider失敗,那麼可能會導致一些重複的訊息。如果是重複的訊息,那麼JMS Provider必須把訊息頭的JMSRedelivered欄位設定為true。可以使用訊息優先順序來指示JMS Provider首先提交緊急的訊息。優先順序分10個級別,從0(最低)到9(最高)。如果不指定優先順序,預設級別是4。需要注意的是,JMS Provider並不一定保證按照優先順序的順序提交訊息。
可以設定訊息在一定時間後過期,預設是永不過期。
可以通過對談上的createTemporaryQueue方法和createTemporaryTopic方法來建立臨時目的地。它們的存在時間只限於建立它們的連線所保持的時間。只有建立該臨時目的地的連線上的訊息消費者才能夠從臨時目的地中提取訊息。
ActiveMQ是一種開源的基於JMS(Java Message Servie)規範的一種訊息中介軟體的實現,ActiveMQ的設計目標是提供標準的,訊息導向的,能夠跨越多語言和多系統的應用整合訊息通訊中介軟體。
官網地址:http://activemq.apache.org/
1. KahaDB儲存: KahaDB是預設的持久化策略,所有訊息順序新增到一個紀錄檔檔案中,同時另外有一個索引檔案記錄指向這些紀錄檔的儲存地址,還有一個事務紀錄檔用於訊息回覆操作。是一個專門針對訊息持久化的解決方案,它對典型的訊息使用模式進行了優化
特性:
1、紀錄檔形式儲存訊息;
2、訊息索引以 B-Tree 結構儲存,可以快速更新;
3、 完全支援 JMS 事務;
4、支援多種恢復機制kahadb 可以限制每個資料檔案的大小。不代表總計資料容量。
2. AMQ 方式: 只適用於 5.3 版本之前。 AMQ 也是一個檔案型資料庫,訊息資訊最終是儲存在檔案中。記憶體中也會有快取資料。
3. JDBC儲存 : 使用JDBC持久化方式,資料庫預設會建立3個表,每個表的作用如下:
activemq_msgs:queue和topic的訊息都存在這個表中
activemq_acks:儲存持久訂閱的資訊和最後一個持久訂閱接收的訊息ID
activemq_lock:跟kahadb的lock檔案類似,確保資料庫在某一時刻只有一個broker在存取
4. LevelDB儲存 : LevelDB持久化效能高於KahaDB,但是在ActiveMQ官網對LevelDB的表述:LevelDB官方建議使用以及不再支援,推薦使用的是KahaDB
5.Memory 訊息儲存: 顧名思義,基於記憶體的訊息儲存,就是訊息儲存在記憶體中。persistent=」false」,表示不設定持 久化儲存,直接儲存到記憶體中,在broker標籤處設定。
協定官網API:http://activemq.apache.org/configuring-version-5-transports.html
Transmission Control Protocol (TCP):
(1)TCP協定傳輸可靠性高,穩定性強
(2)高效性:位元組流方式傳遞,效率很高
(3)有效性、可用性:應用廣泛,支援任何平臺
New I/O API Protocol(NIO)
NIO協定和TCP協定類似,但NIO更側重於底層的存取操作。它允許開發人員對同一資源可有更多的client呼叫和伺服器端有更多的負載。
適合使用NIO協定的場景:
(1)可能有大量的Client去連結到Broker上一般情況下,大量的Client去連結Broker是被作業系統的執行緒數所限制的。因此,NIO的實現比TCP需要更少的執行緒去執行,所以建議使用NIO協定
(2)可能對於Broker有一個很遲鈍的網路傳輸NIO比TCP提供更好的效能
NIO連線的URI形式:nio://hostname:port?key=value
Transport Connector設定範例:
<transportConnectors>
<transportConnector
name="tcp"
uri="tcp://localhost:61616?trace=true" />
<transportConnector
name="nio"
uri="nio://localhost:61618?trace=true" />
</transportConnectors>
<transportConnectors>
<transportConnector
name="udp"
uri="udp://localhost:61618?trace=true" />
</transportConnectors>
<transportConnectors>
<transportConnector name="ssl" uri="ssl://localhost:61617?trace=true"/>
</transportConnectors>
這裡以windows為案例演示
下載地址:http://activemq.apache.org/components/classic/download/
解壓後直接執行
bin/win64/activemq.bat
http://localhost:8161/
賬號密碼:admin/admin
修改 ActiveMQ 組態檔 activemq/conf/jetty.xml
jettyport節點: 組態檔修改完畢,儲存並重新啟動 ActiveMQ 服務
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="127.0.0.1"/>
<property name="port" value="8161"/>
</bean>
1. jar引入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2. Sender :
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @program: activemq_01
* @ClassName Sender
* @description: 訊息傳送
* @author: muxiaonong
* @create: 2020-10-02 13:01
* @Version 1.0
**/
public class Sender {
public static void main(String[] args) throws Exception{
// 1. 獲取連線工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
// 2. 獲取一個向activeMq的連線
Connection connection = factory.createConnection();
// 3. 獲取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.找目的地,獲取destination,消費端,也會從這個目的地取訊息
Queue queue = session.createQueue("user");
// 5.1 訊息建立者
MessageProducer producer = session.createProducer(queue);
// consumer --> 消費者
// producer --> 建立者
// 5.2. 建立訊息
for (int i = 0; i < 100; i++) {
TextMessage textMessage = session.createTextMessage("hi:"+i);
// 5.3 向目的地寫入訊息
producer.send(textMessage);
Thread.sleep(1000);
}
// 6.關閉連線
connection.close();
System.out.println("結束。。。。。");
}
}
3. Receiver :
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @program: activemq_01
* @ClassName Receiver
* @description: 訊息接收
* @author: muxiaonong
* @create: 2020-10-02 13:01
* @Version 1.0
**/
public class Receiver {
public static void main(String[] args) throws Exception{
// 1. 獲取連線工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
// 2. 獲取一個向activeMq的連線
Connection connection = factory.createConnection();
connection.start();
// 3. 獲取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.找目的地,獲取destination,消費端,也會從這個目的地取訊息
Destination queue = session.createQueue("user");
// 5 獲取訊息
MessageConsumer consumer = session.createConsumer(queue);
while(true){
TextMessage message = (TextMessage)consumer.receive();
System.out.println("message:"+message.getText());
}
}
}
測試結果:
message:hi:38
message:hi:39
message:hi:40
message:hi:41
message:hi:42
message:hi:43
message:hi:44
message:hi:45
web後臺顯示有一個消費者處於連線狀態,且已消費了68個message,而該條佇列已沒有message待消費了
今天的MQ入門教學系列就這裡了,感興趣的小夥伴可以試試,遇到了什麼問題,或者有疑問的,都可以在下方留言,小農看見了會第一時間回覆大家,MQ作為一個訊息中介軟體,不管是面試還是工作中都會經常用到,所以是很有必要去了解和學習的一個技術點,今天的分享就到這裡了,謝謝各位小夥伴的觀看,我們下篇文章見,大家加油!