RocketMQ Streams 1.1.0: 輕量級流處理再出發

2023-02-07 12:01:09

本文作者:倪澤,Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer

01 背景

RocketMQ Streams是一款基於RocketMQ為基礎的輕量級流計算引擎,具有資源消耗少、部署簡單、功能全面的特點,目前已經在社群開源。RocketMQ Streams在阿里雲內部被使用在對資源比較敏感,同時又強烈需要流計算的場景,比如在自建機房的雲安全場景下。

自RocketMQ Streams開源以來,吸引了大量使用者調研和試用。但是也存在一些問題,在RocketMQ Streams 1.1.0中,主要針對以下問題做出了改進和優化。

1、面向使用者API不夠友好,不能使用泛型,不支援自定義序列化/反序列化;

2、程式碼冗餘,在RocketMQ Streams中存在將流處理拓撲序列化反序列化模組,RocketMQ Streams作為輕量級流處理SDK,構建好流處理節點之後應該可以直接處理資料,不存在將流處理拓撲圖本地儲存或者網路傳輸需求。

3、流處理過程不容易理解,含有大量快取、重新整理邏輯;

4、存在大量支援SQL的程式碼,這部分和SDK方式執行流處理任務的邏輯無關;

在RocketMQ Streams 1.1.0中,對上述問題做出了改進,期望能帶來更好的使用體驗。同時,重新設計了流處理拓撲構建過程、去掉冗餘程式碼,使得程式碼更容易被理解。

從今天起,將推出系列文章介紹RocketMQ Streams 1.1.0版本,本次文章主要介紹RocketMQ Streams 1.1.0的API如何使用,如何利用RocketMQ Streams快速構建流處理應用。

02 典型使用範例

本地執行下列範例的步驟:

1、部署RocketMQ 5.0;

2、使用mqAdmin建立topic;

3、構建範例工程,新增依賴,啟動範例。RocketMQ Streams 座標:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-streams</artifactId>
    <version>1.1.0</version>
</dependency>

4、向topic中寫入相應資料,並觀察結果。

更詳細檔案請參考:https://github.com/apache/rocketmq-streams

WordCount

public class WordCount {
    public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("wordCount");


        builder.source("sourceTopic", total -> {
                    String value = new String(total, StandardCharsets.UTF_8);
                    return new Pair<>(null, value);
                })
                .flatMap((ValueMapperAction<String, List<String>>) value -> {
                    String[] splits = value.toLowerCase().split("\W+");
                    return Arrays.asList(splits);
                })
                .keyBy(value -> value)
                .count()
                .toRStream()
                .print();


        TopologyBuilder topologyBuilder = builder.build();


        Properties properties = new Properties();
        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");


        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);


        final CountDownLatch latch = new CountDownLatch(1);


        Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
            @Override
            public void run() {
                rocketMQStream.stop();
                latch.countDown();
            }
        });


        try {
            rocketMQStream.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

WordCount範例要點:

1、JobId wordCount唯一標識流處理任務;

2、自定義的反序列化;

3、一對多轉化;

4、lambda形式從資料中指定Key;

5、支援有狀態計算;

視窗聚合

public class WindowCount {
    public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("windowCountUser");


        AggregateAction<String, User, Num> aggregateAction = (key, value, accumulator) -> new Num(value.getName(), 100);


        builder.source("user", source -> {
                    User user1 = JSON.parseObject(source, User.class);
                    return new Pair<>(null, user1);
                })
                .selectTimestamp(User::getTimestamp)
                .filter(value -> value.getAge() > 0)
                .keyBy(value -> "key")
                .window(WindowBuilder.tumblingWindow(Time.seconds(15)))
                .aggregate(aggregateAction)
                .toRStream()
                .print();


        TopologyBuilder topologyBuilder = builder.build();


        Properties properties = new Properties();
        properties.putIfAbsent(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
        properties.put(Constant.TIME_TYPE, TimeType.EVENT_TIME);
        properties.put(Constant.ALLOW_LATENESS_MILLISECOND, 2000);


        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);


        rocketMQStream.start();
    }
}

視窗聚合範例要點:

1、支援指定時間欄位;

2、支援滑動、捲動、對談多種型別window;

3、支援自定義UDAF型別聚合;

4、支援自定義時間型別和資料最大遲到時間;

雙流JOIN

public class JoinWindow {
    public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("joinWindow");


        //左流
        RStream<User> user = builder.source("user", total -> {
            User user1 = JSON.parseObject(total, User.class);
            return new Pair<>(null, user1);
        });


        //右流
        RStream<Num> num = builder.source("num", source -> {
            Num user12 = JSON.parseObject(source, Num.class);
            return new Pair<>(null, user12);
        });


        //自定義join後的運算
        ValueJoinAction<User, Num, Union> action = new ValueJoinAction<User, Num, Union>() {
            @Override
            public Union apply(User value1, Num value2) {
                ...
            }
        };


        user.join(num)
                .where(User::getName)
                .equalTo(Num::getName)
                .window(WindowBuilder.tumblingWindow(Time.seconds(30)))
                .apply(action)
                .print();


        TopologyBuilder topologyBuilder = builder.build();


        Properties properties = new Properties();
        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");


        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);


        rocketMQStream.start();
    }
}

雙流聚合範例要點:

1、支援window join和非window join,對於非window join,只需要在上述及連表示式中去掉window即可;

2、支援多種視窗型別的window join;

3、支援對join後資料自定義操作;

03 參與貢獻

RocketMQ Streams是Apache RocketMQ的子專案,已經在社群開源,參與RocketMQ Streams相關工作,請參考以下資源:

1、試用RocketMQ Streams,並閱讀相關檔案以瞭解更多資訊;

maven倉庫座標:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-streams</artifactId>
    <version>1.1.0</version>
</dependency>

RocketMQ Streams檔案:

https://rocketmq.apache.org/zh/docs/streams/30RocketMQ Streams Overview

2、參與貢獻:如果你有任何功能請求或錯誤報告,請隨時提交 Pull Request 來分享你的反饋和想法;

社群倉庫:

https://github.com/apache/rocketmq-streams

3、聯絡我們:可以在 GitHub上建立 Issue,向 RocketMQ 郵寄清單傳送電子郵件,或在 RocketMQ Streams SIG 交流群與專家共同探討,RocketMQ Streams SIG加入方式:新增「小火箭」微信,回覆RocketMQ Streams。

郵寄清單:

https://lists.apache.org/[email protected]