Doris(五) -- 資料的匯入匯出

2023-05-30 21:00:28

資料匯入

使用 Insert 方式同步資料

使用者可以通過 MySQL 協定,使用 INSERT 語句進行資料匯入
INSERT 語句的使用方式和 MySQL 等資料庫中 INSERT 語句的使用方式類似。 INSERT 語句支援以下兩種語法:

INSERT INTO table SELECT ...
INSERT INTO table VALUES(...)

對於 Doris 來說,一個 INSERT 命令就是一個完整的匯入事務。

因此不論是匯入一條資料,還是多條資料,我們都不建議在生產環境使用這種方式進行資料匯入。高頻次的 INSERT 操作會導致在儲存層產生大量的小檔案,會嚴重影響系統效能。

該方式僅用於線下簡單測試或低頻少量的操作。
或者可以使用以下方式進行批次的插入操作:

INSERT INTO example_tbl VALUES
(1000, "baidu1", 3.25)
(2000, "baidu2", 4.25)
(3000, "baidu3", 5.25);

Stream Load

用於將本地檔案匯入到doris中。Stream Load 是通過 HTTP 協定與 Doris 進行連線互動的。
該方式中涉及 HOST:PORT 都是對應的HTTP 協定埠。
• BE 的 HTTP 協定埠,預設為 8040。
• FE 的 HTTP 協定埠,預設為 8030。
但須保證使用者端所在機器網路能夠聯通FE, BE 所在機器。

-- 建立表
drop table if exists load_local_file_test;
CREATE TABLE IF NOT EXISTS load_local_file_test
(
    id INT,
    name VARCHAR(50),
    age TINYINT
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;
# 建立檔案
1,zss,28
2,lss,28
3,ww,88

# 匯入資料
## 語法範例
 curl \
 -u user:passwd \  # 賬號密碼
 -H "label:load_local_file_test" \  # 本次任務的唯一標識
 -T 檔案地址 \
 http://主機名:埠號/api/庫名/表名/_stream_load
# user:passwd 為在 Doris 中建立的使用者。初始使用者為 admin / root,密碼初始狀態下為空。
# host:port 為 BE 的 HTTP 協定埠,預設是 8040,可以在 Doris 叢集 WEB UI頁面檢視。
# label: 可以在 Header 中指定 Label 唯一標識這個匯入任務。


curl \
 -u root:123 \
 -H "label:load_local_file" \
 -H "column_separator:," \
 -T /root/data/loadfile.txt \
http://doitedu01:8040/api/test/load_local_file_test/_stream_load

curl的一些可設定的引數

  1. label: 匯入任務的標籤,相同標籤的資料無法多次匯入。(標籤預設保留30分鐘)
  2. column_separator:用於指定匯入檔案中的列分隔符,預設為\t。
  3. line_delimiter:用於指定匯入檔案中的換行符,預設為\n。
  4. columns:用於指定檔案中的列和table中列的對應關係,預設一一對應
  5. where: 用來過濾匯入檔案中的資料
  6. max_filter_ratio:最大容忍可過濾(資料不規範等原因)的資料比例。預設零容忍。資料不規範不包括通過 where 條件過濾掉的行。
  7. partitions: 用於指定這次匯入所設計的partition。如果使用者能夠確定資料對應的partition,推薦指定該項。不滿足這些分割區的資料將被過濾掉。
  8. timeout: 指定匯入的超時時間。單位秒。預設是 600 秒。可設定範圍為 1 秒 ~ 259200 秒。
  9. timezone: 指定本次匯入所使用的時區。預設為東八區。該引數會影響所有匯入涉及的和時區有關的函數結果。
  10. exec_mem_limit: 匯入記憶體限制。預設為 2GB。單位為位元組。
  11. format: 指定匯入資料格式,預設是csv,支援json格式。
  12. read_json_by_line: 布林型別,為true表示支援每行讀取一個json物件,預設值為false。
  13. merge_type: 資料的合併型別,一共支援三種型別APPEND、DELETE、MERGE 其中,APPEND是預設值,表示這批資料全部需要追加到現有資料中,DELETE 表示刪除與這批資料key相同的所有行,MERGE 語意 需要與delete 條件聯合使用,表示滿足delete 條件的資料按照DELETE 語意處理其餘的按照APPEND 語意處理, 範例:-H "merge_type: MERGE" -H "delete: flag=1"
  14. delete: 僅在 MERGE下有意義, 表示資料的刪除條件 function_column.sequence_col: 只適用於UNIQUE_KEYS,相同key列下,保證value列按照source_sequence列進行REPLACE, source_sequence可以是資料來源中的列,也可以是表結構中的一列。

建議一個匯入請求的資料量控制在 1 - 2 GB 以內。如果有大量本地檔案,可以分批並行提交。

匯入json資料

# 準備資料
{"id":1,"name":"liuyan","age":18}
{"id":2,"name":"tangyan","age":18}
{"id":3,"name":"jinlian","age":18}
{"id":4,"name":"dalang","age":18}
{"id":5,"name":"qingqing","age":18}

curl \
 -u root: \
 -H "label:load_local_file_json_20221126" \
 -H "columns:id,name,age" \
 -H "max_filter_ratio:0.1" \
 -H "timeout:1000" \
 -H "exec_mem_limit:1G" \
 -H "where:id>1" \
 -H "format:json" \
 -H "read_json_by_line:true" \
 -H "merge_type:delete" \
 -T /root/data/json.txt \
http://doitedu01:8040/api/test/load_local_file_test/_stream_load
  -H "merge_type:append" \
  # 會把id = 3 的這條資料刪除
  -H "merge_type:MERGE" \
  -H "delete:id=3"

外部儲存資料匯入(hdfs)

適用場景

• 源資料在 Broker 可以存取的儲存系統中,如 HDFS。
• 資料量在幾十到百 GB 級別。

基本原理

  1. 建立提交匯入的任務
  2. FE生成執行計劃並將執行計劃分發到多個BE節點上(每個BE節點都匯入一部分資料)
  3. BE收到執行計劃後開始執行,從broker上拉取資料到自己的節點上
  4. 所有BE都完成後,FE決定是否匯入成功,返回結果給使用者端
-- 新建一張表
drop table if exists load_hdfs_file_test1;
CREATE TABLE IF NOT EXISTS load_hdfs_file_test1
(
    id INT,
    name VARCHAR(50),
    age TINYINT
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;
將原生的資料匯入到hdfs上面
hdfs dfs -put ./loadfile.txt  hdfs://linux01:8020/
hdfs dfs -ls  hdfs://linux01:8020/
-- 匯入語法
LOAD LABEL test.label_202204(
[MERGE|APPEND|DELETE]  -- 不寫就是append
DATA INFILE
(
"file_path1"[, file_path2, ...]  -- 描述資料的路徑   這邊可以寫多個 ,以逗號分割
)
[NEGATIVE]               -- 負增長
INTO TABLE `table_name`  -- 匯入的表名字
[PARTITION (p1, p2, ...)] -- 匯入到哪些分割區,不符合這些分割區的就會被過濾掉
[COLUMNS TERMINATED BY "column_separator"]  -- 指定分隔符
[FORMAT AS "file_type"] -- 指定儲存的檔案型別
[(column_list)] -- 指定匯入哪些列 
[COLUMNS FROM PATH AS (c1, c2, ...)]  -- 從路勁中抽取的部分列
[SET (column_mapping)] -- 對於列可以做一些對映,寫一些函數
-- 這個引數要寫在要寫在set的後面
[PRECEDING FILTER predicate]  -- 在mapping前做過濾做一些過濾
[WHERE predicate]  -- 在mapping後做一些過濾  比如id>10 
[DELETE ON expr] --根據欄位去做一些抵消消除的策略  需要配合MERGE
[ORDER BY source_sequence] -- 匯入資料的時候保證資料順序
[PROPERTIES ("key1"="value1", ...)]  -- 一些設定引數
-- 將hdfs上的資料load到表中
LOAD LABEL test.label_20221125
(
DATA INFILE("hdfs://linux01:8020/test.txt")
INTO TABLE `load_hdfs_file_test`
COLUMNS TERMINATED BY ","
(id,name,age)
)
with HDFS (
"fs.defaultFS"="hdfs://linux01:8020",
"hadoop.username"="root"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);


-- 這是一個非同步的操作,所以需要去檢視下執行的狀態
show load order by createtime desc limit 1\G;

從 HDFS 匯入資料,使用萬用字元匹配兩批兩批檔案。分別匯入到兩個表中

LOAD LABEL example_db.label2
(
    DATA INFILE("hdfs://linux01:8020/input/file-10*")
    INTO TABLE `my_table1`
    PARTITION (p1)
    COLUMNS TERMINATED BY ","
    FORMAT AS "parquet"  
    (id, tmp_salary, tmp_score) 
    SET (
        salary= tmp_salary + 1000,
        score = tmp_score + 10
    ),
    DATA INFILE("hdfs://linux01:8020/input/file-20*")
    INTO TABLE `my_table2`
    COLUMNS TERMINATED BY ","
    (k1, k2, k3)
)
with HDFS (
"fs.defaultFS"="hdfs://linux01:8020",
"hadoop.username"="root"
)


-- 匯入資料,並提取檔案路徑中的分割區欄位
LOAD LABEL example_db.label10
(
    DATA INFILE("hdfs://linux01:8020/user/hive/warehouse/table_name/dt=20221125/*")
    INTO TABLE `my_table`
    FORMAT AS "csv"
    (k1, k2, k3)
    COLUMNS FROM PATH AS (dt)
)
WITH BROKER hdfs
(
    "username"="root",
    "password"="123"
);

-- 對待匯入資料進行過濾。
LOAD LABEL example_db.label6
(
    DATA INFILE("hdfs://linux01:8020/input/file")
    INTO TABLE `my_table`
    (k1, k2, k3)
    SET (
        k2 = k2 + 1
    )
        PRECEDING FILTER k1 = 1  ==》前置過濾
    WHERE k1 > k2   ==》 後置過濾
)
WITH BROKER hdfs
(
    "username"="root",
    "password"="123"
);

-- 只有原始資料中,k1 = 1,並且轉換後,k1 > k2 的行才會被匯入。

取消匯入任務

當 Broker load 作業狀態不為 CANCELLED 或 FINISHED 時,可以被使用者手動取消。
取消時需要指定待取消匯入任務的 Label 。取消匯入命令語法可執行 HELP CANCEL LOAD 檢視。

CANCEL LOAD [FROM db_name] WHERE LABEL="load_label"; 

通過外部表同步資料

Doris 可以建立外部表。建立完成後,可以通過 SELECT 語句直接查詢外部表的資料,也可以通過 INSERT INTO SELECT 的方式匯入外部表的資料。

Doris 外部表目前支援的資料來源包括:MySQL,Oracle,Hive,PostgreSQL,SQLServer,Iceberg,ElasticSearch

-- 整體語法
CREATE [EXTERNAL] TABLE table_name ( 
 col_name col_type [NULL | NOT NULL] [COMMENT "comment"] 
) ENGINE=HIVE
[COMMENT "comment"]
PROPERTIES (
-- 我要對映的hive表在哪個庫裡面
-- 對映的表名是哪一張
-- hive的後設資料服務地址
 'property_name'='property_value',
 ...
);

-- 引數說明:
-- 1.外表列 
-- 	列名要與 Hive 表一一對應
-- 	列的順序需要與 Hive 表一致
-- 	必須包含 Hive 表中的全部列
-- 	Hive 表分割區列無需指定,與普通列一樣定義即可。 
-- 2.ENGINE 需要指定為 HIVE 
-- 3.PROPERTIES 屬性: 
-- 	hive.metastore.uris:Hive Metastore 服務地址 
-- 	database:掛載 Hive 對應的資料庫名 
-- 	table:掛載 Hive 對應的表名 

完成在 Doris 中建立 Hive 外表後,除了無法使用 Doris 中的資料模型(rollup、預聚合、物化檢視等)外,與普通的 Doris OLAP 表並無區別

-- 在Hive 中建立一個測試用表:
CREATE TABLE `user_info` ( 
 `id` int, 
 `name` string, 
 `age` int
) stored as orc;

insert into user_info values (1,'zss',18);
insert into user_info values (2,'lss',20);
insert into user_info values (3,'ww',25);

-- Doris 中建立外部表
CREATE EXTERNAL TABLE `hive_user_info` (
 `id` int, 
 `name` varchar(10), 
 `age` int 
) ENGINE=HIVE 
PROPERTIES ( 
'hive.metastore.uris' = 'thrift://linux01:9083', 
'database' = 'db1', 
'table' = 'user_info' 
);

外部表建立好後,就可以直接在doris中對這個外部表進行查詢了
直接查詢外部表,無法利用到doris自身的各種查詢優化機制!

select * from hive_user_info;

-- 將資料從外部表匯入內部表
-- 資料從外部表匯入內部表後,就可以利用doris自身的查詢優勢了!
-- 假設要匯入的目標內部表為: doris_user_info  (需要提前建立)

CREATE TABLE IF NOT EXISTS doris_user_info
(
    id INT,
    name VARCHAR(50),
    age TINYINT
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;

-- 就是用sql查詢,從外部表中select出資料後,insert到內部表即可
insert into doris_user_info
select
 *
from hive_user_info;

注意:
Hive 表 Schema 變更不會自動同步,需要在 Doris 中重建 Hive 外表。
當前 Hive 的儲存格式僅支援 Text,Parquet 和 ORC 型別

Binlog Load

Binlog Load提供了一種使Doris增量同步使用者在Mysql資料庫中對資料更新操作的CDC(Change Data Capture)功能。

基本原理
當前版本設計中,Binlog Load需要依賴canal作為中間媒介,讓canal偽造成一個從節點去獲取Mysql主節點上的Binlog並解析,再由Doris去獲取Canal上解析好的資料,主要涉及Mysql端、Canal端以及Doris端

  1. FE會為每個資料同步作業啟動一個canal client,來向canal server端訂閱並獲取資料。
  2. client中的receiver將負責通過Get命令接收資料,每獲取到一個資料batch,都會由consumer根據對應表分發到不同的channel,每個channel都會為此資料batch產生一個傳送資料的子任務Task。
  3. 在FE上,一個Task是channel向BE傳送資料的子任務,裡面包含分發到當前channel的同一個batch的資料。
  4. channel控制著單個表事務的開始、提交、終止。一個事務週期內,一般會從consumer獲取到多個batch的資料,因此會產生多個向BE傳送資料的子任務Task,在提交事務成功前,這些Task不會實際生效。
  5. 滿足一定條件時(比如超過一定時間、達到提交最巨量資料大小),consumer將會阻塞並通知各個channel提交事務。
  6. 當且僅當所有channel都提交成功,才會通過Ack命令通知canal並繼續獲取並消費資料。
  7. 如果有任意channel提交失敗,將會重新從上一次消費成功的位置獲取資料並再次提交(已提交成功的channel不會再次提交以保證冪等性)。
  8. 整個資料同步作業中,FE通過以上流程不斷的從canal獲取資料並提交到BE,來完成資料同步。

Mysql端

在Mysql Cluster模式的主從同步中,二進位制紀錄檔檔案(Binlog)記錄了主節點上的所有資料變化,資料在Cluster的多個節點間同步、備份都要通過Binlog紀錄檔進行,從而提高叢集的可用性。架構通常由一個主節點(負責寫)和一個或多個從節點(負責讀)構成,所有在主節點上發生的資料變更將會複製給從節點。
注意:目前必須要使用Mysql 5.7及以上的版本才能支援Binlog Load功能。

# 開啟mysql的二進位制binlog紀錄檔功能,則需要編輯my.cnf組態檔設定一下。
find / -name my.cnf
/etc/my.cnf
# 修改mysqld中的一些組態檔
[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-format = ROW

#binlog-format 的三種模式
#ROW   記錄每一行資料的資訊
#Statement  記錄sql語句
#Mixed   上面兩種的混合

# 重啟 MySQL 使設定生效
systemctl restart mysqld 
-- 建立使用者並授權
-- 設定這些引數可以使得mysql的密碼簡單化
set global validate_password_length=4; 
set global validate_password_policy=0; 
-- 新增一個canal的使用者,讓他監聽所有庫中的所有表,並且設定密碼為canal
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
-- 重新整理一下許可權
FLUSH PRIVILEGES;

-- 準備測試表
CREATE TABLE `user_doris2` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `gender` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

設定 Canal 端

Canal 是屬於阿里巴巴 otter 專案下的一個子專案,主要用途是基於 MySQL 資料庫增量紀錄檔解析,提供增量資料訂閱和消費,用於解決跨機房同步的業務場景,建議使用 canal 1.1.5及以上版本。

下載地址:https://github.com/alibaba/canal/releases

# 上傳並解壓 canal deployer壓縮包
mkdir /opt/apps/canal
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/apps/canal

# 在 conf 資料夾下新建目錄並重新命名 
# 一個 canal 服務中可以有多個 instance,conf/下的每一個目錄即是一個範例,每個範例下面都有獨立的組態檔
mkdir /opt/apps/canel/conf/doris

# 拷貝組態檔模板 
cp /opt/apps/canal/conf/example/instance.properties /opt/apps/canal/conf/doris/
# 修改 conf/canal.properties 的設定
vi canal.properties

# 進入找到canal.destinations = example
# 將其修改為 我們自己設定的目錄
canal.destinations = doris
# 修改 instance 組態檔
vi instance.properties 
# 修改:
canal.instance.master.address=doitedu01:3306
# 啟動
sh bin/startup.sh

注意:canal client 和 canal instance 是一一對應的,Binlog Load 已限制多個資料同步作 業不能連線到同一個 destination。

設定目標表

基本語法:
CREATE SYNC [db.]job_name
(
 channel_desc,
 channel_desc
 ...
)
binlog_desc
-- 引數說明:
-- job_name:是資料同步作業在當前資料庫內的唯一標識
-- channel_desc :用來定義任務下的資料通道,可表示 MySQL 源表到 doris 目標表的對映關係。在設定此項時,如果存在多個對映關係,必須滿足 MySQL 源表應該與 doris 目標表是一一對應關係,其他的任何對映關係(如一對多關係),檢查語法時都被視為不合法。
-- column_mapping:主要指MySQL源表和doris目標表的列之間的對映關係,如果不指定,FE 會預設源表和目標表的列按順序一一對應。但是我們依然建議顯式的指定列的對映關係,這樣當目標表的結構發生變化(比如增加一個 nullable 的列),資料同步作業依然可以進行。否則,當發生上述變動後,因為列對映關係不再一一對應,匯入將報錯。 
-- binlog_desc:定義了對接遠端 Binlog 地址的一些必要資訊,目前可支援的對接型別只有 canal 方式,所有的設定項前都需要加上 canal 字首。
-- 	canal.server.ip: canal server 的地址 
-- 	canal.server.port: canal server 的埠 
-- 	canal.destination: 前文提到的 instance 的字串標識 
-- 	canal.batchSize: 每批從 canal server 處獲取的 batch 大小的最大值,預設 8192 
-- 	canal.username: instance 的使用者名稱 
-- 	canal.password: instance 的密碼 
-- 	canal.debug: 設定為 true 時,會將 batch 和每一行資料的詳細資訊都列印出來,會影響效能。



-- Doris 建立與 Mysql 對應的目標表
CREATE TABLE `binlog_mysql` ( 
 `id` int(11) NOT NULL COMMENT "", 
 `name` VARCHAR(50) NOT NULL COMMENT "", 
 `age` int(11) NOT NULL COMMENT "" ,
 `gender` VARCHAR(50) NOT NULL COMMENT ""
) ENGINE=OLAP 
UNIQUE KEY(`id`) 
DISTRIBUTED BY HASH(`id`) BUCKETS 1; 


CREATE SYNC test.job20221228
(
 FROM test.binlog_test INTO binlog_test
)
FROM BINLOG 
(
 "type" = "canal",
 "canal.server.ip" = "linux01",
 "canal.server.port" = "11111",
 "canal.destination" = "doris",
 "canal.username" = "canal",
 "canal.password" = "canal"
);

-- 檢視作業狀態
-- 展示當前資料庫的所有資料同步作業狀態。 
SHOW SYNC JOB; 
-- 展示資料庫 `test_db` 下的所有資料同步作業狀態。 
SHOW SYNC JOB FROM `test`; 

-- 停止名稱為 `job_name` 的資料同步作業 
STOP SYNC JOB [db.]job_name 

-- 暫停名稱為 `job_name` 的資料同步作業 
PAUSE SYNC JOB [db.]job_name 

-- 恢復名稱為 `job_name` 的資料同步作業 
RESUME SYNC JOB `job_name` 

資料匯出

資料匯出(Export)是 Doris 提供的一種將資料匯出的功能。該功能可以將使用者指定的表或分割區的資料,以文字的格式,通過 Broker 程序匯出到遠端儲存上,如 HDFS / 物件儲存(支援S3協定) 等。
原理

  1. 使用者提交一個 Export 作業到 FE。
  2. FE 的 Export 排程器會通過兩階段來執行一個 Export 作業:
  3. PENDING:FE 生成 ExportPendingTask,向 BE 傳送 snapshot 命令,對所有涉及到的 Tablet 做一個快照。並生成多個查詢計劃。
  4. EXPORTING:FE 生成 ExportExportingTask,開始執行查詢計劃。

查詢計劃拆分

Export 作業會生成多個查詢計劃,每個查詢計劃負責掃描一部分 Tablet。每個查詢計劃掃描的 Tablet 個數由 FE 設定引數 export_tablet_num_per_task 指定,預設為 5。即假設一共 100 個 Tablet,則會生成 20 個查詢計劃。使用者也可以在提交作業時,通過作業屬性 tablet_num_per_task 指定這個數值。
一個作業的多個查詢計劃順序執行

查詢計劃執行

一個查詢計劃掃描多個分片,將讀取的資料以行的形式組織,每 1024 行為一個 batch,呼叫 Broker 寫入到遠端儲存上。
查詢計劃遇到錯誤會整體自動重試 3 次。如果一個查詢計劃重試 3 次依然失敗,則整個作業失敗。
Doris 會首先在指定的遠端儲存的路徑中,建立一個名為 __doris_export_tmp_12345 的臨時目錄(其中 12345 為作業 id)。匯出的資料首先會寫入這個臨時目錄。每個查詢計劃會生成一個檔案,檔名範例:

export-data-c69fcf2b6db5420f-a96b94c1ff8bccef-1561453713822

其中 c69fcf2b6db5420f-a96b94c1ff8bccef 為查詢計劃的 query id。1561453713822 為檔案生成的時間戳。當所有資料都匯出後,Doris 會將這些檔案 rename 到使用者指定的路徑中

範例:匯出到hdfs

EXPORT TABLE test.event_info_log1 -- 庫名.表名
to "hdfs://linux01:8020/event_info_log1"  -- 匯出到那裡去
PROPERTIES
(
    "label" = "event_info_log1",
    "column_separator"=",",
    "exec_mem_limit"="2147483648",
    "timeout" = "3600"
)
WITH BROKER "broker_name"
(
    "username" = "root",
    "password" = ""
);

-- 1.label:本次匯出作業的標識。後續可以使用這個標識檢視作業狀態。
-- 2.column_separator:列分隔符。預設為 \t。支援不可見字元,比如 '\x07'。
-- 3.columns:要匯出的列,使用英文狀態逗號隔開,如果不填這個引數預設是匯出表的所有列。
-- 4.line_delimiter:行分隔符。預設為 \n。支援不可見字元,比如 '\x07'。
-- 5.exec_mem_limit: 表示 Export 作業中,一個查詢計劃在單個 BE 上的記憶體使用限制。預設 2GB。單位位元組。
-- 6.timeout:作業超時時間。預設 2小時。單位秒。
-- 7.tablet_num_per_task:每個查詢計劃分配的最大分片數。預設為 5。


-- 檢視匯出狀態
show EXPORT \G;

注意事項

  1. 不建議一次性匯出大量資料。一個 Export 作業建議的匯出資料量最大在幾十 GB。過大的匯出會導致更多的垃圾檔案和更高的重試成本。
  2. 如果表資料量過大,建議按照分割區匯出。
  3. 在 Export 作業執行過程中,如果 FE 發生重啟或切主,則 Export 作業會失敗,需要使用者重新提交。
  4. 如果 Export 作業執行失敗,在遠端儲存中產生的 __doris_export_tmp_xxx 臨時目錄,以及已經生成的檔案不會被刪除,需要使用者手動刪除。
  5. 如果 Export 作業執行成功,在遠端儲存中產生的 __doris_export_tmp_xxx 目錄,根據遠端儲存的檔案系統語意,可能會保留,也可能會被清除。比如在百度物件儲存(BOS)中,通過 rename 操作將一個目錄中的最後一個檔案移走後,該目錄也會被刪除。如果該目錄沒有被清除,使用者可以手動清除
  6. 當 Export 執行完成後(成功或失敗),FE 發生重啟或切主,則 SHOW EXPORT展示的作業的部分資訊會丟失,無法檢視。
  7. Export 作業只會匯出 Base 表的資料,不會匯出 Rollup Index 的資料。
  8. Export 作業會掃描資料,佔用 IO 資源,可能會影響系統的查詢延遲

查詢結果匯出

SELECT INTO OUTFILE 語句可以將查詢結果匯出到檔案中。目前支援通過 Broker程序, 通過 S3 協定, 或直接通過 HDFS 協定,匯出到遠端儲存,如 HDFS,S3,BOS,COS (騰訊雲)上。

-- 語法
query_stmt  -- 查詢語句
INTO OUTFILE "file_path"  --匯出檔案的路勁
[format_as]  -- 指定檔案儲存的格式
[properties]  -- 一些組態檔

file_path:指向檔案儲存的路徑以及檔案字首。如 hdfs://path/to/my_file_.最終的檔名將由 my_file_,檔案序號以及檔案格式字尾組成。其中檔案序號由 0 開始,數量為檔案被分割的數量

-- 如
my_file_abcdefg_0.csv 
my_file_abcdefg_1.csv 
my_file_abcdegf_2.csv 

-- [format_as]:指定匯出格式。預設為 CSV
-- [properties]:指定相關屬性。目前支援通過 Broker 程序,hdfs協定等
-- Broker 相關屬性需加字首 broker.
-- HDFS 相關屬性需加字首 hdfs. 其中hdfs.fs.defaultFS 用於填寫 namenode地址和埠,屬於必填項。
-- 如:
("broker.prop_key" = "broker.prop_val", ...)
("hdfs.fs.defaultFS" = "xxx", "hdfs.hdfs_user" = "xxx")

-- 其他屬性:
-- column_separator:列分隔符,僅對 CSV 格式適用。預設為 \t。 
-- line_delimiter:行分隔符,僅對 CSV 格式適用。預設為 \n。 
-- max_file_size:單個檔案的最大大小。預設為 1GB。取值範圍在 5MB 到 2GB 之間。超過這個大小的檔案將會被切分。
-- schema:PARQUET 檔案 schema 資訊。僅對 PARQUET 格式適用。匯出檔案格式為 PARQUET 時,必須指定 schema。

使用 broker 方式,將簡單查詢結果匯出

select * from log_detail where id >2
INTO OUTFILE "hdfs://doitedu01:8020/doris-out/broker_a_" 
FORMAT AS CSV 
PROPERTIES 
( 
"broker.name" = "broker_name", 
"column_separator" = ",", 
"line_delimiter" = "\n", 
"max_file_size" = "100MB" 
); 

使用 HDFS 方式匯出

EXPLAIN SELECT * FROM log_detail
INTO OUTFILE "hdfs://doris-out/hdfs_" 
FORMAT AS CSV 
PROPERTIES 
( 
"fs.defaultFS" = "hdfs://doitedu01:8020", 
"hadoop.username" = "root",
"column_separator" = ","
);