基於Spark對消費者行為資料進行資料分析開發案例

2023-11-10 18:00:54

原創/朱季謙

本文適合入門Spark RDD的計算處理。

在日常工作當中,經常遇到基於Spark去讀取儲存在HDFS中的批次檔案資料進行統計分析的案例,這些檔案一般以csv或者txt檔案格式存在。例如,存在這樣一份消費者行為資料,欄位包括消費者姓名,年齡,性別,月薪,消費偏好,消費領域,購物平臺,支付方式,單次購買商品數量,優惠券獲取情況,購物動機。

基於這份消費者行為資料,往往會有以下一些分析目標:

  • 使用者統計學分析:針對性別、年齡等屬性進行統計分析,瞭解消費者群體的組成和特徵。
  • 收入與購買行為的關係分析:通過比較月薪和單次購買商品數量之間的關係,探索收入水平對消費行為的影響。
  • 消費偏好和消費領域的分析:檢視不同消費者的消費偏好(例如價效比、功能性、時尚潮流等)和消費領域(例如家居用品、汽車配件、美妝護膚等),以瞭解他們的興趣和偏好。
  • 購物平臺和支付方式的分析:研究購物平臺(例如天貓、淘寶、拼多多等)和支付方式(例如微信支付、支付寶等)的選擇情況,瞭解消費者在電商平臺上的偏好。
  • 優惠券獲取情況和購物動機的關係:觀察優惠券獲取情況和購物動機之間的聯絡,探索消費者是否更傾向於使用優惠券進行購物。

針對這些需求,就可以使用Spark來讀取檔案後,進一步分析處理統計。

接下來,就是針對以上分析目標,設計一番Spark程式碼計算邏輯,由此可入門學習下Spark RDD常用用法。

獲取一份具備以下欄位的csv隨機假樣本,總共5246條資料,包括「消費者姓名,年齡,性別,月薪,消費偏好,消費領域,購物平臺,支付方式,單次購買商品數量,優惠券獲取情況,購物動機」。

Amy Harris,39,男,18561,價效比,家居用品,天貓,微信支付,10,折扣優惠,品牌忠誠
Lori Willis,33,女,14071,功能性,家居用品,蘇寧易購,貨到付款,1,折扣優惠,日常使用
Jim Williams,61,男,14145,時尚潮流,汽車配件,淘寶,微信支付,3,免費贈品,禮物贈送
Anthony Perez,19,女,11587,時尚潮流,珠寶首飾,拼多多,支付寶,5,免費贈品,商品推薦
......

將樣本存放到專案目錄為src/main/resources/consumerdata.csv,然後新建一個Scala的object類,建立一個main方法, 模擬從HDSF讀取資料,然後通過.map(_.split(","))將csv檔案每一行切割成一個陣列形式的RDD

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("consumer")
    val ss = SparkSession.builder().config(conf).getOrCreate()
    val filePath: String = "src/main/resources/consumerdata.csv"
    val consumerRDD = ss.sparkContext.textFile(filePath).map(_.split(","))

可以寫一段程式碼列印看一下consumerRDD結構——

    consumerRDD.foreach(x => {
      x.foreach(y => print(y +" "))
      println()
    })

列印結果如下——

這個RDD相當於把每一行當作裡一個Array[]陣列,第一行的Array0是消費者姓名,即Amy Harris,Array1是年齡,即39,以此類推。

消費者姓名 年齡 性別 月薪 消費偏好 消費領域 購物平臺 支付方式 單次購買商品數量 優惠券獲取情況 購物動機
Amy Harris 39 18561 價效比 家居用品 天貓 微信支付 10 折扣優惠 品牌忠誠
Lori Willis 33 14071 功能性 家居用品 蘇寧易購 貨到付款 1 折扣優惠 日常使用
。。。

獲取到該RDD後,就可以進行下一步的統計分析了。

一、統計消費者支付方式偏好分佈

這行程式碼意思,x.apply(7)表示取每一行的第八個欄位,相當陣列Array[7],第八個欄位是【支付方式】。

  • map(x=>(x.apply(7),1))表示是對RDD裡每一行出現過的支付方式欄位設定為1個,例如,第一行把原本陣列格式Array的RDD做了轉換,生成(微信支付,1)格式的新RDD,表示用微信支付的使用者出現了1次。
  • reduceByKey(_ + _)表示按RDD的key進行聚合統計,表示統計微信支付出現的次數,支付寶出現的次數等。最後,通過
  • sortBy(_._2,false)表示按照key-value當中的value進行倒序排序,false表示倒敘,true表示升序。

因此就可以按照以上格式,對文字資料裡的每一個欄位做相應分析,後文其他計算邏輯也是類似。

consumerRDD.map(x => (x.apply(7),1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)

列印結果如下:

二、統計購物平臺偏好分佈

x.apply(5)表示取每一行的第六個欄位,相當陣列Array[5],第六個欄位是【購物平臺】。

同前文的【統計消費者支付方式偏好分佈】一樣,通過map(x=>(x.apply(5),1))生成(購物平臺,1)格式的RDD,然後再通過reduceByKey運算元針對相同的key做統計,最後倒序排序。

consumerRDD.map(x => (x.apply(5), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)

列印結果——

三、統計購物偏好方式分佈

x.apply(4)表示取每一行的第五個欄位,相當陣列Array[4],第五個欄位是【消費領域】。

consumerRDD.map(x => (x.apply(4), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)

列印結果:

四、統計購物動機分佈

x.apply(10)表示取每一行的第十個欄位,相當陣列Array[10],第10個欄位是【購物動機】。

consumerRDD.map(x => (x.apply(10), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)

列印結果——

五、消費者年齡分佈

該需求通過將RDD對映成DataFrame資料集,方便用SQL語法處理,按照年齡區間分割區,分別為"0-20","21-30","31-40"

......這個分割區字串名,就相當key,value表示落在該分割區的使用者數量。這時,就可以分組做聚合統計了,統計出各個年齡段的消費者數量。

//取出consumerRDD每一行陣列需要的欄位
val rowRDD = consumerRDD.map{
  x => Row(x.apply(0),x.apply(1).toInt,x.apply(2),x.apply(3).toInt,x.apply(4),x.apply(5),x.apply(6),x.apply(7),x.apply(8).toInt,x.apply(9),x.apply(10))
}

//設定欄位對映
val schema = StructType(Seq(
  StructField("consumerName", StringType),
  StructField("age", IntegerType),
  StructField("gender", StringType),
  StructField("monthlyIncome", IntegerType),
  StructField("consumptionPreference", StringType),
  StructField("consumptionArea", StringType),
  StructField("shoppingPlatform", StringType),
  StructField("paymentMethod", StringType),
  StructField("quantityOfItemsPurchased", IntegerType),
  StructField("couponAcquisitionStatus", StringType),
  StructField("shoppingMotivation", StringType)

))
val df = ss.createDataFrame(rowRDD, schema).toDF()
//按年齡分佈計算
val agedf = df.withColumn("age_range",
  when(col("age").between(0, 20), "0-20")
    .when(col("age").between(21, 30), "21-30")
    .when(col("age").between(31, 40), "31-40")
    .when(col("age").between(41, 50), "41-50")
    .when(col("age").between(51, 60), "51-60")
    .when(col("age").between(61, 70), "61-70")
    .when(col("age").between(81, 90), "81-90")
    .when(col("age").between(91, 100), "91-100")
    .otherwise("Unknow")
)
//分組統計
val result = agedf.groupBy("age_range").agg(count("consumerName").alias("Count")).sort(desc("Count"))
result.show()

列印結果:

六、統計年齡分佈

類似年齡分佈的操作。

val sexResult = agedf.groupBy("gender").agg(count("consumerName").alias("Count")).sort(desc("Count"))
sexResult.show()

列印結果:

除了以上的統計分析案例之外,還有優惠券獲取情況和購物動機的關係、消費領域方式等統計,可以進一步拓展分析。

本文基於分析消費者行為資料,可以入門學習到,Spark如何讀取樣本檔案,通過map(_.split(","))處理樣本成一個陣列格式的RDD,基於該RDD,可以進一步通過map、reduceByKey、groupBy等運算元做處理與統計,最後獲取該樣本的資訊價值。