作者:Grey
原文地址:
首先,需要先完成
本文基於上述四個環境已經搭建完成的基礎上進行 Hudi 資料湖的插入,更新,查詢操作。
Scala 2.11.8
JDK 1.8
需要熟悉 Maven 構建專案和 Scala 一些基礎語法。
master 節點首先啟動叢集,執行:
stop-dfs.sh && start-dfs.sh
啟動 yarn,執行:
stop-yarn.sh && start-yarn.sh
然後準備一個 Mave 專案,在 src/main/resources 目錄下,將 Hadoop 的一些組態檔拷貝進來,分別是
$HADOOP_HOME/etc/hadoop/core-site.xml
檔案
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/hadoop/tmp</value>
</property>
</configuration>
注意,需要在你存取叢集的機器上設定 host 檔案,這樣才可以識別 master 節點。
$HADOOP_HOME/etc/hadoop/hdfs-site.xml
檔案
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
$HADOOP_HOME/etc/hadoop/yarn-site.xml
檔案,目前還沒有任何設定
<?xml version="1.0"?>
<configuration>
</configuration>
然後,設計實體的資料結構,
package git.snippet.entity
case class MyEntity(uid: Int,
uname: String,
dt: String
)
插入資料程式碼如下
package git.snippet.test
import git.snippet.entity.MyEntity
import git.snippet.util.JsonUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object DataInsertion {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
insertData(sparkSession)
}
def insertData(sparkSession: SparkSession) = {
import org.apache.spark.sql.functions._
import sparkSession.implicits._
val commitTime = System.currentTimeMillis().toString //生成提交時間
val df = sparkSession.read.text("/mydata/data1")
.mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = JsonUtil.getJsonData(item.getString(0))
MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))
})
})
val result = df.withColumn("ts", lit(commitTime)) //新增ts 時間戳列
.withColumn("uuid", col("uid"))
.withColumn("hudipart", col("dt")) //增加hudi分割區列
result.write.format("org.apache.hudi")
.option("hoodie.insert.shuffle.parallelism", 2)
.option("hoodie.upsert.shuffle.parallelism", 2)
.option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交時間列
.option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一標示列
.option("hoodie.table.name", "myDataTable")
.option("hoodie.datasource.write.partitionpath.field", "hudipart") //分割區列
.mode(SaveMode.Overwrite)
.save("/snippet/data/hudi")
}
}
然後,在 master 節點先準備好資料
vi data1
輸入如下資料
{'uid':1,'uname':'grey','dt':'2022/09'}
{'uid':2,'uname':'tony','dt':'2022/10'}
然後建立檔案目錄,
hdfs dfs -mkdir /mydata/
把 data1 放入目錄下
hdfs dfs -put data1 /mydata/
存取:http://192.168.100.130:50070/explorer.html
可以查到這個資料
接下來執行插入資料的 scala 程式碼,執行完畢後,驗證一下
存取:http://192.168.100.130:50070/explorer.html
可以檢視到插入的資料
準備一個 data2 檔案
cp data1 data2 && vi data2
data2 的資料更新為
{'uid':1,'uname':'grey1','dt':'2022/11'}
{'uid':2,'uname':'tony1','dt':'2022/12'}
然後執行
hdfs dfs -put data2 /mydata/
更新資料的程式碼,我們可以做如下調整,完整程式碼如下
package git.snippet.test
import git.snippet.entity.MyEntity
import git.snippet.util.JsonUtil
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object DataUpdate {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
updateData(sparkSession)
}
def updateData(sparkSession: SparkSession) = {
import org.apache.spark.sql.functions._
import sparkSession.implicits._
val commitTime = System.currentTimeMillis().toString //生成提交時間
val df = sparkSession.read.text("/mydata/data2")
.mapPartitions(partitions => {
partitions.map(item => {
val jsonObject = JsonUtil.getJsonData(item.getString(0))
MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))
})
})
val result = df.withColumn("ts", lit(commitTime)) //新增ts 時間戳列
.withColumn("uuid", col("uid")) //新增uuid 列
.withColumn("hudipart", col("dt")) //增加hudi分割區列
result.write.format("org.apache.hudi")
// .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option("hoodie.insert.shuffle.parallelism", 2)
.option("hoodie.upsert.shuffle.parallelism", 2)
.option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交時間列
.option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一標示列
.option("hoodie.table.name", "myDataTable")
.option("hoodie.datasource.write.partitionpath.field", "hudipart") //分割區列
.mode(SaveMode.Append)
.save("/snippet/data/hudi")
}
}
執行更新資料的程式碼。
驗證一下,存取:http://192.168.100.130:50070/explorer.html
可以檢視到更新的資料情況
資料查詢的程式碼也很簡單,完整程式碼如下
package git.snippet.test
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DataQuery {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
queryData(sparkSession)
}
def queryData(sparkSession: SparkSession) = {
val df = sparkSession.read.format("org.apache.hudi")
.load("/snippet/data/hudi/*/*")
df.show()
println(df.count())
}
}
執行,輸出以下資訊,驗證成功。
資料查詢也支援很多查詢條件,比如增量查詢,按時間段查詢等。
接下來是 flink 實時資料分析的服務,首先需要在 master 上啟動 kafka,並建立 一個名字為 mytopic 的 topic,詳見Linux 下搭建 Kafka 環境
相關命令如下
建立topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --create --topic mytopic
生產者啟動設定
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic mytopic
消費者啟動設定
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mytopic
然後執行如下程式碼
package git.snippet.analyzer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class DataAnalyzer {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.100.130:9092");
properties.setProperty("group.id", "snippet");
//構建FlinkKafkaConsumer
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("mytopic", new SimpleStringSchema(), properties);
//指定偏移量
myConsumer.setStartFromLatest();
final DataStream<String> stream = env.addSource(myConsumer);
env.enableCheckpointing(5000);
stream.print();
try {
env.execute("DataAnalyzer");
} catch (Exception e) {
e.printStackTrace();
}
}
}
其中
properties.setProperty("bootstrap.servers", "192.168.100.130:9092");
根據自己的設定調整,然後通過 kakfa 的生產者使用者端輸入一些資料,這邊可以收到這個資料,驗證完畢。
完整程式碼見
本文來自部落格園,作者:Grey Zeng,轉載請註明原文連結:https://www.cnblogs.com/greyzeng/p/16808216.html