Doris(七) -- 修改表、動態和臨時分割區、join的優化

2023-06-01 21:01:19

修改表

修改表名

-- 1.將名為 table1 的表修改為 table2
ALTER TABLE table1 RENAME table2;

-- 範例
ALTER TABLE aggregate_test RENAME aggregate_test1;

-- 2.將表 example_table 中名為 rollup1 的 rollup index 修改為 rollup2
ALTER TABLE base_table_name RENAME ROLLUP old_rollup_name new_rollup_name;

ALTER TABLE ex_user RENAME ROLLUP rollup_u_cost new_rollup_u_cost;

desc ex_user all;

-- 3.將表 example_table 中名為 p1 的 partition 修改為 p2
ALTER TABLE example_table RENAME PARTITION old_partition_name new_partition_name ;

-- 範例:
ALTER TABLE expamle_range_tbl RENAME PARTITION p201701 newp201701; 

show partitions from expamle_range_tbl \G;

表結構變更

使用者可以通過 Schema Change 操作來修改已存在表的 Schema。目前 Doris 支援以下幾種修改:
• 增加、刪除列
• 修改列型別
• 調整列順序
• 增加、修改 Bloom Filter index
• 增加、刪除 bitmap index

原理介紹

執行 Schema Change 的基本過程,是通過原 Index 的資料,生成一份新 Schema 的 Index 的資料。其中主要需要進行兩部分資料轉換:
一是已存在的歷史資料的轉換;
二是在 Schema Change 執行過程中,新到達的匯入資料的轉換。

建立作業
Schema Change 的建立是一個非同步過程,作業提交成功後,使用者需要通過 SHOW ALTER TABLE COLUMN 命令來檢視作業進度。

-- 語法:
ALTER TABLE [database.]table alter_clause;

schema change 的 alter_clause 支援如下幾種修改方式:
1.向指定 index 的指定位置新增一列

ALTER TABLE db.table_name
-- 如果增加的是key列 那麼,需要在 列型別後面增加key 這個關鍵字
-- 如果增加的是value列 那麼,是聚合表模型,需要指定列的聚合型別   如果是明細模型和唯一模型,不需要指定
ADD COLUMN column_name column_type [KEY | agg_type] [DEFAULT "default_value"]
[AFTER column_name|FIRST]  -- 確定列的位置   如果不寫,預設插在最後
[TO rollup_index_name]   -- 如果你是針對rollup表新增一個列,那麼這個列明基表中不能有
[PROPERTIES ("key"="value", ...)]

-- 明細模型中新增value列
ALTER TABLE test.expamle_range_tbl ADD COLUMN abc varchar AFTER age;

-- 明細模型中新增key 列
ALTER TABLE test.expamle_range_tbl ADD COLUMN abckey varchar key AFTER user_id;

-- 聚合模型中新增一個value列
mysql> ALTER TABLE test.ex_user ADD COLUMN abckey int sum AFTER cost;

注意:

  • 聚合模型如果增加 value 列,需要指定 agg_type
  • 非聚合模型(如 DUPLICATE KEY)如果增加key列,需要指定KEY關鍵字
  • 不能在 rollup index 中增加 base index 中已經存在的列(如有需要,可以重新建立一個 rollup index)

範例:

-- 源schema:

+-----------+-------+------+------+------+---------+-------+
| IndexName | Field | Type | Null | Key  | Default | Extra |
+-----------+-------+------+------+------+---------+-------+
| tbl1      | k1    | INT  | No   | true | N/A     |       |
|           | k2    | INT  | No   | true | N/A     |       |
|           | k3    | INT  | No   | true | N/A     |       |
|           |       |      |      |      |         |       |
| rollup2   | k2    | INT  | No   | true | N/A     |       |
|           |       |      |      |      |         |       |
| rollup1   | k1    | INT  | No   | true | N/A     |       |
|           | k2    | INT  | No   | true | N/A     |       |
+-----------+-------+------+------+------+---------+-------+

-- 源schema中沒有k4和k5列,所以可以往rollup表中新增 k4和k5列,在往rollup表中新增的過程,也會往base表中新增一份
ALTER TABLE tbl1
ADD COLUMN k4 INT default "1" to rollup1,
ADD COLUMN k4 INT default "1" to rollup2,
ADD COLUMN k5 INT default "1" to rollup2;

-- 改變完成後,Schema 變為       base表中也會相應的新增k4和k5
+-----------+-------+------+------+------+---------+-------+
| IndexName | Field | Type | Null | Key  | Default | Extra |
+-----------+-------+------+------+------+---------+-------+
| tbl1      | k1    | INT  | No   | true | N/A     |       |
|           | k2    | INT  | No   | true | N/A     |       |
|           | k3    | INT  | No   | true | N/A     |       |
|           | k4    | INT  | No   | true | 1       |       |
|           | k5    | INT  | No   | true | 1       |       |
|           |       |      |      |      |         |       |
| rollup2   | k2    | INT  | No   | true | N/A     |       |
|           | k4    | INT  | No   | true | 1       |       |
|           | k5    | INT  | No   | true | 1       |       |
|           |       |      |      |      |         |       |
| rollup1   | k1    | INT  | No   | true | N/A     |       |
|           | k2    | INT  | No   | true | N/A     |       |
|           | k4    | INT  | No   | true | 1       |       |
+-----------+-------+------+------+------+---------+-------+

-- 這樣的匯入方式錯誤
-- 因為base表中已經存在k3,匯入的時候無法將base表中在新增一個叫k3的列,重複
ALTER TABLE tbl1
ADD COLUMN k3 INT default "1" to rollup1

2.向指定 index 新增多列

ALTER TABLE db.table_name
ADD COLUMN (column_name1 column_type [KEY | agg_type] DEFAULT "default_value", ...)
[TO rollup_index_name]
[PROPERTIES ("key"="value", ...)]

-- 新增的時候根據key和value列,新增在對應的列之後
ALTER TABLE test.expamle_range_tbl ADD COLUMN (abc int,bcd int);

mysql> ALTER TABLE test.expamle_range_tbl ADD COLUMN (a int key ,b int);
Query OK, 0 rows affected (0.01 sec)

mysql> desc expamle_range_tbl all;


3.從指定 index 中刪除一列

ALTER TABLE db.table_name
DROP COLUMN column_name
[FROM rollup_index_name]

-- 刪除明細表中的value列
ALTER TABLE test.expamle_range_tbl DROP COLUMN abc;

-- 刪除明細表中的key列
ALTER TABLE test.expamle_range_tbl DROP COLUMN abckey;

-- 刪除聚合模型中的value列
ALTER TABLE test.ex_user DROP COLUMN abckey;

-- 注意:
-- 不能刪除分割區列
-- 如果是從 base index 中刪除列,則如果 rollup index 中包含該列,也會被刪除

4.修改指定 index 的列型別以及列位置

ALTER TABLE db.table_name
MODIFY COLUMN column_name column_type [KEY | agg_type] [NULL | NOT NULL] [DEFAULT "default_value"]
[AFTER column_name|FIRST]
[FROM rollup_index_name]
[PROPERTIES ("key"="value", ...)]

-- 注意:
-- 聚合模型如果修改 value 列,需要指定 agg_type
-- 非聚合型別如果修改key列,需要指定KEY關鍵字
-- 分割區列和分桶列不能做任何修改

5.對指定 index 的列進行重新排序

ALTER TABLE db.table_name
ORDER BY (column_name1, column_name2, ...)
[FROM rollup_index_name]
[PROPERTIES ("key"="value", ...)]

-- 注意:
-- index 中的所有列都要寫出來
-- value 列在 key 列之後

範例:

-- 1.向 example_rollup_index 的 col1 後新增一個key列 new_col(非聚合模型)
ALTER TABLE example_db.my_table
ADD COLUMN new_col INT KEY DEFAULT "0" AFTER col1
TO example_rollup_index;

-- 2.向example_rollup_index的col1後新增一個value列new_col(非聚合模型)
ALTER TABLE example_db.my_table  
ADD COLUMN new_col INT DEFAULT "0" AFTER col1 
TO example_rollup_index;

-- 3.向example_rollup_index的col1後新增一個key列new_col(聚合模型)
ALTER TABLE example_db.my_table   
ADD COLUMN new_col INT DEFAULT "0" AFTER col1    
TO example_rollup_index;

-- 4.向example_rollup_index的col1後新增一個value列new_col SUM聚合型別(聚合模型)
ALTER TABLE example_db.my_table   
ADD COLUMN new_col INT SUM DEFAULT "0" AFTER col1    
TO example_rollup_index;

-- 5.向 example_rollup_index 新增多列(聚合模型)
ALTER TABLE example_db.my_table
ADD COLUMN (col1 INT DEFAULT "1", col2 FLOAT SUM DEFAULT "2.3")
TO example_rollup_index;

-- 6.從 example_rollup_index 刪除一列
ALTER TABLE example_db.my_table
DROP COLUMN col2
FROM example_rollup_index;

-- 7.修改 base index 的 key 列 col1 的型別為 BIGINT,並移動到 col2 列後面。
ALTER TABLE example_db.my_table 
MODIFY COLUMN col1 BIGINT KEY DEFAULT "1" AFTER col2;

-- 注意:無論是修改 key 列還是 value 列都需要宣告完整的 column 資訊

-- 8.修改 base index 的 val1 列最大長度。原 val1 為 (val1 VARCHAR(32) REPLACE DEFAULT "abc")
ALTER TABLE example_db.my_table 
MODIFY COLUMN val1 VARCHAR(64) REPLACE DEFAULT "abc";

-- 9.重新排序 example_rollup_index 中的列(設原列順序為:k1,k2,k3,v1,v2)
ALTER TABLE example_db.my_table
ORDER BY (k3,k1,k2,v2,v1)
FROM example_rollup_index;

-- 10.同時執行兩種操作
ALTER TABLE example_db.my_table
ADD COLUMN v2 INT MAX DEFAULT "0" AFTER k2 TO example_rollup_index,
ORDER BY (k3,k1,k2,v2,v1) FROM example_rollup_index;

檢視作業

SHOW ALTER TABLE COLUMN 可以檢視當前正在執行或已經完成的 Schema Change 作業。當一次 Schema Change 作業涉及到多個 Index 時,該命令會顯示多行,每行對應一個 Index

SHOW ALTER TABLE COLUMN\G;
*************************** 1. row ***************************
        JobId: 20021
    TableName: tbl1
   CreateTime: 2019-08-05 23:03:13
   FinishTime: 2019-08-05 23:03:42
    IndexName: tbl1
      IndexId: 20022
OriginIndexId: 20017
SchemaVersion: 2:792557838
TransactionId: 10023
        State: FINISHED
          Msg: 
     Progress: NULL
      Timeout: 86400
1 row in set (0.00 sec)

-- JobId:每個 Schema Change 作業的唯一 ID。
-- TableName:Schema Change 對應的基表的表名。
-- CreateTime:作業建立時間。
-- FinishedTime:作業結束時間。如未結束,則顯示 "N/A"。
-- IndexName: 本次修改所涉及的某一個 Index 的名稱。
-- IndexId:新的 Index 的唯一 ID。
-- OriginIndexId:舊的 Index 的唯一 ID。
-- SchemaVersion:以 M:N 的格式展示。其中 M 表示本次 Schema Change 變更的版本,N 表示對應的 Hash 值。每次 Schema Change,版本都會遞增。
-- TransactionId:轉換歷史資料的分水嶺 transaction ID。
-- State:作業所在階段。
-- 	PENDING:作業在佇列中等待被排程。
-- 	WAITING_TXN:等待分水嶺 transaction ID 之前的匯入任務完成。
-- 	RUNNING:歷史資料轉換中。
-- 	FINISHED:作業成功。
-- 	CANCELLED:作業失敗。
-- Msg:如果作業失敗,這裡會顯示失敗資訊。
-- Progress:作業進度。只有在 RUNNING 狀態才會顯示進度。進度是以 M/N 的形式顯示。其中 N 為 Schema Change 涉及的總副本數。M 為已完成歷史資料轉換的副本數。
-- Timeout:作業超時時間。單位秒。

取消作業

在作業狀態不為 FINISHED 或 CANCELLED 的情況下,可以通過以下命令取消Schema Change作業:

CANCEL ALTER TABLE COLUMN FROM tbl_name;

注意事項
• 一張表在同一時間只能有一個 Schema Change 作業在執行。
• Schema Change 操作不阻塞匯入和查詢操作。
• 分割區列和分桶列不能修改。
• 如果 Schema 中有 REPLACE 方式聚合的 value 列,則不允許刪除 Key 列。
• 如果刪除 Key 列,Doris 無法決定 REPLACE 列的取值。
• Unique 資料模型表的所有非 Key 列都是 REPLACE 聚合方式。
• 在新增聚合型別為 SUM 或者 REPLACE 的 value 列時,該列的預設值對歷史資料沒有含義。
• 因為歷史資料已經失去明細資訊,所以預設值的取值並不能實際反映聚合後的取值。
• 當修改列型別時,除 Type 以外的欄位都需要按原列上的資訊補全。
• 如修改列 k1 INT SUM NULL DEFAULT "1" 型別為 BIGINT,則需執行命令如下:
• ALTER TABLE tbl1 MODIFY COLUMN k1 BIGINT SUM NULL DEFAULT "1";
• 注意,除新的列型別外,如聚合方式,Nullable 屬性,以及預設值都要按照原資訊補全。
• 不支援修改列名稱、聚合型別、Nullable 屬性、預設值以及列註釋。

partition的增減

-- 1.增加分割區, 使用預設分桶方式:現有分割區 \[MIN, 2013-01-01),增加分割區 \[2013-01-01, 2014-01-01)
ALTER TABLE example_db.my_table ADD PARTITION p1 VALUES LESS THAN ("2014-01-01");

-- 2.增加分割區,使用新的分桶數
ALTER TABLE example_db.my_table ADD PARTITION p1 VALUES LESS THAN ("2015-01-01") 
DISTRIBUTED BY HASH(k1) BUCKETS 20; 

-- 3.增加分割區,使用新的副本數 
ALTER TABLE example_db.my_table ADD PARTITION p1 VALUES LESS THAN ("2015-01-01") 
("replication_num"="1"); 

-- 4.修改分割區副本數 
ALTER TABLE example_db.my_table MODIFY PARTITION p1 SET("replication_num"="1"); 
-- 5.批次修改指定分割區
ALTER TABLE example_db.my_table MODIFY PARTITION (p1, p2, p4) SET("in_memory"="true"); 

-- 6.批次修改所有分割區 
ALTER TABLE example_db.my_table MODIFY PARTITION (*) SET("storage_medium"="HDD"); 

-- 7.刪除分割區 
ALTER TABLE example_db.my_table DROP PARTITION p1; 
-- 8.增加一個指定上下界的分割區 
ALTER TABLE example_db.my_table ADD PARTITION p1 VALUES [("2014-01-01"), ("2014-02-01")); 

rollup的增減

-- 1.建立 index: example_rollup_index,基於 base index(k1,k2,k3,v1,v2)。列式儲存。 
ALTER TABLE example_db.my_table ADD ROLLUP example_rollup_index(k1, k3, v1, v2);

-- 2.建立 index: example_rollup_index2,基於 example_rollup_index(k1,k3,v1,v2)
ALTER TABLE example_db.my_table ADD ROLLUP example_rollup_index2 (k1, v1) 
FROM example_rollup_index;

-- 3.建立 index: example_rollup_index3, 基於base index (k1,k2,k3,v1), 自定義rollup超時時間一小時
ALTER TABLE example_db.my_table ADD ROLLUP example_rollup_index(k1, k3, v1) 
PROPERTIES("timeout" = "3600"); 

-- 4.刪除 index: example_rollup_index2
ALTER TABLE example_db.my_table DROP ROLLUP example_rollup_index2; 

動態分割區和臨時分割區

動態分割區

原理

在某些使用場景下,使用者會將表按照天進行分割區劃分,每天定時執行例行任務,這時需要使用方手動管理分割區,否則可能由於使用方沒有建立分割區導致資料匯入失敗,這給使用方帶來了額外的維護成本。通過動態分割區功能,使用者可以在建表時設定動態分割區的規則。FE 會啟動一個後臺執行緒,根據使用者指定的規則建立或刪除分割區。使用者也可以在執行時對現有規則進行變更。

使用方式

動態分割區的規則可以在建表時指定,或者在執行時進行修改。當前僅支援對單分割區列的分割區表設定動態分割區規則

-- 建表時指定
CREATE TABLE tbl1
(...)
PROPERTIES
(
-- 新增動態分割區的規則
    "dynamic_partition.prop1" = "value1",
    "dynamic_partition.prop2" = "value2",
    ...
)

-- 執行時修改
ALTER TABLE tbl1 SET
(
    "dynamic_partition.prop1" = "value1",
    "dynamic_partition.prop2" = "value2",
    ...
)

動態分割區規則引數

  1. dynamic_partition.enable:是否開啟動態分割區特性。預設是true
  2. dynamic_partition.time_unit:動態分割區排程的單位。可指定為 HOUR、DAY、WEEK、MONTH。分別表示按小時、按天、按星期、按月進行分割區建立或刪除。
  3. dynamic_partition.time_zone:動態分割區的時區,如果不填寫,則預設為當前機器的系統的時區
  4. dynamic_partition.start:動態分割區的起始偏移,為負數。以當天(星期/月)為基準,分割區範圍在此偏移之前的分割區將會被刪除。如果不填寫,則預設為 -2147483648,即不刪除歷史分割區。
  5. dynamic_partition.end:動態分割區的結束偏移,為正數。根據 time_unit 屬性的不同,以當天(星期/月)為基準,提前建立對應範圍的分割區。
  6. dynamic_partition.prefix:動態建立的分割區名字首。
  7. dynamic_partition.buckets:動態建立的分割區所對應的分桶數量
  8. dynamic_partition.replication_num:動態建立的分割區所對應的副本數量,如果不填寫,則預設為該表建立時指定的副本數量
  9. dynamic_partition.start_day_of_week:當 time_unit 為 WEEK 時,該引數用於指定每週的起始點。取值為 1 到 7。其中 1 表示週一,7 表示週日。預設為 1,即表示每週以週一為起始點。
  10. dynamic_partition.start_day_of_month:當 time_unit 為 MONTH 時,該引數用於指定每月的起始日期。取值為 1 到 28。其中 1 表示每月1號,28 表示每月28號。預設為 1,即表示每月以1號位起始點。暫不支援以29、30、31號為起始日,以避免因閏年或閏月帶來的歧義
  11. dynamic_partition.create_history_partition:為 true 時代表可以建立歷史分割區,預設是false
  12. dynamic_partition.history_partition_num:當 create_history_partition 為 true 時,該引數用於指定建立歷史分割區數量。預設值為 -1, 即未設定。
  13. dynamic_partition.hot_partition_num:指定最新的多少個分割區為熱分割區。對於熱分割區,系統會自動設定其 storage_medium 引數為SSD,並且設定 storage_cooldown_time 。hot_partition_num:設定往前 n 天和未來所有分割區為熱分割區,並自動設定冷卻時間
  14. dynamic_partition.reserved_history_periods:需要保留的歷史分割區的時間範圍。

修改動態分割區屬性

ALTER TABLE tbl1 SET
(
    "dynamic_partition.prop1" = "value1",
    ...
);


ALTER TABLE partition_test SET
(
    "dynamic_partition.time_unit" = "week",
    "dynamic_partition.start" = "-1",
    "dynamic_partition.end" = "1"
);

某些屬性的修改可能會產生衝突。假設之前分割區粒度為 DAY,並且已經建立了如下分割區:

p20200519: ["2020-05-19", "2020-05-20")
p20200520: ["2020-05-20", "2020-05-21")
p20200521: ["2020-05-21", "2020-05-22")

如果此時將分割區粒度改為 MONTH,則系統會嘗試建立範圍為 ["2020-05-01", "2020-06-01") 的分割區,而該分割區的分割區範圍和已有分割區衝突,所以無法建立。而範圍為 ["2020-06-01", "2020-07-01") 的分割區可以正常建立。因此,2020-05-22 到 2020-05-30 時間段的分割區,需要自行填補。

檢視動態分割區表排程情況

-- 通過以下命令可以進一步檢視當前資料庫下,所有動態分割區表的排程情況:
SHOW DYNAMIC PARTITION TABLES;

-- LastUpdateTime: 最後一次修改動態分割區屬性的時間
-- LastSchedulerTime: 最後一次執行動態分割區排程的時間
-- State: 最後一次執行動態分割區排程的狀態
-- LastCreatePartitionMsg: 最後一次執行動態新增分割區排程的錯誤資訊
-- LastDropPartitionMsg: 最後一次執行動態刪除分割區排程的錯誤資訊

臨時分割區

規則

• 臨時分割區的分割區列和正式分割區相同,且不可修改。
• 一張表所有臨時分割區之間的分割區範圍不可重疊,但臨時分割區的範圍和正式分割區範圍可以重疊。
• 臨時分割區的分割區名稱不能和正式分割區以及其他臨時分割區重複。

操作

臨時分割區支援新增、刪除、替換操作。

新增臨時分割區

可以通過 ALTER TABLE ADD TEMPORARY PARTITION 語句對一個表新增臨時分割區:

ALTER TABLE tbl1 ADD TEMPORARY PARTITION tp1 VALUES LESS THAN("2020-02-01");


ALTER TABLE tbl1 ADD TEMPORARY PARTITION tp2 VALUES LESS THAN("2020-02-02")
("in_memory" = "true", "replication_num" = "1")
DISTRIBUTED BY HASH(k1) BUCKETS 5;


ALTER TABLE tbl3 ADD TEMPORARY PARTITION tp1 VALUES IN ("Beijing", "Shanghai");

ALTER TABLE tbl3 ADD TEMPORARY PARTITION tp1 VALUES IN ("Beijing", "Shanghai")
("in_memory" = "true", "replication_num" = "1")
DISTRIBUTED BY HASH(k1) BUCKETS 5;


-- 新增操作的一些說明:
-- 臨時分割區的新增和正式分割區的新增操作相似。臨時分割區的分割區範圍獨立於正式分割區。
-- 臨時分割區可以獨立指定一些屬性。包括分桶數、副本數、是否是記憶體表、儲存媒介等資訊。

刪除臨時分割區

-- 可以通過 ALTER TABLE DROP TEMPORARY PARTITION 語句刪除一個表的臨時分割區:
ALTER TABLE tbl1 DROP TEMPORARY PARTITION tp1;
-- 刪除臨時分割區,不影響正式分割區的資料。

替換分割區

可以通過 ALTER TABLE REPLACE PARTITION 語句將一個表的正式分割區替換為臨時分割區。

-- 正式分割區替換成臨時分割區以後,正是分割區的資料會被刪除,並且這個過程是不可逆的
-- 用之前要小心
ALTER TABLE tbl1 REPLACE PARTITION (p1) WITH TEMPORARY PARTITION (tp1);

ALTER TABLE partition_test REPLACE PARTITION (p20230104) WITH TEMPORARY PARTITION (tp1);

ALTER TABLE tbl1 REPLACE PARTITION (p1, p2) WITH TEMPORARY PARTITION (tp1, tp2)
PROPERTIES (
    "strict_range" = "false",
    "use_temp_partition_name" = "true"
);

-- strict_range:預設為 true。
-- 	對於 Range 分割區,當該引數為 true 時,表示要被替換的所有正式分割區的範圍並集需要和替換的臨時分割區的範圍並集完全相同。當置為 false 時,只需要保證替換後,新的正式分割區間的範圍不重疊即可。
-- 	對於 List 分割區,該引數恆為 true。要被替換的所有正式分割區的列舉值必須和替換的臨時分割區列舉值完全相同。
-- use_temp_partition_name:預設為 false。當該引數為 false,並且待替換的分割區和替換分割區的個數相同時,則替換後的正式分割區名稱維持不變。如果為 true,則替換後,正式分割區的名稱為替換分割區的名稱。


LTER TABLE tbl1 REPLACE PARTITION (p1) WITH TEMPORARY PARTITION (tp1);
-- use_temp_partition_name 預設為 false,則在替換後,分割區的名稱依然為 p1,但是相關的資料和屬性都替換為 tp1 的。 如果 use_temp_partition_name 預設為 true,則在替換後,分割區的名稱為 tp1。p1 分割區不再存在。

ALTER TABLE tbl1 REPLACE PARTITION (p1, p2) WITH TEMPORARY PARTITION (tp1);
-- use_temp_partition_name 預設為 false,但因為待替換分割區的個數和替換分割區的個數不同,則該引數無效。替換後,分割區名稱為 tp1,p1 和 p2 不再存在。


-- 替換操作的一些說明:
-- 分割區替換成功後,被替換的分割區將被刪除且不可恢復。

資料的匯入和查詢

匯入臨時分割區

根據匯入方式的不同,指定匯入臨時分割區的語法稍有差別。這裡通過範例進行簡單說明

-- 查詢結果用insert匯入
INSERT INTO tbl TEMPORARY PARTITION(tp1, tp2, ...) SELECT ....

-- 檢視資料
SELECT ... FROM
tbl1 TEMPORARY PARTITION(tp1, tp2, ...)
JOIN
tbl2 TEMPORARY PARTITION(tp1, tp2, ...)
ON ...
WHERE ...;

doris中join的優化原理

Shuffle Join(Partitioned Join)

和mr中的shuffle過程是一樣的,針對每個節點上的資料進行shuffle,相同資料分發到下游的節點上的join方式叫shuffle join

-- 訂單表
CREATE TABLE  test.order_info_shuffle
(
 `order_id` varchar(20) COMMENT "訂單id",
 `user_id` varchar(20) COMMENT "使用者id",
 `goods_id` VARCHAR(20) COMMENT "商品id",
 `goods_num` Int COMMENT "商品數量",
 `price` double COMMENT "商品價格"
)
duplicate KEY(`order_id`)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;

-- 匯入資料:
insert into test.order_info_shuffle values\
('o001','u001','g001',1,9.9 ),\
('o001','u001','g002',2,19.9),\
('o001','u001','g003',2,39.9),\
('o002','u002','g001',3,9.9 ),\
('o002','u002','g002',1,19.9),\
('o003','u002','g003',1,39.9),\
('o003','u002','g002',2,19.9),\
('o003','u002','g004',3,99.9),\
('o003','u002','g005',1,99.9),\
('o004','u003','g001',2,9.9 ),\
('o004','u003','g002',1,19.9),\
('o004','u003','g003',4,39.9),\
('o004','u003','g004',1,99.9),\
('o004','u003','g005',4,89.9);


-- 商品表
CREATE TABLE  test.goods_shuffle
(
 `goods_id` VARCHAR(20) COMMENT "商品id",
 `goods_name`  VARCHAR(20) COMMENT "商品名稱",
 `category_id` VARCHAR(20) COMMENT "商品品類id"
)
duplicate KEY(`goods_id`)
DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5;

-- 匯入資料:
insert into test.goods_shuffle values\
('g001','iphon13','c001'),\
('g002','ipad','c002'),\
('g003','xiaomi12','c001'),\
('g004','huaweip40','c001'),\
('g005','headset','c003');


-- sql範例
EXPLAIN 
select 
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_shuffle as oi
-- 我們可以不指定哪一種join方式,doris會自己根據資料的實際情況幫我們選擇
JOIN goods_shuffle as gs
on oi.goods_id = gs.goods_id;

EXPLAIN select 
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_shuffle as oi
-- 可以顯式的hint指定我們想要的join型別
JOIN [broadcast] goods_shuffle as gs
on oi.goods_id = gs.goods_id;

適用場景:不管資料量,不管是大表join大表還是大表join小表都可以用
優點:通用
缺點:需要shuffle記憶體和網路開銷比較大,效率不高

Broadcast Join

當一個大表join小表的時候,將小表廣播到每一個大表所在的每一個節點上(以hash表的形式放在記憶體中)這樣的方式叫做Broadcast Join,類似於mr裡面的一個map端join

-- 顯式使用 Broadcast Join:
EXPLAIN 
select 
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_broadcast as oi
JOIN [broadcast] goods_broadcast as gs
on oi.goods_id = gs.goods_id;

適用場景:
左表join右表,要求左表的資料量相對來說比較大,右表資料量比較小
優點:避免了shuffle,提高了運算效率
缺點:有限制,必須右表資料量比較小

Bucket Shuffle Join

利用建表時候分桶的特性,當join的時候,join的條件和左表的分桶欄位一樣的時候,將右表按照左表分桶的規則進行shuffle操作,使右表中需要join的資料落在左表中需要join資料的BE節點上的join方式叫做Bucket Shuffle Join。

-- 從 0.14 版本開始預設為 true,新版本可以不用設定這個引數了!
show variables like '%bucket_shuffle_join%'; 
set enable_bucket_shuffle_join = true;
-- 通過 explain 檢視 join 型別
EXPLAIN 
select 
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_bucket as oi
-- 目前 Bucket Shuffle Join不能像Shuffle Join那樣可以顯示指定Join方式,
-- 只能讓執行引擎自動選擇,
-- 選擇的順序:Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。
JOIN goods_bucket as gs
where oi.goods_id = gs.goods_id;




EXPLAIN select 
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_bucket as oi
-- 目前 Bucket Shuffle Join不能像Shuffle Join那樣可以顯示指定Join方式,
-- 只能讓執行引擎自動選擇,
-- 選擇的順序:Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。
JOIN goods_bucket1 as gs
where oi.goods_id = gs.goods_id;

-- 注意事項:
-- Bucket Shuffle Join 只生效於 Join 條件為等值的場景
-- Bucket Shuffle Join 要求左表的分桶列的型別與右表等值 join 列的型別需要保持一致,否則無法進行對應的規劃。 
-- Bucket Shuffle Join 只作用於 Doris 原生的 OLAP 表,對於 ODBC,MySQL,ES 等外表,當其作為左表時是無法規劃生效的。 
-- Bucket Shuffle Join只能保證左表為單分割區時生效。所以在 SQL 執行之中,需要儘量使用 where 條件使分割區裁剪的策略能夠生效。

Colocation Join

中文意思叫位置協同分組join,指需要join的兩份資料都在同一個BE節點上,這樣在join的時候,直接本地join計算即可,不需要進行shuffle。

名詞解釋
• Colocation Group(位置協同組CG):在同一個 CG內的 Table 有著相同的 Colocation Group Schema,並且有著相同的資料分片分佈(滿足三個條件)。
• Colocation Group Schema(CGS):用於描述一個 CG 中的 Table,和 Colocation 相關的通用 Schema 資訊。包括分桶列型別,分桶數以及分割區的副本數等。

使用限制

  1. 建表時兩張表的分桶列的型別和數量需要完全一致,並且桶數一致,才能保證多張表的資料分片能夠一一對應的進行分佈控制。
  2. 同一個 CG 內所有表的所有分割區(Partition)的副本數必須一致。如果不一致,可能出現某一個Tablet 的某一個副本,在同一個 BE 上沒有其他的表分片的副本對應
  3. 同一個 CG 內的表,分割區的個數、範圍以及分割區列的型別不要求一致。

使用案例

-- 建兩張表,分桶列都為 int 型別,且桶的個數都是 5 個。副本數都為預設副本數

-- 編寫查詢語句,並檢視執行計劃
EXPLAIN 
select 
oi.order_id,
oi.user_id,
oi.goods_id,
gs.goods_name,
gs.category_id,
oi.goods_num,
oi.price
from order_info_colocation as oi
-- 目前 Colocation Join不能像Shuffle Join那樣可以顯示指定Join方式,
-- 只能讓執行引擎自動選擇,
-- 選擇的順序:Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。
JOIN goods_colocation as gs
where oi.goods_id = gs.goods_id;


-- 檢視 Group
SHOW PROC '/colocation_group';

-- 當 Group 中最後一張表徹底刪除後(徹底刪除是指從回收站中刪除。通常,一張表通過DROP TABLE 命令刪除後,會在回收站預設停留一天的時間後,再刪除),該 Group 也會被自動刪除。
-- 修改表 Colocate Group 屬性
ALTER TABLE tbl SET ("colocate_with" = "group2");
-- 如果被修改的表原來有group,那麼會直接將原來的group刪除後建立新的group, 如果原來沒有組,就直接建立

-- 刪除表的 Colocation 屬性
ALTER TABLE tbl SET ("colocate_with" = ""); 
-- 當對一個具有 Colocation 屬性的表進行增加分割區(ADD PARTITION)、修改副本數時,Doris 會檢查修改是否會違反 Colocation Group Schema,如果違反則會拒絕

Runtime Filter

Runtime Filter會在有join動作的 sql執行時,建立一個HashJoinNode和一個ScanNode來對join的資料進行過濾優化,使得join的時候資料量變少,從而提高效率

使用

-- 指定 RuntimeFilter 型別 
set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";

set runtime_filter_type="MIN_MAX";


引數解釋:

  • runtime_filter_type: 包括Bloom Filter、MinMax Filter、IN predicate、IN Or Bloom Filter
    • Bloom Filter: 針對右表中的join欄位的所有資料標註在一個布隆過濾器中,從而判斷左表中需要join的資料在還是不在
    • MinMax Filter: 獲取到右表表中資料的最大值和最小值,看左表中檢視,將超出這個最大值最小值範圍的資料過濾掉
    • IN predicate: 將右表中需要join欄位所有資料構建一個IN predicate,再去左表表中過濾無意義資料
  • runtime_filter_wait_time_ms: 左表的ScanNode等待每個Runtime Filter的時間,預設1000ms
  • runtime_filters_max_num: 每個查詢可應用的Runtime Filter中Bloom Filter的最大數量,預設10
  • runtime_bloom_filter_min_size: Runtime Filter中Bloom Filter的最小長度,預設1M
  • runtime_bloom_filter_max_size: Runtime Filter中Bloom Filter的最大長度,預設16M
  • runtime_bloom_filter_size: Runtime Filter中Bloom Filter的預設長度,預設2M
  • runtime_filter_max_in_num: 如果join右表資料行數大於這個值,我們將不生成IN predicate,預設102400

範例

-- 建表
CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2  
PROPERTIES("replication_num" = "1"); 

INSERT INTO test VALUES (1), (2), (3), (4); 

CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2  
PROPERTIES("replication_num" = "1"); 

INSERT INTO test2 VALUES (3), (4), (5); 

-- 檢視執行計劃
set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";

EXPLAIN SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;