Flink SQL管理平臺flink-streaming-platform-web安裝搭建

2022-12-02 12:01:49

文章都在個人部落格網站:https://www.ikeguang.com/ 同步,歡迎存取。

最近看到有人在用flink sql的頁面管理平臺,大致看了下,嘗試安裝使用,比原生的flink sql介面確實好用多了,我們看下原生的,通過bin/sql-client.sh命令進入那個黑框,一隻松鼠,對,就是那個介面。。。。

這個工具不是Flink官方出的,是一個國內的小夥伴寫的,Github地址:

https://github.com/zhp8341/flink-streaming-platform-web

根據github上,作者的描述,flink-streaming-patform-web主要功能:

  • [1] 任務支援單流 、雙流、 單流與維表等。
  • [2] 支援本地模式、yarn-per模式、STANDALONE模式 Application模式
  • [3] 支援catalog、hive。
  • [4] 支援自定義udf、聯結器等,完全相容官方聯結器。
  • [5] 支援sql的線上開發,語法提示,格式化。
  • [6] 支援釘釘告警、自定義回撥告警、自動拉起任務。
  • [7] 支援自定義Jar提交任務。
  • [8] 支援多版本flink版本(需要使用者編譯對應flink版本)。
  • [9] 支援自動、手動savepoint備份,並且從savepoint恢復任務。
  • [10] 支援批任務如:hive。
  • [11] 聯結器、udf等三jar管理

是不是覺得很強大,很多同學已經摩拳擦掌想試試了。

安裝

這裡只介紹flink on yarn模式的安裝,如果你的hadoop叢集已經安裝好了,大概半個小時就能好;否則,安裝hadoop叢集可老費事兒了。總體步驟如下:

第一步 hadoop叢集

這裡假設你的hadoop叢集是好的,yarn是可以正常使用的,8088埠可以存取,如下:

flink on yarn,只需要下載一個flink安裝包即可使用,下載命令:

http://archive.apache.org/dist/flink/flink-1.13.5/flink-1.13.5-bin-scala_2.11.tgz

解壓

tar -xvf flink-1.13.5-bin-scala_2.11.tgz

關鍵:這裡問題來了,我的flink怎麼識別hadoop呢,需要設定一個環境變數,編輯 /etc/profile,鍵入內容:

export HADOOP_CONF_DIR=填你的hadoop組態檔目錄,比如我的是/usr/local/hadoop2.8/etc/hadoop/conf
export HADOOP_CLASSPATH=`hadoop classpath`

好了,這樣一個flink on yarn的環境就搭建好了。

官方地址文章開頭已經給出,在github找到下載地址: https://github.com/zhp8341/flink-streaming-platform-web/releases/,我下載的版本是這個。

為什麼我下的是適配flink 1.14.3的,我前面安裝flink1.13.5,我也是下了一堆flink,經過嘗試,才發現flink1.13.5這個版本,適配flink-streaming-platform-web tagV20220625。

解壓後,修改組態檔:application.properties,懂的人知道這個其實是個springboot的組態檔。

#### jdbc資訊
server.port=9084
spring.datasource.url=jdbc:mysql://192.168.1.1:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=bigdata
spring.datasource.password=bigdata

這裡設定了一個資料庫,需要自己新建一下,建表語句作者給出了:https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql/flink_web.sql,把這段sql執行一下,在flink_web資料庫建相應的一些整個系統執行需要的表。

啟動web服務

# bin目錄下面的命令

啟動 : sh deploy.sh  start
停止 :  sh deploy.sh  stop

服務啟動後,通過9084埠在瀏覽器存取

這一步很關鍵,頁面上點選系統設定,進入設定頁面:

這裡的引數意義:

  1. Flink使用者端目錄:就是安裝的Flink目錄;
  2. Flink管理平臺目錄:就是下載的flink-streaming-platform-web放的目錄;
  3. yarn RM http地址:就是yarn.resourcemanager.webapp.address,通常是8088埠;

經過測試,設定這3個引數即可使用。

第五步 執行demo

這裡以官方demo為例,[demo1 單流kafka寫入mysqld 參考](https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql_demo/demo_1.md),這是一個通過flink sql消費kafka,聚合結果寫入mysql的例子。

  1. 在flink_web資料建表
CREATE TABLE sync_test_1 (
  `day_time` varchar(64) NOT NULL,
  `total_gmv` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`day_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  1. 下載依賴jar包

因為涉及到kafka和mysql,需要對應的connector依賴jar包,下圖中標註出來了,放在Flink的lib目錄(/var/lib/hadoop-hdfs/flink-1.13.5/lib)下面:

wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.5/flink-connector-jdbc_2.11-1.13.5.jar

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.13.5/flink-connector-kafka_2.11-1.13.5.jar

技巧:下載Flink的依賴jar包,有個地方下載很方便,地址是:

https://repo1.maven.org/maven2/org/apache/flink/

這樣依賴,一切都準備好了。

在web頁面新建sql流任務:

我建的一個,任務屬性我是這樣填寫的:

sql指令碼內容:

create table flink_test_1 ( 
    id BIGINT,
    day_time VARCHAR,
    amnount BIGINT,
    proctime AS PROCTIME ()
)
with ( 
    'connector' = 'kafka',
    'topic' = 'flink_connector',
    'properties.bootstrap.servers' = 'kafka-001:9092', 
    'properties.group.id' = 'flink_gp_test1',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);
CREATE TABLE sync_test_1 (
    day_time string,
    total_gmv bigint,
    PRIMARY KEY (day_time) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.1.1:3306/flink_web?characterEncoding=UTF-8',
    'table-name' = 'sync_test_1',
    'username' = 'bigdata',
    'password' = 'bigdata'
);
INSERT INTO sync_test_1 
SELECT day_time,SUM(amnount) AS total_gmv
FROM flink_test_1
GROUP BY day_time;

建立好任務後,啟動任務吧。

啟動後,可以在yarn的8088埠頁面看到起了一個application,名稱是新建任務填寫的名稱加flink@字首:

這個任務,我們點進去看,通過管理平臺提交的sql任務確實跑起來了,這個頁面瞭解Flink的同學就很熟悉了:

其實,這段sql指令碼,我們可以在flink的bin/sql-client.sh進入的那個小松鼠的黑框裡面執行的,你們可以試一下。

kafka控制檯往主題裡面寫資料,主題不存在會自動建立:

我們再看看mysql裡面:

資料已經進來了。

我們可以看到,flink-streaming-platform-web這個工具只是讓我們不需要在這個黑框裡面寫sql了,而是在網頁上面寫sql,系統會把寫的sql進行校驗給flink去執行,不管是flink-streaming-platform-web網頁也好,還是那個黑框sql控制檯,都是使用者端,本質上都是flink提供的一些table api去執行任務。