在本章中,將討論如何將Apache Kafka與Spark Streaming API整合。
Spark Streaming API支援實時資料流的可延伸,高吞吐量,容錯流處理。 資料可以從Kafka,Flume,Twitter等許多來源獲取,並且可以使用複雜演算法進行處理,例如:對映,縮小,連線和視窗等高階功能。 最後,處理後的資料可以推播到檔案系統,資料庫和現場儀表板上。 彈性分散式資料集(RDD)是Spark的基礎資料結構。 它是一個不可變的分散式物件集合。 RDD中的每個資料集都被劃分為邏輯分割區,這些分割區可以在叢集的不同節點上進行計算。
Kafka是Spark串流媒體的潛在訊息傳遞和整合平台。 Kafka充當實時資料流的中心樞紐,並使用Spark Streaming中的複雜演算法進行處理。 資料處理完成後,Spark Streaming可以將結果發佈到HDFS,資料庫或儀表板中的另一個Kafka主題中。 下圖描述了概念流程。
現在,詳細介紹一下Kafka-Spark API。
SparkConf API
它代表Spark應用程式的組態。 用於將各種Spark引數設定為鍵值對。
SparkConf類具有以下方法 -
set(string key, string value)
? 設定組態變數。remove(string key)
? 從組態中刪除鍵。setAppName(string name)
? 為應用程式設定應用程式名稱。get(string key)
? 獲得鍵。StreamingContext API
這是Spark功能的主要入口點。 SparkContext表示與Spark群集的連線,並且可用於在群集上建立RDD,累加器和廣播變數。 簽名的定義如下所示。
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
mesos://host:port
,spark://host:port
,local [4]
)。public StreamingContext(SparkConf conf, Duration batchDuration)
通過提供新的SparkContext所需的組態來建立StreamingContext。
KafkaUtils API
KafkaUtils API用於將Kafka叢集連線到Spark流。 該API具有如下定義的重要方法createStream簽名。
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
上面顯示的方法用於建立從Kafka Brokers中提取訊息的輸入流。
ssc
- StreamingContext物件。zkQuorum
- Zookeeper仲裁。groupId
- 此消費者的組ID。topics
- 返回要消費的主題地圖。storageLevel
- 用於儲存接收物件的儲存級別。KafkaUtils API還有另一種方法createDirectStream,它用於建立一個輸入流,直接從Kafka Brokers中提取訊息而不使用任何接收器。 此流可以保證來自Kafka的每條訊息都只包含在一次轉換中。
範例應用程式在Scala中完成。 要編譯應用程式,請下載並安裝sbt,scala構建工具(與maven類似)。 主應用程式程式碼如下所示。
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
構建指令碼
spark-kafka整合取決於spark
,spark流和spark Kafka整合jar。 建立一個新的檔案build.sbt
並指定應用程式的詳細資訊及其依賴關係。 sbt
將在編譯和打包應用程式時下載必要的jar。
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
編譯/包
執行以下命令來編譯和打包應用程式的jar檔案。 需要將jar檔案提交到spark控制臺來執行應用程式。
sbt package
提交給Spark
啟動Kafka Producer CLI(在前一章中介紹),建立一個名稱為my-first-topic
的新主題,並提供一些範例訊息,如下所示。
Another spark test message
執行以下命令將應用程式提交到 spark 控制台。
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
這個應用程式的輸出範例如下所示。
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..