flinksql讀寫redis

2022-10-11 06:00:59

0、前言

  最近有個需求,需要使用flinksql讀寫redis,由於官網上並沒有redis的connector,在網上找了很久,開源的幾個connector又沒法滿足要求,所有這裡就自己動手實現了一個。已經適配了各個版本的flink,從flink1.12到flink1.15。

  簡單介紹一下功能吧:

  • 將redis作為流表時支援BLPOP、BRPOP、LPOP、RPOP、SPOP等命令;使用lua指令碼封裝的批次彈出提高消費效能
  • 將redis作為維表時支援GET、HGET等命令;支援lookup快取
  • 將redis作為sink表時支援LPUSH、RPUSH、SADD、SET、HSET等命令;支援指定key的ttl時間
  • 支援flink常見的序列化反序列化方式,如json、csv等,具體參見flink官網:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/overview/

1、redis作為流表

1.1、資料準備

  

    @Before
    public void init() {
        /**
            設定當前屬於測試模式,在這個測試模式下,當流表資料消費完成後程式會停止,方便測試,這個模式預設false
        */
        RedisOptions.IS_TEST = true;
        RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
        List<String> lists = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            lists.add("{\n" +
                    "  \"number\": " + i + ",\n" +
                    "  \"name\": \"學生" + i + "\",\n" +
                    "  \"school\": \"學校" + ((i % 3) + 1) +"\",\n" +
                    "  \"class_id\": " + ((i % 10) + 1) +"\n" +
                    "}");
        }
        /**
         * 初始化學生資料
         */
        for (int i = 0; i < 1; i++) {
            redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));
        }
        /**
         * 初始化班級資料
         */
        for(int i = 0;i < 10;i++) {
            redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班");
        }

        /**
         * 初始化學校班級資料
         */
        for(int j = 1;j < 4;j++) {
            for (int i = 1; i < 11; i++) {
                redisOperator.hset("學校" + j, String.valueOf(i), "銀河" + i + "班");
            }
        }
    }

1.2、使用BLPOP、BRPOP、LPOP、RPOP、SPOP消費指定的key的list或者set的資料

    @Test
    public void testBlpopSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";



        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='print'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students select * from students";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

2、redis作為維表(不帶format)

2.1、資料準備

    @Before
    public void init() {
        /**
            設定當前屬於測試模式,在這個測試模式下,當流表資料消費完成後程式會停止,方便測試,這個模式預設false
        */
        RedisOptions.IS_TEST = true;
        RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
        List<String> lists = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            lists.add("{\n" +
                    "  \"number\": " + i + ",\n" +
                    "  \"name\": \"學生" + i + "\",\n" +
                    "  \"school\": \"學校" + ((i % 3) + 1) +"\",\n" +
                    "  \"class_id\": " + ((i % 10) + 1) +"\n" +
                    "}");
        }
        /**
         * 初始化學生資料
         */
        for (int i = 0; i < 1; i++) {
            redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));
        }
        /**
         * 初始化班級資料
         */
        for(int i = 0;i < 10;i++) {
            redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班");
        }

        /**
         * 初始化學校班級資料
         */
        for(int j = 1;j < 4;j++) {
            for (int i = 1; i < 11; i++) {
                redisOperator.hset("學校" + j, String.valueOf(i), "銀河" + i + "班");
            }
        }
    }

2.2、使用GET作為維表查詢命令

    @Test
    public void testGetSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        /**
            這裡需要注意的是,由於使用get命令,而且沒有加format屬性,所以維表只能有兩個欄位,多了也識別不到,
            詳細可以看原始碼裡的註釋
        */
        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='GET'\n" +
                        " )";


        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='print'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);
        /**
            這裡join的欄位必須是GET命令的key
        */
        String sql =
                " insert into sink_students "
                + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"
                + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

2.3、使用HGET作為維表查詢命令

    @Test
    public void testHGetSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";
        /**
            這裡需要注意的是,由於使用hget命令,而且沒有加format屬性,所以維表只能有三個欄位,多了也識別不到,
            詳細可以看原始碼裡的註釋
        */
        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";


        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='print'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);
        /**
            這裡需要注意的是,由於使用hget命令,這裡join的引數兩個引數順序沒有關係,真正執行hget命令哪個欄位作為key,
            哪個欄位作為field只與維表定義的時候的欄位順序有關係
        */
        String sql =
                " insert into sink_students "
                        + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"
                        + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

3、redis作為維表(帶format)

3.1、資料準備

    @Before
    public void init() {
        RedisOptions.IS_TEST = true;
        RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
        List<String> lists = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            lists.add("{\n" +
                    "  \"number\": " + i + ",\n" +
                    "  \"name\": \"學生" + i + "\",\n" +
                    "  \"school\": \"學校" + ((i % 3) + 1) +"\",\n" +
                    "  \"class_id\": " + ((i % 10) + 1) +"\n" +
                    "}");
        }
        /**
         * 初始化學生資料
         */
        for (int i = 0; i < 1; i++) {
            redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));
        }
        /**
         * 初始化班級資料
         */
        for(int i = 0;i < 10;i++) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("class_id",String.valueOf(i + 1));
            jsonObject.put("class_name","銀河" + (i + 1) + "班");
            jsonObject.put("remark","remark" + i);
            redisOperator.set(String.valueOf(i + 1),jsonObject.toString());
        }

        /**
         * 初始化學校班級資料
         */
        for(int j = 1;j < 4;j++) {
            for (int i = 1; i < 11; i++) {
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("class_id",String.valueOf(i));
                jsonObject.put("class_name","銀河" + i + "班");
                jsonObject.put("remark","remark" + i);
                jsonObject.put("school","學校" + j);
                redisOperator.hset("學校" + j, String.valueOf(i), jsonObject.toString());
            }
        }
    }

3.2、使用GET作為維表查詢命令

    @Test
    public void testGetSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        /**
         * 這裡測試的核心是維表有format=json設定項,有了format設定項後,欄位個數不受限制,但是需要注意的是,作為get命令的key的欄位
         * 一定要放在表申明的第一位,並且get命令的value的值使用format格式化後,比如是json格式,則json裡一定要包含作為維表查詢的
         *  join on後面帶的作為key的查詢列,不然會報空指標異常
         */ 
        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string ,\n   " +
                        "    remark  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'format'='json', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='GET'\n" +
                        " )";


        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    remark  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='print'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.number,s.name,s.school,s.class_id,d.class_name,d.remark  from students s"
                        + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

3.3、使用HGET作為維表查詢命令

    @Test
    public void testHGetSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        /**
         * 這裡測試的核心是維表有format=json設定項,有了format設定項後,欄位個數不受限制,但是需要注意的是,作為hget命令的key的欄位
         * 一定要放在表申明的第一位,field的欄位一定要放在申明的第二位,並且hget命令的value的值使用format格式化後,比如是json格式,          * 則json裡一定要包含作為維表查詢的 join on後面帶的作為key和field的查詢列,不然會報空指標異常
         */ 
        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string,   " +
                        "    remark  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'format'='json', \n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";


        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    remark  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='print'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.number,s.name,s.school,s.class_id,d.class_name,d.remark  from students s"
                        + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

4、redis作為sink表

4.1、資料準備

    @Before
    public void init() {
        RedisOptions.IS_TEST = true;
        RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);
        List<String> lists = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            lists.add("{\n" +
                    "  \"number\": " + i + ",\n" +
                    "  \"name\": \"學生" + i + "\",\n" +
                    "  \"school\": \"學校" + ((i % 3) + 1) +"\",\n" +
                    "  \"class_id\": " + ((i % 10) + 1) +"\n" +
                    "}");
        }
        /**
         * 初始化學生資料
         */
        for (int i = 0; i < 1; i++) {
            redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));
        }
        /**
         * 初始化班級資料
         */
        for(int i = 0;i < 10;i++) {
            redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班");
        }

        /**
         * 初始化學校班級資料
         */
        for(int j = 1;j < 4;j++) {
            for (int i = 1; i < 11; i++) {
                redisOperator.hset("學校" + j, String.valueOf(i), "銀河" + i + "班");
            }
        }
    }

4.2、使用LPush、RPUSH、SADD命令作為sink表寫入命令

@Test
    public void testLPushSQL() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、這裡因為command是LPUSH,所以不需要primary key(number) not enforced, 因為這種命令只支援INSERT語意
         *  2、並行度設定項sink.parallelism沒有設定,預設為核心數
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='sink_students_list',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='LPUSH'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"
                + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
    }

4.2、使用SET命令作為sink表寫入命令

    @Test
    public void testSet() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、這裡因為command是SET,所以需要一個key,這裡key就是使用主鍵,多個就用下劃線拼接起來,
         *  2、並行度設定項sink.parallelism沒有設定,預設為核心數
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    primary key(school,number) not enforced" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='SET'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗時:" + (end - start) + "ms");
    }

4.3、使用HSET命令作為sink表寫入命令(不指定key)

    @Test
    public void testHSet() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、這裡因為command是HSET,所以需要一個key和一個field,這裡是按照表申明的順序,第一個作為key,
         *  第二個作為field,由於需要更新,也需要一個主鍵,這裡最好把前兩個欄位一起作為主鍵
         *  2、作為sink有一個sink.key.ttl引數可以設定key儲存在redis的ttl生存時間,單位秒,預設為-1表示長期儲存
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    primary key(school,number) not enforced" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'sink.parallelism' = '16',\n" +
                        "  'sink.key.ttl' = '300',\n" +
                        "  'command'='HSET'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗時:" + (end - start) + "ms");
    }

4.4、使用HSET命令作為sink表寫入命令(指定key)

    @Test
    public void testHSetWithKey() throws Exception {
        long start = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String source =
                "CREATE TABLE students\n" +
                        "(\n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    school   string, \n" +
                        "    class_id   BIGINT, \n" +
                        "    proctime as PROCTIME() \n" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'key'='students',\n" +
                        "  'format'='json',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'command'='BLPOP'\n" +
                        " )";

        String daeamon =
                "CREATE TABLE classes\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    class_id  BIGINT   ,\n" +
                        "    class_name  string   " +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'lookup.cache.max-rows'='1000',\n" +
                        "  'lookup.cache.ttl'='3600',\n" +
                        "  'lookup.cache.load-all'='true',\n" +
                        "  'database'='0',\n" +
                        "  'command'='HGET'\n" +
                        " )";

        /**
         *  1、這裡因為command是HSET,所以需要一個key和一個field,這裡設定項指定了key,那麼主鍵拼接就作為field,
         *  使用hset儲存到redis
         *  2、作為sink有一個sink.key.ttl引數可以設定key儲存在redis的ttl生存時間,單位秒,預設-1表示長期儲存
         */
        String sink =
                "CREATE TABLE sink_students\n" +
                        "(\n" +
                        "    school   string, \n" +
                        "    number  BIGINT ,\n" +
                        "    name  string,\n" +
                        "    class_id   BIGINT, \n" +
                        "    class_name   string, \n" +
                        "    primary key(number) not enforced" +
                        ") \n" +
                        "WITH (\n" +
                        "  'connector'='redis',\n" +
                        "  'host'='10.201.0.33', \n" +
                        "  'port'='6379',\n" +
                        "  'redis-mode'='single', \n" +
                        "  'password'='123456',\n" +
                        "  'database'='0',\n" +
                        "  'format'='json',\n" +
                        "  'key'='sink_students_hset',\n" +
                        "  'batch-fetch-rows'='1000',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true',\n" +
                        "  'sink.parallelism' = '16',\n" +
                        "  'sink.key.ttl' = '300',\n" +
                        "  'command'='HSET'\n" +
                        " )";

        tEnv.executeSql(source);
        tEnv.executeSql(daeamon);
        tEnv.executeSql(sink);

        String sql =
                " insert into sink_students "
                        + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"
                        + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get().getJobExecutionResult().get();
        long end = System.currentTimeMillis();
        System.out.println("耗時:" + (end - start) + "ms");
    }

5、設定說明

設定項描述
host redis的host
port redis的port
password redis的password
cluster-nodes redis的叢集節點,ip和埠之間用英文冒號分隔,多個ip埠用英文逗號分割
master.name redis的sentinel模式的master節點的名稱
sentinels.info redis的sentinel模式的info資訊
sentinels.password redis的sentinel模式的密碼
database redis的database,一般是0~15
command redis的命令,作為流表時支援BLPOP、BRPOP、LPOP、RPOP、SPOP;作為維表時支援GET、HGET;作為sink表時支援LPUSH、RPUSH、SADD、SET、HSET
redis-mode redis的部署模式,single、cluster、sentinel
key redis需要存取的key,比如資料是以某個固定的key存放在redis裡,值是一個list;redis執行lpush、rpush、sadd、hset等sink使用的命令時的key;
timeout 連線redis的超時時間,單位毫秒
max-total 連線redis的連線池的最大連線數
max-idle 連線redis的連線池的最大空閒數
min-idle 連線redis的連線池的最小空閒數
format 格式化資料格式,如json、csv
batch-fetch-rows 像LPOP、BLPOP、RPOP、BRPOP這種命令每次從redis拿到資料的條數
lookup.cache.max-rows 作為維表lookup模式,快取在記憶體中的資料的最大條數
lookup.cache.ttl 作為維表lookup模式,快取在記憶體中的資料的ttl超時時間,單位秒
lookup.max-retries 作為維表lookup模式,查詢資料的失敗重試次數
lookup.cache.load-all 作為維表lookup模式,查詢資料是否載入所有,主要是針對hget命令,如:HGET KEY_NAME FIELD_NAME;是否根據key查出所有field的值,這裡可以根據實際hash表的大小決定是否要查詢所有出來快取起來
sink.max-retries redis作為sink源時,最大重試次數
sink.parallelism redis作為sink源時,sink的並行數,預設並行度為核心數
sink.key.ttl redis作為sink源時,sink的資料儲存在redis的ttl超時時間,單位秒,預設為-1表示長期儲存
lookup.max-retries 作為維表lookup模式,查詢資料的失敗重試次數

原始碼地址:https://gitee.com/rongdi/flinksql-connector-redis/