前兩篇文章介紹了 RocketMQ 基本概念與搭建,現在以它與 SpringBoot 的結合來介紹其基本的用法。
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
注意:rocketmq-spring-boot-starter
要與RocketMQ
的版本一致。
application.yaml
檔案設定如下:
server:
port: 9007
spring:
application:
name: rockmq-producer
rocketmq:
# NameServer地址
name-server: 192.168.0.17:9876
producer:
# 生產者組
group: producer-group
# 傳送同步訊息失敗時,重試次數,預設是 2
retry-times-when-send-failed: 2
# 傳送非同步訊息失敗時,重試次數,預設是 2
retry-times-when-send-async-failed: 2
# 傳送訊息超時時間,預設是 3s
send-message-timeout: 3000
下面介面傳送的為同步訊息,即必須收到 RocketMQ 服務響應後才能進行下一步,否則一直阻塞。
@RequestMapping("/rocketmq")
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 傳送同步訊息
*/
@RequestMapping("/syncSend")
public void syncSend() {
// 第一個引數指定Topic與Tag,格式: `topicName:tags`
// 第二個引數,訊息內容
SendResult sendResult = rocketMQTemplate.syncSend("topicClean:tagTest", "syncMessage");
System.out.println("傳送同步訊息結果:" + sendResult.toString());
}
}
消費者的依賴同上面的生產者一樣,同樣是寫下 yaml 檔案設定。
server:
port: 9008
spring:
application:
name: rockmq-consumer
rocketmq:
# NameServer地址
name-server: 192.168.0.17:9876
生產者傳送訊息到 broker
後,消費者通過監聽的方式獲取broker
傳送過來的訊息。實現監聽需要實現 RocketMQListener
介面:
/**
* 消費者監聽器
*/
@Component
@RocketMQMessageListener(
consumerGroup = "consumer-group", //消費者組
topic = "topicClean", //topic
selectorExpression = "tagTest || tagB" //tag,可以有多個
)
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("接收訊息:" + message);
}
}
分析一下引數內容
topic 這個是必須指定的,否則沒有訊息來源。
consumerGroup 是消費者組,這個必須制定。一條訊息只能被同一個消費者組裡的一個消費者消費。
selectorExpression
是用於訊息過濾的,我們在生產的時候定義了tag內容,消費者可以指定消費某些tag的訊息,具體策略如下:
上面的@RocketMQMessageListener
註解的常用設定引數:
引數 | 型別 | 預設值 | 說明 |
---|---|---|---|
consumerGroup | String | 消費者組 | |
topic | String | Topic | |
selectorType | SelectorType | SelectorType.TAG | 使用TAG 或者SQL92選擇訊息,預設tag |
selectorExpression | String | "*" | 控制哪些訊息可以選擇 |
consumeMode | ConsumeMode | ConsumeMode.CONCURRENTLY | 消費模式,並行接收還是順序接收,預設並行模式 |
messageModel | MessageModel | MessageModel.CLUSTERING | 消費模式,廣播模式還是叢集模式,預設叢集模式 |
consumeThreadMax | int | 64 | 最大消費執行緒數 |
consumeTimeout | long | 15L | 消費超時時間(一條訊息可能會阻塞使用執行緒的最長時間(以分鐘為單位)) |
nameServer | String | 組態檔中讀取:$ | nameServer地址 |
accessKey | String | 組態檔中讀取:$ | AK |
secretKey | String | 組態檔中讀取:$ | SK |
accessChannel | String | $ | |
maxReconsumeTimes | int | -1 | 最大訊息重試次數 |
首先第一步啟動剛剛編寫好的生產者及消費者服務。
呼叫生產者傳送訊息的介面/rocketmq/syncSend
後,控制檯返回結果 sendStatus=SEND_OK,表示訊息成功傳送到 broker
:
傳送同步訊息結果:SendResult [sendStatus=SEND_OK, msgId=7F000001178C18B4AAC288364E780000, offsetMsgId=7C471A0C00002A9F0000000000031086, messageQueue=MessageQueue [topic=topicClean, brokerName=broker-a, queueId=2], queueOffset=4]
檢視 RocketMQ 控制檯訊息介面,也可以查詢到剛剛發出來的訊息:
那麼消費者是否成功的消費到訊息了呢?這個我們暫時不清楚。
檢視消費者控制檯,很完美,消費者接收到了生產者的訊息:
接收訊息:syncMessage
同樣,也可以檢視 RocketMQ
控制檯消費者介面,上面我們確定的消費者組是consumer-group
,點選檢視消費詳情,是能夠看到成功地消費到了訊息:
生產者傳送的訊息成功被消費者消費,說明了基本的訊息流程是沒問題的。
上面我們傳送的是同步訊息,那這麼說除了同步訊息,還有其他哪幾種訊息阿?不瞭解,那我們就繼續往下看。
上面傳送的同步訊息屬於普通訊息,普通訊息就是 RocketMQ 中無特性的訊息,包含了同步訊息、非同步訊息、單步傳送訊息 3 種。
同步訊息是指訊息傳送方發出一條訊息後,會在收到伺服器端返回響應之後才發下一條訊息的通訊方式。
流程如下:
事務訊息傳送步驟如下:
RocketMQ Broker
。RocketMQ Broker
將訊息持久化成功之後,向生產者返回 Ack 確認訊息已經傳送成功,此時訊息暫不能投遞,為半事務訊息。應用場景:如使用者發起轉賬後,交易狀態短暫掛起,傳送指令給銀行,如果發起失敗則不傳送指令,傳送成功後等待結果更新交易狀態。
範例程式碼如下:
生產者:
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* 檢查本地事務的狀態
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
logger.info("start check Local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try {
String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
logger.info("check trans msg content:{}", jsonStr);
} catch (Exception e) {
//異常就回滾
resultState = RocketMQLocalTransactionState.ROLLBACK;
}
return resultState;
}
說明:傳送事務訊息採用的是 sendMessageInTransaction 方法,返回結果為 TransactionSendResult 物件,該物件中包含了事務傳送的狀態、本地事務執行的狀態等。
生產者監聽器
傳送事務訊息除了生產者和消費者以外,我們還需要建立生產者的訊息監聽器,來監聽本地事務執行的狀態和檢查本地事務狀態。
/**
* 事務訊息監聽器
*/
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* 執行本地事務
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
logger.info("start invoke local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try {
//處理業務
String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
logger.info("invoke msg content:{}", jsonStr);
String UUID = (String) arg;
logger.info("UUID:" + UUID);
} catch (Exception e) {
logger.error("invoke local mq trans error", e);
resultState = RocketMQLocalTransactionState.UNKNOWN;
}
return resultState;
}
/**
* 檢查本地事務的狀態
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
logger.info("start check Local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try {
String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
logger.info("check trans msg content:{}", jsonStr);
} catch (Exception e) {
//異常就回滾
resultState = RocketMQLocalTransactionState.ROLLBACK;
}
return resultState;
}
}
executeLocalTransaction
是半事務訊息傳送成功後,執行本地事務的方法,具體執行完本地事務後,可以在該方法中返回以下三種狀態:
LocalTransactionState.COMMIT_MESSAGE
:提交事務,允許消費者消費該訊息LocalTransactionState.ROLLBACK_MESSAGE
:回滾事務,訊息將被丟棄不允許消費。LocalTransactionState.UNKNOW
:暫時無法判斷狀態,等待固定時間以後 Broker 端根據回查規則向生產者進行訊息回查。checkLocalTransaction
是由於二次確認訊息沒有收到,Broker 端回查事務狀態的方法。回查規則:本地事務執行完成後,若 Broker 端收到的本地事務返回狀態為 LocalTransactionState.UNKNOW,或生產者應用退出導致本地事務未提交任何狀態。則 Broker 端會向訊息生產者發起事務回查,第一次回查後仍未獲取到事務狀態,則之後每隔一段時間會再次回查。
消費者
/**
* 消費者監聽器
*/
@Component
@RocketMQMessageListener(
consumerGroup = "consumer-group", //消費者組
topic = "topicClean", //topic
selectorExpression = "tagTest || tagB" //tag
)
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("接收訊息:" + message);
}
}
說明:事務訊息的消費者與普通的消費者沒有區別。
測試
呼叫事務訊息介面,控制檯列印紀錄檔如下:
l.p.listen.TransactionMsgListener : start invoke local rocketMQ transaction
l.p.listen.TransactionMsgListener : invoke msg content:this is transactionMessage
l.p.listen.TransactionMsgListener : UUID:39030439-551f-407a-970b-a85f0671bfac
l.p.controller.ProducerController : sendStatus:SEND_OK,localTransactionState:COMMIT_MESSAGE
通過紀錄檔我們可以看出,執行的流程與上述的一致,執行成功後,訊息執行成功返回的結果為 SEND_OK,本地事務執行的狀態為 COMMIT_MESSAGE。
異常測試
這裡將修改executeLocalTransaction
方法內容,當處理業務出現異常時,直接設定本地事務狀態為ROLLBACK
。
/**
* 執行本地事務
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
logger.info("start invoke local rocketMQ transaction");
RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
try {
//處理業務
String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
logger.info("invoke msg content:{}", jsonStr);
//丟擲異常
int i = 1/0;
} catch (Exception e) {
logger.error("invoke local mq trans error", e);
//設定事務狀態為回滾
resultState = RocketMQLocalTransactionState.ROLLBACK;
}
return resultState;
}
注意:
executeLocalTransaction
返回本地事務狀態為UNKNOWN
,Broker 端會進行事務回查,而事務回查執行的就是checkLocalTransaction
方法。executeLocalTransaction
返回本地事務狀態為ROLLBACK
,則直接丟棄準備發給消費者的訊息,結束訊息傳送流程。檢視控制檯,紀錄檔列印結果如下:
l.p.listen.TransactionMsgListener : start invoke local rocketMQ transaction
l.p.listen.TransactionMsgListener : invoke msg content:this is transactionMessage
l.p.listen.TransactionMsgListener : invoke local mq trans error
l.p.controller.ProducerController : sendStatus:SEND_OK,localTransactionState:ROLLBACK_MESSAGE
通過紀錄檔可以看出訊息執行成功返回的結果為 SEND_OK,本地事務執行的狀態為 ROLLBACK_MESSAGE。
本文演示了 Springboot 專案下 RocketMQ 訊息的傳送及消費流程,由最基本的同步訊息舉例講解延伸到順序訊息、延時訊息及事務訊息這幾種不同的訊息型別。
想了解有關 RocketMQ 的更多知識點,且聽下回(肝有點疼)。
參考資料: