使用者可以通過 MySQL 協定,使用 INSERT 語句進行資料匯入
INSERT 語句的使用方式和 MySQL 等資料庫中 INSERT 語句的使用方式類似。 INSERT 語句支援以下兩種語法:
INSERT INTO table SELECT ...
INSERT INTO table VALUES(...)
對於 Doris 來說,一個 INSERT 命令就是一個完整的匯入事務。
因此不論是匯入一條資料,還是多條資料,我們都不建議在生產環境使用這種方式進行資料匯入。高頻次的 INSERT 操作會導致在儲存層產生大量的小檔案,會嚴重影響系統效能。
該方式僅用於線下簡單測試或低頻少量的操作。
或者可以使用以下方式進行批次的插入操作:
INSERT INTO example_tbl VALUES
(1000, "baidu1", 3.25)
(2000, "baidu2", 4.25)
(3000, "baidu3", 5.25);
用於將本地檔案匯入到doris中。Stream Load 是通過 HTTP 協定與 Doris 進行連線互動的。
該方式中涉及 HOST:PORT 都是對應的HTTP 協定埠。
• BE 的 HTTP 協定埠,預設為 8040。
• FE 的 HTTP 協定埠,預設為 8030。
但須保證使用者端所在機器網路能夠聯通FE, BE 所在機器。
-- 建立表
drop table if exists load_local_file_test;
CREATE TABLE IF NOT EXISTS load_local_file_test
(
id INT,
name VARCHAR(50),
age TINYINT
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;
# 建立檔案
1,zss,28
2,lss,28
3,ww,88
# 匯入資料
## 語法範例
curl \
-u user:passwd \ # 賬號密碼
-H "label:load_local_file_test" \ # 本次任務的唯一標識
-T 檔案地址 \
http://主機名:埠號/api/庫名/表名/_stream_load
# user:passwd 為在 Doris 中建立的使用者。初始使用者為 admin / root,密碼初始狀態下為空。
# host:port 為 BE 的 HTTP 協定埠,預設是 8040,可以在 Doris 叢集 WEB UI頁面檢視。
# label: 可以在 Header 中指定 Label 唯一標識這個匯入任務。
curl \
-u root:123 \
-H "label:load_local_file" \
-H "column_separator:," \
-T /root/data/loadfile.txt \
http://doitedu01:8040/api/test/load_local_file_test/_stream_load
建議一個匯入請求的資料量控制在 1 - 2 GB 以內。如果有大量本地檔案,可以分批並行提交。
# 準備資料
{"id":1,"name":"liuyan","age":18}
{"id":2,"name":"tangyan","age":18}
{"id":3,"name":"jinlian","age":18}
{"id":4,"name":"dalang","age":18}
{"id":5,"name":"qingqing","age":18}
curl \
-u root: \
-H "label:load_local_file_json_20221126" \
-H "columns:id,name,age" \
-H "max_filter_ratio:0.1" \
-H "timeout:1000" \
-H "exec_mem_limit:1G" \
-H "where:id>1" \
-H "format:json" \
-H "read_json_by_line:true" \
-H "merge_type:delete" \
-T /root/data/json.txt \
http://doitedu01:8040/api/test/load_local_file_test/_stream_load
-H "merge_type:append" \
# 會把id = 3 的這條資料刪除
-H "merge_type:MERGE" \
-H "delete:id=3"
• 源資料在 Broker 可以存取的儲存系統中,如 HDFS。
• 資料量在幾十到百 GB 級別。
-- 新建一張表
drop table if exists load_hdfs_file_test1;
CREATE TABLE IF NOT EXISTS load_hdfs_file_test1
(
id INT,
name VARCHAR(50),
age TINYINT
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;
將原生的資料匯入到hdfs上面
hdfs dfs -put ./loadfile.txt hdfs://linux01:8020/
hdfs dfs -ls hdfs://linux01:8020/
-- 匯入語法
LOAD LABEL test.label_202204(
[MERGE|APPEND|DELETE] -- 不寫就是append
DATA INFILE
(
"file_path1"[, file_path2, ...] -- 描述資料的路徑 這邊可以寫多個 ,以逗號分割
)
[NEGATIVE] -- 負增長
INTO TABLE `table_name` -- 匯入的表名字
[PARTITION (p1, p2, ...)] -- 匯入到哪些分割區,不符合這些分割區的就會被過濾掉
[COLUMNS TERMINATED BY "column_separator"] -- 指定分隔符
[FORMAT AS "file_type"] -- 指定儲存的檔案型別
[(column_list)] -- 指定匯入哪些列
[COLUMNS FROM PATH AS (c1, c2, ...)] -- 從路勁中抽取的部分列
[SET (column_mapping)] -- 對於列可以做一些對映,寫一些函數
-- 這個引數要寫在要寫在set的後面
[PRECEDING FILTER predicate] -- 在mapping前做過濾做一些過濾
[WHERE predicate] -- 在mapping後做一些過濾 比如id>10
[DELETE ON expr] --根據欄位去做一些抵消消除的策略 需要配合MERGE
[ORDER BY source_sequence] -- 匯入資料的時候保證資料順序
[PROPERTIES ("key1"="value1", ...)] -- 一些設定引數
-- 將hdfs上的資料load到表中
LOAD LABEL test.label_20221125
(
DATA INFILE("hdfs://linux01:8020/test.txt")
INTO TABLE `load_hdfs_file_test`
COLUMNS TERMINATED BY ","
(id,name,age)
)
with HDFS (
"fs.defaultFS"="hdfs://linux01:8020",
"hadoop.username"="root"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
-- 這是一個非同步的操作,所以需要去檢視下執行的狀態
show load order by createtime desc limit 1\G;
從 HDFS 匯入資料,使用萬用字元匹配兩批兩批檔案。分別匯入到兩個表中
LOAD LABEL example_db.label2
(
DATA INFILE("hdfs://linux01:8020/input/file-10*")
INTO TABLE `my_table1`
PARTITION (p1)
COLUMNS TERMINATED BY ","
FORMAT AS "parquet"
(id, tmp_salary, tmp_score)
SET (
salary= tmp_salary + 1000,
score = tmp_score + 10
),
DATA INFILE("hdfs://linux01:8020/input/file-20*")
INTO TABLE `my_table2`
COLUMNS TERMINATED BY ","
(k1, k2, k3)
)
with HDFS (
"fs.defaultFS"="hdfs://linux01:8020",
"hadoop.username"="root"
)
-- 匯入資料,並提取檔案路徑中的分割區欄位
LOAD LABEL example_db.label10
(
DATA INFILE("hdfs://linux01:8020/user/hive/warehouse/table_name/dt=20221125/*")
INTO TABLE `my_table`
FORMAT AS "csv"
(k1, k2, k3)
COLUMNS FROM PATH AS (dt)
)
WITH BROKER hdfs
(
"username"="root",
"password"="123"
);
-- 對待匯入資料進行過濾。
LOAD LABEL example_db.label6
(
DATA INFILE("hdfs://linux01:8020/input/file")
INTO TABLE `my_table`
(k1, k2, k3)
SET (
k2 = k2 + 1
)
PRECEDING FILTER k1 = 1 ==》前置過濾
WHERE k1 > k2 ==》 後置過濾
)
WITH BROKER hdfs
(
"username"="root",
"password"="123"
);
-- 只有原始資料中,k1 = 1,並且轉換後,k1 > k2 的行才會被匯入。
當 Broker load 作業狀態不為 CANCELLED 或 FINISHED 時,可以被使用者手動取消。
取消時需要指定待取消匯入任務的 Label 。取消匯入命令語法可執行 HELP CANCEL LOAD 檢視。
CANCEL LOAD [FROM db_name] WHERE LABEL="load_label";
Doris 可以建立外部表。建立完成後,可以通過 SELECT 語句直接查詢外部表的資料,也可以通過 INSERT INTO SELECT 的方式匯入外部表的資料。
Doris 外部表目前支援的資料來源包括:MySQL,Oracle,Hive,PostgreSQL,SQLServer,Iceberg,ElasticSearch
-- 整體語法
CREATE [EXTERNAL] TABLE table_name (
col_name col_type [NULL | NOT NULL] [COMMENT "comment"]
) ENGINE=HIVE
[COMMENT "comment"]
PROPERTIES (
-- 我要對映的hive表在哪個庫裡面
-- 對映的表名是哪一張
-- hive的後設資料服務地址
'property_name'='property_value',
...
);
-- 引數說明:
-- 1.外表列
-- 列名要與 Hive 表一一對應
-- 列的順序需要與 Hive 表一致
-- 必須包含 Hive 表中的全部列
-- Hive 表分割區列無需指定,與普通列一樣定義即可。
-- 2.ENGINE 需要指定為 HIVE
-- 3.PROPERTIES 屬性:
-- hive.metastore.uris:Hive Metastore 服務地址
-- database:掛載 Hive 對應的資料庫名
-- table:掛載 Hive 對應的表名
完成在 Doris 中建立 Hive 外表後,除了無法使用 Doris 中的資料模型(rollup、預聚合、物化檢視等)外,與普通的 Doris OLAP 表並無區別
-- 在Hive 中建立一個測試用表:
CREATE TABLE `user_info` (
`id` int,
`name` string,
`age` int
) stored as orc;
insert into user_info values (1,'zss',18);
insert into user_info values (2,'lss',20);
insert into user_info values (3,'ww',25);
-- Doris 中建立外部表
CREATE EXTERNAL TABLE `hive_user_info` (
`id` int,
`name` varchar(10),
`age` int
) ENGINE=HIVE
PROPERTIES (
'hive.metastore.uris' = 'thrift://linux01:9083',
'database' = 'db1',
'table' = 'user_info'
);
外部表建立好後,就可以直接在doris中對這個外部表進行查詢了
直接查詢外部表,無法利用到doris自身的各種查詢優化機制!
select * from hive_user_info;
-- 將資料從外部表匯入內部表
-- 資料從外部表匯入內部表後,就可以利用doris自身的查詢優勢了!
-- 假設要匯入的目標內部表為: doris_user_info (需要提前建立)
CREATE TABLE IF NOT EXISTS doris_user_info
(
id INT,
name VARCHAR(50),
age TINYINT
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;
-- 就是用sql查詢,從外部表中select出資料後,insert到內部表即可
insert into doris_user_info
select
*
from hive_user_info;
注意:
Hive 表 Schema 變更不會自動同步,需要在 Doris 中重建 Hive 外表。
當前 Hive 的儲存格式僅支援 Text,Parquet 和 ORC 型別
Binlog Load提供了一種使Doris增量同步使用者在Mysql資料庫中對資料更新操作的CDC(Change Data Capture)功能。
基本原理
當前版本設計中,Binlog Load需要依賴canal作為中間媒介,讓canal偽造成一個從節點去獲取Mysql主節點上的Binlog並解析,再由Doris去獲取Canal上解析好的資料,主要涉及Mysql端、Canal端以及Doris端
在Mysql Cluster模式的主從同步中,二進位制紀錄檔檔案(Binlog)記錄了主節點上的所有資料變化,資料在Cluster的多個節點間同步、備份都要通過Binlog紀錄檔進行,從而提高叢集的可用性。架構通常由一個主節點(負責寫)和一個或多個從節點(負責讀)構成,所有在主節點上發生的資料變更將會複製給從節點。
注意:目前必須要使用Mysql 5.7及以上的版本才能支援Binlog Load功能。
# 開啟mysql的二進位制binlog紀錄檔功能,則需要編輯my.cnf組態檔設定一下。
find / -name my.cnf
/etc/my.cnf
# 修改mysqld中的一些組態檔
[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-format = ROW
#binlog-format 的三種模式
#ROW 記錄每一行資料的資訊
#Statement 記錄sql語句
#Mixed 上面兩種的混合
# 重啟 MySQL 使設定生效
systemctl restart mysqld
-- 建立使用者並授權
-- 設定這些引數可以使得mysql的密碼簡單化
set global validate_password_length=4;
set global validate_password_policy=0;
-- 新增一個canal的使用者,讓他監聽所有庫中的所有表,並且設定密碼為canal
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
-- 重新整理一下許可權
FLUSH PRIVILEGES;
-- 準備測試表
CREATE TABLE `user_doris2` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`gender` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
Canal 是屬於阿里巴巴 otter 專案下的一個子專案,主要用途是基於 MySQL 資料庫增量紀錄檔解析,提供增量資料訂閱和消費,用於解決跨機房同步的業務場景,建議使用 canal 1.1.5及以上版本。
下載地址:https://github.com/alibaba/canal/releases
# 上傳並解壓 canal deployer壓縮包
mkdir /opt/apps/canal
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/apps/canal
# 在 conf 資料夾下新建目錄並重新命名
# 一個 canal 服務中可以有多個 instance,conf/下的每一個目錄即是一個範例,每個範例下面都有獨立的組態檔
mkdir /opt/apps/canel/conf/doris
# 拷貝組態檔模板
cp /opt/apps/canal/conf/example/instance.properties /opt/apps/canal/conf/doris/
# 修改 conf/canal.properties 的設定
vi canal.properties
# 進入找到canal.destinations = example
# 將其修改為 我們自己設定的目錄
canal.destinations = doris
# 修改 instance 組態檔
vi instance.properties
# 修改:
canal.instance.master.address=doitedu01:3306
# 啟動
sh bin/startup.sh
注意:canal client 和 canal instance 是一一對應的,Binlog Load 已限制多個資料同步作 業不能連線到同一個 destination。
基本語法:
CREATE SYNC [db.]job_name
(
channel_desc,
channel_desc
...
)
binlog_desc
-- 引數說明:
-- job_name:是資料同步作業在當前資料庫內的唯一標識
-- channel_desc :用來定義任務下的資料通道,可表示 MySQL 源表到 doris 目標表的對映關係。在設定此項時,如果存在多個對映關係,必須滿足 MySQL 源表應該與 doris 目標表是一一對應關係,其他的任何對映關係(如一對多關係),檢查語法時都被視為不合法。
-- column_mapping:主要指MySQL源表和doris目標表的列之間的對映關係,如果不指定,FE 會預設源表和目標表的列按順序一一對應。但是我們依然建議顯式的指定列的對映關係,這樣當目標表的結構發生變化(比如增加一個 nullable 的列),資料同步作業依然可以進行。否則,當發生上述變動後,因為列對映關係不再一一對應,匯入將報錯。
-- binlog_desc:定義了對接遠端 Binlog 地址的一些必要資訊,目前可支援的對接型別只有 canal 方式,所有的設定項前都需要加上 canal 字首。
-- canal.server.ip: canal server 的地址
-- canal.server.port: canal server 的埠
-- canal.destination: 前文提到的 instance 的字串標識
-- canal.batchSize: 每批從 canal server 處獲取的 batch 大小的最大值,預設 8192
-- canal.username: instance 的使用者名稱
-- canal.password: instance 的密碼
-- canal.debug: 設定為 true 時,會將 batch 和每一行資料的詳細資訊都列印出來,會影響效能。
-- Doris 建立與 Mysql 對應的目標表
CREATE TABLE `binlog_mysql` (
`id` int(11) NOT NULL COMMENT "",
`name` VARCHAR(50) NOT NULL COMMENT "",
`age` int(11) NOT NULL COMMENT "" ,
`gender` VARCHAR(50) NOT NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1;
CREATE SYNC test.job20221228
(
FROM test.binlog_test INTO binlog_test
)
FROM BINLOG
(
"type" = "canal",
"canal.server.ip" = "linux01",
"canal.server.port" = "11111",
"canal.destination" = "doris",
"canal.username" = "canal",
"canal.password" = "canal"
);
-- 檢視作業狀態
-- 展示當前資料庫的所有資料同步作業狀態。
SHOW SYNC JOB;
-- 展示資料庫 `test_db` 下的所有資料同步作業狀態。
SHOW SYNC JOB FROM `test`;
-- 停止名稱為 `job_name` 的資料同步作業
STOP SYNC JOB [db.]job_name
-- 暫停名稱為 `job_name` 的資料同步作業
PAUSE SYNC JOB [db.]job_name
-- 恢復名稱為 `job_name` 的資料同步作業
RESUME SYNC JOB `job_name`
資料匯出(Export)是 Doris 提供的一種將資料匯出的功能。該功能可以將使用者指定的表或分割區的資料,以文字的格式,通過 Broker 程序匯出到遠端儲存上,如 HDFS / 物件儲存(支援S3協定) 等。
原理
Export 作業會生成多個查詢計劃,每個查詢計劃負責掃描一部分 Tablet。每個查詢計劃掃描的 Tablet 個數由 FE 設定引數 export_tablet_num_per_task 指定,預設為 5。即假設一共 100 個 Tablet,則會生成 20 個查詢計劃。使用者也可以在提交作業時,通過作業屬性 tablet_num_per_task 指定這個數值。
一個作業的多個查詢計劃順序執行
一個查詢計劃掃描多個分片,將讀取的資料以行的形式組織,每 1024 行為一個 batch,呼叫 Broker 寫入到遠端儲存上。
查詢計劃遇到錯誤會整體自動重試 3 次。如果一個查詢計劃重試 3 次依然失敗,則整個作業失敗。
Doris 會首先在指定的遠端儲存的路徑中,建立一個名為 __doris_export_tmp_12345 的臨時目錄(其中 12345 為作業 id)。匯出的資料首先會寫入這個臨時目錄。每個查詢計劃會生成一個檔案,檔名範例:
export-data-c69fcf2b6db5420f-a96b94c1ff8bccef-1561453713822
其中 c69fcf2b6db5420f-a96b94c1ff8bccef 為查詢計劃的 query id。1561453713822 為檔案生成的時間戳。當所有資料都匯出後,Doris 會將這些檔案 rename 到使用者指定的路徑中
範例:匯出到hdfs
EXPORT TABLE test.event_info_log1 -- 庫名.表名
to "hdfs://linux01:8020/event_info_log1" -- 匯出到那裡去
PROPERTIES
(
"label" = "event_info_log1",
"column_separator"=",",
"exec_mem_limit"="2147483648",
"timeout" = "3600"
)
WITH BROKER "broker_name"
(
"username" = "root",
"password" = ""
);
-- 1.label:本次匯出作業的標識。後續可以使用這個標識檢視作業狀態。
-- 2.column_separator:列分隔符。預設為 \t。支援不可見字元,比如 '\x07'。
-- 3.columns:要匯出的列,使用英文狀態逗號隔開,如果不填這個引數預設是匯出表的所有列。
-- 4.line_delimiter:行分隔符。預設為 \n。支援不可見字元,比如 '\x07'。
-- 5.exec_mem_limit: 表示 Export 作業中,一個查詢計劃在單個 BE 上的記憶體使用限制。預設 2GB。單位位元組。
-- 6.timeout:作業超時時間。預設 2小時。單位秒。
-- 7.tablet_num_per_task:每個查詢計劃分配的最大分片數。預設為 5。
-- 檢視匯出狀態
show EXPORT \G;
注意事項
SELECT INTO OUTFILE 語句可以將查詢結果匯出到檔案中。目前支援通過 Broker程序, 通過 S3 協定, 或直接通過 HDFS 協定,匯出到遠端儲存,如 HDFS,S3,BOS,COS (騰訊雲)上。
-- 語法
query_stmt -- 查詢語句
INTO OUTFILE "file_path" --匯出檔案的路勁
[format_as] -- 指定檔案儲存的格式
[properties] -- 一些組態檔
file_path:指向檔案儲存的路徑以及檔案字首。如 hdfs://path/to/my_file_.最終的檔名將由 my_file_,檔案序號以及檔案格式字尾組成。其中檔案序號由 0 開始,數量為檔案被分割的數量
-- 如
my_file_abcdefg_0.csv
my_file_abcdefg_1.csv
my_file_abcdegf_2.csv
-- [format_as]:指定匯出格式。預設為 CSV
-- [properties]:指定相關屬性。目前支援通過 Broker 程序,hdfs協定等
-- Broker 相關屬性需加字首 broker.
-- HDFS 相關屬性需加字首 hdfs. 其中hdfs.fs.defaultFS 用於填寫 namenode地址和埠,屬於必填項。
-- 如:
("broker.prop_key" = "broker.prop_val", ...)
("hdfs.fs.defaultFS" = "xxx", "hdfs.hdfs_user" = "xxx")
-- 其他屬性:
-- column_separator:列分隔符,僅對 CSV 格式適用。預設為 \t。
-- line_delimiter:行分隔符,僅對 CSV 格式適用。預設為 \n。
-- max_file_size:單個檔案的最大大小。預設為 1GB。取值範圍在 5MB 到 2GB 之間。超過這個大小的檔案將會被切分。
-- schema:PARQUET 檔案 schema 資訊。僅對 PARQUET 格式適用。匯出檔案格式為 PARQUET 時,必須指定 schema。
使用 broker 方式,將簡單查詢結果匯出
select * from log_detail where id >2
INTO OUTFILE "hdfs://doitedu01:8020/doris-out/broker_a_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "broker_name",
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "100MB"
);
使用 HDFS 方式匯出
EXPLAIN SELECT * FROM log_detail
INTO OUTFILE "hdfs://doris-out/hdfs_"
FORMAT AS CSV
PROPERTIES
(
"fs.defaultFS" = "hdfs://doitedu01:8020",
"hadoop.username" = "root",
"column_separator" = ","
);