@
DataX 官網地址 https://maxwells-daemon.io/
DataX GitHub原始碼地址 https://github.com/alibaba/DataX
DataX 是Alibaba集團下阿里雲 DataWorks資料整合的開源版本,用作異構資料來源離線同步工具或平臺;其實現瞭如 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、HBase、ClickHouse 等各種異構資料來源之間穩定高效的資料同步功能。本文全部內容只對最新框架3.0系列說明,最新版本為datax_v202210
為了解決異構資料來源同步問題,DataX將複雜的網狀的同步鏈路變成了星型資料鏈路,DataX作為中間傳輸載體負責連線各種資料來源。當需要接入一個新的資料來源的時候,只需要將此資料來源對接到DataX,便能跟已有的資料來源做到無縫資料同步;基於外掛式擴充套件能力上可以說DataX框架具備支援任意資料來源型別的資料同步工作的能力。
Apache Sqoop(TM)是一種用於在Apache Hadoop和結構化資料儲存(如關聯式資料庫)之間高效傳輸批次資料的工具,最新的穩定版本是1.4.7,而其Sqoop2的最新版本是1.99.7,但是1.99.7與1.4.7不相容,而且特性不完整,因此Sqoop2不用於生產部署。Sqoop1.4.7在2017年後就沒有再更新,不是說Sqoop不好,是官方已沒有需要修復的問題,穩定,據說專案PMC也都解散了。如果業務只需要對關聯式資料庫同步的HDFS(還包括hive、hbase),使用sqoop也是可以的。Sqoop也可以實現增量資料同步,比如通過查詢的sql中增加時間過濾欄位,也可以結合自身job記住帶有單調遞增的編號欄位實現增量同步。
雖然說DataX是單機版壓力大,但可以通過手工排程系統佈置多個節點分開設定來實現類似多臺分散式處理,提高處理能力。
DataX框架設計也比較簡單,與其他資料採集框架如Flume相似,採用Framework + plugin架構構建;將資料來源讀取和寫入抽象成為Reader/Writer外掛,納入到整個同步框架中。
DataX目前已經有了比較全面的外掛體系,主流的RDBMS資料庫、NOSQL、巨量資料、時序資料庫等都已經接入;DataX Framework提供了簡單的介面與外掛互動,提供簡單的外掛接入機制,只需要任意加上一種外掛,就能無縫對接其他資料來源,具體資料來源使用說明根據需要點選讀或寫檢視使用詳細介紹。下面支援型別就在DataX GitHub主頁READERME上。
DataX 支援單機多執行緒模式完成同步作業執行,這裡以一個DataX作業生命週期的時序圖,從整體架構設計非常簡要說明DataX各個模組相互關係。
核心模組介紹:
DataX排程流程拿一個舉例,比如使用者提交了一個DataX作業並設定了20個並行,目的是將一個100張分表的mysql資料同步到odps裡面。 DataX的排程決策思路是:
# 下載最新版本datax_v202210的datax
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz
# 解壓檔案
tar -xvf datax.tar.gz
# 進入根目錄
cd datax/
# 自檢指令碼
python ./bin/datax.py ./job/job.json
建立json格式作業的組態檔,可以通過檢視設定模板範例
python bin/datax.py -r streamreader -w streamwriter
在job目錄下建立stream2stream.json,vim stream2stream.json
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,welcome to DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
# 執行job
python bin/datax.py job/stream2stream.json
可以通過GitHub找到支援資料通道並通過查閱讀、寫相關檔案,非常詳細,不僅包含實現原理、功能說明、約束限制,還對每一種資料通道提供了效能測試報告,可見DataX是把效能做到了極致。引數的說明
[外連圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-gtc3fCRv-1671803026649)(image-20221223135319256.png)]
需要同步資料表為test資料庫的student表
在job目錄下建立mysql2hdfs.json,vim job/mysql2hdfs.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"age"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://hadoop3:3308/test"],
"table": ["student"]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "INT"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "INT"
}
],
"defaultFS": "hdfs://hadoop1:9000",
"fieldDelimiter": "\t",
"fileName": "student.txt",
"fileType": "text",
"path": "/",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
# 執行job
python bin/datax.py job/mysql2hdfs.json
從控制檯的紀錄檔列印可以看到這個job寫入hdfs時先寫入臨時檔案,全部成功則修改檔名和路徑;如果個別失敗則整個job失敗,刪除臨時路徑。檢視hdfs上可以看到檔案已經寫入成功,並且固定加了一串字尾
點選檔案檢視內容和間隔符也是正確的
如果是HA模式可以hadoopConfig裡設定
"hadoopConfig":{
"dfs.nameservices": "testDfs",
"dfs.ha.namenodes.testDfs": "namenode1,namenode2",
"dfs.namenode.rpc-address.aliDfs.namenode1": "",
"dfs.namenode.rpc-address.aliDfs.namenode2": "",
"dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
建立一張同樣表結構的student1表,在job目錄下建立hdfs2mysql.json,vim job/hdfs2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": ["*"],
"defaultFS": "hdfs://hadoop1:9000",
"encoding": "UTF-8",
"fieldDelimiter": "\t",
"fileType": "text",
"path": "/student.txt__6eeb1730_21bd_40e9_a360_16de5396b140"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"name",
"age"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop3:3308/test?useUnicode=true&characterEncoding=gbk",
"table": ["student1"]
}
],
"password": "123456",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
# 由於我的mysql是8,因此需要將plugin/writer/mysqlwriter/libs的mysql-connector-java-5.1.47.jar替換為高版本,這裡就直接使用mysql-connector-java-8.0.29.jar
rm plugin/writer/mysqlwriter/libs/mysql-connector-java-5.1.47.jar
cp mysql-connector-java-8.0.29.jar plugin/writer/mysqlwriter/libs/
# 執行job
python bin/datax.py job/hdfs2mysql.json
檢視student1表已經有4條包含指定3個欄位的資料