最近有個需求,需要使用flinksql讀寫redis,由於官網上並沒有redis的connector,在網上找了很久,開源的幾個connector又沒法滿足要求,所有這裡就自己動手實現了一個。已經適配了各個版本的flink,從flink1.12到flink1.15。
簡單介紹一下功能吧:
@Before public void init() { /** 設定當前屬於測試模式,在這個測試模式下,當流表資料消費完成後程式會停止,方便測試,這個模式預設false */ RedisOptions.IS_TEST = true; RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0); List<String> lists = new ArrayList<>(); for (int i = 0; i < 1000; i++) { lists.add("{\n" + " \"number\": " + i + ",\n" + " \"name\": \"學生" + i + "\",\n" + " \"school\": \"學校" + ((i % 3) + 1) +"\",\n" + " \"class_id\": " + ((i % 10) + 1) +"\n" + "}"); } /** * 初始化學生資料 */ for (int i = 0; i < 1; i++) { redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1))); } /** * 初始化班級資料 */ for(int i = 0;i < 10;i++) { redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班"); } /** * 初始化學校班級資料 */ for(int j = 1;j < 4;j++) { for (int i = 1; i < 11; i++) { redisOperator.hset("學校" + j, String.valueOf(i), "銀河" + i + "班"); } } }
@Test public void testBlpopSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT \n" + ") \n" + "WITH (\n" + " 'connector'='print'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(sink); String sql = " insert into sink_students select * from students"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
@Before public void init() { /** 設定當前屬於測試模式,在這個測試模式下,當流表資料消費完成後程式會停止,方便測試,這個模式預設false */ RedisOptions.IS_TEST = true; RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0); List<String> lists = new ArrayList<>(); for (int i = 0; i < 1000; i++) { lists.add("{\n" + " \"number\": " + i + ",\n" + " \"name\": \"學生" + i + "\",\n" + " \"school\": \"學校" + ((i % 3) + 1) +"\",\n" + " \"class_id\": " + ((i % 10) + 1) +"\n" + "}"); } /** * 初始化學生資料 */ for (int i = 0; i < 1; i++) { redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1))); } /** * 初始化班級資料 */ for(int i = 0;i < 10;i++) { redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班"); } /** * 初始化學校班級資料 */ for(int j = 1;j < 4;j++) { for (int i = 1; i < 11; i++) { redisOperator.hset("學校" + j, String.valueOf(i), "銀河" + i + "班"); } } }
@Test public void testGetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; /** 這裡需要注意的是,由於使用get命令,而且沒有加format屬性,所以維表只能有兩個欄位,多了也識別不到, 詳細可以看原始碼裡的註釋 */ String daeamon = "CREATE TABLE classes\n" + "(\n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='GET'\n" + " )"; String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " class_name string \n" + ") \n" + "WITH (\n" + " 'connector'='print'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); /** 這裡join的欄位必須是GET命令的key */ String sql = " insert into sink_students " + " select s.number,s.name,s.school,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
@Test public void testHGetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; /** 這裡需要注意的是,由於使用hget命令,而且沒有加format屬性,所以維表只能有三個欄位,多了也識別不到, 詳細可以看原始碼裡的註釋 */ String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " class_name string \n" + ") \n" + "WITH (\n" + " 'connector'='print'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); /** 這裡需要注意的是,由於使用hget命令,這裡join的引數兩個引數順序沒有關係,真正執行hget命令哪個欄位作為key, 哪個欄位作為field只與維表定義的時候的欄位順序有關係 */ String sql = " insert into sink_students " + " select s.number,s.name,s.school,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
@Before public void init() { RedisOptions.IS_TEST = true; RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0); List<String> lists = new ArrayList<>(); for (int i = 0; i < 1000; i++) { lists.add("{\n" + " \"number\": " + i + ",\n" + " \"name\": \"學生" + i + "\",\n" + " \"school\": \"學校" + ((i % 3) + 1) +"\",\n" + " \"class_id\": " + ((i % 10) + 1) +"\n" + "}"); } /** * 初始化學生資料 */ for (int i = 0; i < 1; i++) { redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1))); } /** * 初始化班級資料 */ for(int i = 0;i < 10;i++) { JSONObject jsonObject = new JSONObject(); jsonObject.put("class_id",String.valueOf(i + 1)); jsonObject.put("class_name","銀河" + (i + 1) + "班"); jsonObject.put("remark","remark" + i); redisOperator.set(String.valueOf(i + 1),jsonObject.toString()); } /** * 初始化學校班級資料 */ for(int j = 1;j < 4;j++) { for (int i = 1; i < 11; i++) { JSONObject jsonObject = new JSONObject(); jsonObject.put("class_id",String.valueOf(i)); jsonObject.put("class_name","銀河" + i + "班"); jsonObject.put("remark","remark" + i); jsonObject.put("school","學校" + j); redisOperator.hset("學校" + j, String.valueOf(i), jsonObject.toString()); } } }
@Test public void testGetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; /** * 這裡測試的核心是維表有format=json設定項,有了format設定項後,欄位個數不受限制,但是需要注意的是,作為get命令的key的欄位 * 一定要放在表申明的第一位,並且get命令的value的值使用format格式化後,比如是json格式,則json裡一定要包含作為維表查詢的 * join on後面帶的作為key的查詢列,不然會報空指標異常 */ String daeamon = "CREATE TABLE classes\n" + "(\n" + " class_id BIGINT ,\n" + " class_name string ,\n " + " remark string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'format'='json', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='GET'\n" + " )"; String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " class_name string, \n" + " remark string " + ") \n" + "WITH (\n" + " 'connector'='print'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.number,s.name,s.school,s.class_id,d.class_name,d.remark from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
@Test public void testHGetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; /** * 這裡測試的核心是維表有format=json設定項,有了format設定項後,欄位個數不受限制,但是需要注意的是,作為hget命令的key的欄位 * 一定要放在表申明的第一位,field的欄位一定要放在申明的第二位,並且hget命令的value的值使用format格式化後,比如是json格式, * 則json裡一定要包含作為維表查詢的 join on後面帶的作為key和field的查詢列,不然會報空指標異常 */ String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string, " + " remark string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'format'='json', \n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " class_name string, \n" + " remark string " + ") \n" + "WITH (\n" + " 'connector'='print'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.number,s.name,s.school,s.class_id,d.class_name,d.remark from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
@Before public void init() { RedisOptions.IS_TEST = true; RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0); List<String> lists = new ArrayList<>(); for (int i = 0; i < 1000; i++) { lists.add("{\n" + " \"number\": " + i + ",\n" + " \"name\": \"學生" + i + "\",\n" + " \"school\": \"學校" + ((i % 3) + 1) +"\",\n" + " \"class_id\": " + ((i % 10) + 1) +"\n" + "}"); } /** * 初始化學生資料 */ for (int i = 0; i < 1; i++) { redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1))); } /** * 初始化班級資料 */ for(int i = 0;i < 10;i++) { redisOperator.set(String.valueOf(i + 1),"銀河" + (i + 1) + "班"); } /** * 初始化學校班級資料 */ for(int j = 1;j < 4;j++) { for (int i = 1; i < 11; i++) { redisOperator.hset("學校" + j, String.valueOf(i), "銀河" + i + "班"); } } }
@Test public void testLPushSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; /** * 1、這裡因為command是LPUSH,所以不需要primary key(number) not enforced, 因為這種命令只支援INSERT語意 * 2、並行度設定項sink.parallelism沒有設定,預設為核心數 */ String sink = "CREATE TABLE sink_students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " class_name string \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='sink_students_list',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='LPUSH'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.number,s.name,s.school,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); }
@Test public void testSet() throws Exception { long start = System.currentTimeMillis(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; /** * 1、這裡因為command是SET,所以需要一個key,這裡key就是使用主鍵,多個就用下劃線拼接起來, * 2、並行度設定項sink.parallelism沒有設定,預設為核心數 */ String sink = "CREATE TABLE sink_students\n" + "(\n" + " school string, \n" + " number BIGINT ,\n" + " name string,\n" + " class_id BIGINT, \n" + " class_name string, \n" + " primary key(school,number) not enforced" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='SET'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.school,s.number,s.name,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); long end = System.currentTimeMillis(); System.out.println("耗時:" + (end - start) + "ms"); }
@Test public void testHSet() throws Exception { long start = System.currentTimeMillis(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; /** * 1、這裡因為command是HSET,所以需要一個key和一個field,這裡是按照表申明的順序,第一個作為key, * 第二個作為field,由於需要更新,也需要一個主鍵,這裡最好把前兩個欄位一起作為主鍵 * 2、作為sink有一個sink.key.ttl引數可以設定key儲存在redis的ttl生存時間,單位秒,預設為-1表示長期儲存 */ String sink = "CREATE TABLE sink_students\n" + "(\n" + " school string, \n" + " number BIGINT ,\n" + " name string,\n" + " class_id BIGINT, \n" + " class_name string, \n" + " primary key(school,number) not enforced" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'sink.parallelism' = '16',\n" + " 'sink.key.ttl' = '300',\n" + " 'command'='HSET'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.school,s.number,s.name,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); long end = System.currentTimeMillis(); System.out.println("耗時:" + (end - start) + "ms"); }
@Test public void testHSetWithKey() throws Exception { long start = System.currentTimeMillis(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); String source = "CREATE TABLE students\n" + "(\n" + " number BIGINT ,\n" + " name string,\n" + " school string, \n" + " class_id BIGINT, \n" + " proctime as PROCTIME() \n" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'key'='students',\n" + " 'format'='json',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'command'='BLPOP'\n" + " )"; String daeamon = "CREATE TABLE classes\n" + "(\n" + " school string, \n" + " class_id BIGINT ,\n" + " class_name string " + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'lookup.cache.max-rows'='1000',\n" + " 'lookup.cache.ttl'='3600',\n" + " 'lookup.cache.load-all'='true',\n" + " 'database'='0',\n" + " 'command'='HGET'\n" + " )"; /** * 1、這裡因為command是HSET,所以需要一個key和一個field,這裡設定項指定了key,那麼主鍵拼接就作為field, * 使用hset儲存到redis * 2、作為sink有一個sink.key.ttl引數可以設定key儲存在redis的ttl生存時間,單位秒,預設-1表示長期儲存 */ String sink = "CREATE TABLE sink_students\n" + "(\n" + " school string, \n" + " number BIGINT ,\n" + " name string,\n" + " class_id BIGINT, \n" + " class_name string, \n" + " primary key(number) not enforced" + ") \n" + "WITH (\n" + " 'connector'='redis',\n" + " 'host'='10.201.0.33', \n" + " 'port'='6379',\n" + " 'redis-mode'='single', \n" + " 'password'='123456',\n" + " 'database'='0',\n" + " 'format'='json',\n" + " 'key'='sink_students_hset',\n" + " 'batch-fetch-rows'='1000',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true',\n" + " 'sink.parallelism' = '16',\n" + " 'sink.key.ttl' = '300',\n" + " 'command'='HSET'\n" + " )"; tEnv.executeSql(source); tEnv.executeSql(daeamon); tEnv.executeSql(sink); String sql = " insert into sink_students " + " select s.school,s.number,s.name,s.class_id,d.class_name from students s" + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); long end = System.currentTimeMillis(); System.out.println("耗時:" + (end - start) + "ms"); }
設定項 | 描述 |
---|---|
host | redis的host |
port | redis的port |
password | redis的password |
cluster-nodes | redis的叢集節點,ip和埠之間用英文冒號分隔,多個ip埠用英文逗號分割 |
master.name | redis的sentinel模式的master節點的名稱 |
sentinels.info | redis的sentinel模式的info資訊 |
sentinels.password | redis的sentinel模式的密碼 |
database | redis的database,一般是0~15 |
command | redis的命令,作為流表時支援BLPOP、BRPOP、LPOP、RPOP、SPOP;作為維表時支援GET、HGET;作為sink表時支援LPUSH、RPUSH、SADD、SET、HSET |
redis-mode | redis的部署模式,single、cluster、sentinel |
key | redis需要存取的key,比如資料是以某個固定的key存放在redis裡,值是一個list;redis執行lpush、rpush、sadd、hset等sink使用的命令時的key; |
timeout | 連線redis的超時時間,單位毫秒 |
max-total | 連線redis的連線池的最大連線數 |
max-idle | 連線redis的連線池的最大空閒數 |
min-idle | 連線redis的連線池的最小空閒數 |
format | 格式化資料格式,如json、csv |
batch-fetch-rows | 像LPOP、BLPOP、RPOP、BRPOP這種命令每次從redis拿到資料的條數 |
lookup.cache.max-rows | 作為維表lookup模式,快取在記憶體中的資料的最大條數 |
lookup.cache.ttl | 作為維表lookup模式,快取在記憶體中的資料的ttl超時時間,單位秒 |
lookup.max-retries | 作為維表lookup模式,查詢資料的失敗重試次數 |
lookup.cache.load-all | 作為維表lookup模式,查詢資料是否載入所有,主要是針對hget命令,如:HGET KEY_NAME FIELD_NAME;是否根據key查出所有field的值,這裡可以根據實際hash表的大小決定是否要查詢所有出來快取起來 |
sink.max-retries | redis作為sink源時,最大重試次數 |
sink.parallelism | redis作為sink源時,sink的並行數,預設並行度為核心數 |
sink.key.ttl | redis作為sink源時,sink的資料儲存在redis的ttl超時時間,單位秒,預設為-1表示長期儲存 |
lookup.max-retries | 作為維表lookup模式,查詢資料的失敗重試次數 |
原始碼地址:https://gitee.com/rongdi/flinksql-connector-redis/