sqoop1.4.7完全支援Hadoop3.x, Hive3.x Hbase2.x

2023-09-12 12:00:19

已經修改好 儲存至雲盤 自己下載 花了時間的,記得關注我。。。
連結:https://pan.xunlei.com/s/VNe6P6Tm1A9Q-RG5GByN08rdA1
提取碼:5nke
複製這段內容後開啟手機迅雷App,檢視更方便

下載解壓直接用,裡面的內容已經改好
但是需要注意的是conf目錄下的sqoop-env.sh裡面的hadoop路徑和hive路徑改成自己的喲

準備MySQL資料

登入MySQL資料庫

mysql -u root -p123456;

建立student資料庫

create database student;

切換資料庫並匯入資料

# mysql shell中執行
use student;
source /usr/local/soft/shujia/student.sql;
source /usr/local/soft/shujia/score.sql;

另外一種匯入資料的方式

# linux shell中執行
mysql -uroot -p123456  student < /usr/local/soft/shujia/student.sql
mysql -uroot -p123456  student < /usr/local/soft/shujia/score.sql

使用Navicat執行SQL檔案

也可以通過Navicat匯入

匯出MySQL資料庫

mysqldump -u root -p123456 資料庫名>任意一個檔名.sql

import

從傳統的關係型資料庫匯入HDFS、HIVE、HBASE......

MySQLToHDFS

編寫指令碼,儲存為MySQLToHDFS.conf
sqoop執行指令碼有兩種方式:第一種方式:直接在命令列視窗中直接輸入指令碼;第二種方式是將命令封裝成一個指令碼檔案,然後使用另一個命令執行
第一種方式:
sqoop import \
--append \
--connect jdbc:mysql://master:3306/student \
--username root \
--password 123456 \
--table student \
--m 4 \
--split-by id \
--target-dir /shujia/bigdata25/sqoopdata/student1/ \
--fields-terminated-by '\t'

第二種方式:使用指令碼檔案的形式
import
--append
--connect
jdbc:mysql://master:3306/student
--username
root
--password
123456
--table
student
--m
1
--split-by
id
--target-dir
/shujia/bigdata25/sqoopdata/student2/
--fields-terminated-by
','


執行指令碼
sqoop --options-file MySQLToHDFS.conf
注意事項:

1、--m 表示指定生成多少個Map任務,不是越多越好,因為MySQL Server的承載能力有限

2、當指定的Map任務數>1,那麼需要結合--split-by引數,指定分割鍵,以確定每個map任務到底讀取哪一部分資料,最好指定數值型的列,最好指定主鍵(或者分佈均勻的列=>避免每個map任務處理的資料量差別過大)

3、如果指定的分割鍵資料分佈不均,可能導致資料傾斜問題

4、分割的鍵最好指定數值型的,而且欄位的型別為int、bigint這樣的數值型

5、編寫指令碼的時候,注意:例如:--username引數,引數值不能和引數名同一行

--username root  // 錯誤的

// 應該分成兩行
--username
root

6、執行的時候會報錯InterruptedException,hadoop2.7.6自帶的問題,忽略即可

21/01/25 14:32:32 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1252)
	at java.lang.Thread.join(Thread.java:1326)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:716)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:476)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:652)

7、實際上sqoop在讀取mysql資料的時候,用的是JDBC的方式,所以當資料量大的時候,效率不是很高

8、sqoop底層通過MapReduce完成資料匯入匯出,只需要Map任務,不許需要Reduce任務 part-m-00000

9、每個Map任務會生成一個檔案

MySQLToHive

先會將MySQL的資料匯出來並在HDFS上找個目錄臨時存放,預設為:/user/使用者名稱/表名 /user/root/student

然後再將資料載入到Hive中,載入完成後,會將臨時存放的目錄刪除 /

編寫指令碼,並儲存為MySQLToHive.conf檔案
import
--connect
jdbc:mysql://master:3306/student
--username
root
--password
123456
--table
student
--m
2
--split-by
id
--fields-terminated-by
','
--hive-import
--hive-overwrite
--hbase-create-table
--hive-database
bigdata25sqoop
--hive-table
from_mysql_student1
--direct
執行指令碼
sqoop --options-file MySQLToHive.conf
--direct

加上這個引數,可以在匯出MySQL資料的時候,使用MySQL提供的匯出工具mysqldump,加快匯出速度,提高效率

需要將master上的/usr/bin/mysqldump分發至 node1、node2的/usr/bin目錄下

scp /usr/bin/mysqldump node01:/usr/bin/
scp /usr/bin/mysqldump node02:/usr/bin/
--e引數的使用

sqoop在匯入資料時,可以使用--e搭配sql來指定查詢條件,並且還需在sql中新增$CONDITIONS,來實現並行執行mr的功能。

只要有--e+sql,就需要加$CONDITIONS,哪怕只有一個maptask。

sqoop通過繼承hadoop的並行性來執行高效的資料傳輸。 為了幫助sqoop將查詢拆分為多個可以並行傳輸的塊,需要在查詢的where子句中包含$conditions預留位置。 sqoop將自動用生成的條件替換這個預留位置,這些條件指定每個任務應該傳輸哪個資料片。


MySQLToHBase

編寫指令碼,並儲存為MySQLToHBase.conf

sqoop1.4.6 只支援 HBase1.0.1 之前的版本的自動建立 HBase 表的功能


在HBase中建立student表
create 'studentsq','cf1'
執行指令碼
sqoop --options-file MySQLToHBase.conf

export

HDFSToMySQL

編寫指令碼,並儲存為HDFSToMySQL.conf

在往關係型資料庫中匯出的時候我們要先在關係型資料庫中建立好庫以及表,這些sqoop不會幫我們完成。


先清空MySQL student表中的資料,不然會造成主鍵衝突
執行指令碼
sqoop --options-file HDFSToMySQL.conf

檢視sqoop help

sqoop help

21/04/26 15:50:36 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6
usage: sqoop COMMAND [ARGS]

Available commands:
  codegen            Generate code to interact with database records
  create-hive-table  Import a table definition into Hive
  eval               Evaluate a SQL statement and display the results
  export             Export an HDFS directory to a database table
  help               List available commands
  import             Import a table from a database to HDFS
  import-all-tables  Import tables from a database to HDFS
  import-mainframe   Import datasets from a mainframe server to HDFS
  job                Work with saved jobs
  list-databases     List available databases on a server
  list-tables        List available tables in a database
  merge              Merge results of incremental imports
  metastore          Run a standalone Sqoop metastore
  version            Display version information

See 'sqoop help COMMAND' for information on a specific command.
# 檢視import的詳細幫助
sqoop import --help
1、並行度不能太高,就是 -m
2、如果沒有主鍵的時候,-m 不是1的時候就要指定分割欄位,不然會報錯,如果有主鍵的時候,-m 不是1 可以不去指定分割欄位,預設是主鍵,不指定 -m 的時候,Sqoop會預設是分4個map任務。

Sqoop 在從HDFS中匯出到關係型資料庫時的一些問題

問題一:

在上傳過程中遇到這種問題:

ERROR tool.ExportTool: Encountered IOException running export job: java.io.IOException: No columns to generate for ClassWriter

驅動版本的過低導致的,其實在嘗試這個方法的時候我們可以先進行這樣:加一行命令,--driver com.mysql.jdbc.Driver \  然後問題解決!!!

如果新增命令之後還沒有解決就把jar包換成高點版本的。
問題二:

依舊是匯出的時候,會報錯,但是我們很神奇的發現,也有部分資料匯入了。這也就是下一個問題。

Caused by: java.lang.NumberFormatException: For input string: "null"

解決方式:因為資料有存在null值得導致的

在命令中加入一行(方式一中的修改方式,方式二也就是轉換一下格式):--input-null-string '\\N' \  

--input-null-string 
'\\N'
問題三:**

java.lang.RuntimeException: Can't parse input data: '1998/5/11'

出現像這樣的問題,大多是因為HDFS上的資料與關係型資料庫建立表的欄位型別不匹配導致的。仔細對比修改後,就不會有這個報錯啦!!

增量同步資料

我們之前匯入的都是全量匯入,一次性全部匯入,但是實際開發並不是這樣,例如web端進行使用者註冊,mysql就增加了一條資料,但是HDFS中的資料並沒有進行更新,但是又再全部匯入一次又完全沒有必要。

所以,sqoop提供了增量匯入的方法。

1、資料準備:

2、將其先用全量匯入到HDFS(hive)中去


3、先在mysql中新增一條資料,在使用命令進行追加

4、根據時間進行大量追加(不去重)

#前面的案例中,hive本身的資料也是儲存在HDFS上的,所以我今後要做增量操作的時候,需要指定HDFS上的路徑
import
--connect
jdbc:mysql://master:3306/student
--username
root
--password
123456
--table
student
--target-dir
/user/hive/warehouse/testsqoop.db/from_mysql_student
--fields-terminated-by
'\t'
--incremental 
append
--check-column
id
--last-value
3

結果:但是我們發現有兩個重複的欄位

5、往往開發中需要進行去重操作:sqoop提供了一個方法進行去重,內部是先開一個map任務將資料匯入進來,然後再開一個map任務根據指定的欄位進行合併去重

結果:

之前有重複的也進行合併去重操作,最後生成一個結果。

總結:

–check-column
用來指定一些列,這些列在增量匯入時用來檢查這些資料是否作為增量資料進行匯入,和關係型資料庫中的自增欄位及時間戳類似. 
注意:這些被指定的列的型別不能使任意字元型別,如char、varchar等型別都是不可以的,同時–check-column可以去指定多個列
–incremental
用來指定增量匯入的模式,兩種模式分別為Append和Lastmodified
–last-value
指定上一次匯入中檢查列指定欄位最大值

總結

RDBMS-->HDFS     import
HDFS--->RDBMS    export

Mysql--->HDFS(hive)
要知道你要資料的來源和資料的目的地
mysql:
--connect
jdbc:mysql://master:3306/student
--username
root
--password
123456
--table
student

hdfs:
--target-dir
/user/hive/warehouse/sqooptest.db/from_mysql_student
--fields-terminated-by
'\t'

hive:
1)
--hive-import
--hive-overwrite
--create-hive-table  (如果表不存在,自動建立,如果存在,報錯,就不需要這個引數)
--hive-database
testsqoop
--hive-table
from_mysql_student
--fields-terminated-by
'\t'
2)
--target-dir
/user/hive/warehouse/sqooptest.db/from_mysql_student
--fields-terminated-by
'\t'
# 增量需要新增的引數=================================================
--incremental 
append 
--check-column
id
--last-value
3
(或者是)------------------------------------------------------------
--fields-terminated-by
'\t'
--check-column (hive的列名)
last_mod
--incremental
lastmodified
--last-value
"2022-06-18 16:40:09"
--m
1
========================================================================
# 如果需要去重,請先搞清楚根據什麼去重,否則結果可能不是你想要的
--merge-key
name   (這裡是根據姓名去重,你可以改成自己的去重列名)

hbase:(因為我們的hbase版本是1.4.6,而sqoop1.4.6不支援hbase1.0.1以後的自動建立表,所以我們在做同步到hbase的時候,需要手動先將表建立好)
--hbase-table
studentsq
--column-family
cf1
--hbase-row-key
id  (mysql中的列名)
--m
1



HDFS--->mysql

hdfs:
--columns
id,name,age,gender,clazz
--export-dir
/shujia/bigdata17/sqoopinput/
--fields-terminated-by
','
# 如果資料分割出來的欄位值有空值,需要新增以下引數(面試可能會面到)
--null-string 
'\\N' 
--null-non-string 
'\\N'