本次需求場景主要為實現將flinksql中collect()函數輸出的Mutiset(VARCHAR<100>)多行結果轉換為字串。
Flink SQL 的自定義函數是使用者可以自行編寫的一種函數,用於擴充套件 Flink SQL 的功能。自定義函數可以在 SQL 查詢中被呼叫,以完成使用者自定義的資料處理邏輯。 在 Flink SQL 中,自定義函數分為標量函數、表函數和聚合函數三種型別。
標量函數接受一行輸入,返回一行輸出。常見的標量函數有字串函數、數學函數等。使用者可以通過繼承 ScalarFunction 類或實現 ScalarFunction 介面的方式來實現自定義的標量函數。
表函數接受一行輸入,返回多行輸出。在 Flink SQL 中,表函數可以使用 LATERAL TABLE 語法進行呼叫。使用者可以通過繼承 TableFunction 類或實現 TableFunction 介面的方式來實現自定義的表函數。
聚合函數接受多行輸入,返回一行輸出。在 Flink SQL 中,聚合函數可以使用 GROUP BY 語法進行呼叫。使用者可以通過繼承 AggregateFunction 類或實現 AggregateFunction 介面的方式來實現自定義的聚合函數。 在使用自定義函數時,需要將對應的 Jar 包提交到 Flink 叢集中,並在執行任務時將其加入到 Classpath 中。Flink SQL 還提供了 CREATE FUNCTION 語句來註冊使用者自定義的函數,以便在 SQL 查詢中進行呼叫。 總的來說,自定義函數是 Flink SQL 中非常重要的一個功能,可以幫助使用者擴充套件 Flink SQL 的功能,提高資料處理的靈活性和效率。
上面的圖片展示了一個聚合的例子。假設你有一個關於飲料的表。表裡面有三個欄位,分別是 id、name、price,表裡有 5 行資料。假設你需要找到所有飲料裡最貴的飲料的價格,即執行一個 max() 聚合。你需要遍歷所有 5 行資料,而結果就只有一個數值。
自定義聚合函數是通過擴充套件 AggregateFunction 來實現的。AggregateFunction 的工作過程如下。首先,它需要一個 accumulator,它是一個資料結構,儲存了聚合的中間結果。通過呼叫 AggregateFunction 的 createAccumulator() 方法建立一個空的 accumulator。接下來,對於每一行資料,會呼叫 accumulate() 方法來更新 accumulator。當所有的資料都處理完了之後,通過呼叫 getValue 方法來計算和返回最終的結果。
下面幾個方法是每個 AggregateFunction 必須要實現的:
自定義表值聚合函數(UDTAGG)可以把一個表(一行或者多行,每行有一列或者多列)聚合成另一張表,結果中可以有多行多列。
上圖展示了一個表值聚合函數的例子。假設你有一個飲料的表,這個表有 3 列,分別是 id、name 和 price,一共有 5 行。假設你需要找到價格最高的兩個飲料,類似於 top2() 表值聚合函數。你需要遍歷所有 5 行資料,結果是有 2 行資料的一個表。
使用者自定義表值聚合函數是通過擴充套件 TableAggregateFunction 類來實現的。一個 TableAggregateFunction 的工作過程如下。首先,它需要一個 accumulator,這個 accumulator 負責儲存聚合的中間結果。 通過呼叫 TableAggregateFunction 的 createAccumulator 方法來構造一個空的 accumulator。接下來,對於每一行資料,會呼叫 accumulate 方法來更新 accumulator。當所有資料都處理完之後,呼叫 emitValue 方法來計算和返回最終的結果。
下面幾個 TableAggregateFunction 的方法是必須要實現的:
非同步表值函數是非同步查詢外部資料系統的特殊函數。
基於Flink1.14.4叢集,有一批基於某個主鍵生成的collect函數結果資料,需要轉換為字串傳到下游Kafka。由於collect()函數生成的結果是一個多行的集合MutiSet<varchar(100)>,FlinkSQL中暫未支援concat_ws或者concat函數,因此無法將collect生成的多行結果直接通過現有SQL函數轉換為一行字串。基於以上原因,需要開發一個自定義函數實現。
CREATE TABLE "air_data_source_result" (
"id" int NOT NULL DEFAULT '0' COMMENT '主鍵',
"airlineLogo" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"airlineShortCompany" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrActCross" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrActTime" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrAirport" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrCode" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrOntimeRate" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrPlanCross" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrPlanTime" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"arrTerminal" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"checkInTable" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"checkInTableWidth" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depActCross" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depActTime" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depAirport" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depCode" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depPlanCross" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depPlanTime" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"depTerminal" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"flightNo" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"flightState" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"localDate" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"mainFlightNo" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"shareFlag" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
"stateColor" varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (1, 'https://cdn1.133.cn/ticket/airline/image_ca_cca.png', '中國國航', '', '11:11\n', '廣州白雲', 'CAN', '89.65%', '', '11:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '08:15\n', '北京首都', 'PEK', '', '08:00', 'T3', 'CA1351', '到達', '2023-02-27', '', '0', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (2, 'https://cdn1.133.cn/ticket/airline/image_zh_csz.png', '深圳航空', '', '11:11\n', '廣州白雲', 'CAN', '89.65%', '', '11:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '08:15\n', '北京首都', 'PEK', '', '08:00', 'T3', 'ZH1351', '到達', '2023-02-27', 'CA1351', '1', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (3, 'https://cdn1.133.cn/ticket/airline/image_hu_chh.png', '海南航空', '', '11:57\n', '廣州白雲', 'CAN', '75.86%', '', '11:50', 'T1B', 'https://api.133.cn/third/textImg?code=IfLOkkFeJagwbNuqYtoqNg==', '140', '', '08:51\n', '北京首都', 'PEK', '', '08:30', 'T2', 'HU7805', '到達', '2023-02-27', '', '0', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (4, 'https://cdn1.133.cn/ticket/airline/image_ca_cca.png', '中國國航', '', '12:14\n', '廣州白雲', 'CAN', '79.31%', '', '12:20', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '09:19\n', '北京首都', 'PEK', '', '09:00', 'T3', 'CA1321', '到達', '2023-02-27', '', '0', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (5, 'https://cdn1.133.cn/ticket/airline/image_zh_csz.png', '深圳航空', '', '12:14\n', '廣州白雲', 'CAN', '79.31%', '', '12:20', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '09:19\n', '北京首都', 'PEK', '', '09:00', 'T3', 'ZH1321', '到達', '2023-02-27', 'CA1321', '1', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (6, 'https://cdn1.133.cn/ticket/airline/image_hu_chh.png', '海南航空', '', '13:12\n', '廣州白雲', 'CAN', '96.55%', '', '13:40', 'T1B', 'https://api.133.cn/third/textImg?code=IfLOkkFeJagwbNuqYtoqNg==', '140', '', '10:07\n', '北京首都', 'PEK', '', '10:00', 'T2', 'HU7813', '到達', '2023-02-27', '', '0', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (7, 'https://cdn1.133.cn/ticket/airline/image_zh_csz.png', '深圳航空', '', '14:22\n', '廣州白雲', 'CAN', '82.75%', '', '14:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '11:22\n', '北京首都', 'PEK', '', '11:00', 'T3', 'ZH1315', '到達', '2023-02-27', 'CA1315', '1', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (8, 'https://cdn1.133.cn/ticket/airline/image_ca_cca.png', '中國國航', '', '14:22\n', '廣州白雲', 'CAN', '82.75%', '', '14:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '11:22\n', '北京首都', 'PEK', '', '11:00', 'T3', 'CA1315', '到達', '2023-02-27', '', '0', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (9, 'https://cdn1.133.cn/ticket/airline/image_zh_csz.png', '深圳航空', '', '15:13\n', '廣州白雲', 'CAN', '78.57%', '', '15:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '12:19\n', '北京首都', 'PEK', '', '12:00', 'T3', 'ZH1339', '到達', '2023-02-27', 'CA1339', '1', '#4273FE');
INSERT INTO `air_data`.`air_data_source_result` (`id`, `airlineLogo`, `airlineShortCompany`, `arrActCross`, `arrActTime`, `arrAirport`, `arrCode`, `arrOntimeRate`, `arrPlanCross`, `arrPlanTime`, `arrTerminal`, `checkInTable`, `checkInTableWidth`, `depActCross`, `depActTime`, `depAirport`, `depCode`, `depPlanCross`, `depPlanTime`, `depTerminal`, `flightNo`, `flightState`, `localDate`, `mainFlightNo`, `shareFlag`, `stateColor`) VALUES (10, 'https://cdn1.133.cn/ticket/airline/image_ca_cca.png', '中國國航', '', '15:13\n', '廣州白雲', 'CAN', '78.57%', '', '15:25', 'T1B', 'https://api.133.cn/third/textImg?code=FvOGTb%2Bgbgxpgw9zPNG2Qw==', '30', '', '12:19\n', '北京首都', 'PEK', '', '12:00', 'T3', 'CA1339', '到達', '2023-02-27', '', '0', '#4273FE');
create table air_data_source(
id int COMMENT '主鍵',
airlineLogo varchar(100) ,
airlineShortCompany varchar(100) ,
arrActCross varchar(100) ,
arrActTime varchar(100) ,
arrAirport varchar(100) ,
arrCode varchar(100) ,
arrOntimeRate varchar(100) ,
arrPlanCross varchar(100) ,
arrPlanTime varchar(100) ,
arrTerminal varchar(100) ,
checkInTable varchar(100) ,
checkInTableWidth varchar(100) ,
depActCross varchar(100) ,
depActTime varchar(100) ,
depAirport varchar(100) ,
depCode varchar(100) ,
depPlanCross varchar(100) ,
depPlanTime varchar(100) ,
depTerminal varchar(100) ,
flightNo varchar(100) ,
flightState varchar(100) ,
localDate varchar(100) ,
mainFlightNo varchar(100) ,
shareFlag varchar(100) ,
stateColor varchar(100)
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/air_data?serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'root',
'table-name' = 'air_data_source'
)
;
SELECT arrAirport,cast(count(airlineShortCompany) as int) as counts, collect(airlineShortCompany) as collects FROM air_data_source group by arrAirport having count(airlineShortCompany) = 2
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xxxxx.tech</groupId>
<artifactId>alarmCollectPlatform</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<flink.version>1.14.4</flink.version>
</properties>
<dependencies>
<!-- flink依賴引入-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
package com.xxxxx.tech.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
public class MultisetToString extends ScalarFunction implements ResultTypeQueryable<String> {
public String eval(@DataTypeHint("MULTISET<STRING>") Map<String, Integer> multiset) {
return multiset.toString();
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
mvn clean install
將打好的jar包上傳到Flink_HOME的lib目錄下,並重啟叢集
進入bin目錄啟動sql-client,註冊函數
select arrAirport,counts,multiset_to_string(collects) as collects from (
SELECT arrAirport,cast(count(airlineShortCompany) as int) as counts, collect(airlineShortCompany) as collects FROM air_data_source group by arrAirport having count(airlineShortCompany) = 2
) t