一文詳解RocketMQ-Spring的原始碼解析與實戰

2023-04-24 15:00:42
摘要:這篇文章主要介紹 Spring Boot 專案使用 rocketmq-spring SDK 實現訊息收發的操作流程,同時筆者會從開發者的角度解讀 SDK 的設計邏輯。

本文分享自華為雲社群《RocketMQ-Spring : 實戰與原始碼解析一網打盡》,作者:勇哥java實戰分享。

RocketMQ 是大家耳熟能詳的訊息佇列,開源專案 rocketmq-spring 可以幫助開發者在 Spring Boot 專案中快速整合 RocketMQ。

這篇文章會介紹 Spring Boot 專案使用 rocketmq-spring SDK 實現訊息收發的操作流程,同時筆者會從開發者的角度解讀 SDK 的設計邏輯。

一 SDK 簡介

專案地址:https://github.com/apache/rocketmq-spring

rocketmq-spring 的本質是一個 Spring Boot starter 。

Spring Boot 基於「約定大於設定」(Convention over configuration)這一理念來快速地開發、測試、執行和部署 Spring 應用,並能通過簡單地與各種啟動器(如 spring-boot-web-starter)結合,讓應用直接以命令列的方式執行,不需再部署到獨立容器中。

Spring Boot starter 構造的啟動器使用起來非常方便,開發者只需要在 pom.xml 引入 starter 的依賴定義,在組態檔中編寫約定的設定即可。

下面我們看下 rocketmq-spring-boot-starter 的設定:

1、引入依賴

<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-spring-boot-starter</artifactId>
 <version>2.2.3</version>
</dependency>

2、約定設定

接下來,我們分別按照生產者和消費者的順序,詳細的講解訊息收發的操作過程。

二 生產者

首先我們新增依賴後,進行如下三個步驟:

1、組態檔中設定如下

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
      group: platform-sms-server-group
    # access-key: myaccesskey
    # secret-key: mysecretkey
  topic: sms-common-topic

生產者設定非常簡單,主要設定名字服務地址和生產者組。

2、需要傳送訊息的類中注入 RcoketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;
​
@Value("${rocketmq.topic}")
private String smsTopic;

3、傳送訊息,訊息體可以是自定義物件,也可以是 Message 物件

rocketMQTemplate 類包含多鍾傳送訊息的方法:

  1. 同步傳送 syncSend
  2. 非同步傳送 asyncSend
  3. 順序傳送 syncSendOrderly
  4. oneway傳送 sendOneWay

下面的程式碼展示如何同步傳送訊息。

String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
 rocketMQTemplate.syncSend(
            destination, 
 MessageBuilder.withPayload(messageContent).
 setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
 build()
 );
if (sendResult != null) {
 if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
 // send message success ,do something 
 }
}

syncSend 方法的第一個引數是傳送的目標,格式是:topic + ":" + tags ,

第二個引數是:spring-message 規範的 message 物件 ,而 MessageBuilder 是一個工具類,方法鏈式呼叫建立訊息物件。

三 消費者

1、組態檔中設定如下

rocketmq:
  name-server: 127.0.0.1:9876
  consumer1:
    group: platform-sms-worker-common-group
    topic: sms-common-topic

2、實現訊息監聽器

@Component
@RocketMQMessageListener(
 consumerGroup = "${rocketmq.consumer1.group}", //消費組
    topic = "${rocketmq.consumer1.topic}" //主題
)
public class SmsMessageCommonConsumer implements RocketMQListener<String> {
 public void onMessage(String message) {
 System.out.println("普通簡訊:" + message);
 }
}

消費者實現類也可以實現 RocketMQListener<MessageExt>, 在 onMessage 方法裡通過 RocketMQ 原生訊息物件 MessageExt 獲取更詳細的訊息資料 。

public void onMessage(MessageExt message) {
 try {
        String body = new String(message.getBody(), "UTF-8");
        logger.info("普通簡訊:" + message);
 } catch (Exception e) {
 logger.error("common onMessage error:", e);
 }
}

四 原始碼概覽

最新原始碼中,我們可以看到原始碼中包含四個模組:

1、rocketmq-spring-boot-parent

該模組是父模組,定義專案所有依賴的 jar 包。

2、rocketmq-spring-boot

核心模組,實現了 starter 的核心邏輯。

3、rocketmq-spring-boot-starter

SDK 模組,簡單封裝,外部專案參照。

4、rocketmq-spring-boot-samples

範例程式碼模組。這個模組非常重要,當用戶使用 SDK 時,可以參考範例快速開發。

五 starter 實現

我們重點分析下 rocketmq-spring-boot 模組的核心原始碼:

spring-boot-starter 實現需要包含如下三個部分:

1、定義 Spring 自身的依賴包和 RocketMQ 的依賴包 ;

2、定義spring.factories 檔案

在 resources 包下建立 META-INF 目錄後,新建 spring.factories 檔案,並在檔案中定義自動載入類,檔案內容是:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

spring boot 會根據檔案中設定的自動化設定類來自動初始化相關的 Bean、Component 或 Service。

3、實現自動載入類

在 RocketMQAutoConfiguration 類的具體實現中,我們重點分析下生產者和消費者是如何分別啟動的。

▍生產者傳送模板類:RocketMQTemplate

RocketMQAutoConfiguration 類定義了兩個預設的 Bean :

首先SpringBoot專案中組態檔中的設定值會根據屬性條件繫結到 RocketMQProperties 物件 中,然後使用 RocketMQ 的原生 API 分別建立生產者 Bean 和拉取消費者 Bean , 分別將兩個 bean 設定到 RocketMQTemplate 物件中。

兩個重點需要強調:

  • 傳送訊息時,將 spring-message 規範下的訊息物件封裝成 RocketMQ 訊息物件
  • 預設拉取消費者 litePullConsumer 。拉取消費者一般用於巨量資料批次處理場景 。

RocketMQTemplate 類封裝了拉取消費者的receive方法,以方便開發者使用。

▍自定義消費者類

下圖是並行消費者的例子:

那麼 rocketmq-spring 是如何自動啟動消費者呢 ?

spring 容器首先註冊了訊息監聽器後置處理器,然後呼叫 ListenerContainerConfiguration 類的 registerContainer 方法 。

對比並行消費者的例子,我們可以看到: DefaultRocketMQListenerContainer 是對 DefaultMQPushConsumer 消費邏輯的封裝。

封裝消費訊息的邏輯,同時滿足 RocketMQListener 泛化介面支援不同引數,比如 String 、MessageExt 、自定義物件 。

首先DefaultRocketMQListenerContainer初始化之後, 獲取 onMessage 方法的引數型別 。

然後消費者呼叫 consumeMessage 處理訊息時,封裝了一個 handleMessage 方法 ,將原生 RocketMQ 訊息物件 MessageExt 轉換成 onMessage 方法定義的引數物件,然後呼叫 rocketMQListener 的 onMessage 方法。

上圖右側標紅的程式碼也就是該方法的精髓:

rocketMQListener.onMessage(doConvertMessage(messageExt));

六 寫到最後

開源專案 rocketmq-spring 有很多值得學習的地方 ,我們可以從如下四個層面逐層進階:

1、學會如何使用 :參考 rocketmq-spring-boot-samples 模組的範例程式碼,學會如何傳送和接收訊息,快速編碼;

2、模組設計:學習專案的模組分層 (父模組、SDK 模組、核心實現模組、範例程式碼模組);

3、starter 設計思路 :定義自動組態檔 spring.factories 、設計設定屬性類 、在 RocketMQ client 的基礎上實現優雅的封裝、深入理解 RocketMQ 原始碼等;

4、舉一反三:當我們理解了 rocketmq-spring 的原始碼,我們可以嘗試模仿該專案寫一個簡單的 spring boot starter。

 

點選關注,第一時間瞭解華為雲新鮮技術~