阿里雲體驗有獎:使用PolarDB-X與Flink搭建實時資料大屏

2022-06-29 15:01:01

體驗簡介

場景將提供一臺設定了CentOS 8.5作業系統的ECS範例(雲伺服器)。通過本教學的操作帶您體驗如何使用PolarDB-X與Flink搭建一個實時資料鏈路,模擬阿里巴巴雙十一GMV大屏。

實驗準備

1. 建立實驗資源

開始實驗之前,您需要先建立ECS範例資源。

  1. 在實驗室頁面,單擊建立資源

  2. (可選)在實驗室頁面左側導航欄中,單擊雲產品資源列表,可檢視本次實驗資源相關資訊(例如IP地址、使用者資訊等)。

說明:資源建立過程需要1~3分鐘。

2. 安裝PolarDB-X

本步驟將指導您如何安裝PolarDB-X。

  1. 安裝並啟動Docker。

    1. 執行如下命令,安裝Docker。
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun


  1. 執行如下命令,啟動Docker。
systemctl start docker


  1. 執行如下命令,安裝PolarDB-X。
docker run -d --name some-polardb-x -p 8527:8527 polardbx/polardb-x:2.1.0


3. 在PolarDB-X中準備訂單表

PolarDB-X支援通過MySQL Client命令列、第三方使用者端以及符合MySQL互動協定的第三方程式程式碼進行連線。本實驗使用MySQL Client命令列連線到PolarDB-X資料庫。

本步驟將指導您如何連線PolarDB-X資料庫,並建立測試庫、測試表和測試資料。

  1. 執行如下命令,安裝MySQL。
yum install mysql -y


  1. 執行如下命令,檢視MySQL版本號。
mysql -V


返回結果如下,表示您已成功安裝MySQL。

  1. 執行如下命令,登入PolarDB-X資料庫。

說明

  • 本實驗場景中的PolarDB-X資料庫使用者名稱和密碼已預設,請您使用下方命令登入即可。

  • 如遇到mysql: [Warning] Using a password on the command line interface can be insecure.ERROR 2013 (HY000): Lost connection to MySQL server at 'reading initial communication packet', system error: 0報錯,請您稍等一分鐘,重新執行登入命令即可。

mysql -h127.0.0.1 -P8527 -upolardbx_root -p123456


返回結果如下,表示您已成功登入PolarDB-X資料庫。

  1. 執行如下SQL語句,建立測試庫mydb。
create database mydb;


  1. 執行如下SQL語句,使用測試庫mydb。
use mydb;


  1. 執行如下SQL語句,建立訂單表orders。
CREATE TABLE `orders` (
 `order_id` int(11) NOT NULL AUTO_INCREMENT,
 `order_date` datetime NOT NULL,
 `customer_name` varchar(255) NOT NULL,
 `price` decimal(10, 5) NOT NULL,
 `product_id` int(11) NOT NULL,
 `order_status` tinyint(1) NOT NULL,
 PRIMARY KEY (`order_id`)
)AUTO_INCREMENT = 10001;


  1. 執行如下SQL語句,給訂單表orders中插入資料。
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);


  1. 輸入exit退出資料庫。

4. 執行Flink

本步驟將指導您如何下載並執行Flink。

  1. 安裝JDK。

a.執行如下命令,使用yum安裝JDK 1.8。

yum -y install java-1.8.0-openjdk*


b.執行如下命令,檢視是否安裝成功。

java -version


返回結果如下,表示您已成功安裝JDK 1.8。

  1. 下載Flink和Flink CDC MySQL Connector。

a.執行如下命令,下載Flink。

wget https://labfileapp.oss-cn-hangzhou.aliyuncs.com/PolarDB-X/flink-1.13.6-bin-scala_2.11.tgz


b.執行如下命令,解壓Flink。

tar xzvf flink-1.13.6-bin-scala_2.11.tgz


c.執行如下命令,進入lib目錄。

cd flink-1.13.6/lib/


d.執行如下命令,下載flink-sql-connector-mysql-cdc。

wget https://labfileapp.oss-cn-hangzhou.aliyuncs.com/PolarDB-X/flink-sql-connector-mysql-cdc-2.2.1.jar


e.執行如下命令,返回Flink目錄。

cd ..


  1. 啟動Flink。

a.執行如下命令,啟動Flink。

./bin/start-cluster.sh


b.執行如下命令,連線Flink。

./bin/sql-client.sh


  1. 在Flink中建立與PolarDB-X關聯的訂單表orders。

a.執行如下SQL語句,建立訂單表orders。

CREATE TABLE orders (
 order_id INT,
 order_date TIMESTAMP(0),
 customer_name STRING,
 price DECIMAL(10, 5),
 product_id INT,
 order_status BOOLEAN,
 PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '8527',
'username' = 'polardbx_root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);


b.執行如下SQL語句,檢視訂單表orders。

select * from orders;


返回結果如下,您可以檢視到PolarDB-X的訂單表orders的資料已經同步到Flink的訂單表orders中。

c.按q鍵退出。

5. 啟動壓測指令碼並實時獲取GMV

經過前面幾步操作後,我們在PolarDB-X中準備好了原始訂單表,在Flink中準備好了對應的訂單表,並通過 PolarDB-X Global Binlog與Flink CDC MySQL Connector打通了兩者之間的實時同步鏈路。 本步驟將指導您如何建立壓測指令碼,模擬雙十一零點大量訂單湧入的場景。

  1. 準備壓測指令碼。

a.在實驗頁面,單擊右上角的圖示,建立新的終端二。

b.執行如下命令,建立組態檔mysql-config.cnf。

vim mysql-config.cnf

c.將如下程式碼新增到組態檔mysql-config.cnf中。

[client]
user = "polardbx_root"
password = "123456"
host = 127.0.0.1
port = 8527

d.新增完成後的檔案內容如下所示。按下Esc鍵後,輸入:wq後按下Enter鍵儲存並退出。

e.執行如下命令,建立指令碼buy.sh

vim buy.sh

f.將如下程式碼新增到指令碼buy.sh中。

#!/bin/bash

echo "start buying..."

count=0
while :
do
 mysql --defaults-extra-file=./mysql-config.cnf -Dmydb -e "insert into orders values(default, now(), 'free6om', 1024, 102, 0)"
 let count++
 if ! (( count % 10 )); then
  let "batch = count/10"
  echo $batch": got 10 products, gave 1024¥"
 fi
 sleep 0.05
done

g.新增完成後的檔案內容如下所示。按下Esc鍵後,輸入:wq後按下Enter鍵儲存並退出。

h.執行如下命令,為指令碼buy.sh增加執行許可權。

chmod +x buy.sh

  1. 啟動Flink實時計算。

本實驗場景通過Flink SQL實時呈現GMV計算結果。

切換至終端一,在Flink中執行如下SQL語句,查詢GMV(gmv列)和訂單數(orders列)。

select 1, sum(price) as gmv, count(order_id) as orders from orders;

返回結果如下,您可在Flink的實時計算結果中檢視到實時的GMV(gmv列)和訂單數(orders列)。

  1. 啟動壓測指令碼。

a.切換至終端二,執行如下命令,啟動壓測指令碼,開始建立訂單。

./buy.sh

返回結果如下,您可看到壓測指令碼啟動後,不斷有訂單被建立出來。

b.切換至終端一,在Flink的實時計算結果中,可檢視到實時的GMV(gmv列)和訂單數(orders列)。

恭喜完成