RocketMQ 系列(三) 整合 SpringBoot

2023-09-06 12:00:35

RocketMQ 系列(三) 整合 SpringBoot

前兩篇文章介紹了 RocketMQ 基本概念與搭建,現在以它與 SpringBoot 的結合來介紹其基本的用法。

1、建立生產者

1.1、引入依賴

    <!-- RocketMQ -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.2</version>
    </dependency>

注意rocketmq-spring-boot-starter要與RocketMQ的版本一致。

1.2、yaml 設定

application.yaml檔案設定如下:

server:
  port: 9007
spring:
  application:
    name: rockmq-producer
rocketmq:
  # NameServer地址
  name-server: 192.168.0.17:9876
  producer:
    # 生產者組
    group: producer-group
    # 傳送同步訊息失敗時,重試次數,預設是 2
    retry-times-when-send-failed: 2
    # 傳送非同步訊息失敗時,重試次數,預設是 2
    retry-times-when-send-async-failed: 2
    # 傳送訊息超時時間,預設是 3s
    send-message-timeout: 3000

1.3、編寫傳送訊息介面

下面介面傳送的為同步訊息,即必須收到 RocketMQ 服務響應後才能進行下一步,否則一直阻塞。

@RequestMapping("/rocketmq")
@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 傳送同步訊息
     */
    @RequestMapping("/syncSend")
    public void syncSend() {
        // 第一個引數指定Topic與Tag,格式: `topicName:tags`
        // 第二個引數,訊息內容
        SendResult sendResult = rocketMQTemplate.syncSend("topicClean:tagTest", "syncMessage");
        System.out.println("傳送同步訊息結果:" + sendResult.toString());
    }
}

2、建立消費者

消費者的依賴同上面的生產者一樣,同樣是寫下 yaml 檔案設定。

2.1、yaml 設定

server:
  port: 9008
spring:
  application:
    name: rockmq-consumer
rocketmq:
  # NameServer地址
  name-server: 192.168.0.17:9876

2.2、編寫消費者監聽器

生產者傳送訊息到 broker 後,消費者通過監聽的方式獲取broker傳送過來的訊息。實現監聽需要實現 RocketMQListener介面:

/**
 * 消費者監聽器
 */
@Component
@RocketMQMessageListener(
        consumerGroup = "consumer-group",  //消費者組
        topic = "topicClean", //topic
        selectorExpression = "tagTest || tagB" //tag,可以有多個
)
public class ConsumerListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("接收訊息:" + message);
    }
}

分析一下引數內容

  • topic 這個是必須指定的,否則沒有訊息來源。

  • consumerGroup 是消費者組,這個必須制定。一條訊息只能被同一個消費者組裡的一個消費者消費。

  • selectorExpression

    是用於訊息過濾的,我們在生產的時候定義了tag內容,消費者可以指定消費某些tag的訊息,具體策略如下:

    • 預設為 「*」,表示不過濾,消費此 topic 下所有訊息
    • 設定為 「tagA」,表示只消費此 topic 下 TAG = tagA 的訊息
    • 設定為 「tagTest || tagB」,表示消費此 topic 下 TAG = tagTest 或 TAG = tagB 的訊息,以此類推

上面的@RocketMQMessageListener 註解的常用設定引數:

引數 型別 預設值 說明
consumerGroup String 消費者組
topic String Topic
selectorType SelectorType SelectorType.TAG 使用TAG 或者SQL92選擇訊息,預設tag
selectorExpression String "*" 控制哪些訊息可以選擇
consumeMode ConsumeMode ConsumeMode.CONCURRENTLY 消費模式,並行接收還是順序接收,預設並行模式
messageModel MessageModel MessageModel.CLUSTERING 消費模式,廣播模式還是叢集模式,預設叢集模式
consumeThreadMax int 64 最大消費執行緒數
consumeTimeout long 15L 消費超時時間(一條訊息可能會阻塞使用執行緒的最長時間(以分鐘為單位))
nameServer String 組態檔中讀取:$ nameServer地址
accessKey String 組態檔中讀取:$ AK
secretKey String 組態檔中讀取:$ SK
accessChannel String $
maxReconsumeTimes int -1 最大訊息重試次數

3、測試

首先第一步啟動剛剛編寫好的生產者及消費者服務。

呼叫生產者傳送訊息的介面/rocketmq/syncSend後,控制檯返回結果 sendStatus=SEND_OK,表示訊息成功傳送到 broker:

傳送同步訊息結果:SendResult [sendStatus=SEND_OK, msgId=7F000001178C18B4AAC288364E780000, offsetMsgId=7C471A0C00002A9F0000000000031086, messageQueue=MessageQueue [topic=topicClean, brokerName=broker-a, queueId=2], queueOffset=4]

檢視 RocketMQ 控制檯訊息介面,也可以查詢到剛剛發出來的訊息:

那麼消費者是否成功的消費到訊息了呢?這個我們暫時不清楚。

檢視消費者控制檯,很完美,消費者接收到了生產者的訊息:

接收訊息:syncMessage

同樣,也可以檢視 RocketMQ控制檯消費者介面,上面我們確定的消費者組是consumer-group,點選檢視消費詳情,是能夠看到成功地消費到了訊息:

生產者傳送的訊息成功被消費者消費,說明了基本的訊息流程是沒問題的。

上面我們傳送的是同步訊息,那這麼說除了同步訊息,還有其他哪幾種訊息阿?不瞭解,那我們就繼續往下看。

4、訊息型別

4.1、普通訊息

上面傳送的同步訊息屬於普通訊息,普通訊息就是 RocketMQ 中無特性的訊息,包含了同步訊息、非同步訊息、單步傳送訊息 3 種。

4.1.1、同步訊息

同步訊息是指訊息傳送方發出一條訊息後,會在收到伺服器端返回響應之後才發下一條訊息的通訊方式。

流程如下:

事務訊息傳送步驟如下:

  1. 生產者將半事務訊息傳送至 RocketMQ Broker
  2. RocketMQ Broker 將訊息持久化成功之後,向生產者返回 Ack 確認訊息已經傳送成功,此時訊息暫不能投遞,為半事務訊息。
  3. 生產者開始執行本地事務邏輯。
  4. 生產者根據本地事務執行結果向伺服器端提交二次確認結果(Commit或是Rollback),伺服器端收到確認結果後處理邏輯如下:
    • 二次確認結果為 Commit:伺服器端將半事務訊息標記為可投遞,並投遞給消費者。
    • 二次確認結果為 Rollback:伺服器端將回滾事務,不會將半事務訊息投遞給消費者。
  5. 在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間後 MQ Server 將對該訊息發起訊息回查。
  6. 傳送方收到訊息回查後,需要檢查對應訊息的本地事務執行的最終結果。
  7. 傳送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟4對半訊息進行操作。

應用場景:如使用者發起轉賬後,交易狀態短暫掛起,傳送指令給銀行,如果發起失敗則不傳送指令,傳送成功後等待結果更新交易狀態。

範例程式碼如下:

生產者

private Logger logger = LoggerFactory.getLogger(getClass());

/**
     * 檢查本地事務的狀態
     */
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
    logger.info("start check Local rocketMQ transaction");

    RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;

    try {
        String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
        logger.info("check trans msg content:{}", jsonStr);
    } catch (Exception e) {
        //異常就回滾
        resultState = RocketMQLocalTransactionState.ROLLBACK;
    }
    return resultState;
}

說明:傳送事務訊息採用的是 sendMessageInTransaction 方法,返回結果為 TransactionSendResult 物件,該物件中包含了事務傳送的狀態、本地事務執行的狀態等。

生產者監聽器

傳送事務訊息除了生產者和消費者以外,我們還需要建立生產者的訊息監聽器,來監聽本地事務執行的狀態和檢查本地事務狀態。

/**
 * 事務訊息監聽器
 */
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
    private Logger logger = LoggerFactory.getLogger(getClass());


    /**
     * 執行本地事務
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
                                                                 Object arg) {
        logger.info("start invoke local rocketMQ transaction");
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;

        try {
            //處理業務
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("invoke msg content:{}", jsonStr);
            String UUID = (String) arg;
            logger.info("UUID:" + UUID);
        } catch (Exception e) {
            logger.error("invoke local mq trans error", e);
            resultState = RocketMQLocalTransactionState.UNKNOWN;
        }

        return resultState;
    }

    /**
     * 檢查本地事務的狀態
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        logger.info("start check Local rocketMQ transaction");

        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;

        try {
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("check trans msg content:{}", jsonStr);
        } catch (Exception e) {
            //異常就回滾
            resultState = RocketMQLocalTransactionState.ROLLBACK;
        }
        return resultState;
    }
}

executeLocalTransaction 是半事務訊息傳送成功後,執行本地事務的方法,具體執行完本地事務後,可以在該方法中返回以下三種狀態:

  • LocalTransactionState.COMMIT_MESSAGE:提交事務,允許消費者消費該訊息
  • LocalTransactionState.ROLLBACK_MESSAGE:回滾事務,訊息將被丟棄不允許消費。
  • LocalTransactionState.UNKNOW:暫時無法判斷狀態,等待固定時間以後 Broker 端根據回查規則向生產者進行訊息回查。

checkLocalTransaction是由於二次確認訊息沒有收到,Broker 端回查事務狀態的方法。回查規則:本地事務執行完成後,若 Broker 端收到的本地事務返回狀態為 LocalTransactionState.UNKNOW,或生產者應用退出導致本地事務未提交任何狀態。則 Broker 端會向訊息生產者發起事務回查,第一次回查後仍未獲取到事務狀態,則之後每隔一段時間會再次回查。

消費者

/**
 * 消費者監聽器
 */
@Component
@RocketMQMessageListener(
        consumerGroup = "consumer-group",  //消費者組
        topic = "topicClean", //topic
        selectorExpression = "tagTest || tagB" //tag
)
public class ConsumerListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("接收訊息:" + message);
    }
}

說明:事務訊息的消費者與普通的消費者沒有區別。

測試

呼叫事務訊息介面,控制檯列印紀錄檔如下:

l.p.listen.TransactionMsgListener        : start invoke local rocketMQ transaction
l.p.listen.TransactionMsgListener        : invoke msg content:this is transactionMessage
l.p.listen.TransactionMsgListener        : UUID:39030439-551f-407a-970b-a85f0671bfac
l.p.controller.ProducerController        : sendStatus:SEND_OK,localTransactionState:COMMIT_MESSAGE

通過紀錄檔我們可以看出,執行的流程與上述的一致,執行成功後,訊息執行成功返回的結果為 SEND_OK,本地事務執行的狀態為 COMMIT_MESSAGE。

異常測試

這裡將修改executeLocalTransaction方法內容,當處理業務出現異常時,直接設定本地事務狀態為ROLLBACK

/**
     * 執行本地事務
     */
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
                                                             Object arg) {
    logger.info("start invoke local rocketMQ transaction");
    RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;

    try {
        //處理業務
        String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
        logger.info("invoke msg content:{}", jsonStr);
        //丟擲異常
        int i = 1/0;
    } catch (Exception e) {
        logger.error("invoke local mq trans error", e);
        //設定事務狀態為回滾
        resultState = RocketMQLocalTransactionState.ROLLBACK;
    }

    return resultState;
}

注意:

  • executeLocalTransaction 返回本地事務狀態為UNKNOWN,Broker 端會進行事務回查,而事務回查執行的就是checkLocalTransaction方法。
  • 而如果executeLocalTransaction 返回本地事務狀態為ROLLBACK,則直接丟棄準備發給消費者的訊息,結束訊息傳送流程。

檢視控制檯,紀錄檔列印結果如下:

l.p.listen.TransactionMsgListener        : start invoke local rocketMQ transaction
l.p.listen.TransactionMsgListener        : invoke msg content:this is transactionMessage
l.p.listen.TransactionMsgListener        : invoke local mq trans error
l.p.controller.ProducerController        : sendStatus:SEND_OK,localTransactionState:ROLLBACK_MESSAGE

通過紀錄檔可以看出訊息執行成功返回的結果為 SEND_OK,本地事務執行的狀態為 ROLLBACK_MESSAGE。

本文演示了 Springboot 專案下 RocketMQ 訊息的傳送及消費流程,由最基本的同步訊息舉例講解延伸到順序訊息、延時訊息及事務訊息這幾種不同的訊息型別。

想了解有關 RocketMQ 的更多知識點,且聽下回(肝有點疼)。

參考資料: