目前網上對於flinksql1.11消費kafka流式寫入hive資料比較少,且大多是直接在flink中新建kafka表寫入hive,但對多層巢狀的json解析不知支援力度幾何 ,我這是使用streaming api 消費kafka先將json解析拉平,再轉成臨時表,最後流式寫入hive,且看程式碼:
package com.xxx.xxx
import java.sql.Timestamp
import java.util.Properties
import java.time.Duration
import java.util
import java.util.{Date, Properties}
import com.fasterxml.jackson.databind.node.JsonNodeType
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.flink.configuration.RestOptions
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.{AscendingTimestampExtractor, BoundedOutOfOrdernessTimestampExtractor}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.table.api.EnvironmentSettings
import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.catalog.hive.HiveCatalog
object NginxLog2HivePartitionTime {
def main(args: Array[String]): Unit = {
val ifCreateHiveTable = args(0)
val parallelism = 3
val kafkaBrokers = "x.x.x.x:9092"
val jobName = "xxx"
val topicNames = List("xxx")
val groupName = "xxx"
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBrokers)
properties.setProperty("group.id", groupName)
//流處理的環境構造
val conf: Configuration = new Configuration()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val rocksDBStateBackend = new RocksDBStateBackend("hdfs:///user/hdfs/flink1_11backend", true)
streamEnv.setStateBackend(rocksDBStateBackend) ///D:/filebackend /opt/ops/flinkbackend
streamEnv.enableCheckpointing(1000)
streamEnv.setParallelism(parallelism)
// table環境構造
val blinkTableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, blinkTableEnvSettings)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(60))
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
// 構造hive catalog
// Catalog名稱
val name = "myhive"
// 預設數據庫名稱
val defaultDatabase = "default"
// hive-site.xml路徑
val hiveConfDir = "/etc/hive/conf.cloudera.hive";
// D:\idea-workspace\flink-nginx-logs\src\main\resources
// Hive版本號
val version = "2.1.1";
val hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog(name, hiveCatalog)
tableEnv.useCatalog(name)
val myConsumer = new FlinkKafkaConsumer[String](topicNames, new SimpleStringSchema(), properties)
//從kafka最新offset開始消費
// myConsumer.setStartFromLatest()
val mapper = new ObjectMapper
val srcStream = streamEnv.addSource(myConsumer)
.filter(_.nonEmpty)
.map(line => {
val rootNode = mapper.readTree(line)
val keys = rootNode.fieldNames().toList
val eventTime = DateUtil.dateFormatUTC2Local(rootNode.get("@timestamp").asText())
val eventTimeStamp = DateUtil.dateTimeToTimestampJdk8(eventTime)
//timestamp Long 轉timestamp
//flinksql內部使用utc時間比事件時間晚8個小時,造成分割區會晚8個小時比如本來是18點的分割區,進入hive後會變成10點的分割區
//故在事件時間上加8個小時作爲分割區時間,這纔是我們需要的時間
val eventTimeStamp1 = new Timestamp(eventTimeStamp + 8 * 60 * 60 * 1000L)
val eventLocalTime = DateUtil.timeStampTodateTimeJdk8(eventTimeStamp, isHyphen = true)
val systemLocalTime = DateUtil.timeStampTodateTimeJdk8(System.currentTimeMillis, isHyphen = true)
val path = ""
val upstream_response_time = if (keys.contains("upstream_response_time")) rootNode.get("upstream_response_time").asText() else ""
val status = if (keys.contains("status")) rootNode.get("status").asInt() else 999
val body_bytes_sent = if (keys.contains("body_bytes_sent")) rootNode.get("body_bytes_sent").asInt() else 999
(systemLocalTime, eventLocalTime, eventTimeStamp1, path, upstream_response_time, status, body_bytes_sent)
})
.assignAscendingTimestamps(row => {
row._3.getTime
})
val nginxTmpTable = tableEnv.fromDataStream(srcStream, 'systemLocalTime, 'eventLocalTime, 'eventTimeStamp1.rowtime(), 'path, 'upstream_response_time, 'status, 'body_bytes_sent)
tableEnv.createTemporaryView("nginxTmpTable", nginxTmpTable)
// tableEnv.sqlQuery("select systemLocalTime,eventLocalTime, eventTimeStamp1 from nginxTmpTable").toAppendStream[Row].print("www")
val createSql =
"""create table xxx.ods_nginx_log_partition_time(
|process_time String,
|event_time String,
|path String,
|upstream_response_time String,
|status Int,
|body_bytes_sent BigInt
|) PARTITIONED BY (
| pday STRING,
| phour STRING,
| pminute string
|) STORED AS PARQUET
|TBLPROPERTIES (
| 'sink.partition-commit.trigger' = 'partition-time',
| 'sink.partition-commit.delay' = '5s',
| 'sink.partition-commit.policy.kind' = 'metastore,success-file',
| 'format' = 'parquet',
| 'parquet.compression'='SNAPPY',
| 'partition.time-extractor.timestamp-pattern' = '$pday $phour:$pminute:00'
|
|)""".stripMargin
if (ifCreateHiveTable.toInt == 1) {
tableEnv.executeSql("drop table if exists xxx.ods_nginx_log_partition_time")
tableEnv.executeSql(createSql)
}
tableEnv.executeSql(
"""
insert into xxx.ods_nginx_log_partition_time
|select
|systemLocalTime,
|eventLocalTime ,
|path ,
|upstream_response_time ,
|status ,
|body_bytes_sent
|DATE_FORMAT(eventTimeStamp1,'yyyy-MM-dd'),
|DATE_FORMAT(eventTimeStamp1,'HH'),
|DATE_FORMAT(eventTimeStamp1,'mm')
|from nginxTmpTable
|""".stripMargin
)
streamEnv.execute()
}
}
POM.XML
<properties>
<scala.version>2.11.12</scala.version>
<flink.version>1.11.1</flink.version>
<jredis.version>2.9.0</jredis.version>
<maven.version.min>3.5.2</maven.version.min>
<scala.minor.version>2.11.12</scala.minor.version>
<scala.complete.version>${scala.minor.version}</scala.complete.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-metastore -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.6.5-8.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.8</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.8</version>
<scope>provided</scope>
</dependency>
依賴在pom.xml都設成provided,將相關jar放入flink的lib目錄下,如下如圖.否則將依賴打入FAT包會造成各種各樣亂七八糟的jar包衝突
提交:
export HADOOP_CLASSPATH=`hadoop classpath`
/opt/flink/flink-1.11.1/bin/flink run \
-m yarn-cluster \
-ytm 1024 \
-ynm flink-xxxx \
-c com.xxx.xxx.NginxLog2HivePartitionTime \
flink-xxxx-1.0-SNAPSHOT-jar-with-dependencies.jar
結果: