Hudi 資料湖的插入,更新,查詢,分析操作範例

2022-10-20 06:01:03

Hudi 資料湖的插入,更新,查詢,分析操作範例

作者:Grey

原文地址:

部落格園:Hudi 資料湖的插入,更新,查詢,分析操作範例

CSDN:Hudi 資料湖的插入,更新,查詢,分析操作範例

前置工作

首先,需要先完成

Linux 下搭建 Kafka 環境

Linux 下搭建 Hadoop 環境

Linux 下搭建 HBase 環境

Linux 下搭建 Hive 環境

本文基於上述四個環境已經搭建完成的基礎上進行 Hudi 資料湖的插入,更新,查詢操作。

開發環境

Scala 2.11.8

JDK 1.8

需要熟悉 Maven 構建專案和 Scala 一些基礎語法。

操作步驟

master 節點首先啟動叢集,執行:

stop-dfs.sh && start-dfs.sh

啟動 yarn,執行:

stop-yarn.sh && start-yarn.sh

然後準備一個 Mave 專案,在 src/main/resources 目錄下,將 Hadoop 的一些組態檔拷貝進來,分別是

$HADOOP_HOME/etc/hadoop/core-site.xml 檔案

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://master:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/usr/local/hadoop/tmp</value>
    </property>
</configuration>

注意,需要在你存取叢集的機器上設定 host 檔案,這樣才可以識別 master 節點。

$HADOOP_HOME/etc/hadoop/hdfs-site.xml 檔案

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>
</configuration>

$HADOOP_HOME/etc/hadoop/yarn-site.xml 檔案,目前還沒有任何設定

<?xml version="1.0"?>

<configuration>
</configuration>

然後,設計實體的資料結構,

package git.snippet.entity

case class MyEntity(uid: Int,
                    uname: String,
                    dt: String
                   )

插入資料程式碼如下

package git.snippet.test


import git.snippet.entity.MyEntity
import git.snippet.util.JsonUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object DataInsertion {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
    insertData(sparkSession)
  }

  def insertData(sparkSession: SparkSession) = {
    import org.apache.spark.sql.functions._
    import sparkSession.implicits._
    val commitTime = System.currentTimeMillis().toString //生成提交時間
    val df = sparkSession.read.text("/mydata/data1")
      .mapPartitions(partitions => {
        partitions.map(item => {
          val jsonObject = JsonUtil.getJsonData(item.getString(0))
          MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))
        })
      })
    val result = df.withColumn("ts", lit(commitTime)) //新增ts 時間戳列
      .withColumn("uuid", col("uid"))
      .withColumn("hudipart", col("dt")) //增加hudi分割區列
    result.write.format("org.apache.hudi")
      .option("hoodie.insert.shuffle.parallelism", 2)
      .option("hoodie.upsert.shuffle.parallelism", 2)
      .option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交時間列
      .option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一標示列
      .option("hoodie.table.name", "myDataTable")
      .option("hoodie.datasource.write.partitionpath.field", "hudipart") //分割區列
      .mode(SaveMode.Overwrite)
      .save("/snippet/data/hudi")
  }
}

然後,在 master 節點先準備好資料

vi data1

輸入如下資料

{'uid':1,'uname':'grey','dt':'2022/09'}
{'uid':2,'uname':'tony','dt':'2022/10'}

然後建立檔案目錄,

hdfs dfs -mkdir /mydata/

把 data1 放入目錄下

hdfs dfs -put data1 /mydata/

存取:http://192.168.100.130:50070/explorer.html

可以查到這個資料

接下來執行插入資料的 scala 程式碼,執行完畢後,驗證一下

存取:http://192.168.100.130:50070/explorer.html

可以檢視到插入的資料

準備一個 data2 檔案

cp data1 data2 && vi data2

data2 的資料更新為

{'uid':1,'uname':'grey1','dt':'2022/11'}
{'uid':2,'uname':'tony1','dt':'2022/12'}

然後執行

hdfs dfs -put data2 /mydata/

更新資料的程式碼,我們可以做如下調整,完整程式碼如下

package git.snippet.test

import git.snippet.entity.MyEntity
import git.snippet.util.JsonUtil
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object DataUpdate {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
    updateData(sparkSession)
  }

  def updateData(sparkSession: SparkSession) = {
    import org.apache.spark.sql.functions._
    import sparkSession.implicits._
    val commitTime = System.currentTimeMillis().toString //生成提交時間
    val df = sparkSession.read.text("/mydata/data2")
      .mapPartitions(partitions => {
        partitions.map(item => {
          val jsonObject = JsonUtil.getJsonData(item.getString(0))
          MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))
        })
      })
    val result = df.withColumn("ts", lit(commitTime)) //新增ts 時間戳列
      .withColumn("uuid", col("uid")) //新增uuid 列
      .withColumn("hudipart", col("dt")) //增加hudi分割區列
    result.write.format("org.apache.hudi")
      //      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
      .option("hoodie.insert.shuffle.parallelism", 2)
      .option("hoodie.upsert.shuffle.parallelism", 2)
      .option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交時間列
      .option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一標示列
      .option("hoodie.table.name", "myDataTable")
      .option("hoodie.datasource.write.partitionpath.field", "hudipart") //分割區列
      .mode(SaveMode.Append)
      .save("/snippet/data/hudi")
  }
}

執行更新資料的程式碼。

驗證一下,存取:http://192.168.100.130:50070/explorer.html

可以檢視到更新的資料情況

資料查詢的程式碼也很簡單,完整程式碼如下

package git.snippet.test

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object DataQuery {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sparkConf = new SparkConf().setAppName("MyFirstDataApp")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    ssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")
    queryData(sparkSession)
  }


  def queryData(sparkSession: SparkSession) = {
    val df = sparkSession.read.format("org.apache.hudi")
      .load("/snippet/data/hudi/*/*")
    df.show()
    println(df.count())
  }
}

執行,輸出以下資訊,驗證成功。

資料查詢也支援很多查詢條件,比如增量查詢,按時間段查詢等。

接下來是 flink 實時資料分析的服務,首先需要在 master 上啟動 kafka,並建立 一個名字為 mytopic 的 topic,詳見Linux 下搭建 Kafka 環境

相關命令如下

建立topic

kafka-topics.sh --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --create --topic  mytopic

生產者啟動設定

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic mytopic

消費者啟動設定

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mytopic

然後執行如下程式碼

package git.snippet.analyzer;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class DataAnalyzer {
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.100.130:9092");
        properties.setProperty("group.id", "snippet");
        //構建FlinkKafkaConsumer
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("mytopic", new SimpleStringSchema(), properties);
        //指定偏移量
        myConsumer.setStartFromLatest();
        final DataStream<String> stream = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        stream.print();
        try {
            env.execute("DataAnalyzer");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

其中

properties.setProperty("bootstrap.servers", "192.168.100.130:9092");

根據自己的設定調整,然後通過 kakfa 的生產者使用者端輸入一些資料,這邊可以收到這個資料,驗證完畢。

完整程式碼見

data-lake