體驗連結:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6
本教學將Demo演示使用java使用者端傳送訊息和消費的應用場景
並行訊息,也叫普通訊息,是相對順序訊息而言的,普通訊息的效率最高。本教學將簡單演示如何使用純java client傳送和消費訊息。
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
再執行命令, 可以看到正常生產和消費輸出
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 執行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime
3. Demo程式碼說明
Demo程式碼可以檢視github。並行訊息,意思是生產者可以並行的向topic中傳送訊息, 消費端不區分順序的訊息,這種模式效率最好。生產者demo程式碼如下:
最後留一個思考題給大家: 生產者範例和消費者範例, 都是執行緒安全的嗎?
順序訊息分為分割區有序和全域性有序。生產消費程式碼都是一樣的, 區別在於分割區有序的topic中queue個數可以是任意有效值,全域性有序的topic要求queue的個數為1。順序訊息的實現非常簡單易懂,但犧牲了可用性,單節點故障會直接影響順序訊息。
什麼是分割區有序訊息,什麼場景應該使用呢,又該如何傳送分割區有序訊息?分割區有序表示在一個queue中的訊息是有序的,傳送訊息時設定設定了相同key的訊息會被傳送到同一個queue中。
本教學將簡單演示如何使用純java client傳送和消費順序訊息。
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
再執行命令, 可以看到正常生產和消費輸出。 消費輸出注意看相同queue id的訊息輸出內容中的數位,按照從小到大就是正確的。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 執行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime
3. Demo程式碼說明
Demo程式碼可以檢視github。
生產者會根據設定的keys做hash,相同hash值的訊息會傳送到相同的queue中。所以相同hash值的訊息需要保證在同一個執行緒中順序的傳送。
消費者使用相對比較簡單, 訊息監聽類實現org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly介面即可。相同queue的訊息需要序列處理,這樣救保證消費的順序性
延遲訊息,對於一些特殊場景比如訂票後30分鐘不支付自動取消等類似場景比較有用。本教學將簡單演示如何使用純java client傳送和消費延遲訊息。
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
執行命令, 可以看到正常生產和消費輸出。 目前RocketMQ支援多種延遲級別, 不過每種延遲級別都是基於RocketMQ自身,實際延遲時間會加上Broker-Client端的網路情況不同而略有差異。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 執行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime
3. Demo程式碼說明
Demo程式碼可以檢視github。
生產者在傳送訊息的時候需要設定延遲級別,RocketMQ支援多種延遲級別。如果把延遲時間算作一個以空格分割的陣列,延遲級別就是延遲時間陣列的下標index+1。RocketMQ如何解析延遲級別和延遲時間對映關係。
事務訊息,是RocketMQ解決分散式事務的一種實現,極其簡單好用。一個事物訊息大致的生命週期如下圖
概括為如下幾個重要點:
生產者傳送half訊息(事物訊息)
Broker儲存half訊息
生產者處理本地事物,處理成功後commit事物
消費者消費到事物訊息
本教學將簡單演示如何使用純java client傳送和消費事物訊息。
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
執行命令, 可以看到事物訊息的全部過程。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 執行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime
3. Demo程式碼說明
Demo程式碼可以檢視github。在事物訊息中,消費程式碼和普通訊息的消費一樣,主要程式碼在生產者端。
生產者端的主要程式碼包含3個步驟:
這裡注意事物訊息的生產者類是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生產者類。
事物監聽類需要實現2個方法,這裡的邏輯都是mock的,實際使用的時候需要根據實際修改。
request-reply模式,可以滿足目前類似RPC同步呼叫的場景,本教學將簡單演示如何使用該模式。
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
執行命令, 可以看到正常生產和消費輸出。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 執行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime
通過程式碼結果和程式碼比較, 我們得知request-reply類似RPC同步呼叫的效果。
個人覺得:需要同步呼叫就用RPC, 不要走MQ,畢竟兩者是完全不同的目標的產品,專業的事情交給專業的產品。
3. Demo程式碼說明
Demo程式碼可以檢視github。
request-reply模式,在生產者和消費者兩端都和一般的生產消費有區別,下面分別介紹下demo程式碼。
生產者demo主要程式碼, 主要區別在於呼叫request(),而不是send()方法。
消費者demo主要程式碼: 消費程式碼主要增加了「回覆」邏輯。回覆是利用訊息傳送直接向生產者傳送一條訊息。 有點類似事物訊息中broker回查生產者。
一個小問題:事物訊息和request-reply訊息時,生產者的生產者組名有什麼要求嘛?
有時候我們只想消費部分訊息, 當然全部消費,在程式碼中過濾。 假如訊息海量時, 會有很多資源浪費,比如浪費不必要的頻寬。我們可以通過tag,sql92表示式來選擇性的消費。
cd /usr/local/services/5-rocketmq/broker-01
vim conf/broker.conf
設定項值:
// 是否支援重試訊息也過濾
filterSupportRetry=true
// 支援屬性過濾
enablePropertyFilter=true
修改後:
./restart.sh
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行tag過濾程式碼demo
執行命令, 可以看到正常生產和消費輸出。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 執行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime
3. 執行sql過濾程式碼demo
執行命令, 可以看到正常生產和消費輸出。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 執行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime
4. Demo程式碼說明
Demo程式碼可以檢視github。以下分別介紹生產者和消費者主要demo程式碼。
在生產tag訊息的時候, 訊息中需要加上傳送tag;sql92過濾的時候,加上自定義k-v。
tag過濾消費時,在訂閱topic時, 也新增上tag訂閱
SQL92過濾時,新增上SQL過濾訂閱。至於SQL92除了等號,還是支援什麼,大家可以自行自行檢視或者到群裡問。
ACL,全稱是Access Control List,是RocketMQ設計來做存取和許可權控制的。更多檔案參見github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL
0. 啟動一個叢集
cd /usr/local/services/5-rocketmq/broker-01
vim conf/broker.conf
設定項值:
aclEnable=true
修改後:
./restart.sh
1. 下載java程式碼demo(已下載則忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,執行程式碼demo
執行命令, 可以看到正常生產和消費輸出。 demo程式碼使用的admin許可權傳送和消費,實際使用需要對於每個topic,消費者組授權,才能正常生產消費。
// 進入demo程式碼目錄
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 執行程式碼
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime
3. Demo程式碼說明
Demo程式碼可以檢視github。帶ACL的生產者和消費者在初始化的時候,都必須給一個hook範例,構建方法如下:
static RPCHook getAclRPCHook(String accessKey, String secretKey) {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
在broker端secret key用來校驗資訊的完整性, access key用來校驗使用者許可權。二者缺一不可。