RDD(Resilient Distributed Dataset)彈性分散式資料集
是Spark中最基本的資料抽象,它代表一個不可變、可分割區、裡面的元素可平行計算的集合。
1.RDD的特性
2.RDD建立方式
3.建立RDD分割區方式
通過記憶體集合建立
預設的分割區方式(取決於分配給當前應用的CPU核數)
指定分割區
// i是集合下標,length是集合長度,numSlices是分割區數
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
讀取外部檔案建立
4.RDD常用運算元
Transformation轉換運算元
當我們去呼叫的時候,會建立一個新的RDD,並不會真正執行計算操作,只是做一些計算邏輯的封裝
Action行動運算元
只有行動運算元被呼叫之後才會去做計算
行動運算元是觸發了整個作業的執行。因為轉換運算元都是懶載入,並不會立即執行。
map(fun):返回一個新的RDD,該RDD由每一個輸入元素經過fun函數轉換後組成.
當某個RDD執行map方法時,會遍歷RDD中的每一個資料項,並依次應用fun函數,產生一個新的RDD
新的RDD分割區數和舊的RDD分割區數相同
程式碼範例
def main(args: Array[String]): Unit = {
// 建立SparkConf並設定App名稱
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
// 建立SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
println("原分割區個數" + rdd.partitions.size)
val newRDD: RDD[Int] = rdd.map(_ * 2)
println("新分割區個數" + newRDD.partitions.size)
newRDD.collect().foreach(println)
// 關閉連線
sc.stop()
}
-----------------------------------------------------------
輸出結果:
原分割區個數2
新分割區個數2
2
4
6
8
以分割區為單位執行Map運算元
類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函數型別必須是Iterator[T] => Iterator[U]。
假設有N個元素,有M個分割區,那麼map的函數的將被呼叫N次,而mapPartitions被呼叫M次,一個函數一次處理所有分割區。
引數是一個可迭代的集合
map是一次處理一個元素,mapPartitions是一次處理一個分割區的元素,mapPartitions適用於批次處理操作
程式碼範例(對RDD中的元素進行乘2操作)
def main(args: Array[String]): Unit = {
// 建立SparkConf並設定App名稱
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
// 建立SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// 以分割區為單位,對RDD中的元素進行對映
val newRDD: RDD[Int] = rdd.mapPartitions(datas => {
datas.map(_ * 2)
})
newRDD.collect().foreach(println)
// 關閉連線
sc.stop()
}
---------------------------------------
輸出結果
2
4
6
8
適用於批次處理操作
比如將RDD中的元素插入到資料庫中,需要資料庫連線,如果每一個元素都建立一個連線,效率很低,可以對每個分割區的元素建立一個連線
mapPartitions每次處理一批資料,這個分割區中資料處理完之後,原來的RDD中分割區的資料才會釋放,可能會導致記憶體溢位
兩個引數,一個是當前分割區的編號,另外一個是分割區的資料(可以單獨對某一個分割區進行操作)
程式碼範例
需求1
對RDD元素做對映,將元素變為元組,元組裡面分別表示分割區編號和當前元素
def main(args: Array[String]): Unit = {
// 建立SparkConf並設定App名稱
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
// 建立SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val newRDD: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
(index, datas) => {
datas.map((index, _))
}
)
newRDD.collect().foreach(println)
// 關閉連線
sc.stop()
}
-----------------------------------------------------------------
輸出結果
(0,1)
(0,2)
(1,3)
(1,4)
需求2
第二個分割區乘2,其他分割區不變
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8), 3)
val newRDD: RDD[Int] = rdd.mapPartitionsWithIndex(
(index, datas) => {
index match {
case 1 => datas.map(_ * 2)
case _ => datas
}
}
)
newRDD.collect().foreach(println)
--------------------------------------
輸出結果
1
2
6
8
10
6
7
8
功能與map相似,將RDD中的每一個元素通過應用f函數依次轉換為新函數,並封裝到RDD中
跟map的區別在於:
程式碼範例
需求
對集合進行扁平化處理,將一個大的集合(裡面有子集合)裡面資料取出,放入一個大的集合當中
def main(args: Array[String]): Unit = {
// 建立SparkConf並設定App名稱
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
// 建立SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4), List(5, 6), List(7, 8)), 2)
// 如果匿名函數輸入和輸出相同,那麼不能進行簡化
val newRDD: RDD[Int] = rdd.flatMap(datas => datas)
newRDD.collect().foreach(println)
// 關閉連線
sc.stop()
}
----------------------------------------------
1
2
3
4
5
6
7
8
def glom(): RDD[Array[T]]
將RDD中每一個分割區中元素組合成一個陣列,封裝到新的RDD當中,陣列中元素的型別保持不變
(flatMap是將整體轉換成個體,glom是將個體轉換成整體)
程式碼範例
需求
將RDD中每個分割區裡的資料放到陣列當中
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
var rdd: RDD[Int] =sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
println("----------沒有進行glom之前--------------")
rdd.mapPartitionsWithIndex(
(index, datas) => {
println("當前分割區:" + index + " " + "元素:"+ datas.mkString(","))
datas
}
).collect()
println("----------呼叫glom之後--------------")
val newRDD: RDD[Array[Int]] = rdd.glom()
newRDD.mapPartitionsWithIndex(
(index, datas) => {
println("當前分割區:" + index + " " + "元素:"+ datas.next().mkString(","))
datas
}
).collect()
sc.stop()
}
-------------------------------------------
輸出結果
----------沒有進行glom之前--------------
當前分割區:1 元素:4,5,6
當前分割區:0 元素:1,2,3
----------呼叫glom之後--------------
當前分割區:0 元素:1,2,3
當前分割區:1 元素:4,5,6
def groupBy[K](f T => K) (implicit kt: ClassTag[K]): RDD[(k, Iterable[T])]
分組,按照傳入函數的返回值進行分組,相同key對應的值放入一個迭代器(按照指定的規則,對RDD中的元素進行分組)
groupBy會存在shuffle的過程進行落盤
程式碼範例
需求
將RDD中奇數放在一組,偶數放到另一組
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] =sc.makeRDD(1 to 9, 3)
println("-------------執行分組前----------------")
rdd.mapPartitionsWithIndex(
(index, datas) => {
println(s"分割區號:$index " + "元素:" + datas.mkString(","))
datas
}
).collect()
val newRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
println("-------------執行分組後----------------")
newRDD.mapPartitionsWithIndex(
(index, datas) => {
println(s"分割區號:$index " + "元素:" + datas.mkString(","))
datas
}
).collect()
sc.stop()
}
--------------------------------------------
輸出結果
-------------執行分組前----------------
分割區號:0 元素:1,2,3
分割區號:2 元素:7,8,9
分割區號:1 元素:4,5,6
-------------執行分組後----------------
分割區號:2 元素:
分割區號:1 元素:(1,CompactBuffer(1, 3, 5, 7, 9))
分割區號:0 元素:(0,CompactBuffer(2, 4, 6, 8))
給出一個RDD,將相同的單詞放到一組
val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Peanut", "Hadoop", "Hive"))
val newRDD: RDD[(String, Iterable[String])] = rdd.groupBy(elem => elem)
newRDD.collect().foreach(println)
-------------------------------------------
輸出結果
(Hive,CompactBuffer(Hive))
(Peanut,CompactBuffer(Peanut, Peanut))
(Hadoop,CompactBuffer(Hadoop, Hadoop))
用以上的運算元來實現一個簡單版的WordCount
List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark")
首先從集合中讀取資料,用flatMap對RDD中的資料進行扁平化處理,通過map來對每一個單詞設定為元組,每個單詞對應的數為1,使用groupBy將相同的key放到一組,對相同的單詞進行累加
實現方式1
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark"))
// 對RDD中的元素進行扁平化對映
val flatMap: RDD[String] = rdd.flatMap(_.split(" "))
// 將對映後的資料進行結構轉換,為每個單詞進行計數
val mapRDD: RDD[(String, Int)] = flatMap.map((_, 1))
// 按照key對RDD中的元素進行分組
val groupByRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupBy(_._1)
// 對分組後的元素再進行對映
val resRDD: RDD[(String, Int)] = groupByRDD.map({
case (word, datas) => {
(word, datas.size)
}
})
resRDD.collect().foreach(println)
sc.stop()
}
------------------------------------------------
輸出結果
(Peanut,1)
(Spark,3)
(Hadoop,2)
實現方式2
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark"))
// 對RDD中的元素進行扁平化對映
val flatMap: RDD[String] = rdd.flatMap(_.split(" "))
// 將RDD中的單詞進行分組
val groupByRDD: RDD[(String, Iterable[String])] = flatMap.groupBy(word => word)
// 對分組後的元素再進行對映
val resRDD: RDD[(String, Int)] = groupByRDD.map({
case (word, datas) => {
(word, datas.size)
}
})
resRDD.collect().foreach(println)
sc.stop()
}
------------------------------------------------
輸出結果
(Peanut,1)
(Spark,3)
(Hadoop,2)
用以上的運算元來實現一個複雜版的WordCount
數位代表出現的次數
List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)))
// 將原來RDD中字串以及字串出現的次數進行處理,形成一個新的字串
val rdd1: RDD[String] = rdd.map{
case (str, count) => {
(str + " ") * count
}
}
println("輸出當前rdd1")
rdd1.collect().foreach(println)
println("-------------------------------------")
// 對RDD中的元素進行扁平化對映
val flatMap: RDD[String] = rdd1.flatMap(_.split(" "))
// 將RDD中的單詞進行分組
val groupByRDD: RDD[(String, Iterable[String])] = flatMap.groupBy(word => word)
// 對分組後的元素再進行對映
val resRDD: RDD[(String, Int)] = groupByRDD.map({
case (word, datas) => {
(word, datas.size)
}
})
resRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------------------------------------
輸出結果
輸出當前rdd1
Peanut Spark Peanut Spark
Hello Spark Hello Spark Hello Spark
Hello World Hello World
-------------------------------------
(Peanut,2)
(Hello,5)
(World,2)
(Spark,5)
def main(args: Array[String]): Unit = {
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)))
val flatMapRDD: RDD[(String, Int)] = rdd.flatMap {
case (words, count) => {
// 對多個單片語成的字串進行切割
words.split(" ").map((_, count))
}
}
// 按照單詞對RDD中的元素進行分組
val groupByRDD: RDD[(String, Iterable[(String, Int)])] = flatMapRDD.groupBy(_._1)
val resRDD: RDD[(String, Int)] = groupByRDD.map{
case (word, datas) =>
(word, datas.map(_._2).sum)
}
resRDD.collect().foreach(println)
}
--------------------------------------------------------------------------------
輸出結果
(Peanut,2)
(Hello,5)
(World,2)
(Spark,5)
def filter(f T=> Boolean): RDD[T]
RDD呼叫filter時,對RDD中的每一個元素應用f函數,將返回值為true的元素新增到新的RDD當中
按照指定的過濾規則,對RDD中的元素進行過濾
程式碼範例
需求1
List("Peanut", "zhangsan", "lisi", "wangwu", "xiaoliu", "xiaowang")
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("Peanut", "zhangsan", "lisi", "wangwu", "xiaoliu", "xiaowang"))
val newRDD: RDD[String] = rdd.filter(_.contains("xiao"))
newRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------------
輸出結果
xiaoliu
xiaowang
需求2
List(1, 2, 3, 4, 5, 6, 7, 8, 9)
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
// 把所有的奇數過濾出來
val newRDD: RDD[Int] = rdd.filter(_ % 2 == 0)
newRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------------------------
輸出結果
2
4
6
8
隨機抽樣
def sample(
withReplacement: Boolean, // true為有放回的抽樣,false為無放回抽樣
fraction: Double, // 當withReplacement = false時,表示每個元素的概率,範圍是[0,1]
// 當withReplacement = true時,表示每個元素的期望次數,範圍是 >= 0
seed: Long = Untils.random.nextLong // send表示亂數生成器種子(生成亂數的初始值,種子一樣時,生成的亂數相同)抽樣演演算法的初始值,一般不需要指定
): RDD[T]
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(1 to 10)
val newRDD: RDD[Int] = rdd.sample(true, 1)
newRDD.collect().foreach(println)
sc.stop()
}
-----------------------------------------------------
輸出結果(每次執行的結果都不同)
1
2
2
2
2
5
5
6
6
6
7
10
10
10
最後的結果是隨機的,第二個引數設定期望出現次數為1,只是期望,並不代表每個數位都出現一次。
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(1 to 10)
// 從RDD中隨機抽取資料(抽樣不放回)
val newRDD: RDD[Int] = rdd.sample(false, 0.3)
newRDD.collect().foreach(println)
sc.stop()
}
---------------------------------------------
3
5
7
takeSample可以在抽取時指定抽取的個數(在第二個引數中指定抽取的個數)
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val stds = List("張1", "張2", "張3", "張4", "張5", "張6", "張7", "張8", "張9",
"張10", "張11", "張12", "張13", "張14", "張15", "張16",
"張17", "張18", "張19", "張21", "張20", "張22")
val nameRDD: RDD[String] = sc.makeRDD(stds)
// 從上面RDD當中抽取一名幸運觀眾
val res: Array[String] = nameRDD.takeSample(false, 3)
res.foreach(println)
sc.stop()
}
------------------------------------------
輸出結果
張18
張13
張1
去重運算元
def distinct(): RDD[T]
distinct運算元是對內部元素進行去重,將去重後的結果放到新的RDD當中
預設情況下,distinct會生成與原分割區個數一致的分割區數
distinct有兩種,一種是不加引數,一種是可以指定分割區數
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 4, 3, 3), 5)
// 對RDD中資料進行去重
val newRDD: RDD[Int] = rdd.distinct()
newRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------
輸出結果
5
1
2
3
4
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 4, 3, 3), 5)
rdd.mapPartitionsWithIndex{
(index, datas) =>
println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
datas
}.collect()
println("------------去重前後分割區對比----------------")
// 對RDD中資料進行去重
val newRDD: RDD[Int] = rdd.distinct(2)
newRDD.mapPartitionsWithIndex{
(index, datas) =>
println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
datas
}.collect()
sc.stop()
}
--------------------------------------------------------
輸出結果
分割區:4---> 分割區元素:3,3
分割區:1---> 分割區元素:3,4
分割區:0---> 分割區元素:1,2
分割區:2---> 分割區元素:5,5
分割區:3---> 分割區元素:4,4
------------去重前後分割區對比----------------
分割區:1---> 分割區元素:1,3,5
分割區:0---> 分割區元素:4,2
coalesce運算元可以重新分割區
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
rdd.mapPartitionsWithIndex{
(index, datas) =>
println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
datas
}.collect()
println("*****************縮減分割區前後********************")
// 縮減分割區
val newRDD: RDD[Int] = rdd.coalesce(2)
newRDD.mapPartitionsWithIndex{
(index, datas) =>
println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
datas
}.collect()
sc.stop()
}
----------------------------------------------------------
分割區:0---> 分割區元素:1,2
分割區:2---> 分割區元素:5,6
分割區:1---> 分割區元素:3,4
***************縮減分割區前後**********************
分割區:0---> 分割區元素:1,2
分割區:1---> 分割區元素:3,4,5,6
重新分割區,擴大分割區時,可以使用reparation,reparation會執行shuffle
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
rdd.mapPartitionsWithIndex{
(index, datas) =>
println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
datas
}.collect()
// 使用reparation擴大分割區
val newRDD: RDD[Int] = rdd.repartition(4)
println("******************擴大分割區前後*******************")
newRDD.mapPartitionsWithIndex{
(index, datas) =>
println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
datas
}.collect()
sc.stop()
}
-----------------------------------
輸出結果
分割區:2---> 分割區元素:5,6
分割區:1---> 分割區元素:3,4
分割區:0---> 分割區元素:1,2
******************擴大分割區前後*******************
分割區:2---> 分割區元素:
分割區:1---> 分割區元素:
分割區:3---> 分割區元素:1,3,5
分割區:0---> 分割區元素:2,4,6
repartition的底層其實是呼叫的coalesce,只不過在呼叫時,第二個引數指定執行shuffle機制
請看原始碼:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
排序
對RDD中的元素進行排序,排序時需要指定排序規則
def sortBy[K](
f: (T) => K,
ascending: Boolean = true, // 預設為正序排序
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 6, 5, 3, 2))
// 升序排序
val sortedRDD: RDD[Int] = rdd.sortBy(num => num)
sortedRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------------------------
輸出結果
1
2
3
4
5
6
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 6, 5, 3, 2))
// 降序排序
val sortedRDD: RDD[Int] = rdd.sortBy(num => num, false)
sortedRDD.collect().foreach(println)
sc.stop()
}
-----------------------------------------------------------------
輸出結果
6
5
4
3
2
1
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val strRDD: RDD[String] = sc.makeRDD(List("1", "6", "2", "10"))
println("-----------對字串按字典序排序-------------")
// 按照字串字典順序進行排序
val sortedRDD: RDD[String] = strRDD.sortBy(elem => elem)
sortedRDD.collect().foreach(println)
println("------------將字串按照字面的數位來進行排序---------------")
// 按照字串轉換為整數後的大小進行排序
val sortedRDD2: RDD[String] = strRDD.sortBy(_.toInt)
sortedRDD2.collect().foreach(println)
sc.stop()
}
-----------------------------------------
輸出結果
-----------對字串按字典序排序-------------
1
10
2
6
------------將字串按照字面的數位來進行排序---------------
1
2
6
10
def pipe(command: String): RDD[String]
針對每一個分割區,都呼叫一次shell指令碼,返回輸出的RDD
def union(other: RDD[T]): RDD[T]
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
// 求並集
val newRDD: RDD[Int] = rdd1.union(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------
1
2
3
4
3
4
5
6
7
def intersection(other: RDD[T]): RDD[T]
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
// 求交集
val newRDD: RDD[Int] = rdd1.intersection(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
---------------------
輸出結果
3
4
def subtract(other: RDD[T]): RDD[T]
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
// 求差集
val newRDD: RDD[Int] = rdd1.subtract(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
--------------------------------------------
輸出結果
1
2
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
拉鍊的功能是將兩個RDD中的元素合併成元組,元組中key為第一個RDD中的元素,value為第二個RDD中的元素
要求兩個RDD的分割區數量以及元素數量都相等,否則會拋異常
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))
// 拉鍊
val newRDD: RDD[(Int, Int)] = rdd1.zip(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
----------------------------------------
輸出結果
(1,3)
(2,4)
(3,5)
(4,6)
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
按照key從新分割區,將RDD[K, V]中的K,按照指定Partitioner重新進行分割區如果原有的partitionRDD和現有partitionRDD是一致的話,就不進行分割區,否則會產生shuffe過程。
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c")), 3)
rdd.mapPartitionsWithIndex(
(index, datas) =>{
println("分割區號:" + index + "---->" + "分割區資料" + datas.mkString(","))
datas
}
).collect()
println("**************分割區前後對比*****************")
val newRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
newRDD.mapPartitionsWithIndex(
(index, datas) =>{
println("分割區號:" + index + "---->" + "分割區資料" + datas.mkString(","))
datas
}
).collect()
sc.stop()
}
----------------------------------------
輸出結果
分割區號:0---->分割區資料(1,a)
分割區號:2---->分割區資料(3,c)
分割區號:1---->分割區資料(2,b)
**************分割區前後對比*****************
分割區號:1---->分割區資料(1,a),(3,c)
分割區號:0---->分割區資料(2,b)
自定義分割區器
class MyPartitioner(partitions: Int) extends Partitioner {
// 獲取分割區個數
override def numPartitions: Int = partitions
// 指定分割區規則 返回值Int表示分割區編號,從0開始
override def getPartition(key: Any): Int = {
// 此時返回為1,那麼所有的元素都會被分到1號區
1
}
}
------------------------------------------------------------------------------
將val newRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))中new HashPartitioner(2)換成new MyPartitioner(2)
輸出結果
分割區號:1---->分割區資料(2,b)
分割區號:0---->分割區資料(1,a)
分割區號:2---->分割區資料(3,c)
**************分割區前後對比*****************
分割區號:0---->分割區資料
分割區號:1---->分割區資料(1,a),(2,b),(3,c)
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
reduceByKey可以將RDD[K, V]中的元素按照相同的K對V進行聚合
可以指定新的RDD中分割區器或分割區數
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 4), ("b", 3)))
val resRDD: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
resRDD.collect().foreach(println)
sc.stop()
}
--------------------------------------------------------------
輸出結果
(a,5)
(b,5)
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
groupByKey對集合中的每個key進行操作,會將相同key對應的value放到一個集合當中,不進行聚合
可以指定新的RDD中分割區器(預設使用的分割區器是HashPartitioner)或分割區數
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 2), ("a", 4), ("b", 3)))
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
---------------------------------
輸出結果
(a,CompactBuffer(5, 4))
(b,CompactBuffer(2, 3))
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 2), ("a", 4), ("b", 3)))
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
val resRDD = groupRDD.map {
case (key, datas) => {
(key, datas.sum)
}
}
resRDD.collect().foreach(println)
sc.stop()
}
---------------------------------------------------------------
輸出結果
(a,9)
(b,5)
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
aggregateByKey是按照key對分割區內以及分割區間的資料進行處理
aggregateByKey(引數1)(引數2, 引數3)
引數1: 初始值
引數2: 分割區內計算規則
引數3: 分割區間計算規則
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("c", 5), ("b", 5), ("c", 1), ("b", 2)), 2)
// aggregateByKey實現wordCount
val newRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(_ + _, _ + _)
newRDD.collect().foreach(println)
sc.stop()
}
----------------------------------------------
輸出結果
(b,7)
(a,6)
(c,6)
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("c", 5), ("a", 2), ("a", 3), ("b", 5), ("a", 3), ("c", 3), ("b", 2)), 2)
rdd.mapPartitionsWithIndex(
(index, datas) => {
println("分割區:" + index + "---> 分割區資料:" + datas.mkString(","))
datas
}
).collect()
println("求分割區最大值")
val newRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(
// 分割區內的計算規則,找出最大值,max在進行比較時,首先是將第一個元素跟初始值比較
(x, y) => math.max(x, y),
// 分割區間計算規則
(a, b) => a + b
)
/*
// 簡化
val newRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(
// 分割區內的計算規則,找出最大值
math.max(_, _),
// 分割區間計算規則
_ + _
)*/
newRDD.collect().foreach(println)
sc.stop()
}
-----------------------------------------------------
輸出結果
分割區:0---> 分割區資料:(c,5),(a,2),(a,3)
分割區:1---> 分割區資料:(b,5),(a,3),(c,3),(b,2)
求分割區最大值
(b,5)
(a,6)
(c,8)
當分割區內計算邏輯和分割區間計算邏輯不一樣時,可以考慮使用aggregateByKey
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
foldByKey是aggregateByKey的簡化版本,如果分割區內和分割區間計算邏輯相同,那麼可以使用foldByKey,foldByKey和reduceByKey之間的差別在於foldKey可以指定初始值
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("c", 2), ("a", 1), ("a", 2), ("b", 5), ("a", 3), ("c", 3), ("b", 2)), 2)
val newRDD: RDD[(String, Int)] = rdd.foldByKey(0)(_ + _)
newRDD.collect().foreach(println)
sc.stop()
}
------------------------------------------------------
(b,7)
(a,6)
(c,5)
轉換結構後分割區內和分割區間操作
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)]
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]
引數說明:
createCombiner
mergeValue
mergeCombiners
combineByKey運算元的使用場景
程式碼範例
List(("LaoWang", 90), ("LaoLiu", 80), ("LaoWang", 85), ("LaoLiu", 60)
List中表示的是學生姓名和成績,一個學生有不同科目的成績
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 需求,求出每一個學生的平均成績
// 建立RDD
val scoreRDD: RDD[(String, Int)] = sc.makeRDD(List(("LaoWang", 90), ("LaoLiu", 80), ("LaoWang", 85), ("LaoLiu", 60)), 2)
val combineRDD: RDD[(String, (Int, Int))] = scoreRDD.combineByKey(
// 初始化
(_, 1),
// 分割區內計算規則 將分數相加,每加一次,課程門數加1
(t1: (Int, Int), v) => {
(t1._1 + v, t1._2 + 1)
},
// 分割區間計算規則,將分數以及課程門數分別相加
(tup2: (Int, Int), tup3: (Int, Int)) => {
(tup2._1 + tup3._1, tup2._2 + tup2._2)
}
)
// 求平均成績
val resRDD: RDD[(String, Int)] = combineRDD.map {
case (name, (score, count)) => {
(name, score / count)
}
}
resRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------------------------------------------------------
輸出結果
(LaoLiu,70)
(LaoWang,87)
按照K進行排序
在一個(K, V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]
引數說明
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
// 建立RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((2, "aa"), (6, "dd"), (4, "bb"), (1, "cc")))
// 按照key對RDD中的元素進行排序 Int已經預設實現了Ordered介面
val newRDD: RDD[(Int, String)] = rdd.sortByKey()
newRDD.collect().foreach(println)
sc.stop()
}
-------------------------------------------------------------
(1,cc)
(2,aa)
(4,bb)
(6,dd)
如果想降序排序,只需在方法裡設定第一個引數為false即可
val newRDD: RDD[(Int, String)] = rdd.sortByKey(false)
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val stdList: List[(Student, Int)] = List(
(new Student("zhangsan", 18), 1),
(new Student("lisi", 12), 1),
(new Student("wangwu", 20), 1),
(new Student("wangwu", 23), 1)
)
val stdRDD: RDD[(Student, Int)] = sc.makeRDD(stdList)
val resRDD: RDD[(Student, Int)] = stdRDD.sortByKey()
resRDD.collect().foreach(println)
sc.stop()
}
-----------------------------------------------------------------------------
輸出結果
(Student(lisi, 12) ,1)
(Student(wangwu, 23) ,1)
(Student(wangwu, 20) ,1)
(Student(zhangsan, 18) ,1)
Student類定義如下
class Student(var name: String, var age: Int) extends Ordered[Student] with Serializable {
// 指定比較規則
override def compare(that: Student): Int = {
// 先按照名稱升序,如果名稱相同的話,再按照年齡降序
var res: Int = this.name.compareTo(that.name)
if (res == 0) {
// res 等於0 說明名字相同,此時按照年齡降序排序
res = that.age - this.age
}
res
}
override def toString: String = s"Student($name, $age) "
}
def mapValues[U](f: V => U): RDD[(K, U)]
對kv型別的RDD中的value部分進行對映
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d")))
val newRDD: RDD[(Int, String)] = rdd.mapValues(" --> " + _)
newRDD.collect().foreach(println)
sc.stop()
}
----------------------------------------------------
輸出結果
(1, --> a)
(2, --> c)
(1, --> b)
(3, --> d)
我們先來回顧一下SQL中的連線
join()是將相同key對應的多個value關聯在一起
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
對於兩個RDD ,型別分別為(k, v)和(k, w),呼叫join運算元時,返回一個相同key對應的所有元素組合在一起的RDD(k, (v, w)),簡單點來說,就是k不變,將value從新組合放到一起
如果key只是某一個RDD中有,那麼這個key不會關聯,類似於內連線,將兩張表完全匹配的資料給顯示出來
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))
// join運算元相當於內連線,將兩個RDD中的key相同的資料匹配,如果key匹配不上,那麼資料不關聯
val newRDD: RDD[(Int, (String, Int))] = rdd.join(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
---------------------------------------------------------------------------
輸出結果
(1,(a,3))
(1,(a,2))
(1,(b,3))
(1,(b,2))
(2,(c,5))
(3,(d,3))
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))
val newRDD: RDD[(Int, (String, Option[Int]))] = rdd.leftOuterJoin(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
-----------------------------------------------------------------------
(1,(a,Some(3)))
(1,(a,Some(2)))
(1,(b,Some(3)))
(1,(b,Some(2)))
(2,(c,Some(5)))
(3,(d,Some(3)))
(4,(e,None))
類似於全連線,但是在同一個RDD中對key聚合
操作兩個RDD中的KV元素,每個RDD中相同key中的元素分別聚合成一個集合。
舉例
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))
// cogroup
val newRDD: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd.cogroup(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
---------------------------------------------------------------
輸出結果
(1,(CompactBuffer(a, b),CompactBuffer(3, 2)))
(2,(CompactBuffer(c),CompactBuffer(5)))
(3,(CompactBuffer(d),CompactBuffer(3)))
(4,(CompactBuffer(e),CompactBuffer()))
def reduce(f: (T, T) => T): T
聚合
f函數聚集RDD中的所有元素,先聚合分割區內資料,再聚合分割區間資料
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
// reduce
val res: Int = rdd.reduce(_ + _)
println(res)
sc.stop()
}
-----------------------------------------------------
輸出結果
10
def collect(): Array[T]
以陣列Array的形式返回資料集的所有元素
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
/*
// reduce
val res: Int = rdd.reduce(_ + _)
println(res)
*/
rdd.foreach(println)
println("-------------------------")
val ints: Array[Int] = rdd.collect()
ints.foreach(println)
sc.stop()
}
----------------------------------------------------
輸出結果
1
3
2
4
-------------------------
1
2
3
4
注意:雖然用rdd直接呼叫foreach進行輸出和先呼叫collect再進行輸出,結果都可以出來,但是呼叫collect之後,會把資料先收集到driver端,資料是全部存到一個陣列當中;而不呼叫直接輸出時,不同分割區的資料是在不同的Executor並行輸出,我們會看到沒呼叫collect進行輸出的話,元素的順序可能會跟原順序不一樣
計數,返回RDD中元素的個數
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val res: Long = rdd.count()
println(res)
sc.stop()
}
---------------
輸出結果
4
返回RDD中的第一個元素
def first(): T
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// first 返回RDD中的第一個元素
val res: Int = rdd.first()
println(res)
sc.stop()
}
-------------------
輸出結果
1
take運算元可以返回由RDD前n個元素組成的陣列
def take(num: Int): Array[T]
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// take 返回rdd前n個元素組成的陣列
val res: Array[Int] = rdd.take(3)
println(res.mkString(","))
sc.stop()
}
-----------------------------------------------
輸出結果
1,2,3
返回RDD排序後前n個元素組成的陣列
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(3, 1, 2, 5, 4), 2)
// takeOrdered(n) 獲取rdd排序後前n個元素
val ans: Array[Int] = rdd.takeOrdered(3)
println(ans.mkString(","))
sc.stop()
}
----------------------------------------------
輸出結果
1,2,3
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
在進行分割區內的邏輯時,每個分割區會通過分割區內邏輯和初始值進行聚合(例如:初始值為1,分割區計算邏輯為相加,那麼分割區內的元素聚合時,最後的結果還要加一次初始值1),然後分割區間也會通過分割區間邏輯和初始值進行操作。
程式碼範例
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
rdd.mapPartitionsWithIndex(
(index, datas) => {
println("分割區號:" + index + "-> 分割區資料:" + datas.mkString(","))
datas
}
).collect()
// agreegate
val res: Int = rdd.aggregate(1)(_ + _, _ + _)
println(res)
sc.stop()
}
---------------------------------------------------
輸出結果
分割區號:4-> 分割區資料:
分割區號:2-> 分割區資料:
分割區號:1-> 分割區資料:1
分割區號:5-> 分割區資料:3
分割區號:7-> 分割區資料:4
分割區號:0-> 分割區資料:
分割區號:6-> 分割區資料:
分割區號:3-> 分割區資料:2
19
這裡有點不太好理解,我解釋一下程式碼,程式碼首先是一個集合,資料在八個分割區中,由輸出結果可看出,1,3,5,7號分割區中有資料,分割區裡的邏輯是相加,相加時每個分割區進行分割區間的邏輯操作(相加)時,分割區裡元素相加的結果還要加上初始值1
然後分割區間的邏輯也是相加,分割區間相加時也會加上初始值
總共8個分割區,每個分割區都要加一次初始值1,總共要加8,所有分割區裡的原始資料相加總共是10,那麼現在總共是18。分割區間在進行相加操作時,還要加一次初始值,即為結果19。
讀者可以通過修改rdd分割區數以及初始值來體會這個運算元。
def fold(zeroValue: T)(op: (T, T) => T): T
fold 是aggregate的簡化,分割區內和分割區間計算規則相同
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 3)
rdd.mapPartitionsWithIndex(
(index, datas) => {
println("分割區號:" + index + "-> 分割區資料:" + datas.mkString(","))
datas
}
).collect()
val res: Int = rdd.fold(10)(_ + _)
println(res)
sc.stop()
}
---------------------------------------------------------------------------
輸出結果
分割區號:1-> 分割區資料:2
分割區號:2-> 分割區資料:3,4
分割區號:0-> 分割區資料:1
50
初始值為10,3個分割區,所以要另外加上30,分割區間操作還要加上初始值,那麼總共要另外加40,40再加上原始的總值10,結果為50
統計每種key的個數
def countByKey(): Map[K, Long]
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
val res: collection.Map[Int, Long] = rdd.countByKey()
println(res)
sc.stop()
}
----------------------------------------------------------------------------------------
輸出結果
Map(1 -> 2, 2 -> 1, 3 -> 2)
saveAsTextFile(path)
**saveAsSequenceFile(path) **
saveAsObjectFile(path)
// save 相關運算元
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1)
// 儲存為文字檔案,指定儲存路徑
rdd.saveAsTextFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output")
// 儲存為序列化檔案
rdd.saveAsObjectFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output1")
// 儲存為SequenceFile (只支援KV型別的RDD)
rdd.map((_, 1)).saveAsSequenceFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output2")
遍歷RDD中每一個元素,前面的程式碼大多數地方都用過這個運算元了,這裡就不再贅述。