文章都在個人部落格網站:https://www.ikeguang.com/ 同步,歡迎存取。
最近看到有人在用flink sql的頁面管理平臺,大致看了下,嘗試安裝使用,比原生的flink sql介面確實好用多了,我們看下原生的,通過bin/sql-client.sh命令進入那個黑框,一隻松鼠,對,就是那個介面。。。。
這個工具不是Flink官方出的,是一個國內的小夥伴寫的,Github地址:
https://github.com/zhp8341/flink-streaming-platform-web
根據github上,作者的描述,flink-streaming-patform-web主要功能:
是不是覺得很強大,很多同學已經摩拳擦掌想試試了。
這裡只介紹flink on yarn模式的安裝,如果你的hadoop叢集已經安裝好了,大概半個小時就能好;否則,安裝hadoop叢集可老費事兒了。總體步驟如下:
這裡假設你的hadoop叢集是好的,yarn是可以正常使用的,8088埠可以存取,如下:
flink on yarn,只需要下載一個flink安裝包即可使用,下載命令:
http://archive.apache.org/dist/flink/flink-1.13.5/flink-1.13.5-bin-scala_2.11.tgz
解壓
tar -xvf flink-1.13.5-bin-scala_2.11.tgz
關鍵:這裡問題來了,我的flink怎麼識別hadoop呢,需要設定一個環境變數,編輯 /etc/profile,鍵入內容:
export HADOOP_CONF_DIR=填你的hadoop組態檔目錄,比如我的是/usr/local/hadoop2.8/etc/hadoop/conf
export HADOOP_CLASSPATH=`hadoop classpath`
好了,這樣一個flink on yarn的環境就搭建好了。
官方地址文章開頭已經給出,在github找到下載地址: https://github.com/zhp8341/flink-streaming-platform-web/releases/,我下載的版本是這個。
為什麼我下的是適配flink 1.14.3的,我前面安裝flink1.13.5,我也是下了一堆flink,經過嘗試,才發現flink1.13.5這個版本,適配flink-streaming-platform-web tagV20220625。
解壓後,修改組態檔:application.properties,懂的人知道這個其實是個springboot的組態檔。
#### jdbc資訊
server.port=9084
spring.datasource.url=jdbc:mysql://192.168.1.1:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=bigdata
spring.datasource.password=bigdata
這裡設定了一個資料庫,需要自己新建一下,建表語句作者給出了:https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql/flink_web.sql,把這段sql執行一下,在flink_web資料庫建相應的一些整個系統執行需要的表。
啟動web服務
# bin目錄下面的命令
啟動 : sh deploy.sh start
停止 : sh deploy.sh stop
服務啟動後,通過9084埠在瀏覽器存取
這一步很關鍵,頁面上點選系統設定,進入設定頁面:
這裡的引數意義:
經過測試,設定這3個引數即可使用。
這裡以官方demo為例,[demo1 單流kafka寫入mysqld 參考](https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql_demo/demo_1.md),這是一個通過flink sql消費kafka,聚合結果寫入mysql的例子。
CREATE TABLE sync_test_1 (
`day_time` varchar(64) NOT NULL,
`total_gmv` bigint(11) DEFAULT NULL,
PRIMARY KEY (`day_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
因為涉及到kafka和mysql,需要對應的connector依賴jar包,下圖中標註出來了,放在Flink的lib目錄(/var/lib/hadoop-hdfs/flink-1.13.5/lib)下面:
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.5/flink-connector-jdbc_2.11-1.13.5.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.13.5/flink-connector-kafka_2.11-1.13.5.jar
技巧:下載Flink的依賴jar包,有個地方下載很方便,地址是:
https://repo1.maven.org/maven2/org/apache/flink/
這樣依賴,一切都準備好了。
在web頁面新建sql流任務:
我建的一個,任務屬性我是這樣填寫的:
sql指令碼內容:
create table flink_test_1 (
id BIGINT,
day_time VARCHAR,
amnount BIGINT,
proctime AS PROCTIME ()
)
with (
'connector' = 'kafka',
'topic' = 'flink_connector',
'properties.bootstrap.servers' = 'kafka-001:9092',
'properties.group.id' = 'flink_gp_test1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE TABLE sync_test_1 (
day_time string,
total_gmv bigint,
PRIMARY KEY (day_time) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.1:3306/flink_web?characterEncoding=UTF-8',
'table-name' = 'sync_test_1',
'username' = 'bigdata',
'password' = 'bigdata'
);
INSERT INTO sync_test_1
SELECT day_time,SUM(amnount) AS total_gmv
FROM flink_test_1
GROUP BY day_time;
建立好任務後,啟動任務吧。
啟動後,可以在yarn的8088埠頁面看到起了一個application,名稱是新建任務填寫的名稱加flink@字首:
這個任務,我們點進去看,通過管理平臺提交的sql任務確實跑起來了,這個頁面瞭解Flink的同學就很熟悉了:
其實,這段sql指令碼,我們可以在flink的bin/sql-client.sh進入的那個小松鼠的黑框裡面執行的,你們可以試一下。
kafka控制檯往主題裡面寫資料,主題不存在會自動建立:
我們再看看mysql裡面:
資料已經進來了。
我們可以看到,flink-streaming-platform-web這個工具只是讓我們不需要在這個黑框裡面寫sql了,而是在網頁上面寫sql,系統會把寫的sql進行校驗給flink去執行,不管是flink-streaming-platform-web網頁也好,還是那個黑框sql控制檯,都是使用者端,本質上都是flink提供的一些table api去執行任務。