Spark 是一種基於記憶體的快速、通用、可延伸的巨量資料分析計算引擎
上面流程對應Hadoop的處理流程,下面對應著Spark的處理流程
Hadoop
Spark
由上面的資訊可以獲知,Spark 出現的時間相對較晚,並且主要功能主要是用於資料計算, 所以其實 Spark 一直被認為是 Hadoop 框架的升級版。
Spark or Hadoop
所謂的 Local 模式,就是不需要其他任何節點資源就可以在本地執行 Spark 程式碼的環境,一般用於教學,偵錯,演示等
在官網下載安裝包,將 spark-XX-bin-hadoopXX.tgz 檔案上傳到 Linux 並解壓縮,放置在指定位置,路徑中不要包含中文或空格。
tar -zxvf spark-XXX-bin-hadoop.XX.tgz -C /opt/module
cd /opt/module
mv spark-3.0.0-bin-hadoop3.2 spark-local
進入解壓縮後的路徑,執行如下指令
bin/spark-shell
可以在命令列中,執行scala命令,也可以呼叫spark
測試
在解壓縮資料夾下的 data 目錄中,新增 word.txt 檔案。
Hello Scala
Hello Spark
在命令列工具中執行如下程式碼指令
sc.textFile("data/word.txt").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collect
啟動成功後,可以輸入網址進行 Web UI 監控頁面存取
http://虛擬機器器or本機ip地址:4040
退出
按鍵 Ctrl+C 或輸入 Scala 指令
:quit
/opt/module/spark-local/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_XXXXXjar \
10
local 本地模式畢竟只是用來進行練習演示的,真實工作中還是要將應用提交到對應的 叢集中去執行,這裡我們來看看只使用 Spark 自身節點執行的叢集模式,也就是我們所謂的 獨立部署(Standalone)模式。Spark 的 Standalone 模式體現了經典的 master-slave 模式。
叢集規劃:
bin/spark-submit \
--class <main-class>
--master <master-url> \
... # other options
<application-jar> \
[application-arguments]
由於 spark-shell 停止掉後,叢集監控 node1:4040 頁面就看不到歷史任務的執行情況,所以 開發時都設定歷史伺服器記錄任務執行情況
修改 spark-defaults.conf.template 檔名為 spark-defaults.conf
修改 spark-defaults.conf 檔案,設定紀錄檔儲存路徑
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:8020/logs
注意:路徑自己設定,需要啟動 hadoop 叢集,HDFS 上的 directory 目錄需要提前存在。
修改 spark-env.sh 檔案, 新增紀錄檔設定
前後路徑保持一致
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://node1:8020/logs
-Dspark.history.retainedApplications=30"
引數 1 含義:WEB UI 存取的埠號為 18080
引數 2 含義:指定歷史伺服器紀錄檔儲存路徑
引數 3 含義:指定儲存 Application 歷史記錄的個數,如果超過這個值,舊的應用程式 資訊將被刪除,這個是記憶體中的應用數,而不是頁面上顯示的應用數。
注意:每一個節點上設定保持一致
重新啟動叢集和歷史服務
/opt/module/spark-standlone/sbin/start-all.sh
/opt/module/spark-standlone/sbin/start-history-server.sh
重新執行任務
/opt/module/spark-standlone/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node1:7077 \
./examples/jars/spark-examples_XXX.jar \
10
檢視歷史服務:http://node1:18080
獨立部署(Standalone)模式由 Spark 自身提供計算資源,無需其他框架提供資源。這種方式降低了和其他第三方資源框架的耦合性,獨立性非常強。但是,Spark 主要是計算框架,而不是資源排程框架,所以本身提供的資源排程並不是它的強項,所以還是和其他專業的資源排程框架整合會更靠譜一些。
注意: 每個節點上設定相同,可設定一臺節點,然後上傳到其他節點便可
解壓縮檔案
將 spark-XXX.tgz 檔案上傳到 Linux 並解壓縮在指定位置
tar -zxvf spark-XXX.tgz -C /opt/module
cd /opt/module
mv spark-XXX spark-yarn
修改組態檔
修改 conf/spark-env.sh,新增 JAVA_HOME 和 YARN_CONF_DIR\HADOOP_CONF_DIR 設定
export JAVA_HOME=/XXX/jdk1XX
YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop
啟動HDFS和YARN叢集
/opt/module/spark-yarn/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_XXX.jar \
10
資源之間的依賴關係,不能成環,會形成死鎖
巨量資料計算引擎框架我們根據使用方式的不同一般會分為四類:
這裡所謂的有向無環圖,並不是真正意義的圖形,而是由 Spark 程式直接對映成的資料流的高階抽象模型。簡單理解就是將整個程式計算的執行過程用圖形表示出來,這樣更直觀,更便於理解,可以用於表示程式的拓撲結構。
DAG(Directed Acyclic Graph)有向無環圖是由點和線組成的拓撲圖形,該圖形具有方向,不會閉環。
基於Yarn環境
Client 模式將用於監控和排程的 Driver 模組在使用者端執行,而不是在 Yarn 中,所以一般用於測試
Cluster 模式將用於監控和排程的 Driver 模組啟動在 Yarn 叢集資源中執行。一般應用於實際生產環境
Spark 計算框架為了能夠進行高並行和高吞吐的資料處理,封裝了三巨量資料結構,用於處理不同的應用場景。
三巨量資料結構分別是:
RDD(Resilient Distributed Dataset)叫做彈性分散式資料集,是 Spark 中最基本的資料處理模型。程式碼中是一個抽象類,它代表一個彈性的、不可變、可分割區、裡面的元素可平行計算的集合。
彈性
儲存的彈性:記憶體與磁碟的自動切換
容錯的彈性:資料丟失可以自動恢復
計算的彈性:計算出錯重試機制
分片的彈性:可根據需要重新分片
分散式:資料儲存在巨量資料叢集不同節點上
資料集:RDD 封裝了計算邏輯,並不儲存資料
資料抽象:RDD 是一個抽象類,需要子類具體實現
不可變:RDD 封裝了計算邏輯,是不可以改變的,想要改變,只能產生新的 RDD,在新的 RDD 裡面封裝計算邏輯
可分割區、平行計算
RDD vs IO
RDD的資料處理方式類似於IO流,也有裝飾者設計模式
RDD的資料只有在呼叫collect方法時,才會真正執行業務邏輯操作。之前的封裝全部都是功能上的擴充套件
RDD是不儲存資料的,但是IO可以臨時儲存一部分資料
Internally, each RDD is characterized by five main properties:
分割區列表
RDD 資料結構中存在分割區列表,用於執行任務時平行計算,是實現分散式計算的重要屬性
protected def getPartitions: Array[Partition]
分割區計算函數
Spark 在計算時,是使用分割區函數對每一個分割區進行計算
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
RDD 之間的依賴關係
RDD 是計算模型的封裝,當需求中需要將多個計算模型進行組合時,就需要將多個 RDD 建立依賴關係
protected def getDependencies: Seq[Dependency[_]] = deps
分割區器(可選)
當資料為 KV 型別資料時,可以通過設定分割區器自定義資料的分割區
@transient val partitioner: Option[Partitioner] = None
首選位置(可選)
計算資料時,可以根據計算節點的狀態選擇不同的節點位置進行計算
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
資料處理過程中需要計算資源(記憶體 & CPU)和計算模型(邏輯)。執行時,需要將計算資源和計算模型進行協調和整合
Spark 框架在執行時,先申請資源,然後將應用程式的資料處理邏輯分解成一個一個的計算任務。然後將任務發到已經分配資源的計算節點上, 按照指定的計算模型進行資料計算。最後得到計算結果
RDD 是 Spark 框架中用於資料處理的核心模型,接下來我們看看,在 Yarn 環境中,RDD的工作原理
啟動 Yarn 叢集環境
Spark 通過申請資源建立排程節點和計算節點
Spark 框架根據需求將計算邏輯根據分割區劃分成不同的任務
排程節點將任務根據計算節點狀態傳送到對應的計算節點進行計算
從以上流程可以看出 RDD 在整個流程中主要用於將邏輯進行封裝,並生成 Task 傳送給 Executor 節點執行計算
在 Spark 中建立 RDD 的建立方式可以分為四種
從集合(記憶體)中建立 RDD
從集合中建立 RDD,Spark 主要提供了兩個方法:parallelize 和 makeRDD
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(List(1,2,3,4))
val rdd2 = sparkContext.makeRDD( List(1,2,3,4))
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()
從底層程式碼實現來講,makeRDD 方法其實就是 parallelize 方法
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
// makeRDD方法可以傳遞第二個引數,這個參數列示分割區的數量
// 第二個引數可以不傳遞的,那麼makeRDD方法會使用預設值 : defaultParallelism(預設並行度)
// scheduler.conf.getInt("spark.default.parallelism", totalCores)
// spark在預設情況下,從設定物件中獲取設定引數:spark.default.parallelism
// 如果獲取不到,那麼使用totalCores屬性,這個屬性取值為當前執行環境的最大可用核數
// val rdd = sc.makeRDD(List(1,2,3,4),2)
從外部儲存(檔案)建立 RDD
由外部儲存系統的資料集建立 RDD 包括:原生的檔案系統,所有 Hadoop 支援的資料集, 比如 HDFS、HBase 等
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()
從其他 RDD 建立
主要是通過一個 RDD 運算完後,再產生新的 RDD
直接建立 RDD(new)
使用 new 的方式直接構造 RDD,一般由 Spark 框架自身使用
預設情況下,Spark 可以將一個作業切分多個任務後,傳送給 Executor 節點平行計算,而能夠平行計算的任務數量我們稱之為並行度。這個數量可以在構建 RDD 時指定。記住,這裡的並行執行的任務數量,並不是指的切分任務的數量,不要混淆了
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 4)
val fileRDD: RDD[String] = sparkContext.textFile( "input", 2)
fileRDD.collect().foreach(println)
sparkContext.stop()
讀取記憶體資料時,資料可以按照並行度的設定進行資料的分割區操作,資料分割區規則的 Spark 核心原始碼如下
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
讀取檔案資料時,資料是按照 Hadoop 檔案讀取的規則進行切片分割區,而切片規則和資料讀取的規則有些差異,具體 Spark 核心原始碼如下
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
...
for (FileStatus file: files) {
...
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
...
}
protected long computeSplitSize(long goalSize, long minSize,long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
RDD 根據資料處理方式的不同將運算元整體上分為 Value 型別、雙 Value 型別和 Key-Value 型別
Value | DESC |
---|---|
map | 將處理的資料逐條進行對映轉換,這裡的轉換可以是型別的轉換,也可以是值的轉換 |
mapPartitions | 將待處理的資料以分割區為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾資料 |
mapPartitionsWithIndex | 將待處理的資料以分割區為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處 理,哪怕是過濾資料,在處理時同時可以獲取當前分割區索引 |
flatMap | 將處理的資料進行扁平化後再進行對映處理,所以運算元也稱之為扁平對映 |
glom | 將同一個分割區的資料直接轉換為相同型別的記憶體陣列進行處理,分割區不變 |
groupBy | 將資料根據指定的規則進行分組, 分割區預設不變,但是資料會被打亂重新組合,我們將這樣的操作稱之為 shuffle。極限情況下,資料可能被分在同一個分割區中 一個組的資料在一個分割區中,但是並不是說一個分割區中只有一個組 |
filter | 將資料根據指定的規則進行篩選過濾,符合規則的資料保留,不符合規則的資料丟棄。 當資料進行篩選過濾後,分割區不變,但是分割區內的資料可能不均衡,生產環境下,可能會出現資料傾斜。 |
sample | 根據指定的規則從資料集中抽取資料 |
distinct | 將資料集中重複的資料去重 |
coalesce | 根據資料量縮減分割區,用於巨量資料集過濾後,提高小資料集的執行效率。當 spark 程式中,存在過多的小任務的時候,可以通過 coalesce 方法,收縮合並分割區,減少分割區的個數,減小任務排程成本 |
repartition | 該操作內部其實執行的是 coalesce 操作,引數 shuffle 的預設值為 true。無論是將分割區數多的 RDD 轉換為分割區數少的 RDD,還是將分割區數少的 RDD 轉換為分割區數多的 RDD,repartition 操作都可以完成,因為無論如何都會經 shuffle 過程 |
sortBy | 該操作用於排序資料。在排序之前,可以將資料通過 f 函數進行處理,之後按照 f 函數處理的結果進行排序,預設為升序排列。排序後新產生的 RDD 的分割區數與原 RDD 的分割區數一致。中間存在 shuffle 的過程 |
雙Value | DESC |
intersection | 對源 RDD 和引數 RDD 求交集後返回一個新的 RDD |
union | 對源 RDD 和引數 RDD 求並集後返回一個新的 RDD |
subtract | 以一個 RDD 元素為主,去除兩個 RDD 中重復元素,將其他元素保留下來 |
zip | 將兩個 RDD 中的元素,以鍵值對的形式進行合併 |
Key - Value | DESC |
partitionBy | 將資料按照指定 Partitioner 重新進行分割區。Spark 預設的分割區器是 HashPartitioner |
reduceByKey | 可以將資料按照相同的 Key 對 Value 進行聚合 |
groupByKey | 將資料來源的資料根據 key 對 value 進行分組 |
aggregateByKey | 將資料根據不同的規則進行分割區內計算和分割區間計算 |
foldByKey | 當分割區內計算規則和分割區間計算規則相同時,aggregateByKey 就可以簡化為 foldByKey |
combineByKey | 最通用的對 key-value 型 rdd 進行聚集操作的聚集函數(aggregation function),combineByKey()允許使用者返回值的型別與輸入不一致 |
sortByKey | 在一個(K,V)的 RDD 上呼叫,K 必須實現 Ordered 介面(特質),返回一個按照 key 進行排序 的 |
join | 在型別為(K,V)和(K,W)的 RDD 上呼叫,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD |
leftOuterJoin | 類似於 SQL 語句的左外連線 |
cogroup | (join & group)在型別為(K,V)和(K,W)的 RDD 上呼叫,返回一個(K,(Iterable,Iterable))型別的 RDD |
函數簽名
def map[U: ClassTag](f: T => U): RDD[U]
函數說明
將處理的資料逐條進行對映轉換,這裡的轉換可以是型別的轉換,也可以是值的轉換
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(
num => { num * 2 }
)
val dataRDD2: RDD[String] = dataRDD1.map(
num => { "" + num}
)
函數簽名
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
函數說明
將待處理的資料以分割區為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾資料
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
datas => { datas.filter(_==2) }
)
// mapPartitions : 可以以分割區為單位進行資料轉換操作
// 但是會將整個分割區的資料載入到記憶體進行參照
// 如果處理完的資料是不會被釋放掉,存在物件的參照。
// 在記憶體較小,資料量較大的場合下,容易出現記憶體溢位。
map 和 mapPartitions 的區別?
函數簽名
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
函數說明
將待處理的資料以分割區為單位傳送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾資料,在處理時同時可以獲取當前分割區索引
val dataRDD1 = dataRDD.mapPartitionsWithIndex(
(index, datas) => { datas.map(index, _) }
)
函數簽名
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
函數說明
將處理的資料進行扁平化後再進行對映處理,所以運算元也稱之為扁平對映
val dataRDD = sparkContext.makeRDD(List(List(1,2),List(3,4)),1)
val dataRDD1 = dataRDD.flatMap(list => list)
函數簽名
def glom(): RDD[Array[T]]
函數說明
將同一個分割區的資料直接轉換為相同型別的記憶體陣列進行處理,分割區不變
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1:RDD[Array[Int]] = dataRDD.glom()
函數簽名
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
函數說明
將資料根據指定的規則進行分組, 分割區預設不變,但是資料會被打亂重新組合,我們將這樣的操作稱之為 shuffle。極限情況下,資料可能被分在同一個分割區中
一個組的資料在一個分割區中,但是並不是說一個分割區中只有一個組
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(_%2)
函數簽名
def filter(f: T => Boolean): RDD[T]
函數說明
將資料根據指定的規則進行篩選過濾,符合規則的資料保留,不符合規則的資料丟棄。 當資料進行篩選過濾後,分割區不變,但是分割區內的資料可能不均衡,生產環境下,可能會出現資料傾斜
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)
函數簽名
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
函數說明
根據指定的規則從資料集中抽取資料
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4
),1)
// 抽取資料不放回(伯努利演演算法)
// 伯努利演演算法:又叫 0、1 分佈。例如扔硬幣,要麼正面,要麼反面。
// 具體實現:根據種子和隨機演演算法算出一個數和第二個引數設定機率比較,小於第二個引數要,大於不要
// 第一個引數:抽取的資料是否放回,false:不放回
// 第二個引數:抽取的機率,範圍在[0,1]之間,0:全不取;1:全取;
// 第三個引數:亂數種子,如果不傳遞第三個引數,那麼使用的是當前系統時間
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取資料放回(泊松演演算法)
// 第一個引數:抽取的資料是否放回,true:放回;false:不放回
// 第二個引數:重複資料的機率,範圍大於等於 0.表示每一個元素被期望抽取到的次數
// 第三個引數:亂數種子,如果不傳遞第三個引數,那麼使用的是當前系統時間
val dataRDD2 = dataRDD.sample(true, 2)
函數簽名
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
函數說明
將資料集中重複的資料去重
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),1)
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)
函數簽名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
:RDD[T]
函數說明
根據資料量縮減分割區,用於巨量資料集過濾後,提高小資料集的執行效率
當 spark 程式中,存在過多的小任務的時候,可以通過 coalesce 方法,收縮合並分割區,減少分割區的個數,減小任務排程成本
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)
// coalesce方法預設情況下不會將分割區的資料打亂重新組合
// 這種情況下的縮減分割區可能會導致資料不均衡,出現資料傾斜
// 如果想要讓資料均衡,可以進行shuffle處理
函數簽名
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
函數說明
該操作內部其實執行的是 coalesce 操作,引數 shuffle 的預設值為 true。無論是將分割區數多的 RDD 轉換為分割區數少的 RDD,還是將分割區數少的 RDD 轉換為分割區數多的 RDD,repartition 操作都可以完成,因為無論如何都會經 shuffle 過程
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)
兩者區別
coalesce運算元可以擴大分割區的,但是如果不進行shuffle操作,是沒有意義,不起作用。
所以如果想要實現擴大分割區的效果,需要使用shuffle操作
spark提供了一個簡化的操作
函數簽名
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
函數說明
該操作用於排序資料。在排序之前,可以將資料通過 f 函數進行處理,之後按照 f 函數處理的結果進行排序,預設為升序排列。排序後新產生的 RDD 的分割區數與原 RDD 的分割區數一致。中間存在 shuffle 的過程
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
// sortBy方法可以根據指定的規則對資料來源中的資料進行排序,預設為升序,第二個引數可以改變排序的方式
// sortBy預設情況下,不會改變分割區。但是中間存在shuffle操作
交集,並集和差集要求兩個資料來源資料型別保持一致
對源 RDD 和引數 RDD 求交集後返回一個新的 RDD
def intersection(other: RDD[T]): RDD[T]
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)
對源 RDD 和引數 RDD 求並集後返回一個新的 RDD
def subtract(other: RDD[T]): RDD[T]
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)
以一個 RDD 元素為主,去除兩個 RDD 中重複元素,將其他元素保留下來。求差集
def subtract(other: RDD[T]): RDD[T]
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)
將兩個 RDD 中的元素,以鍵值對的形式進行合併。
資料型別可以不一致
兩個資料來源要求分割區數量要保持一致
兩個資料來源要求分割區中資料數量保持一致
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.zip(dataRDD2)
函數簽名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
函數說明 將資料按照指定 Partitioner 重新進行分割區。Spark 預設的分割區器是 HashPartitioner
import org.apache.spark.HashPartitioner
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
// RDD => PairRDDFunctions
// 隱式轉換(二次編譯)
// 重分割區的分割區器與當前RDD的分割區器一樣,則不會再次分割區
val rdd2: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
函數簽名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
函數說明
可以將資料按照相同的 Key 對 Value 進行聚合
// reduceByKey : 相同的key的資料進行value資料的聚合操作
// scala語言中一般的聚合操作都是兩兩聚合,spark基於scala開發的,所以它的聚合也是兩兩聚合
// 【1,2,3】
// 【3,3】
// 【6】
// reduceByKey中如果key的資料只有一個,是不會參與運算的。
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
spark中,shuffle操作必須落盤處理,不能在記憶體中資料等待,會導致記憶體溢位
函數簽名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
函數說明
將資料來源的資料根據 key 對 value 進行分組
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
兩者區別
從 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前對分割區內相同 key 的資料進行預聚合(combine)功能,這樣會減少落盤的資料量,而 groupByKey 只是進行分組,不存在資料量減少的問題,reduceByKey 效能比較高。
從功能的角度:reduceByKey 其實包含分組和聚合的功能。GroupByKey 只能分組,不能聚合,所以在分組聚合的場合下,推薦使用 reduceByKey,如果僅僅是分組而不需要聚合。那麼還是隻能使用 groupByKey
函數簽名
def aggregateByKey[U: ClassTag](zeroValue: U)
(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
函數說明
將資料根據不同的規則進行分割區內計算和分割區間計算
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_)
// TODO : 取出每個分割區內相同 key 的最大值然後分割區間相加
// aggregateByKey 運算元是函數柯里化,存在兩個參數列
// 1. 第一個參數列中的參數列示初始值
// 2. 第二個參數列中含有兩個引數
// 2.1 第一個參數列示分割區內的計算規則
// 2.2 第二個參數列示分割區間的計算規則
函數簽名
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
函數說明
當分割區內計算規則和分割區間計算規則相同時,aggregateByKey 就可以簡化為 foldByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
函數簽名
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
函數說明
最通用的對 key-value 型 rdd 進行聚集操作的聚集函數(aggregation function)。類似於 aggregate(),combineByKey()允許使用者返回值的型別與輸入不一致
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),
("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
(_, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
reduceByKey、foldByKey、aggregateByKey、combineByKey 的區別?
reduceByKey: 相同 key 的第一個資料不進行任何計算,分割區內和分割區間計算規則相同
FoldByKey: 相同 key 的第一個資料和初始值進行分割區內計算,分割區內和分割區間計算規則相 同
AggregateByKey: 相同 key 的第一個資料和初始值進行分割區內計算,分割區內和分割區間計算規則可以不相同
CombineByKey: 當計算時,發現資料結構不滿足要求時,可以讓第一個資料轉換結構。分割區內和分割區間計算規則不相同
// reduceByKey:
combineByKeyWithClassTag[V](
(v: V) => v, // 第一個值不會參與計算
func, // 分割區內計算規則
func, // 分割區間計算規則
)
// aggregateByKey :
combineByKeyWithClassTag[U](
(v: V) => cleanedSeqOp(createZero(), v), // 初始值和第一個key的value值進行的分割區內資料操作
cleanedSeqOp, // 分割區內計算規則
combOp, // 分割區間計算規則
)
// foldByKey:
combineByKeyWithClassTag[V](
(v: V) => cleanedFunc(createZero(), v), // 初始值和第一個key的value值進行的分割區內資料操作
cleanedFunc, // 分割區內計算規則
cleanedFunc, // 分割區間計算規則
)
// combineByKey :
combineByKeyWithClassTag(
createCombiner, // 相同key的第一條資料進行的處理常式
mergeValue, // 表示分割區內資料的處理常式
mergeCombiners, // 表示分割區間資料的處理常式
)
函數簽名
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]
函數說明
在一個(K,V)的 RDD 上呼叫,K 必須實現 Ordered 介面(特質),返回一個按照 key 進行排序的
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
函數簽名
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
函數說明
在型別為(K,V)和(K,W)的 RDD 上呼叫,返回一個相同 key 對應的所有元素連線在一起的 (K,(V,W))的 RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)
函數簽名
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
函數說明
類似於 SQL 語句的左外連線
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
函數簽名
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
函數說明
在型別為(K,V)和(K,W)的 RDD 上呼叫,返回一個(K,(Iterable,Iterable))型別的 RDD
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)
函數簽名
def reduce(f: (T, T) => T): T
函數說明
聚集 RDD 中的所有元素,先聚合分割區內資料,再聚合分割區間資料
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 聚合資料
val reduceResult: Int = rdd.reduce(_+_)
函數簽名
def collect(): Array[T]
函數說明
在驅動程式中,以陣列 Array 的形式返回資料集的所有元素
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 收集資料到 Driver rdd.collect().foreach(println)
函數簽名
def count(): Long
函數說明
返回 RDD 中元素的個數
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的個數
val countResult: Long = rdd.count()
函數簽名
def first(): T
函數說明
返回 RDD 中的第一個元素
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的個數
val firstResult: Int = rdd.first()
println(firstResult)
函數簽名
def take(num: Int): Array[T]
函數說明
返回一個由 RDD 的前 n 個元素組成的陣列
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的個數
val takeResult: Array[Int] = rdd.take(2)
println(takeResult.mkString(","))
函數簽名
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
函數說明
返回該 RDD 排序後的前 n 個元素組成的陣列
val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
// 返回 RDD 中元素的個數
val result: Array[Int] = rdd.takeOrdered(2)
函數簽名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
函數說明
分割區的資料通過初始值和分割區內的資料進行聚合,然後再和初始值進行分割區間的資料聚合
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
// 將該 RDD 所有元素相加得到結果
//val result: Int = rdd.aggregate(0)(_ + _, _ + _)
val result: Int = rdd.aggregate(10)(_ + _, _ + _)
函數簽名
def fold(zeroValue: T)(op: (T, T) => T): T
函數說明
摺疊操作,aggregate 的簡化版操作
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val foldResult: Int = rdd.fold(0)(_+_)
函數簽名
def countByKey(): Map[K, Long]
函數說明
摺疊操作,aggregate 的簡化版操作
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2,
"b"), (3, "c"), (3, "c")))
// 統計每種 key 的個數
val result: collection.Map[Int, Long] = rdd.countByKey()
函數簽名
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
函數說明
將資料儲存到不同格式的檔案中
// 儲存成 Text 檔案
rdd.saveAsTextFile("output")
// 序列化成物件儲存到檔案
rdd.saveAsObjectFile("output1")
// 儲存成 Sequencefile 檔案
rdd.map((_,1)).saveAsSequenceFile("output2")
函數簽名
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
函數說明
分散式遍歷 RDD 中的每一個元素,呼叫指定函數
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 收集後列印
rdd.map(num=>num).collect().foreach(println)
println("****************")
// 分散式列印
rdd.foreach(println)
從計算的角度, 運算元以外的程式碼都是在 Driver 端執行, 運算元裡面的程式碼都是在 Executor 端執行。
那麼在 scala 的函數語言程式設計中,就會導致運算元內經常會用到運算元外的資料,這樣就形成了閉包的效果,如果使用的運算元外的資料無法序列化,就意味著無法傳值給 Executor 端執行,就會發生錯誤,所以需要在執行任務計算前,檢測閉包內的物件是否可以進行序列化,這個操作我們稱之為閉包檢測。
Scala2.12 版本後閉包編譯方式發生了改變
從計算的角度, 運算元以外的程式碼都是在 Driver 端執行, 運算元裡面的程式碼都是在 Executor 端執行
object Spark_RDD_Serial {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive"))
val search = new Search("h")
// 會報錯
//search.getMatch1(rdd).collect().foreach(println)
// 不會報錯
search.getMatch2(rdd).collect().foreach(println)
sc.stop()
}
// 查詢物件
// 類的構造引數其實是類的屬性, 構造引數需要進行閉包檢測,其實就等同於類進行閉包檢測
class Search(query:String){
def isMatch(s: String): Boolean = {
s.contains(this.query)
}
// 函數序列化案例
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 屬性序列化案例
def getMatch2(rdd: RDD[String]): RDD[String] = {
val s = query
rdd.filter(x => x.contains(s))
}
}
}
RDD 血緣關係
RDD 只支援粗粒度轉換,即在大量記錄上執行的單個操作。將建立 RDD 的一系列 Lineage (血統)記錄下來,以便恢復丟失的分割區。RDD 的 Lineage 會記錄 RDD 的後設資料資訊和轉 換行為,當該 RDD 的部分分割區資料丟失時,它可以根據這些資訊來重新運算和恢復丟失的 資料分割區。
有一臺伺服器:32G記憶體,如何在記憶體中對1T資料排序
在Spark中,什麼情況下,會產生shuffle?
Spark shuffle一共經歷了這幾個過程:
2. 優化後的 HashShuffle
優化的 HashShuffle 過程就是啟用合併機制,合併機制就是複用 buffer,開啟合併機制 的設定是 spark.shuffle.consolidateFiles。該引數預設值為 false,將其設定為 true 即可開啟優化機制。通常來說,如果我們使用 HashShuffleManager,那麼都建議開啟這個選項。
這裡還是有 4 個 Tasks,資料類別還是分成 3 種型別,因為 Hash 演演算法會根據你的 Key 進行分類,在同一個程序中,無論是有多少過 Task,都會把同樣的 Key 放在同一個 Buffer 裡,然後把 Buffer 中的資料寫入以 Core 數量為單位的本地檔案中,(一個 Core 只有一種類 型的 Key 的資料),每 1 個 Task 所在的程序中,分別寫入共同程序中的 3 份本地檔案,這裡 有 4 個 Mapper Tasks,所以總共輸出是 2 個 Cores x 3 個分類檔案 = 6 個本地小檔案。
1. 普通 SortShuffle
在該模式下,資料會先寫入一個資料結構,reduceByKey 寫入 Map,一邊通過 Map 區域性聚合,一邊寫入記憶體。Join 運算元寫入 ArrayList 直接寫入記憶體中。然後需要判斷是否達到閾值,如果達到就會將記憶體資料結構的資料寫入到磁碟,清空記憶體資料結構。
在溢寫磁碟前,先根據 key 進行排序,排序過後的資料,會分批寫入到磁碟檔案中。預設批次為 10000 條,資料會以每批一萬條寫入到磁碟檔案。寫入磁碟檔案通過緩衝區溢寫的方式,每次溢寫都會產生一個磁碟檔案,也就是說一個 Task 過程會產生多個臨時檔案。
最後在每個 Task 中,將所有的臨時檔案合併,這就是 merge 過程,此過程將所有臨時檔案讀取出來,一次寫入到最終檔案。意味著一個 Task 的所有資料都在這一個檔案中。同時單獨寫一份索引檔案,標識下游各個Task的資料在檔案中的索引,start offset和end offset。
Kryo序列化
conf.registerKryoClasses(...)
預設情況下,Spark使用每個Executor 60%的記憶體空間來快取RDD,那麼只有40%的記憶體空間來存放運算元執行期間建立的物件
統一記憶體管理
統一記憶體管理機制,與靜態記憶體管理的區別在於儲存記憶體和執行記憶體共用同一塊空間,可以動態佔用對方的空閒區域,統一記憶體管理的堆內記憶體結構如圖所示:
其中最重要的優化在於動態佔用機制,其規則如下:
要儘量設定合理的並行度,來充分地利用叢集的資源,才能充分提高Spark程式的效能
可以手動使用textFile()、parallelize()等方法的第二個引數來設定並行度,也可以使用spark.default.parallelism引數,來設定統一的並行度,Spark官方推薦,給叢集的每個cpu core設定2-3個task