ETL之apache hop資料增量同步功能

2023-08-22 06:02:10

ETL增量資料抽取CDC

概念:Change Data Capture,變化的資料捕獲,也稱:【增量資料抽取】(名詞解釋)

CDC是一種實現資料的增量抽取解決方案,是實現【ETL整體解決方案】中的一項子方案/子問題。(對CDC的定位)

如何捕獲變化的資料是增量抽取的關鍵,對捕獲方法一般有2點要求:

  • 準確性:能夠將業務系統中的變化資料準確地捕獲到;
  • 效能:儘量減少對業務系統造成太大的壓力,影響現有業務。

2 CDC 常見解決方案
按CDC方案的任一操作是否對資料來源系統產生影響(效能、功能等),分為:【侵入式CDC】、【非侵入式CDC】
按CDC方案所抽取的資料與資料來源系統的變化資料是否在規定時間內同步,分為:【同步CDC】、【非同步CDC】

一、侵入式

1、基於觸發器
建立中間表,編寫觸發器或者在後端服務插入增刪改的操作記錄

2、基於時間戳
區分插入操作和更新操作:只有當源系統包含了插入時間戳和更新時間戳兩個欄位,才能區別插入和更新,否則無法區分。
刪除記錄的操作:不能捕獲到刪除操作,除非是邏輯刪除,即記錄沒有真的刪除,只是做了邏輯上的標誌。
多次更新檢測:如果在一次同步週期內,資料被更新了多次,只能同步最後一次更新操作,中間的更新操作都丟失了。
實時能力:時間戳和基於序列的資料抽取一般適用於批次操作,不適合於實時場景下的資料載入。

二、非侵入式

3、基於快照
1基於快照的CDC可檢測到插入、更新和刪除的資料 (相比基於時間戳的CDC的優點)
2需要大量儲存空間來儲存快照
4、基於紀錄檔
源資料庫會把每個插入、更新、刪除操作記錄到紀錄檔裡。
通過分析已經發生的事件提交(commit)的紀錄檔記錄來得到增量資料資訊,有一定的時間延遲。
【特點】複雜、非同步、非侵入式

參考檔案:

https://zhuanlan.zhihu.com/p/362471672

https://www.cnblogs.com/johnnyzen/p/12781942.html

基於以上的幾種增量同步方式優缺點,採用第一種基於觸發器方式
本文中的範例是源資料庫Sql Server 資料庫的資料同步到目標資料庫MySql資料庫中,被同步的源資料庫為Test,Schema 為innovator,表名為MyTable

​ 流程示意圖

一、在資料庫層面上的系列操作

表的建立SQL如下

CREATE TABLE [innovator].[MyTable] (
  [Id] char(32) COLLATE Chinese_PRC_CI_AS  NOT NULL  PRIMARY KEY ,
  [Name] nvarchar(255) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [CreatedTime] datetime  NOT NULL,
)  
1、建立SQL Server中間資料庫temp_db,需要同步的所有表放在dbo下
2、建立需要同步的中間表,比如 ,中間表的表名addOrEdit_MyTable,欄位和源表一樣
-- 只複製基礎表結構不復制索引觸發器
SELECT * INTO temp_db.dbo.addOrEdit_MyTable FROM Test.innovator.MyTable WHERE 1 = 0;
3、建立需要同步刪除的中間表Table_Delete
CREATE TABLE [dbo].[Table_Delete] (
  [Id] char(32) COLLATE Chinese_PRC_CI_AS  NOT NULL  PRIMARY KEY,
  [TableName] nvarchar(255) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [DeletedTime] datetime  NOT NULL,
)  
4、在源資料庫Test裡的MyTable表裡建立兩個觸發器

(1) 插入修改觸發器

CREATE TRIGGER [innovator].[insertUpdateTrigger_MyTable]
ON [innovator].[MyTable]
WITH EXECUTE AS CALLER
FOR INSERT, UPDATE
AS
BEGIN
  -- Type the SQL Here.
  --檢查插入或更新的資料在A表中是否存在,有則更新,無則新增
	if EXISTS(select 1 from temp_db.dbo.addOrEdit_MyTable as A,inserted as B where A.id=B.id)
		 UPDATE  A set 
			 A.Name=B.Name,A.CreatedTime=B.CreatedTime
		 from temp_db.dbo.addOrEdit_MyTable A join inserted B on A.id= B.id
	else
		 insert into temp_db.dbo.addOrEdit_MyTable select * from inserted
END

(2) 刪除觸發器

CREATE TRIGGER [innovator].[deleteTrigger_MyTable]
ON [innovator].[MyTable]
WITH EXECUTE AS CALLER
FOR DELETE
AS
BEGIN
  -- Type the SQL Here.
	insert into temp_db.dbo.Table_Delete select Id ,'MyTable' as TableName, GETDATE() as DeletedTime from deleted;
END

(3) 觸發器圖片參考

5、建立Mysql目標資料庫 Test_Mysql,字元集選擇為utf8mb4
 -- CREATE DATABASE Test_Mysql DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_chinese_ci;
 CREATE DATABASE IF NOT EXISTS Test_Mysql DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_general_ci;
6、在Test_Mysql資料庫裡建立表MyTable
DROP TABLE IF EXISTS `MyTable`;
CREATE TABLE `MyTable`  (
  `Id` char(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `Name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `CreatedTime` datetime(0) NOT NULL,
  PRIMARY KEY (`ID`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
7、可能存在觸發器中不能存取temp_db資料庫,解決如下:
-- 1、查詢所有資料庫資訊
SELECT name, database_id, is_trustworthy_on FROM sys.databases
-- 2、修改 SQL Server 的範例是否信任該資料庫以及其中的內容(注意:必須是 sysadmin 固定伺服器角色的成員才能設定此選項。)
ALTER  DATABASE  temp_db set TRUSTWORTHY ON 
-- 3、查詢系統所有使用者
SELECT name FROM Sysusers
-- 4、給使用者innovator_regular授權存取temp_db資料庫
ALTER AUTHORIZATION ON DATABASE::temp_db TO innovator_regular

二、在apache hop上編寫工作流和管道

這裡偷點懶,不想再寫一遍這些管道和工作流,拿之前寫的截個圖

1、編寫源表資料第一次全量同步到目標表的管道,命名為Init_MyTable(比如下面示意圖中的Init_HPART_)

(1) Table input就是源資料庫Sql Server裡的表,Insert/update 裡就是被插入修改同步操作的目標MySql表

(2) Table input 範例圖

把上圖的範例sql改為下面的

SELECT
 Id,
 Name,
 CONVERT(VARCHAR(100),CreatedTime,21) AS CreatedTime
 FROM innovator.MyTable

(3) Insert/update 範例圖

2、編寫源表資料第一次全量同步到目標表的初始化工作流,命名為Init_Wrokflow(比如下面示意圖中的InitDataTable_Wrokflow) (只執行一次)

3、編寫增量同步新增修改操作的管道,命名為AddOrEdit_MyTable(比如下面示意圖中的PR_AddOrEdit)

(1) Table input 範例圖

把上圖的範例sql改為下面的

SELECT
 Id,
 Name,
 CONVERT(VARCHAR(100),CreatedTime,21) AS CreatedTime
 FROM dbo.addOrEdit_MyTable

(2) Insert/update 範例圖

(3) Delete 範例圖

4、編寫增量同步刪除操作的管道,命名為Delete_MyTable(比如下面示意圖中的PR_Delete)

(1) Table input 範例圖

這裡Sql改為

SELECT Id,TableName,DeletedTime FROM dbo.Table_Delete where TableName='MyTable'

(2) Delete 範例圖

(3) Delete2 範例圖

5、編寫增量同步的工作流,命名為MyTable_Workflow(比如下面示意圖中的PR_workflow)(定時執行)

其中 PR_AddOrEdit.hpl就是步驟3中的同步新增修改操作的管道,PR_Delete.hpl就是步驟四中的增量同步刪除操作的管道

(1) Start定時執行示意圖

(2) 工作流並行執行各個管道任務示意圖

以上是基於觸發器模式增量資料同步的hop web設計、建模,下一步需要在生產環境中執行設計的工作流/管道檔案

Hop Web ,Hop Gui fat client 是幫助資料工程師通過視覺化方式設計資料淨化流程的。

Hop run是本地命令列,來執行設計好的資料淨化流程的。

Hop server是管理和執行本地或遠端的資料淨化流程的

也可以使用Apache Airflow等非自帶工具,來執行。