Structured Streaming快速入門詳解(8)

2020-08-13 10:31:51

接着上一篇《Spark Streaming快速入門系列(7)》,這算是Spark的終結篇了,從Spark的入門到現在的Structured Streaming,相信很多人學完之後,應該對Spark摸索的差不多了,Spark是一個很重要的技術點,希望我的文章能給大家帶來幫助。

在这里插入图片描述

第一章 Structured Streaming曲折發展史

1.1. Spark Streaming

在这里插入图片描述

Spark Streaming針對實時數據流,提供了一套可延伸、高吞吐、可容錯的流式計算模型。Spark Streaming接收實時數據源的數據,切分成很多小的batches,然後被Spark Engine執行,產出同樣由很多小的batchs組成的結果流。本質上,這是一種micro-batch(微批次處理)的方式處理

不足在於處理延時較高(無法優化到秒以下的數量級), 無法支援基於event_time的時間視窗做聚合邏輯。

1.2. Structured Streaming

1.2.1. 介紹

●官網

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

●簡介

spark在2.0版本中發佈了新的流計算的API,Structured Streaming/結構化流。
Structured Streaming是一個基於Spark SQL引擎的可延伸、容錯的流處理引擎。統一了流、批的程式設計模型,可以使用靜態數據批次處理一樣的方式來編寫流式計算操作。並且支援基於event_time的時間視窗的處理邏輯。

隨着數據不斷地到達,Spark 引擎會以一種增量的方式來執行這些操作,並且持續更新結算結果。可以使用Scala、Java、Python或R中的DataSet/DataFrame API來表示流聚合、事件時間視窗、流到批連線等。此外,Structured Streaming會通過checkpoint和預寫日誌等機制 機製來實現Exactly-Once語意。
簡單來說,對於開發人員來說,根本不用去考慮是流式計算,還是批次處理,只要使用同樣的方式來編寫計算操作即可,Structured Streaming提供了快速、可延伸、容錯、端到端的一次性流處理,而使用者無需考慮更多細節

預設情況下,結構化流式查詢使用微批次處理引擎進行處理,該引擎將數據流作爲一系列小批次處理作業進行處理,從而實現端到端的延遲,最短可達100毫秒,並且完全可以保證一次容錯。自Spark 2.3以來,引入了一種新的低延遲處理模式,稱爲連續處理,它可以在至少一次保證的情況下實現低至1毫秒的端到端延遲。也就是類似於 Flink 那樣的實時流,而不是小批次處理。實際開發可以根據應用程式要求選擇處理模式,但是連續處理在使用的時候仍然有很多限制,目前大部分情況還是應該採用小批次模式。

1.2.2. API

1.Spark Streaming 時代 -DStream-RDD

Spark Streaming 採用的數據抽象是DStream,而本質上就是時間上連續的RDD,
對數據流的操作就是針對RDD的操作

在这里插入图片描述

2.Structured Streaming 時代 - DataSet/DataFrame -RDD

Structured Streaming是Spark2.0新增的可延伸和高容錯性的實時計算框架,它構建於Spark SQL引擎,把流式計算也統一到DataFrame/Dataset裡去了。
Structured Streaming 相比於 Spark Streaming 的進步就類似於 Dataset 相比於 RDD 的進步

在这里插入图片描述

1.2.3. 主要優勢

1.簡潔的模型。Structured Streaming 的模型很簡潔,易於理解。使用者可以直接把一個流想象成是無限增長的表格。

2.一致的 API。由於和 Spark SQL 共用大部分 API,對 Spaprk SQL 熟悉的使用者很容易上手,程式碼也十分簡潔。font color=red>同時批次處理和流處理程式還可以共用程式碼,不需要開發兩套不同的程式碼,顯著提高了開發效率。

3.卓越的效能。Structured Streaming 在與 Spark SQL 共用 API 的同時,也直接使用了 Spark SQL 的 Catalyst 優化器和 Tungsten,數據處理效能十分出色。此外,Structured Streaming 還可以直接從未來 Spark SQL 的各種效能優化中受益。

4.多語言支援。Structured Streaming 直接支援目前 Spark SQL 支援的語言,包括 Scala,Java,Python,R 和 SQL。使用者可以選擇自己喜歡的語言進行開發。

1.2.4. 程式設計模型

●程式設計模型概述

一個流的數據源從邏輯上來說就是一個不斷增長的動態表格,隨着時間的推移,新數據被持續不斷地新增到表格的末尾。

對動態數據源進行實時查詢,就是對當前的表格內容執行一次 SQL 查詢。
數據查詢,使用者通過觸發器(Trigger)設定時間(毫秒級)。也可以設定執行週期。

一個流的輸出有多種模式,既可以是基於整個輸入執行查詢後的完整結果,也可以選擇只輸出與上次查詢相比的差異,或者就是簡單地追加最新的結果。

這個模型對於熟悉 SQL 的使用者來說很容易掌握,對流的查詢跟查詢一個表格幾乎完全一樣,十分簡潔,易於理解

●核心思想

在这里插入图片描述

Structured Streaming最核心的思想就是將實時到達的數據不斷追加到unbound table無界表,到達流的每個數據項(RDD)就像是表中的一個新行被附加到無邊界的表中.這樣使用者就可以用靜態結構化數據的批次處理查詢方式進行流計算,如可以使用SQL對到來的每一行數據進行實時查詢處理;(SparkSQL+SparkStreaming=StructuredStreaming)

●應用場景

Structured Streaming將數據源對映爲類似於關係數據庫中的表,然後將經過計算得到的結果對映爲另一張表,完全以結構化的方式去操作流式數據,這種程式設計模型非常有利於處理分析結構化的實時數據;

== ●WordCount圖解==

在这里插入图片描述

如圖所示,

第一行表示從socket不斷接收數據,
第二行可以看成是之前提到的「unbound table",
第三行爲最終的wordCounts是結果集。
當有新的數據到達時,Spark會執行「增量"查詢,並更新結果集;
該範例設定爲Complete Mode(輸出所有數據),因此每次都將所有數據輸出到控制檯;

1.在第1秒時,此時到達的數據爲"cat dog"和"dog dog",因此我們可以得到第1秒時的結果集cat=1 dog=3,並輸出到控制檯;

2.當第2秒時,到達的數據爲"owl cat",此時"unbound table"增加了一行數據"owl cat",執行word count查詢並更新結果集,可得第2秒時的結果集爲cat=2 dog=3 owl=1,並輸出到控制檯;

3.當第3秒時,到達的數據爲"dog"和"owl",此時"unbound table"增加兩行數據"dog"和"owl",執行word count查詢並更新結果集,可得第3秒時的結果集爲cat=2 dog=4 owl=2;

這種模型跟其他很多流式計算引擎都不同。大多數流式計算引擎都需要開發人員自己來維護新數據與歷史數據的整合併進行聚合操作。
然後我們就需要自己去考慮和實現容錯機制 機製、數據一致性的語意等。
然而在structured streaming的這種模式下,spark會負責將新到達的數據與歷史數據進行整合,並完成正確的計算操作,同時更新result table,不需要我們去考慮這些事情。

第二章 Structured Streaming實戰

2.1. 建立Source

spark 2.0中初步提供了一些內建的source支援。

Socket source (for testing): 從socket連線中讀取文字內容。
File source: 以數據流的方式讀取一個目錄中的檔案。支援text、csv、json、parquet等檔案型別。
Kafka source: 從Kafka中拉取數據,與0.10或以上的版本相容,後面單獨整合Kafka

2.1.1. 讀取Socket數據

●準備工作

nc -lk 9999
hadoop spark sqoop hadoop spark hive hadoop

●程式碼演示

package cn.itcast.structedstreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object WordCount {
  def main(args: Array[String]): Unit = {
    //1.建立SparkSession,因爲StructuredStreaming的數據模型也是DataFrame/DataSet
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.接收數據
    val dataDF: DataFrame = spark.readStream
      .option("host", "node01")
      .option("port", 9999)
      .format("socket")
      .load()
    //3.處理數據
    import spark.implicits._
    val dataDS: Dataset[String] = dataDF.as[String]
    val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
    val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
    //result.show()
    //Queries with streaming sources must be executed with writeStream.start();
    result.writeStream
      .format("console")//往控制檯寫
      .outputMode("complete")//每次將所有的數據寫出
      .trigger(Trigger.ProcessingTime(0))//觸發時間間隔,0表示儘可能的快
      //.option("checkpointLocation","./ckp")//設定checkpoint目錄,socket不支援數據恢復,所以第二次啓動會報錯,需要注掉
      .start()//開啓
      .awaitTermination()//等待停止
  }
}
2.1.2. 讀取目錄下文字數據

spark應用可以監聽某一個目錄,而web服務在這個目錄上實時產生日誌檔案,這樣對於spark應用來說,日誌檔案就是實時數據
Structured Streaming支援的檔案型別有text,csv,json,parquet

●準備工作

在people.json檔案輸入如下數據:
{"name":"json","age":23,"hobby":"running"}
{"name":"charles","age":32,"hobby":"basketball"}
{"name":"tom","age":28,"hobby":"football"}
{"name":"lili","age":24,"hobby":"running"}
{"name":"bob","age":20,"hobby":"swimming"}
注意:檔案必須是被移動到目錄中的,且檔名不能有特殊字元

●需求

使用Structured Streaming統計年齡小於25歲的人羣的愛好排行榜

●程式碼演示

package cn.itcast.structedstreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
  * {"name":"json","age":23,"hobby":"running"}
  * {"name":"charles","age":32,"hobby":"basketball"}
  * {"name":"tom","age":28,"hobby":"football"}
  * {"name":"lili","age":24,"hobby":"running"}
  * {"name":"bob","age":20,"hobby":"swimming"}
  * 統計年齡小於25歲的人羣的愛好排行榜
  */
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //1.建立SparkSession,因爲StructuredStreaming的數據模型也是DataFrame/DataSet
    val spark: SparkSession = 
SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    val Schema: StructType = new StructType()
      .add("name","string")
      .add("age","integer")
      .add("hobby","string")
    //2.接收數據
    import spark.implicits._
    // Schema must be specified when creating a streaming source DataFrame.
    val dataDF: DataFrame = 
spark.readStream.schema(Schema).json("D:\\data\\spark\\data")
    //3.處理數據
    val result: Dataset[Row] =
 dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)
    //4.輸出結果
    result.writeStream
      .format("console")
      .outputMode("complete")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()
  }
}

2.2. 計算操作

獲得到Source之後的基本數據處理方式和之前學習的DataFrame、DataSet一致,不再贅述

2.3. 輸出

計算結果可以選擇輸出到多種裝置並進行如下設定
1.output mode:以哪種方式將result table的數據寫入sink
2.format/output sink的一些細節:數據格式、位置等。
3.query name:指定查詢的標識。類似tempview的名字
4.trigger interval:觸發間隔,如果不指定,預設會盡可能快速地處理數據
5.checkpoint地址:一般是hdfs上的目錄。注意:Socket不支援數據恢復,如果設定了,第二次啓動會報錯 ,Kafka支援
2.3.1. output mode

在这里插入图片描述

每當結果表更新時,我們都希望將更改後的結果行寫入外部接收器。

這裏有三種輸出模型:
1.Append mode:輸出新增的行,預設模式。每次更新結果集時,只將新新增到結果集的結果行輸出到接收器。僅支援新增到結果表中的行永遠不會更改的查詢。因此,此模式保證每行僅輸出一次。例如,僅查詢select,where,map,flatMap,filter,join等會支援追加模式。不支援聚合

2.Complete mode: 所有內容都輸出,每次觸發後,整個結果表將輸出到接收器。聚合查詢支援此功能。僅適用於包含聚合操作的查詢。

3.Update mode: 輸出更新的行,每次更新結果集時,僅將被更新的結果行輸出到接收器(自Spark 2.1.1起可用),不支援排序

2.3.2. output sink

在这里插入图片描述

●使用說明

File sink 輸出到路徑
支援parquet檔案,以及append模式

writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()

Kafka sink 輸出到kafka內的一到多個topic

writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()

Foreach sink 對輸出中的記錄執行任意計算。

writeStream
    .foreach(...)
    .start()

Console sink (for debugging) 當有觸發器時,將輸出列印到控制檯。

writeStream
    .format("console")
    .start()

Memory sink (for debugging) - 輸出作爲記憶體表儲存在記憶體中.

writeStream
    .format("memory")
    .queryName("tableName")
    .start()

●官網範例程式碼

// ========== DF with no aggregations ==========

val noAggDF = deviceDataDf.select("device").where("signal > 10")   

// Print new data to console

noAggDF.writeStream.format("console").start()

// Write new data to Parquet files

noAggDF.writeStream.format("parquet").option("checkpointLocation", 
"path/to/checkpoint/dir").option("path", "path/to/destination/dir").start()

// ========== DF with aggregation ==========

val aggDF = df.groupBy("device").count()

// Print updated aggregations to console

aggDF.writeStream.outputMode("complete").format("console").start()

// Have all the aggregates in an in-memory table

aggDF.writeStream.queryName("aggregates").outputMode("complete").format("memory").start()

spark.sql("select * from aggregates").show()   

// interactively query in-memory table

第三章 StructuredStreaming與其他技術整合

3.1. 整合Kafka

3.1.1. 官網介紹

http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

在这里插入图片描述
●Creating a Kafka Source for Streaming Queries

// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to multiple topics(多個topic)
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to a pattern(訂閱萬用字元topic)
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

●Creating a Kafka Source for Batch Queries(kafka批次處理查詢)

// Subscribe to 1 topic 
//defaults to the earliest and latest offsets(預設爲最早和最新偏移)
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to multiple topics, (多個topic)
//specifying explicit Kafka offsets(指定明確的偏移量)
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to a pattern, (訂閱萬用字元topic)at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

●注意:讀取後的數據的Schema是固定的,包含的列如下

在这里插入图片描述

●注意:下面 下麪的參數是不能被設定的,否則kafka會拋出異常:

group.id:kafka的source會在每次query的時候自定建立唯一的group id
auto.offset.reset :爲了避免每次手動設定startingoffsets的值,structured streaming在內部消費時會自動管理offset。這樣就能保證訂閱動態的topic時不會丟失數據。startingOffsets在流處理時,只會作用於第一次啓動時,之後的處理都會自動的讀取儲存的offset。
key.deserializer,value.deserializer,key.serializer,value.serializer 序列化與反序列化,都是ByteArraySerializer
enable.auto.commit:Kafka源不支援提交任何偏移量

在这里插入图片描述

3.1.2. 整合環境準備

●啓動kafka

/export/servers/kafka/bin/kafka-server-start.sh -daemon 
/export/servers/kafka/config/server.properties 

●向topic中生產數據

/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic  spark_kafka

程式碼實現

package cn.itcast.structedstreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object KafkaStructuredStreamingDemo {
  def main(args: Array[String]): Unit = {
    //1.建立SparkSession
    val spark: SparkSession = 
SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    //2.連線Kafka消費數據
    val dataDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node01:9092")
      .option("subscribe", "spark_kafka")
      .load()
    //3.處理數據
    //注意:StructuredStreaming整合Kafka獲取到的數據都是位元組型別,所以需要按照官網要求,
//轉成自己的實際型別
    val dataDS: Dataset[String] = dataDF.selectExpr("CAST(value AS STRING)").as[String]
    val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
    val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
    result.writeStream
      .format("console")
      .outputMode("complete")
      .trigger(Trigger.ProcessingTime(0))
      .option("truncate",false)//超過長度的列不截斷顯示,即完全顯示
      .start()
      .awaitTermination()
  }
}

3.2. 整合MySQL

3.2.1. 簡介

●需求

我們開發中經常需要將流的運算結果輸出到外部數據庫,例如MySQL中,但是比較遺憾Structured Streaming API不支援外部數據庫作爲接收器

如果將來加入支援的話,它的API將會非常的簡單比如:
format(「jdbc」).option(「url」,「jdbc:mysql://…」).start()
但是目前我們只能自己自定義一個JdbcSink,繼承ForeachWriter並實現其方法

●參考網站

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

在这里插入图片描述

3.2.2. 程式碼演示
package cn.itcast.structedstreaming

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger


object JDBCSinkDemo {
  def main(args: Array[String]): Unit = {
    //1.建立SparkSession
    val spark: SparkSession = 
SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    //2.連線Kafka消費數據
    val dataDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node01:9092")
      .option("subscribe", "spark_kafka")
      .load()
    //3.處理數據
    //注意:StructuredStreaming整合Kafka獲取到的數據都是位元組型別,所以需要按照官網要求,轉成自己的實際型別
    val dataDS: Dataset[String] = dataDF.selectExpr("CAST(value AS STRING)").as[String]
    val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
    val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
    val writer = new JDBCSink("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
    result.writeStream
      .foreach(writer)
      .outputMode("complete")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()
  }

  class JDBCSink(url:String,username:String,password:String) extends ForeachWriter[Row] with Serializable{
    var connection:Connection = _ //_表示佔位符,後面會給變數賦值
    var preparedStatement: PreparedStatement = _
    //開啓連線
    override def open(partitionId: Long, version: Long): Boolean = {
      connection = DriverManager.getConnection(url, username, password)
      true
    }

    /*
    CREATE TABLE `t_word` (
        `id` int(11) NOT NULL AUTO_INCREMENT,
        `word` varchar(255) NOT NULL,
        `count` int(11) DEFAULT NULL,
        PRIMARY KEY (`id`),
        UNIQUE KEY `word` (`word`)
      ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
     */
    //replace INTO `bigdata`.`t_word` (`id`, `word`, `count`) VALUES (NULL, NULL, NULL);
    //處理數據--存到MySQL
    override def process(row: Row): Unit = {
      val word: String = row.get(0).toString
      val count: String = row.get(1).toString
      println(word+":"+count)
      //REPLACE INTO:表示如果表中沒有數據這插入,如果有數據則替換
      //注意:REPLACE INTO要求表有主鍵或唯一索引
      val sql = "REPLACE INTO `t_word` (`id`, `word`, `count`) VALUES (NULL, ?, ?);"
      preparedStatement = connection.prepareStatement(sql)
      preparedStatement.setString(1,word)
      preparedStatement.setInt(2,Integer.parseInt(count))
      preparedStatement.executeUpdate()
    }

    //關閉資源
    override def close(errorOrNull: Throwable): Unit = {
      if (connection != null){
        connection.close()
      }
      if(preparedStatement != null){
        preparedStatement.close()
      }
    }
  }
}

Spark到這也就結束了,以後博主會給你們更新在工作中遇到的各種BUG,以及分享給你們一些在工作中的經驗。

在这里插入图片描述