基於Spark的大規模紀錄檔分析

2023-06-15 12:05:49
摘要:本篇文章將從一個實際專案出發,分享如何使用 Spark 進行大規模紀錄檔分析,並通過程式碼演示加深讀者的理解。

本文分享自華為雲社群《【實戰經驗分享】基於Spark的大規模紀錄檔分析【上進小菜豬巨量資料系列】》,作者:上進小菜豬。

隨著網際網路的普及和應用範圍的擴大,越來越多的應用場景需要對海量資料進行高效地處理和分析,這就要求我們必須具備巨量資料技術方面的知識和技能。本篇文章將從一個實際專案出發,分享如何使用 Spark 進行大規模紀錄檔分析,並通過程式碼演示加深讀者的理解。

1.資料來源

我們的專案是針對某購物網站的存取紀錄檔進行分析,其中主要包含以下幾個欄位:

  • IP:存取的使用者端 IP 地址
  • Time:存取時間
  • Url:存取的 URL 地址
  • User-Agent:瀏覽器識別符號

原始資料規模約為 100GB,我們需要對其進行清洗、統計和分析,以得到有用的資訊和價值。

2. 資料淨化

由於原始資料存在缺失值、異常值、重複值等問題,因此我們需要進行資料淨化,主要包括以下步驟:

  1. 將原始資料進行格式轉換,方便後續處理
  2. 對 IP、Time、Url 和 User-Agent 欄位進行解析和提取
  3. 去除不合法的記錄和重複的記錄

具體程式碼實現如下:

import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat
import java.util.Locale
​
object DataCleaning {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("DataCleaning")
    val sc = new SparkContext(conf)
    val data = sc.textFile("hdfs://master:9000/log/access.log")
​
 // 定義時間格式及地區資訊
    val dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
​
 // 資料淨化
    val cleanData = data.map(line => {
      val arr = line.split(" ")
 if (arr.length >= 9) {
 // 解析 IP
        val ip = arr(0)
​
 // 解析時間,轉換為 Unix 時間戳
        val time = dateFormat.parse(arr(3) + " " + arr(4)).getTime / 1000// 解析 URL
        val url = urlDecode(arr(6))
​
 // 解析 UserAgent
        val ua = arr(8)
​
 (ip, time, url, ua)
 }
 }).filter(x => x != null).distinct()
​
 // 結果輸出
    cleanData.saveAsTextFile("hdfs://master:9000/cleanData")
​
    sc.stop()
 }
​
 // URL 解碼
  def urlDecode(url: String): String = {
    java.net.URLDecoder.decode(url, "utf-8")
 }
}

3. 資料統計

對於大規模資料的處理,我們可以使用 Spark 提供的強大的分散式計算能力,以提高處理效率和減少計算時間。

我們這裡使用 Spark SQL 統計每個 URL 的存取量,並輸出前 10 個存取量最高的 URL,程式碼如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
​
case class LogRecord(ip: String, time: Long, url: String, ua: String)
​
object DataAnalysis {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("DataAnalysis")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
​
 // 讀取清洗後的資料
    val cleanData = sc.textFile("hdfs://master:9000/cleanData").filter(x => x != null)
​
 // 將資料轉換為 DataFrame
 import sqlContext.implicits._
    val logDF = cleanData.map(_.split(",")).map(p => LogRecord(p(0), p(1).toLong, p(2), p(3))).toDF()
​
 // 統計每個 URL 的存取量,並按存取量降序排序
    val topUrls = logDF.groupBy("url").count().sort($"count".desc)
​
 // 輸出前 10 個存取量最高的 URL
    topUrls.take(10).foreach(println)
​
    sc.stop()
 }
}

4. 資料視覺化

資料視覺化是將處理和分析後的資料以圖表或影象的方式展示出來,有利於我們直觀地觀察資料的規律和趨勢。

我們這裡採用 Python 的 Matplotlib 庫將前 10 個存取量最高的 URL 視覺化,程式碼如下:

import matplotlib.pyplot as plt
​
# 讀取資料
with open('topUrls.txt', 'r') as f:
    line = f.readline()
    urls = []
    counts = []
 while line and len(urls) < 10:
        url, count = line.strip().split(',')
        urls.append(url)
        counts.append(int(count))
        line = f.readline()
# 繪製直方圖
plt.bar(range(10), counts, align='center')
plt.xticks(range(10), urls, rotation=90)
plt.xlabel('Url')
plt.ylabel('Count')
plt.title('Top 10 Url')
plt.show()

在進行資料淨化前,需要先對原始紀錄檔資料進行篩選,選取需要分析的欄位。然後進行資料淨化,去掉不必要的空格、特殊字元等,使資料更加規整,並增加可讀性。

下面是資料淨化的程式碼範例:

val originalRdd = spark.sparkContext.textFile("path/to/logfile")
​
val filteredRdd = originalRdd.filter(line => {
  val tokens = line.split("\t")
  tokens.length >= 10 &&
 tokens(0).matches("\d{4}-\d{2}-\d{2}") &&
 tokens(1).matches("\d{2}:\d{2}:\d{2}") &&
 tokens(2).matches("\d+") &&
 tokens(3).matches("\d+") &&
 tokens(4).matches("\d+") &&
 tokens(5).matches("\d+") &&
 tokens(6).matches(".+") &&
 tokens(7).matches(".+") &&
 tokens(8).matches(".+") &&
 tokens(9).matches(".+")
})
​
val cleanedRdd = filteredRdd.map(line => {
  val tokens = line.split("\t")
  val timestamp = s"${tokens(0)} ${tokens(1)}"
  val request = tokens(6).replaceAll(""", "")
  val responseCode = tokens(8).toInt
 (timestamp, request, responseCode)
})

​在上述程式碼中,我們首先讀取原始紀錄檔資料,並使用filter函數過濾掉不符合條件的行;然後使用map函數將資料轉換為元組的形式,並進行清洗。其中,元組的三個元素分別是時間戳、請求內容和響應狀態碼。

接下來,讓我們來介紹一下如何使用Spark進行資料統計。

資料統計是大規模資料分析中非常重要的一個環節。Spark提供了豐富的聚合函數,可用於對資料進行各種統計分析。

下面是對清洗後的資料進行統計分析的程式碼範例:

import org.apache.spark.sql.functions._
​
val df = spark.createDataFrame(cleanedRdd).toDF("timestamp", "request", "responseCode")
val totalCount = df.count()
val errorsCount = df.filter(col("responseCode") >= 400).count()
val successCount = totalCount - errorsCount
val topEndpoints = df.groupBy("request").count().orderBy(desc("count")).limit(10)
topEndpoints.show()

在上面的程式碼中,我們首先將清洗後的資料轉換為DataFrame,然後使用count函數計算總記錄數和錯誤記錄數,並計算成功記錄數。最後使用groupBy和orderBy函數按照請求內容,對資料進行分組統計,並列印出請求次數最多的前10個端點。

通過視覺化,我們可以清楚地看到前 10 個存取量最高的 URL 地址及其存取量,這對於進一步分析和優化網站的效能和使用者體驗具有重要的意義。

總結起來,這就是我們的一個巨量資料實戰專案,我們使用 Spark 統計了購物網站的存取量,並通過 Python 的 Matplotlib 庫將結果視覺化。這個過程中,我們運用了資料淨化、Spark SQL 統計和視覺化等技術,為大規模資料的處理和分析提供了有效的解決方案。

 

點選關注,第一時間瞭解華為雲新鮮技術~