flink1.11 消費kafka 流式寫入hive

2020-08-12 14:30:54

目前網上對於flinksql1.11消費kafka流式寫入hive資料比較少,且大多是直接在flink中新建kafka表寫入hive,但對多層巢狀的json解析不知支援力度幾何 ,我這是使用streaming api 消費kafka先將json解析拉平,再轉成臨時表,最後流式寫入hive,且看程式碼:

package com.xxx.xxx

import java.sql.Timestamp
import java.util.Properties
import java.time.Duration
import java.util
import java.util.{Date, Properties}

import com.fasterxml.jackson.databind.node.JsonNodeType
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.flink.configuration.RestOptions
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.{AscendingTimestampExtractor, BoundedOutOfOrdernessTimestampExtractor}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.table.api.EnvironmentSettings

import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.catalog.hive.HiveCatalog


object NginxLog2HivePartitionTime {

  def main(args: Array[String]): Unit = {

    val ifCreateHiveTable = args(0)
    val parallelism = 3
    val kafkaBrokers = "x.x.x.x:9092"
    val jobName = "xxx"
    val topicNames = List("xxx")
    val groupName = "xxx"
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaBrokers)
    properties.setProperty("group.id", groupName)
    //流處理的環境構造
    val conf: Configuration = new Configuration()
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val rocksDBStateBackend = new RocksDBStateBackend("hdfs:///user/hdfs/flink1_11backend", true)
    streamEnv.setStateBackend(rocksDBStateBackend) ///D:/filebackend  /opt/ops/flinkbackend
    streamEnv.enableCheckpointing(1000)
    streamEnv.setParallelism(parallelism)
    // table環境構造
    val blinkTableEnvSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv = StreamTableEnvironment.create(streamEnv, blinkTableEnvSettings)
    tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
    tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(60))
    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)


    // 構造hive catalog
    // Catalog名稱
    val name = "myhive"
    // 預設數據庫名稱
    val defaultDatabase = "default"
    // hive-site.xml路徑
    val hiveConfDir = "/etc/hive/conf.cloudera.hive";
    //  D:\idea-workspace\flink-nginx-logs\src\main\resources
    // Hive版本號
    val version = "2.1.1";
    val hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
    tableEnv.registerCatalog(name, hiveCatalog)
    tableEnv.useCatalog(name)
    val myConsumer = new FlinkKafkaConsumer[String](topicNames, new SimpleStringSchema(), properties)
    //從kafka最新offset開始消費
    // myConsumer.setStartFromLatest()

    val mapper = new ObjectMapper

    val srcStream = streamEnv.addSource(myConsumer)
      .filter(_.nonEmpty)
      .map(line => {
        val rootNode = mapper.readTree(line)
        val keys = rootNode.fieldNames().toList
        val eventTime = DateUtil.dateFormatUTC2Local(rootNode.get("@timestamp").asText())
        val eventTimeStamp = DateUtil.dateTimeToTimestampJdk8(eventTime)
        //timestamp Long 轉timestamp
        //flinksql內部使用utc時間比事件時間晚8個小時,造成分割區會晚8個小時比如本來是18點的分割區,進入hive後會變成10點的分割區
        //故在事件時間上加8個小時作爲分割區時間,這纔是我們需要的時間
        val eventTimeStamp1 = new Timestamp(eventTimeStamp + 8 * 60 * 60 * 1000L)
        val eventLocalTime = DateUtil.timeStampTodateTimeJdk8(eventTimeStamp, isHyphen = true)
        val systemLocalTime = DateUtil.timeStampTodateTimeJdk8(System.currentTimeMillis, isHyphen = true)
        val path = ""
        val upstream_response_time = if (keys.contains("upstream_response_time")) rootNode.get("upstream_response_time").asText() else ""
        val status = if (keys.contains("status")) rootNode.get("status").asInt() else 999
        val body_bytes_sent = if (keys.contains("body_bytes_sent")) rootNode.get("body_bytes_sent").asInt() else 999

        (systemLocalTime, eventLocalTime, eventTimeStamp1, path, upstream_response_time, status, body_bytes_sent)
      })
      .assignAscendingTimestamps(row => {
        row._3.getTime
      })

    val nginxTmpTable = tableEnv.fromDataStream(srcStream, 'systemLocalTime, 'eventLocalTime, 'eventTimeStamp1.rowtime(), 'path, 'upstream_response_time, 'status, 'body_bytes_sent)

    tableEnv.createTemporaryView("nginxTmpTable", nginxTmpTable)
    //    tableEnv.sqlQuery("select systemLocalTime,eventLocalTime, eventTimeStamp1 from nginxTmpTable").toAppendStream[Row].print("www")


    val createSql =
      """create table  xxx.ods_nginx_log_partition_time(
        |process_time String,
        |event_time String,
        |path String,
        |upstream_response_time String,
        |status Int,
        |body_bytes_sent BigInt
        |) PARTITIONED BY (
        |  pday STRING,
        |  phour STRING,
        |  pminute string
        |) STORED AS PARQUET
        |TBLPROPERTIES (
        |  'sink.partition-commit.trigger' = 'partition-time',
        |  'sink.partition-commit.delay' = '5s',
        |  'sink.partition-commit.policy.kind' = 'metastore,success-file',
        |  'format' = 'parquet',
        |  'parquet.compression'='SNAPPY',
        |  'partition.time-extractor.timestamp-pattern' = '$pday $phour:$pminute:00'
        |
        |)""".stripMargin

    if (ifCreateHiveTable.toInt == 1) {
      tableEnv.executeSql("drop table if exists xxx.ods_nginx_log_partition_time")
      tableEnv.executeSql(createSql)
    }

    tableEnv.executeSql(
      """
        insert into xxx.ods_nginx_log_partition_time
        |select
        |systemLocalTime,
        |eventLocalTime ,
        |path ,
        |upstream_response_time ,
        |status ,
        |body_bytes_sent 
        |DATE_FORMAT(eventTimeStamp1,'yyyy-MM-dd'),
        |DATE_FORMAT(eventTimeStamp1,'HH'),
        |DATE_FORMAT(eventTimeStamp1,'mm')
        |from nginxTmpTable
        |""".stripMargin
    )

    streamEnv.execute()

  }

}

POM.XML

 <properties>
        <scala.version>2.11.12</scala.version>
        <flink.version>1.11.1</flink.version>
        <jredis.version>2.9.0</jredis.version>
        <maven.version.min>3.5.2</maven.version.min>
        <scala.minor.version>2.11.12</scala.minor.version>
        <scala.complete.version>${scala.minor.version}</scala.complete.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
           <scope>provided</scope>

        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.11</artifactId>
            <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.1</version>
           <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>janino</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-metastore -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>2.1.1</version>
           <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.1</version>
           <scope>provided</scope>
        </dependency>


                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-shaded-hadoop-2-uber</artifactId>
                    <version>2.6.5-8.0</version>

                </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.8</version>
           <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.8</version>
           <scope>provided</scope>
        </dependency>

依賴在pom.xml都設成provided,將相關jar放入flink的lib目錄下,如下如圖.否則將依賴打入FAT包會造成各種各樣亂七八糟的jar包衝突在这里插入图片描述

提交:

export HADOOP_CLASSPATH=`hadoop classpath`

/opt/flink/flink-1.11.1/bin/flink run \
-m yarn-cluster  \
-ytm 1024 \
-ynm flink-xxxx \
-c com.xxx.xxx.NginxLog2HivePartitionTime \
flink-xxxx-1.0-SNAPSHOT-jar-with-dependencies.jar

結果: