Spark 3.x Spark Core詳解 & 效能優化

2022-06-01 12:02:06

Spark Core

1. 概述

Spark 是一種基於記憶體的快速、通用、可延伸的巨量資料分析計算引擎

1.1 Hadoop vs Spark

上面流程對應Hadoop的處理流程,下面對應著Spark的處理流程

Hadoop

  • Hadoop 是由 java 語言編寫的,在分散式伺服器叢集上儲存海量資料並執行分散式 分析應用的開源框架
  • 作為 Hadoop 分散式檔案系統,HDFS 處於 Hadoop 生態圈的最下層,儲存著所有的 數 據 , 支援著 Hadoop的所有服務 。 它的理論基礎源於Google 的 The GoogleFile System 這篇論文,它是 GFS 的開源實現。
  • MapReduce 是一種程式設計模型,Hadoop 根據 Google 的 MapReduce 論文將其實現, 作為 Hadoop 的分散式計算模型,是 Hadoop 的核心。基於這個框架,分散式並行 程式的編寫變得異常簡單。綜合了 HDFS 的分散式儲存和 MapReduce 的分散式計 算,Hadoop 在處理海量資料時,效能橫向擴充套件變得非常容易。
  • HBase 是對 Google 的 Bigtable 的開源實現,但又和 Bigtable 存在許多不同之處。 HBase 是一個基於 HDFS 的分散式資料庫,擅長實時地隨機讀/寫超大規模資料集。 它也是 Hadoop 非常重要的元件。

Spark

  • Spark 是一種由 Scala 語言開發的快速、通用、可延伸的巨量資料分析引擎
  • Spark Core 中提供了 Spark 最基礎與最核心的功能
  • Spark SQL 是 Spark 用來操作結構化資料的元件。通過 Spark SQL,使用者可以使用 SQL 或者 Apache Hive 版本的 SQL 方言(HQL)來查詢資料。
  • Spark Streaming 是 Spark 平臺上針對實時資料進行流式計算的元件,提供了豐富的處理資料流的 API。

由上面的資訊可以獲知,Spark 出現的時間相對較晚,並且主要功能主要是用於資料計算, 所以其實 Spark 一直被認為是 Hadoop 框架的升級版。

Spark or Hadoop

  • Hadoop MapReduce 由於其設計初衷並不是為了滿足迴圈迭代式資料流處理,因此在多並行執行的資料可複用場景(如:機器學習、圖挖掘演演算法、互動式資料探勘演演算法)中存在諸多計算效率等問題。
  • 所以 Spark 應運而生,Spark 就是在傳統的 MapReduce 計算框架的基礎上,利用其計算過程的優化,從而大大加快了資料分析、挖掘的執行和讀寫速 度,並將計算單元縮小到更適合平行計算和重複使用的 RDD 計算模型。
  • Spark 是一個分散式資料快速分析專案。它的核心技術是彈性分散式資料集(Resilient Distributed Datasets),提供了比 MapReduce 豐富的模型,可以快速在記憶體中對資料集進行多次迭代,來支援複雜的資料探勘演演算法和圖形計算演演算法。
  • Spark 和Hadoop 的根本差異是多個作業之間的資料通訊問題 : Spark 多個作業之間資料 通訊是基於記憶體,而 Hadoop 是基於磁碟。

1.2 Spark 核心模組

2.1 Local模式

所謂的 Local 模式,就是不需要其他任何節點資源就可以在本地執行 Spark 程式碼的環境,一般用於教學,偵錯,演示等

2.1.1 安裝部署

官網下載安裝包,將 spark-XX-bin-hadoopXX.tgz 檔案上傳到 Linux 並解壓縮,放置在指定位置,路徑中不要包含中文或空格。

tar -zxvf spark-XXX-bin-hadoop.XX.tgz -C /opt/module
cd /opt/module
mv spark-3.0.0-bin-hadoop3.2 spark-local

2.1.2 啟動Local環境

  1. 進入解壓縮後的路徑,執行如下指令

    bin/spark-shell
    

    可以在命令列中,執行scala命令,也可以呼叫spark

    測試

    在解壓縮資料夾下的 data 目錄中,新增 word.txt 檔案。

    Hello Scala
    Hello Spark
    

    在命令列工具中執行如下程式碼指令

    sc.textFile("data/word.txt").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collect
    

  2. 啟動成功後,可以輸入網址進行 Web UI 監控頁面存取

    http://虛擬機器器or本機ip地址:4040
    
  3. 退出

    按鍵 Ctrl+C 或輸入 Scala 指令

    :quit
    

2.1.3 提交應用

/opt/module/spark-local/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_XXXXXjar \
10
  1. --class
    • 表示要執行程式的主類,此處可以更換為自己寫的應用程式
  2. --master local[2]
    • 部署模式,預設為本地模式,數位表示分配的虛擬 CPU 核數量
  3. spark-examples_XXX.jar
    • 執行的應用類所在的 jar 包(根據實際版本輸入),實際使用時,可以設定自己的 jar 包
  4. 數位 10 表示程式的入口引數,用於設定當前應用的任務數量

2.2 Standlone模式

local 本地模式畢竟只是用來進行練習演示的,真實工作中還是要將應用提交到對應的 叢集中去執行,這裡我們來看看只使用 Spark 自身節點執行的叢集模式,也就是我們所謂的 獨立部署(Standalone)模式。Spark 的 Standalone 模式體現了經典的 master-slave 模式。

叢集規劃:

2.2.4 提交引數說明

bin/spark-submit \
--class <main-class>
--master <master-url> \
... # other options
<application-jar> \
[application-arguments]

2.2.5 設定歷史服務

由於 spark-shell 停止掉後,叢集監控 node1:4040 頁面就看不到歷史任務的執行情況,所以 開發時都設定歷史伺服器記錄任務執行情況

  1. 修改 spark-defaults.conf.template 檔名為 spark-defaults.conf

    修改 spark-defaults.conf 檔案,設定紀錄檔儲存路徑

    spark.eventLog.enabled true
    spark.eventLog.dir hdfs://node1:8020/logs
    

    注意:路徑自己設定,需要啟動 hadoop 叢集,HDFS 上的 directory 目錄需要提前存在。

  2. 修改 spark-env.sh 檔案, 新增紀錄檔設定

    前後路徑保持一致

    export SPARK_HISTORY_OPTS="
    -Dspark.history.ui.port=18080
    -Dspark.history.fs.logDirectory=hdfs://node1:8020/logs
    -Dspark.history.retainedApplications=30"
    

    引數 1 含義:WEB UI 存取的埠號為 18080

    引數 2 含義:指定歷史伺服器紀錄檔儲存路徑

    引數 3 含義:指定儲存 Application 歷史記錄的個數,如果超過這個值,舊的應用程式 資訊將被刪除,這個是記憶體中的應用數,而不是頁面上顯示的應用數。

注意:每一個節點上設定保持一致

重新啟動叢集和歷史服務

/opt/module/spark-standlone/sbin/start-all.sh
/opt/module/spark-standlone/sbin/start-history-server.sh

重新執行任務

/opt/module/spark-standlone/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node1:7077 \
./examples/jars/spark-examples_XXX.jar \
10

檢視歷史服務http://node1:18080

2.3 YARN模式

獨立部署(Standalone)模式由 Spark 自身提供計算資源,無需其他框架提供資源。這種方式降低了和其他第三方資源框架的耦合性,獨立性非常強。但是,Spark 主要是計算框架,而不是資源排程框架,所以本身提供的資源排程並不是它的強項,所以還是和其他專業的資源排程框架整合會更靠譜一些。

2.3.1 安裝部署

注意: 每個節點上設定相同,可設定一臺節點,然後上傳到其他節點便可

解壓縮檔案

將 spark-XXX.tgz 檔案上傳到 Linux 並解壓縮在指定位置

tar -zxvf spark-XXX.tgz -C /opt/module 
cd /opt/module
mv spark-XXX spark-yarn

修改組態檔

修改 conf/spark-env.sh,新增 JAVA_HOME 和 YARN_CONF_DIR\HADOOP_CONF_DIR 設定

export JAVA_HOME=/XXX/jdk1XX
YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop

2.3.2 啟動叢集

啟動HDFS和YARN叢集

2.3.3 提交應用

/opt/module/spark-yarn/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_XXX.jar \
10

資源之間的依賴關係,不能成環,會形成死鎖

巨量資料計算引擎框架我們根據使用方式的不同一般會分為四類:

  • 其中第一類就是 Hadoop 所承載的 MapReduce,它將計算分為兩個階段,分別為 Map 階段 和 Reduce 階段
  • 對於上層應用來說,就不得不想方設法去拆分演演算法,甚至於不得不在上層應用實現多個 Job 的串聯,以完成一個完整的演演算法,例如迭代計算。 由於這樣的弊端,催生了支援 DAG 框 架的產生。
  • 因此,支援 DAG 的框架被劃分為第二代計算引擎。如 Tez 以及更上層的 Oozie。
  • 接下來就是以 Spark 為代表的第三代的計算引擎。第三代計算引擎的特點主要是 Job 內部的 DAG 支援(不跨越 Job),以及實時計算。

這裡所謂的有向無環圖,並不是真正意義的圖形,而是由 Spark 程式直接對映成的資料流的高階抽象模型。簡單理解就是將整個程式計算的執行過程用圖形表示出來,這樣更直觀,更便於理解,可以用於表示程式的拓撲結構。

DAG(Directed Acyclic Graph)有向無環圖是由點和線組成的拓撲圖形,該圖形具有方向,不會閉環。

3.4 提交流程

基於Yarn環境

3.4.1 Yarn Client 模式

Client 模式將用於監控和排程的 Driver 模組在使用者端執行,而不是在 Yarn 中,所以一般用於測試

  • Driver 在任務提交的本地機器上執行
  • Driver 啟動後會和 ResourceManager 通訊申請啟動 ApplicationMaster
  • ResourceManager 分配 container,在合適的 NodeManager 上啟動 ApplicationMaster,負責向 ResourceManager 申請 Executor 記憶體
  • ResourceManager 接到 ApplicationMaster 的資源申請後會分配 container,然後 ApplicationMaster 在資源分配指定的 NodeManager 上啟動 Executor 程序
  • Executor 程序啟動後會向 Driver 反向註冊,Executor 全部註冊完成後 Driver 開始執行 main 函數
  • 之後執行到 Action 運算元時,觸發一個 Job,並根據寬依賴開始劃分 stage,每個 stage 生成對應的 TaskSet,之後將 task 分發到各個 Executor 上執行。

3.4.2 Yarn Cluster 模式

Cluster 模式將用於監控和排程的 Driver 模組啟動在 Yarn 叢集資源中執行。一般應用於實際生產環境

  • 在 YARN Cluster 模式下,任務提交後會和 ResourceManager 通訊申請啟動 ApplicationMaster
  • 隨後 ResourceManager 分配 container,在合適的 NodeManager 上啟動 ApplicationMaster,此時的 ApplicationMaster 就是 Driver
  • Driver 啟動後向 ResourceManager 申請 Executor 記憶體,ResourceManager 接到 ApplicationMaster 的資源申請後會分配 container,然後在合適的 NodeManager 上啟動 Executor 程序
  • Executor 程序啟動後會向 Driver 反向註冊,Executor 全部註冊完成後 Driver 開始執行 main 函數
  • 之後執行到 Action 運算元時,觸發一個 Job,並根據寬依賴開始劃分 stage,每個 stage 生成對應的 TaskSet,之後將 task 分發到各個 Executor 上執行。

4. Spark 核心程式設計

Spark 計算框架為了能夠進行高並行和高吞吐的資料處理,封裝了三巨量資料結構,用於處理不同的應用場景。

三巨量資料結構分別是:

  • RDD : 彈性分散式資料集
  • 累加器:分散式共用只寫變數
  • 廣播變數:分散式共用唯讀變數

4.1 RDD

4.1.1 什麼是 RDD

RDD(Resilient Distributed Dataset)叫做彈性分散式資料集,是 Spark 中最基本的資料處理模型。程式碼中是一個抽象類,它代表一個彈性的、不可變、可分割區、裡面的元素可平行計算的集合。

  • 彈性

    • 儲存的彈性:記憶體與磁碟的自動切換

    • 容錯的彈性:資料丟失可以自動恢復

    • 計算的彈性:計算出錯重試機制

    • 分片的彈性:可根據需要重新分片

  • 分散式:資料儲存在巨量資料叢集不同節點上

  • 資料集:RDD 封裝了計算邏輯,並不儲存資料

  • 資料抽象:RDD 是一個抽象類,需要子類具體實現

  • 不可變:RDD 封裝了計算邏輯,是不可以改變的,想要改變,只能產生新的 RDD,在新的 RDD 裡面封裝計算邏輯

  • 可分割區、平行計算

RDD vs IO

RDD的資料處理方式類似於IO流,也有裝飾者設計模式

RDD的資料只有在呼叫collect方法時,才會真正執行業務邏輯操作。之前的封裝全部都是功能上的擴充套件

RDD是不儲存資料的,但是IO可以臨時儲存一部分資料

4.1.2 核心屬性

Internally, each RDD is characterized by five main properties:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

分割區列表

  • RDD 資料結構中存在分割區列表,用於執行任務時平行計算,是實現分散式計算的重要屬性

    protected def getPartitions: Array[Partition]
    

分割區計算函數

  • Spark 在計算時,是使用分割區函數對每一個分割區進行計算

    @DeveloperApi
    def compute(split: Partition, context: TaskContext): Iterator[T]
    

RDD 之間的依賴關係

  • RDD 是計算模型的封裝,當需求中需要將多個計算模型進行組合時,就需要將多個 RDD 建立依賴關係

    protected def getDependencies: Seq[Dependency[_]] = deps
    

分割區器(可選)

  • 當資料為 KV 型別資料時,可以通過設定分割區器自定義資料的分割區

    @transient val partitioner: Option[Partitioner] = None
    

首選位置(可選)

  • 計算資料時,可以根據計算節點的狀態選擇不同的節點位置進行計算

    protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    

4.1.3 執行原理

  • 資料處理過程中需要計算資源(記憶體 & CPU)和計算模型(邏輯)。執行時,需要將計算資源和計算模型進行協調和整合

  • Spark 框架在執行時,先申請資源,然後將應用程式的資料處理邏輯分解成一個一個的計算任務。然後將任務發到已經分配資源的計算節點上, 按照指定的計算模型進行資料計算。最後得到計算結果

RDD 是 Spark 框架中用於資料處理的核心模型,接下來我們看看,在 Yarn 環境中,RDD的工作原理

  1. 啟動 Yarn 叢集環境

  2. Spark 通過申請資源建立排程節點和計算節點

  3. Spark 框架根據需求將計算邏輯根據分割區劃分成不同的任務

  4. 排程節點將任務根據計算節點狀態傳送到對應的計算節點進行計算

從以上流程可以看出 RDD 在整個流程中主要用於將邏輯進行封裝,並生成 Task 傳送給 Executor 節點執行計算

4.1.4 RDD 建立

在 Spark 中建立 RDD 的建立方式可以分為四種

  1. 集合(記憶體)中建立 RDD

    從集合中建立 RDD,Spark 主要提供了兩個方法:parallelize 和 makeRDD

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sparkContext = new SparkContext(sparkConf)
    val rdd1 = sparkContext.parallelize(List(1,2,3,4))
    val rdd2 = sparkContext.makeRDD( List(1,2,3,4))
    rdd1.collect().foreach(println)
    rdd2.collect().foreach(println)
    sparkContext.stop()
    

    從底層程式碼實現來講,makeRDD 方法其實就是 parallelize 方法

    def makeRDD[T: ClassTag](
    	seq: Seq[T],
    	numSlices: Int = defaultParallelism): RDD[T] = withScope {
    		parallelize(seq, numSlices)
    	}
    
    // makeRDD方法可以傳遞第二個引數,這個參數列示分割區的數量
    // 第二個引數可以不傳遞的,那麼makeRDD方法會使用預設值 : defaultParallelism(預設並行度)
    //    scheduler.conf.getInt("spark.default.parallelism", totalCores)
    //    spark在預設情況下,從設定物件中獲取設定引數:spark.default.parallelism
    //    如果獲取不到,那麼使用totalCores屬性,這個屬性取值為當前執行環境的最大可用核數
    // val rdd = sc.makeRDD(List(1,2,3,4),2)
    
  2. 外部儲存(檔案)建立 RDD

    由外部儲存系統的資料集建立 RDD 包括:原生的檔案系統,所有 Hadoop 支援的資料集, 比如 HDFS、HBase 等

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sparkContext = new SparkContext(sparkConf)
    val fileRDD: RDD[String] = sparkContext.textFile("input")
    fileRDD.collect().foreach(println)
    sparkContext.stop()
    
  3. 從其他 RDD 建立

    主要是通過一個 RDD 運算完後,再產生新的 RDD

  4. 直接建立 RDD(new)

    使用 new 的方式直接構造 RDD,一般由 Spark 框架自身使用

4.1.5 RDD並行度與分割區

預設情況下,Spark 可以將一個作業切分多個任務後,傳送給 Executor 節點平行計算,而能夠平行計算的任務數量我們稱之為並行度。這個數量可以在構建 RDD 時指定。記住,這裡的並行執行的任務數量,並不是指的切分任務的數量,不要混淆了

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 4)
val fileRDD: RDD[String] = sparkContext.textFile( "input", 2)
fileRDD.collect().foreach(println)
sparkContext.stop()

讀取記憶體資料時,資料可以按照並行度的設定進行資料的分割區操作,資料分割區規則的 Spark 核心原始碼如下

// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
	(0 until numSlices).iterator.map { i =>
		val start = ((i * length) / numSlices).toInt
		val end = (((i + 1) * length) / numSlices).toInt
		(start, end)
	}
}

讀取檔案資料時,資料是按照 Hadoop 檔案讀取的規則進行切片分割區,而切片規則和資料讀取的規則有些差異,具體 Spark 核心原始碼如下

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 long totalSize = 0; // compute total size
 for (FileStatus file: files) { // check we have valid files
     if (file.isDirectory()) {
     	throw new IOException("Not a file: "+ file.getPath());
 	}
 	totalSize += file.getLen();
 }
 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

 ...

 for (FileStatus file: files) {

 ...

 if (isSplitable(fs, path)) {
     long blockSize = file.getBlockSize();
     long splitSize = computeSplitSize(goalSize, minSize, blockSize);
     ...
 }
 protected long computeSplitSize(long goalSize, long minSize,long blockSize) {
 	return Math.max(minSize, Math.min(goalSize, blockSize));
 }

4.2 RDD 轉換運算元

RDD 根據資料處理方式的不同將運算元整體上分為 Value 型別、雙 Value 型別和 Key-Value 型別

Value DESC
map 將處理的資料逐條進行對映轉換,這裡的轉換可以是型別的轉換,也可以是值的轉換
mapPartitions 將待處理的資料以分割區為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾資料
mapPartitionsWithIndex 將待處理的資料以分割區為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處 理,哪怕是過濾資料,在處理時同時可以獲取當前分割區索引
flatMap 將處理的資料進行扁平化後再進行對映處理,所以運算元也稱之為扁平對映
glom 將同一個分割區的資料直接轉換為相同型別的記憶體陣列進行處理,分割區不變
groupBy 將資料根據指定的規則進行分組, 分割區預設不變,但是資料會被打亂重新組合,我們將這樣的操作稱之為 shuffle。極限情況下,資料可能被分在同一個分割區中 一個組的資料在一個分割區中,但是並不是說一個分割區中只有一個組
filter 將資料根據指定的規則進行篩選過濾,符合規則的資料保留,不符合規則的資料丟棄。 當資料進行篩選過濾後,分割區不變,但是分割區內的資料可能不均衡,生產環境下,可能會出現資料傾斜。
sample 根據指定的規則從資料集中抽取資料
distinct 將資料集中重複的資料去重
coalesce 根據資料量縮減分割區,用於巨量資料集過濾後,提高小資料集的執行效率。當 spark 程式中,存在過多的小任務的時候,可以通過 coalesce 方法,收縮合並分割區,減少分割區的個數,減小任務排程成本
repartition 該操作內部其實執行的是 coalesce 操作,引數 shuffle 的預設值為 true。無論是將分割區數多的 RDD 轉換為分割區數少的 RDD,還是將分割區數少的 RDD 轉換為分割區數多的 RDD,repartition 操作都可以完成,因為無論如何都會經 shuffle 過程
sortBy 該操作用於排序資料。在排序之前,可以將資料通過 f 函數進行處理,之後按照 f 函數處理的結果進行排序,預設為升序排列。排序後新產生的 RDD 的分割區數與原 RDD 的分割區數一致。中間存在 shuffle 的過程
雙Value DESC
intersection 對源 RDD 和引數 RDD 求交集後返回一個新的 RDD
union 對源 RDD 和引數 RDD 求並集後返回一個新的 RDD
subtract 以一個 RDD 元素為主,除兩個 RDD 中復元素,將其他元素保留下來
zip 將兩個 RDD 中的元素,以鍵值對的形式進行合併
Key - Value DESC
partitionBy 將資料按照指定 Partitioner 重新進行分割區。Spark 預設的分割區器是 HashPartitioner
reduceByKey 可以將資料按照相同的 Key 對 Value 進行聚合
groupByKey 將資料來源的資料根據 key 對 value 進行分組
aggregateByKey 將資料根據不同的規則進行分割區內計算和分割區間計算
foldByKey 當分割區內計算規則和分割區間計算規則相同時,aggregateByKey 就可以簡化為 foldByKey
combineByKey 通用的對 key-value 型 rdd 進行聚集操作的聚集函數(aggregation function),combineByKey()允許使用者返回值的型別與輸入不一致
sortByKey 在一個(K,V)的 RDD 上呼叫,K 必須實現 Ordered 介面(特質),返回一個按照 key 進行排序 的
join 在型別為(K,V)和(K,W)的 RDD 上呼叫,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD
leftOuterJoin 類似於 SQL 語句的左外連線
cogroup (join & group)在型別為(K,V)和(K,W)的 RDD 上呼叫,返回一個(K,(Iterable,Iterable))型別的 RDD

4.2.1 value

map

  • 函數簽名

    def map[U: ClassTag](f: T => U): RDD[U]
    
  • 函數說明

    將處理的資料逐條進行對映轉換,這裡的轉換可以是型別的轉換,也可以是值的轉換

    val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
    val dataRDD1: RDD[Int] = dataRDD.map(
     	num => { num * 2 }
    )
    val dataRDD2: RDD[String] = dataRDD1.map(
     	num => { "" + num}
    )
    

mapPartitions

  • 函數簽名

    def mapPartitions[U: ClassTag]( 
        f: Iterator[T] => Iterator[U], 
        preservesPartitioning: Boolean = false): RDD[U] 
    
  • 函數說明

    將待處理的資料以分割區為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾資料

    val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
     datas => { datas.filter(_==2) }
    )
    
    // mapPartitions : 可以以分割區為單位進行資料轉換操作
    //                 但是會將整個分割區的資料載入到記憶體進行參照
    //                 如果處理完的資料是不會被釋放掉,存在物件的參照。
    //                 在記憶體較小,資料量較大的場合下,容易出現記憶體溢位。
    

map 和 mapPartitions 的區別?

  • Map 運算元是分割區內一個資料一個資料的執行,類似於序列操作。而 mapPartitions 運算元是以分割區為單位進行批次處理操作
  • Map 運算元主要目的將資料來源中的資料進行轉換和改變。但是不會減少或增多資料。
  • MapPartitions 運算元需要傳遞一個迭代器,返回一個迭代器,沒有要求的元素的個數保持不變,所以可以增加或減少資料
  • Map 運算元因為類似於序列操作,所以效能比較低,而是 mapPartitions 運算元類似於批次處理,所以效能較高。但是 mapPartitions 運算元會長時間佔用記憶體,那麼這樣會導致記憶體可能不夠用,出現記憶體溢位的錯誤。所以在記憶體有限的情況下,不推薦使用。

mapPartitionsWithIndex

  • 函數簽名

    def mapPartitionsWithIndex[U: ClassTag](
    	f: (Int, Iterator[T]) => Iterator[U],
    	preservesPartitioning: Boolean = false): RDD[U]
    
  • 函數說明

    將待處理的資料以分割區為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾資料,在處理時同時可以獲取當前分割區索引

    val dataRDD1 = dataRDD.mapPartitionsWithIndex(
    	(index, datas) => { datas.map(index, _) }
    )
    

flatMap

  • 函數簽名

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
    
  • 函數說明

    將處理的資料進行扁平化後再進行對映處理,所以運算元也稱之為扁平對映

    val dataRDD = sparkContext.makeRDD(List(List(1,2),List(3,4)),1)
    val dataRDD1 = dataRDD.flatMap(list => list)
    

glom

  • 函數簽名

    def glom(): RDD[Array[T]]
    
  • 函數說明

    將同一個分割區的資料直接轉換為相同型別的記憶體陣列進行處理,分割區不變

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
    val dataRDD1:RDD[Array[Int]] = dataRDD.glom()
    

groupBy

  • 函數簽名

    def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
    
  • 函數說明

    將資料根據指定的規則進行分組, 分割區預設不變,但是資料會被打亂重新組合,我們將這樣的操作稱之為 shuffle。極限情況下,資料可能被分在同一個分割區中

    一個組的資料在一個分割區中,但是並不是說一個分割區中只有一個組

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
    val dataRDD1 = dataRDD.groupBy(_%2)
    

filter

  • 函數簽名

    def filter(f: T => Boolean): RDD[T]
    
  • 函數說明

    將資料根據指定的規則進行篩選過濾,符合規則的資料保留,不符合規則的資料丟棄。 當資料進行篩選過濾後,分割區不變,但是分割區內的資料可能不均衡,生產環境下,可能會出現資料傾斜

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
    val dataRDD1 = dataRDD.filter(_%2 == 0)
    

sample

  • 函數簽名

    def sample(
        withReplacement: Boolean,
        fraction: Double,
        seed: Long = Utils.random.nextLong): RDD[T]
    
  • 函數說明

    根據指定的規則從資料集中抽取資料

    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4
    ),1)
    // 抽取資料不放回(伯努利演演算法)
    // 伯努利演演算法:又叫 0、1 分佈。例如扔硬幣,要麼正面,要麼反面。
    // 具體實現:根據種子和隨機演演算法算出一個數和第二個引數設定機率比較,小於第二個引數要,大於不要
    // 第一個引數:抽取的資料是否放回,false:不放回
    // 第二個引數:抽取的機率,範圍在[0,1]之間,0:全不取;1:全取;
    // 第三個引數:亂數種子,如果不傳遞第三個引數,那麼使用的是當前系統時間
    val dataRDD1 = dataRDD.sample(false, 0.5)
    // 抽取資料放回(泊松演演算法)
    // 第一個引數:抽取的資料是否放回,true:放回;false:不放回
    // 第二個引數:重複資料的機率,範圍大於等於 0.表示每一個元素被期望抽取到的次數
    // 第三個引數:亂數種子,如果不傳遞第三個引數,那麼使用的是當前系統時間
    val dataRDD2 = dataRDD.sample(true, 2)
    

distinct

  • 函數簽名

    def distinct()(implicit ord: Ordering[T] = null): RDD[T]
    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    
  • 函數說明

    將資料集中重複的資料去重

    val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),1)
    val dataRDD1 = dataRDD.distinct()
    val dataRDD2 = dataRDD.distinct(2)
    

coalesce

  • 函數簽名

    def coalesce(numPartitions: Int, shuffle: Boolean = false,
    	partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
    	(implicit ord: Ordering[T] = null)
    	:RDD[T]
    
  • 函數說明

    根據資料量縮減分割區,用於巨量資料集過濾後,提高小資料集的執行效率

    當 spark 程式中,存在過多的小任務的時候,可以通過 coalesce 方法,收縮合並分割區,減少分割區的個數,減小任務排程成本

    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4,1,2
    ),6)
    val dataRDD1 = dataRDD.coalesce(2)
    
    // coalesce方法預設情況下不會將分割區的資料打亂重新組合
    // 這種情況下的縮減分割區可能會導致資料不均衡,出現資料傾斜
    // 如果想要讓資料均衡,可以進行shuffle處理
    

repartition

  • 函數簽名

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    
  • 函數說明

    該操作內部其實執行的是 coalesce 操作,引數 shuffle 的預設值為 true。無論是將分割區數多的 RDD 轉換為分割區數少的 RDD,還是將分割區數少的 RDD 轉換為分割區數多的 RDD,repartition 操作都可以完成,因為無論如何都會經 shuffle 過程

    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4,1,2
    ),2)
    val dataRDD1 = dataRDD.repartition(4)
    

兩者區別

coalesce運算元可以擴大分割區的,但是如果不進行shuffle操作,是沒有意義,不起作用。
所以如果想要實現擴大分割區的效果,需要使用shuffle操作
spark提供了一個簡化的操作

  • 縮減分割區:coalesce,如果想要資料均衡,可以採用shuffle
  • 擴大分割區:repartition, 底層程式碼呼叫的就是coalesce,而且肯定採用shuffle

sortBy

  • 函數簽名

    def sortBy[K](
     f: (T) => K,
     ascending: Boolean = true,
     numPartitions: Int = this.partitions.length)
     (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
    
  • 函數說明

    該操作用於排序資料。在排序之前,可以將資料通過 f 函數進行處理,之後按照 f 函數處理的結果進行排序,預設為升序排列。排序後新產生的 RDD 的分割區數與原 RDD 的分割區數一致。中間存在 shuffle 的過程

    val dataRDD = sparkContext.makeRDD(List(
     1,2,3,4,1,2
    ),2)
    val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
    
    // sortBy方法可以根據指定的規則對資料來源中的資料進行排序,預設為升序,第二個引數可以改變排序的方式
    // sortBy預設情況下,不會改變分割區。但是中間存在shuffle操作
    

4.2.2 double value

交集,並集和差集要求兩個資料來源資料型別保持一致

intersection

對源 RDD 和引數 RDD 求交集後返回一個新的 RDD

def intersection(other: RDD[T]): RDD[T]

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)

union

對源 RDD 和引數 RDD 求並集後返回一個新的 RDD

def subtract(other: RDD[T]): RDD[T]

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)

subtract

以一個 RDD 元素為主,去除兩個 RDD 中重複元素,將其他元素保留下來。求差集

def subtract(other: RDD[T]): RDD[T]

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)

zip

將兩個 RDD 中的元素,以鍵值對的形式進行合併。

  • 資料型別可以不一致

  • 兩個資料來源要求分割區數量要保持一致

  • 兩個資料來源要求分割區中資料數量保持一致

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.zip(dataRDD2)

4.2.3 key-value

partitionBy

  • 函數簽名

    def partitionBy(partitioner: Partitioner): RDD[(K, V)]
    
  • 函數說明 將資料按照指定 Partitioner 重新進行分割區。Spark 預設的分割區器是 HashPartitioner

    import org.apache.spark.HashPartitioner
    
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3) 
    // RDD => PairRDDFunctions
    // 隱式轉換(二次編譯)
    // 重分割區的分割區器與當前RDD的分割區器一樣,則不會再次分割區
    val rdd2: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
    

reduceByKey

  • 函數簽名

    def reduceByKey(func: (V, V) => V): RDD[(K, V)]
    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
    
  • 函數說明

    可以將資料按照相同的 Key 對 Value 進行聚合

    // reduceByKey : 相同的key的資料進行value資料的聚合操作
    // scala語言中一般的聚合操作都是兩兩聚合,spark基於scala開發的,所以它的聚合也是兩兩聚合
    // 【1,2,3】
    // 【3,3】
    // 【6】
    // reduceByKey中如果key的資料只有一個,是不會參與運算的。
    
    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = dataRDD1.reduceByKey(_+_)
    val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
    

groupByKey

spark中,shuffle操作必須落盤處理,不能在記憶體中資料等待,會導致記憶體溢位

  • 函數簽名

    def groupByKey(): RDD[(K, Iterable[V])]
    def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
    
  • 函數說明

    將資料來源的資料根據 key 對 value 進行分組

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = dataRDD1.groupByKey()
    val dataRDD3 = dataRDD1.groupByKey(2)
    val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
    

兩者區別

  • 從 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前對分割區內相同 key 的資料進行預聚合(combine)功能,這樣會減少落盤的資料量,而 groupByKey 只是進行分組,不存在資料量減少的問題,reduceByKey 效能比較高。

  • 從功能的角度:reduceByKey 其實包含分組和聚合的功能。GroupByKey 只能分組,不能聚合,所以在分組聚合的場合下,推薦使用 reduceByKey,如果僅僅是分組而不需要聚合。那麼還是隻能使用 groupByKey

aggregateByKey

  • 函數簽名

    def aggregateByKey[U: ClassTag](zeroValue: U)
    	(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
    
  • 函數說明

    將資料根據不同的規則進行分割區內計算和分割區間計算

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_)
    
    // TODO : 取出每個分割區內相同 key 的最大值然後分割區間相加
    // aggregateByKey 運算元是函數柯里化,存在兩個參數列
    // 1. 第一個參數列中的參數列示初始值
    // 2. 第二個參數列中含有兩個引數
    // 2.1 第一個參數列示分割區內的計算規則
    // 2.2 第二個參數列示分割區間的計算規則
    

foldByKey

  • 函數簽名

    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
    
  • 函數說明

    當分割區內計算規則和分割區間計算規則相同時,aggregateByKey 就可以簡化為 foldByKey

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
    

combineByKey

  • 函數簽名

    def combineByKey[C](
    	createCombiner: V => C,
    	mergeValue: (C, V) => C,
    	mergeCombiners: (C, C) => C): RDD[(K, C)]
    
  • 函數說明

    最通用的對 key-value 型 rdd 進行聚集操作的聚集函數(aggregation function)。類似於 aggregate(),combineByKey()允許使用者返回值的型別與輸入不一致

    val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),
    ("a", 95), ("b", 98))
    val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
    val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
    	(_, 1),
    	(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
    	(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    )
    

reduceByKey、foldByKey、aggregateByKey、combineByKey 的區別?

  • reduceByKey: 相同 key 的第一個資料不進行任何計算,分割區內和分割區間計算規則相同

  • FoldByKey: 相同 key 的第一個資料和初始值進行分割區內計算,分割區內和分割區間計算規則相 同

  • AggregateByKey: 相同 key 的第一個資料和初始值進行分割區內計算,分割區內和分割區間計算規則可以不相同

  • CombineByKey: 當計算時,發現資料結構不滿足要求時,可以讓第一個資料轉換結構。分割區內和分割區間計算規則不相同

// reduceByKey:
combineByKeyWithClassTag[V](
    (v: V) => v, // 第一個值不會參與計算
    func, // 分割區內計算規則
    func, // 分割區間計算規則
)

// aggregateByKey :
combineByKeyWithClassTag[U](
    (v: V) => cleanedSeqOp(createZero(), v), // 初始值和第一個key的value值進行的分割區內資料操作
    cleanedSeqOp, // 分割區內計算規則
    combOp,       // 分割區間計算規則
)

// foldByKey:
combineByKeyWithClassTag[V](
    (v: V) => cleanedFunc(createZero(), v), // 初始值和第一個key的value值進行的分割區內資料操作
    cleanedFunc,  // 分割區內計算規則
    cleanedFunc,  // 分割區間計算規則
)

// combineByKey :
combineByKeyWithClassTag(
    createCombiner,  // 相同key的第一條資料進行的處理常式
    mergeValue,      // 表示分割區內資料的處理常式
    mergeCombiners,  // 表示分割區間資料的處理常式
 )

sortByKey

  • 函數簽名

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
     : RDD[(K, V)]
    
  • 函數說明

    在一個(K,V)的 RDD 上呼叫,K 必須實現 Ordered 介面(特質),返回一個按照 key 進行排序的

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
    val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
    

join

  • 函數簽名

    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    
  • 函數說明

    在型別為(K,V)和(K,W)的 RDD 上呼叫,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD

    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
    rdd.join(rdd1).collect().foreach(println)
    

leftOuterJoin

  • 函數簽名

    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
    
  • 函數說明

    類似於 SQL 語句的左外連線

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
    val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
    

cogroup

  • 函數簽名

    def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
    
  • 函數說明

    在型別為(K,V)和(K,W)的 RDD 上呼叫,返回一個(K,(Iterable,Iterable))型別的 RDD

    val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
    val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))
    val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)
    

4.3 RDD 行動運算元

4.3.1 reduce

  • 函數簽名

    def reduce(f: (T, T) => T): T
    
  • 函數說明

    聚集 RDD 中的所有元素,先聚合分割區內資料,再聚合分割區間資料

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 聚合資料
    val reduceResult: Int = rdd.reduce(_+_)
    

4.3.2 collect

  • 函數簽名

    def collect(): Array[T]
    
  • 函數說明

    在驅動程式中,以陣列 Array 的形式返回資料集的所有元素

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 收集資料到 Driver rdd.collect().foreach(println)
    

4.3.3 count

  • 函數簽名

    def count(): Long
    
  • 函數說明

    返回 RDD 中元素的個數

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的個數
    val countResult: Long = rdd.count()
    

4.3.4 first

  • 函數簽名

    def first(): T
    
  • 函數說明

    返回 RDD 中的第一個元素

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的個數
    val firstResult: Int = rdd.first()
    println(firstResult)
    

4.3.5 take

  • 函數簽名

    def take(num: Int): Array[T]
    
  • 函數說明

    返回一個由 RDD 的前 n 個元素組成的陣列

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的個數
    val takeResult: Array[Int] = rdd.take(2)
    println(takeResult.mkString(","))
    

4.3.6 takeOrdered

  • 函數簽名

    def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
    
  • 函數說明

    返回該 RDD 排序後的前 n 個元素組成的陣列

    val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
    // 返回 RDD 中元素的個數
    val result: Array[Int] = rdd.takeOrdered(2)
    

4.3.7 aggregate

  • 函數簽名

    def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
    
  • 函數說明

    分割區的資料通過初始值和分割區內的資料進行聚合,然後再和初始值進行分割區間的資料聚合

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
    // 將該 RDD 所有元素相加得到結果
    //val result: Int = rdd.aggregate(0)(_ + _, _ + _)
    val result: Int = rdd.aggregate(10)(_ + _, _ + _)
    

4.3.8 fold

  • 函數簽名

    def fold(zeroValue: T)(op: (T, T) => T): T
    
  • 函數說明

    摺疊操作,aggregate 的簡化版操作

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
    val foldResult: Int = rdd.fold(0)(_+_)
    

4.3.9 countByKey

  • 函數簽名

    def countByKey(): Map[K, Long]
    
  • 函數說明

    摺疊操作,aggregate 的簡化版操作

    val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2,
    "b"), (3, "c"), (3, "c")))
    // 統計每種 key 的個數
    val result: collection.Map[Int, Long] = rdd.countByKey()
    

4.3.10 save 相關運算元

  • 函數簽名

    def saveAsTextFile(path: String): Unit
    def saveAsObjectFile(path: String): Unit
    def saveAsSequenceFile(
        path: String,
        codec: Option[Class[_ <: CompressionCodec]] = None): Unit
    
  • 函數說明

    將資料儲存到不同格式的檔案中

    // 儲存成 Text 檔案
    rdd.saveAsTextFile("output")
    // 序列化成物件儲存到檔案
    rdd.saveAsObjectFile("output1")
    // 儲存成 Sequencefile 檔案
    rdd.map((_,1)).saveAsSequenceFile("output2")
    

4.3.11 foreach

  • 函數簽名

    def foreach(f: T => Unit): Unit = withScope {
    	val cleanF = sc.clean(f)
    	sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }
    
  • 函數說明

    分散式遍歷 RDD 中的每一個元素,呼叫指定函數

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 收集後列印
    rdd.map(num=>num).collect().foreach(println)
    println("****************")
    // 分散式列印
    rdd.foreach(println)
    

4.4 RDD 序列化

4.4.1 閉包檢查

從計算的角度, 運算元以外的程式碼都是在 Driver 端執行, 運算元裡面的程式碼都是在 Executor 端執行。

那麼在 scala 的函數語言程式設計中,就會導致運算元內經常會用到運算元外的資料,這樣就形成了閉包的效果,如果使用的運算元外的資料無法序列化,就意味著無法傳值給 Executor 端執行,就會發生錯誤,所以需要在執行任務計算前,檢測閉包內的物件是否可以進行序列化,這個操作我們稱之為閉包檢測。

Scala2.12 版本後閉包編譯方式發生了改變

從計算的角度, 運算元以外的程式碼都是在 Driver 端執行, 運算元裡面的程式碼都是在 Executor 端執行

object Spark_RDD_Serial {

    def main(args: Array[String]): Unit = {
        val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparConf)
        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive"))
        val search = new Search("h")
        // 會報錯
        //search.getMatch1(rdd).collect().foreach(println)
       	// 不會報錯
        search.getMatch2(rdd).collect().foreach(println)
        sc.stop()
    }
    // 查詢物件
    // 類的構造引數其實是類的屬性, 構造引數需要進行閉包檢測,其實就等同於類進行閉包檢測
    class Search(query:String){

        def isMatch(s: String): Boolean = {
            s.contains(this.query)
        }
        // 函數序列化案例
        def getMatch1 (rdd: RDD[String]): RDD[String] = {
            rdd.filter(isMatch)
        }
        // 屬性序列化案例
        def getMatch2(rdd: RDD[String]): RDD[String] = {
            val s = query
            rdd.filter(x => x.contains(s))
        }
    }
}

4.5 RDD 依賴關係

RDD 血緣關係

RDD 只支援粗粒度轉換,即在大量記錄上執行的單個操作。將建立 RDD 的一系列 Lineage (血統)記錄下來,以便恢復丟失的分割區。RDD 的 Lineage 會記錄 RDD 的後設資料資訊和轉 換行為,當該 RDD 的部分分割區資料丟失時,它可以根據這些資訊來重新運算和恢復丟失的 資料分割區。

有一臺伺服器:32G記憶體,如何在記憶體中對1T資料排序

  • 先抽樣,看分佈,然後分塊(至少32塊),對每個塊進行排序,最後合併

5.2 shuffle

在Spark中,什麼情況下,會產生shuffle?

  • reduceByKey,groupByKey,sortByKey,countByKey,join等等

Spark shuffle一共經歷了這幾個過程:

  1. 未優化的 Hash Based Shuffle
  2. 優化後的 Hash Based Shuffle
  3. Sort-Based Shuffle

5.2.1 ShuffleMapStage 與 ResultStage

2. 優化後的 HashShuffle

優化的 HashShuffle 過程就是啟用合併機制,合併機制就是複用 buffer,開啟合併機制 的設定是 spark.shuffle.consolidateFiles。該引數預設值為 false,將其設定為 true 即可開啟優化機制。通常來說,如果我們使用 HashShuffleManager,那麼都建議開啟這個選項。

這裡還是有 4 個 Tasks,資料類別還是分成 3 種型別,因為 Hash 演演算法會根據你的 Key 進行分類,在同一個程序中,無論是有多少過 Task,都會把同樣的 Key 放在同一個 Buffer 裡,然後把 Buffer 中的資料寫入以 Core 數量為單位的本地檔案中,(一個 Core 只有一種類 型的 Key 的資料),每 1 個 Task 所在的程序中,分別寫入共同程序中的 3 份本地檔案,這裡 有 4 個 Mapper Tasks,所以總共輸出是 2 個 Cores x 3 個分類檔案 = 6 個本地小檔案。

5.2.3 SortShuffle 解析

1. 普通 SortShuffle

在該模式下,資料會先寫入一個資料結構,reduceByKey 寫入 Map,一邊通過 Map 區域性聚合,一邊寫入記憶體。Join 運算元寫入 ArrayList 直接寫入記憶體中。然後需要判斷是否達到閾值,如果達到就會將記憶體資料結構的資料寫入到磁碟,清空記憶體資料結構。

溢寫磁碟前,先根據 key 進行排序,排序過後的資料,會分批寫入到磁碟檔案中。預設批次為 10000 條,資料會以每批一萬條寫入到磁碟檔案。寫入磁碟檔案通過緩衝區溢寫的方式,每次溢寫都會產生一個磁碟檔案,也就是說一個 Task 過程會產生多個臨時檔案

最後在每個 Task 中,將所有的臨時檔案合併,這就是 merge 過程,此過程將所有臨時檔案讀取出來,一次寫入到最終檔案。意味著一個 Task 的所有資料都在這一個檔案中。同時單獨寫一份索引檔案,標識下游各個Task的資料在檔案中的索引,start offset和end offset。

6. 效能優化

6.1 高效能序列化庫

  • Spark傾向於序列化的便捷性,預設使用了Java序列化機制
  • Java序列化機制的效能並不高,序列化的速度相對較慢,而且序列化以後的資料,相對來說比較大,比較佔用記憶體空間
  • Spark提供了兩種序列化機制:Java序列化和Kryo序列化

Kryo序列化

  • Kryo序列化比Java序列化更快,而且序列化後的資料更小,通常小十倍
  • 如果要使用Kryo序列化機制,首先要用SparkConf和Spark序列化器設定為KryoSerializer
  • 使用Kryo時,針對需要序列化的類,需要預先進行註冊,這樣才能獲得最佳效能,如果不註冊,Kryo必須時刻儲存型別的全類名,反而佔用不少記憶體
  • Spark預設對Scala中常用的型別自動在Kryo進行了註冊
  • 如果在運算元中,使用了外部的自定義型別的物件,那麼還是需要對其進行註冊
    • 格式:conf.registerKryoClasses(...)
  • 注意:如果要序列化的自定義的型別,欄位特別多,此時就需要對Kryo本身進行優化,因為Kryo內部的換儲存可能不夠存放那麼大的class物件
    • 需要呼叫SparkConf.set()方法,設定spark.kryoserializer.buffer.mb引數的值,將其調大,預設值為2,單位是MB

6.2 持久化&checkpoint

  • 針對程式中多次被transformation或者action操作的RDD進行持久化操作,避免對一個RDD反覆進行計算,再進一步優化,使用序列化Kryo的持久化級別
  • 為了保證RDD持久化資料在可能丟失的情況下還能實現高可靠,則需要對RDD執行CheckPoint操作

6.3 JVM垃圾回收調優

預設情況下,Spark使用每個Executor 60%的記憶體空間來快取RDD,那麼只有40%的記憶體空間來存放運算元執行期間建立的物件

  • 如果垃圾回收頻繁發生,就需要對這個比例進行調優,通過引數spark.storage.memoryFraction來修改比例

統一記憶體管理

統一記憶體管理機制,與靜態記憶體管理的區別在於儲存記憶體和執行記憶體共用同一塊空間,可以動態佔用對方的空閒區域,統一記憶體管理的堆內記憶體結構如圖所示:

其中最重要的優化在於動態佔用機制,其規則如下:

  1. 設定基本的儲存記憶體和執行記憶體區域(spark.storage.storageFraction 引數),該設定確定了雙方各自擁有的空間的範圍
  2. 雙方的空間都不足時,則儲存到硬碟;若己方空間不足而對方空餘時,可借用對方的空間;(儲存空間不足是指不足以放下一個完整的 Block)
  3. 執行記憶體的空間被對方佔用後,可讓對方將佔用的部分轉存到硬碟,然後」歸還」借用的空間
  4. 儲存記憶體的空間被對方佔用後,無法讓對方」歸還」,因為需要考慮 Shuffle 過程中的很多因素,實現起來較為複雜。

6.4 提高並行度

  • 要儘量設定合理的並行度,來充分地利用叢集的資源,才能充分提高Spark程式的效能

  • 可以手動使用textFile()、parallelize()等方法的第二個引數來設定並行度,也可以使用spark.default.parallelism引數,來設定統一的並行度,Spark官方推薦,給叢集的每個cpu core設定2-3個task