原創/朱季謙
本文適合入門Spark RDD的計算處理。
在日常工作當中,經常遇到基於Spark去讀取儲存在HDFS中的批次檔案資料進行統計分析的案例,這些檔案一般以csv或者txt檔案格式存在。例如,存在這樣一份消費者行為資料,欄位包括消費者姓名,年齡,性別,月薪,消費偏好,消費領域,購物平臺,支付方式,單次購買商品數量,優惠券獲取情況,購物動機。
基於這份消費者行為資料,往往會有以下一些分析目標:
針對這些需求,就可以使用Spark來讀取檔案後,進一步分析處理統計。
接下來,就是針對以上分析目標,設計一番Spark程式碼計算邏輯,由此可入門學習下Spark RDD常用用法。
獲取一份具備以下欄位的csv隨機假樣本,總共5246條資料,包括「消費者姓名,年齡,性別,月薪,消費偏好,消費領域,購物平臺,支付方式,單次購買商品數量,優惠券獲取情況,購物動機」。
Amy Harris,39,男,18561,價效比,家居用品,天貓,微信支付,10,折扣優惠,品牌忠誠
Lori Willis,33,女,14071,功能性,家居用品,蘇寧易購,貨到付款,1,折扣優惠,日常使用
Jim Williams,61,男,14145,時尚潮流,汽車配件,淘寶,微信支付,3,免費贈品,禮物贈送
Anthony Perez,19,女,11587,時尚潮流,珠寶首飾,拼多多,支付寶,5,免費贈品,商品推薦
......
將樣本存放到專案目錄為src/main/resources/consumerdata.csv,然後新建一個Scala的object類,建立一個main方法, 模擬從HDSF讀取資料,然後通過.map(_.split(","))將csv檔案每一行切割成一個陣列形式的RDD
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("consumer")
val ss = SparkSession.builder().config(conf).getOrCreate()
val filePath: String = "src/main/resources/consumerdata.csv"
val consumerRDD = ss.sparkContext.textFile(filePath).map(_.split(","))
可以寫一段程式碼列印看一下consumerRDD結構——
consumerRDD.foreach(x => {
x.foreach(y => print(y +" "))
println()
})
列印結果如下——
這個RDD相當於把每一行當作裡一個Array[]陣列,第一行的Array0是消費者姓名,即Amy Harris,Array1是年齡,即39,以此類推。
消費者姓名 | 年齡 | 性別 | 月薪 | 消費偏好 | 消費領域 | 購物平臺 | 支付方式 | 單次購買商品數量 | 優惠券獲取情況 | 購物動機 |
---|---|---|---|---|---|---|---|---|---|---|
Amy Harris | 39 | 男 | 18561 | 價效比 | 家居用品 | 天貓 | 微信支付 | 10 | 折扣優惠 | 品牌忠誠 |
Lori Willis | 33 | 女 | 14071 | 功能性 | 家居用品 | 蘇寧易購 | 貨到付款 | 1 | 折扣優惠 | 日常使用 |
。。。 |
獲取到該RDD後,就可以進行下一步的統計分析了。
這行程式碼意思,x.apply(7)表示取每一行的第八個欄位,相當陣列Array[7],第八個欄位是【支付方式】。
因此就可以按照以上格式,對文字資料裡的每一個欄位做相應分析,後文其他計算邏輯也是類似。
consumerRDD.map(x => (x.apply(7),1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
列印結果如下:
x.apply(5)表示取每一行的第六個欄位,相當陣列Array[5],第六個欄位是【購物平臺】。
同前文的【統計消費者支付方式偏好分佈】一樣,通過map(x=>(x.apply(5),1))生成(購物平臺,1)格式的RDD,然後再通過reduceByKey運算元針對相同的key做統計,最後倒序排序。
consumerRDD.map(x => (x.apply(5), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
列印結果——
x.apply(4)表示取每一行的第五個欄位,相當陣列Array[4],第五個欄位是【消費領域】。
consumerRDD.map(x => (x.apply(4), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
列印結果:
x.apply(10)表示取每一行的第十個欄位,相當陣列Array[10],第10個欄位是【購物動機】。
consumerRDD.map(x => (x.apply(10), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
列印結果——
該需求通過將RDD對映成DataFrame資料集,方便用SQL語法處理,按照年齡區間分割區,分別為"0-20","21-30","31-40"
......這個分割區字串名,就相當key,value表示落在該分割區的使用者數量。這時,就可以分組做聚合統計了,統計出各個年齡段的消費者數量。
//取出consumerRDD每一行陣列需要的欄位
val rowRDD = consumerRDD.map{
x => Row(x.apply(0),x.apply(1).toInt,x.apply(2),x.apply(3).toInt,x.apply(4),x.apply(5),x.apply(6),x.apply(7),x.apply(8).toInt,x.apply(9),x.apply(10))
}
//設定欄位對映
val schema = StructType(Seq(
StructField("consumerName", StringType),
StructField("age", IntegerType),
StructField("gender", StringType),
StructField("monthlyIncome", IntegerType),
StructField("consumptionPreference", StringType),
StructField("consumptionArea", StringType),
StructField("shoppingPlatform", StringType),
StructField("paymentMethod", StringType),
StructField("quantityOfItemsPurchased", IntegerType),
StructField("couponAcquisitionStatus", StringType),
StructField("shoppingMotivation", StringType)
))
val df = ss.createDataFrame(rowRDD, schema).toDF()
//按年齡分佈計算
val agedf = df.withColumn("age_range",
when(col("age").between(0, 20), "0-20")
.when(col("age").between(21, 30), "21-30")
.when(col("age").between(31, 40), "31-40")
.when(col("age").between(41, 50), "41-50")
.when(col("age").between(51, 60), "51-60")
.when(col("age").between(61, 70), "61-70")
.when(col("age").between(81, 90), "81-90")
.when(col("age").between(91, 100), "91-100")
.otherwise("Unknow")
)
//分組統計
val result = agedf.groupBy("age_range").agg(count("consumerName").alias("Count")).sort(desc("Count"))
result.show()
列印結果:
類似年齡分佈的操作。
val sexResult = agedf.groupBy("gender").agg(count("consumerName").alias("Count")).sort(desc("Count"))
sexResult.show()
列印結果:
除了以上的統計分析案例之外,還有優惠券獲取情況和購物動機的關係、消費領域方式等統計,可以進一步拓展分析。
本文基於分析消費者行為資料,可以入門學習到,Spark如何讀取樣本檔案,通過map(_.split(","))處理樣本成一個陣列格式的RDD,基於該RDD,可以進一步通過map、reduceByKey、groupBy等運算元做處理與統計,最後獲取該樣本的資訊價值。