上萬字的spark運算元總結,附帶每個運算元的程式碼範例

2020-09-28 09:03:17



簡介

RDD(Resilient Distributed Dataset)彈性分散式資料集

是Spark中最基本的資料抽象,它代表一個不可變、可分割區、裡面的元素可平行計算的集合。

  • 彈性
    • 儲存(記憶體和磁碟可以自動切換)
    • 計算(RDD有計算出錯重試機制)
    • 容錯(資料丟失可以自動恢復)
    • 分割區
  • 分散式
    • 不同分割區中的資料會分配給叢集中的不同伺服器節點進行計算
  • 資料集
    • 集合不一樣,RDD只封裝計算邏輯,不會儲存資料
  • 資料抽象
    • RDD是抽象類,需要子類實現
  • 不可變
    • RDD封裝計算邏輯,是不可變的,若要改變,需要產生新的RDD,在新的RDD裡面封裝新的計算邏輯

1.RDD的特性

  • 1)RDD有一組分割區,通過一個分割區計算函數getPartitions來獲取分割區
  • 2)分割區計算函數 compute,可以把分割區資料給取出來
  • 3)RDD之間的依賴,通過getDependence
  • 4)有分割區器Partitioner(對於kv型別的RDD),可以控制分割區的資料流向
  • 5)資料儲存優先位置 getPreferedLocation

2.RDD建立方式

  • 1)通過記憶體集合建立
  • 2)通過讀取外部檔案建立
  • 3)通過RDD的轉換運算元得到新的RDD

3.建立RDD分割區方式

  • 通過記憶體集合建立

    • 預設的分割區方式(取決於分配給當前應用的CPU核數)

    • 指定分割區

      	// i是集合下標,length是集合長度,numSlices是分割區數
      	def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
            (0 until numSlices).iterator.map { i =>
              val start = ((i * length) / numSlices).toInt
              val end = (((i + 1) * length) / numSlices).toInt
              (start, end)
            }
          }
      

  • 讀取外部檔案建立

    • 預設的分割區方式(取決於分配給當前應用的CPU核數與2取最小值)
    • 指定分割區
      • FileInputFormat裡面有個getSplits(切片)方法
      • LineRecordReader裡面有個next(按行讀取)方法

4.RDD常用運算元

  • 轉換運算元Transformation
    • 轉換運算元執行完畢後,會建立新的RDD,並不會馬上執行計算
    • 1.map
      • 對RDD中的元素進行一個個對映
    • 2.mapPartitions
      • 以分割區為單位,對RDD中的元素對映
    • 3.mapPartitionsWithIndex
      • 以分割區為單位,對RDD中的元素進行對映,並且帶分割區編號
    • 4.flatMap
      • 對RDD中的元素進行扁平化處理(將整體拆分成個體)
    • 5.glom
      • 將RDD中每一個分割區中的單個元素,轉換為陣列
    • 6.groupBy
      • 按照一定的分組規則,對RDD中的元素進行分組
    • 7.filter
      • 按照一定的規則,對RDD中的元素進行過濾
    • 8.sample
      • 引數1:是否抽樣放回,如果是true,表示放回,否則不放回
      • 引數2:若引數1為true,那麼引數2表示期望元素出現的次數(這裡只是期望,並不代表實際次數),這個數大於等於零;引數1若為false,那麼引數2表示每一個元素出現的概率,範圍是[0, 1]
      • 引數3:隨機演演算法的初始值(一般不用設定,若指定初始值,那麼隨機生成的數相等)
      • takeSample(行動運算元):taskSample可以指定隨機抽取的個數
    • 9.distinct
      • 去重,底層是通過map + reduceByKey完成的去重操作
    • 改變分割區的兩個運算元
      • 10.coalesce
        • 一般用於縮減分割區,預設情況下是不執行shuffle機制
      • 11.repartition
        • 一般用於擴大分割區,底層呼叫的是coalesce,會將coalesce第二個引數設定為true,會執行shuffle
    • 12.sortBy
      • 按照指定規則,對RDD中的元素進行升序,預設升序(可通過引數來設定)
    • 13.pipe
      • 對於RDD中的每一個分割區,都會執行pipe運算元中指定的指令碼
    • 14.Union
      • 合集
    • 15.intersection
      • 交集
    • 16subtract
      • 差集
    • 17.zip
      • 拉鍊,必須要保證兩個RDD的分割區數以及每個分割區中元素的個數一致
    • 18.partitionBy
      • 按照指定的分割區器,通過key對RDD中的元素進行分割區
      • 預設分割區器 HashPartitioner
    • 19.reduceByKey
      • 將相同的key放在一起,對Value進行聚合操作
    • 20.groupByKey
      • 按照key對RDD中的元素進行分組
    • 21.aggregateByKey(引數1:zeroValue)(引數2:分割區內計算規則, 引數3:分割區間計算規則)
    • 22.foldByKey(引數1:zereValue)(引數2:分割區內和分割區間計算規則)
      • 是aggregateByKey的簡化,分割區內和分割區間計算規則相同
    • 23.combineByKey(引數1:對當前key的value進行轉換, 引數2:分割區內計算規則, 引數3:分割區間計算規則)
    • 24.sortByKey
      • 按照RDD中的key對元素進行排序
    • 25.mapValues
      • 只對RDD中的Value進行操作
    • join&cogroup
  • 行動運算元Action
    • 行動運算元執行後,才會觸發計算
    • 1.reduce
      • 對RDD中的元素進行聚合
    • 2.collect.foreach和foreach
      • 對RDD中的元素進行遍歷
      • collect.foreach將每一個Excutor中的資料收集到Driver,形成一個新的陣列
      • .foreach不是一個運算元,是集合的方法,是對陣列中的元素進行遍歷
    • 3.count
      • 獲取RDD中元素的個數
    • 4.countByKey
      • 獲取RDD中每個key對應的元素個數
    • 5.first
      • 獲取RDD中第一個元素
    • 6.take
      • 獲取RDD中的前幾個元素
    • 7.takeOrdered
      • 獲取排序後的RDD中的前幾個元素
    • 8.aggregate&fold
      • aggregateByKey 處理kv型別的RDD,並且在進行分割區間聚合的時候,初始值不參與運算
      • aggregate處理kv型別的RDD,但是在分割區間和分割區內聚合時初始值都會參與運算
      • fold 是aggregate的簡化版
    • 9.save相關的運算元
      • saveAsTextFile
      • saveAsObjectFile
      • saveAsSequenceFile(只針對KV型別RDD)

Transformation轉換運算元

​ 當我們去呼叫的時候,會建立一個新的RDD,並不會真正執行計算操作,只是做一些計算邏輯的封裝

  • RDD整體上分為:
    • Value型別
    • 雙Value型別
    • Key-Value型別

Action行動運算元

​ 只有行動運算元被呼叫之後才會去做計算

​ 行動運算元是觸發了整個作業的執行。因為轉換運算元都是懶載入,並不會立即執行。


Transformation轉換運算元

value型別

1.map()

  • map(fun):返回一個新的RDD,該RDD由每一個輸入元素經過fun函數轉換後組成.

  • 當某個RDD執行map方法時,會遍歷RDD中的每一個資料項,並依次應用fun函數,產生一個新的RDD

  • 新的RDD分割區數和舊的RDD分割區數相同

  • 程式碼範例

    	def main(args: Array[String]): Unit = {
    
            // 建立SparkConf並設定App名稱
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    
            // 建立SparkContext,該物件是提交Spark App的入口
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
            println("原分割區個數" + rdd.partitions.size)
    
            val newRDD: RDD[Int] = rdd.map(_ * 2)
            println("新分割區個數" + newRDD.partitions.size)
    
            newRDD.collect().foreach(println)
    
            // 關閉連線
            sc.stop()
        }
    -----------------------------------------------------------
    輸出結果:
    原分割區個數2
    新分割區個數2
    2
    4
    6
    8
    

2.mapPartitions()

  • 以分割區為單位執行Map運算元

  • 類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函數型別必須是Iterator[T] => Iterator[U]。

  • 假設有N個元素,有M個分割區,那麼map的函數的將被呼叫N次,而mapPartitions被呼叫M次,一個函數一次處理所有分割區。

  • 引數是一個可迭代的集合

  • map是一次處理一個元素,mapPartitions是一次處理一個分割區的元素,mapPartitions適用於批次處理操作

  • 程式碼範例(對RDD中的元素進行乘2操作)

    	def main(args: Array[String]): Unit = {
    
            // 建立SparkConf並設定App名稱
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    
            // 建立SparkContext,該物件是提交Spark App的入口
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    
            // 以分割區為單位,對RDD中的元素進行對映
            val newRDD: RDD[Int] = rdd.mapPartitions(datas => {
                datas.map(_ * 2)
            })
            
            newRDD.collect().foreach(println)
    
            // 關閉連線
            sc.stop()
        }
    ---------------------------------------
    輸出結果
    2
    4
    6
    8
    

    適用於批次處理操作
    比如將RDD中的元素插入到資料庫中,需要資料庫連線,如果每一個元素都建立一個連線,效率很低,可以對每個分割區的元素建立一個連線

    mapPartitions每次處理一批資料,這個分割區中資料處理完之後,原來的RDD中分割區的資料才會釋放,可能會導致記憶體溢位


3.mapPartitionsWithIndex()

兩個引數,一個是當前分割區的編號,另外一個是分割區的資料(可以單獨對某一個分割區進行操作)

  • 程式碼範例

    • 需求1

    • 對RDD元素做對映,將元素變為元組,元組裡面分別表示分割區編號和當前元素

      	def main(args: Array[String]): Unit = {
      
              // 建立SparkConf並設定App名稱
              val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
      
              // 建立SparkContext,該物件是提交Spark App的入口
              val sc: SparkContext = new SparkContext(conf)
      
              val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
      
              val newRDD: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
                  (index, datas) => {
                      datas.map((index, _))
                  }
              )
              newRDD.collect().foreach(println)
              // 關閉連線
              sc.stop()
          }
      -----------------------------------------------------------------
      輸出結果
      (0,1)
      (0,2)
      (1,3)
      (1,4)
      
    • 需求2

    • 第二個分割區乘2,其他分割區不變

      	val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8), 3)
          val newRDD: RDD[Int] = rdd.mapPartitionsWithIndex(
              (index, datas) => {
                  index match {
                      case 1 => datas.map(_ * 2)
                      case _ => datas
                  }
              }
          )
      	newRDD.collect().foreach(println)
      
      --------------------------------------
      輸出結果
      1
      2
      6
      8
      10
      6
      7
      8
      

4.flatMap()

功能與map相似,將RDD中的每一個元素通過應用f函數依次轉換為新函數,並封裝到RDD中

跟map的區別在於:

  • flatMap中f函數返回值是一個集合,將RDD集合中每一個元素拆分出來放到新的RDD當中

程式碼範例

  • 需求

    • 對集合進行扁平化處理,將一個大的集合(裡面有子集合)裡面資料取出,放入一個大的集合當中

      	def main(args: Array[String]): Unit = {
      
              // 建立SparkConf並設定App名稱
              val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
      
              // 建立SparkContext,該物件是提交Spark App的入口
              val sc: SparkContext = new SparkContext(conf)
      
              val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4), List(5, 6), List(7, 8)), 2)
      
              // 如果匿名函數輸入和輸出相同,那麼不能進行簡化
              val newRDD: RDD[Int] = rdd.flatMap(datas => datas)
              newRDD.collect().foreach(println)
              // 關閉連線
              sc.stop()
          }
      
      ----------------------------------------------
      1
      2
      3
      4
      5
      6
      7
      8
      

5.glom()

def glom(): RDD[Array[T]]

將RDD中每一個分割區中元素組合成一個陣列,封裝到新的RDD當中,陣列中元素的型別保持不變

(flatMap是將整體轉換成個體,glom是將個體轉換成整體)

程式碼範例

  • 需求

    • 將RDD中每個分割區裡的資料放到陣列當中

      def main(args: Array[String]): Unit = {
              val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
              
              val sc: SparkContext = new SparkContext(conf)
      
              var rdd: RDD[Int] =sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
      
              println("----------沒有進行glom之前--------------")
              rdd.mapPartitionsWithIndex(
                (index, datas) => {
                    println("當前分割區:" + index + " " + "元素:"+ datas.mkString(","))
                    datas
                }
              ).collect()
      
              println("----------呼叫glom之後--------------")
              val newRDD: RDD[Array[Int]] = rdd.glom()
              newRDD.mapPartitionsWithIndex(
                  (index, datas) => {
                      println("當前分割區:" + index + " " + "元素:"+ datas.next().mkString(","))
                      datas
                  }
              ).collect()
              
              sc.stop()
          }
      -------------------------------------------
      
      輸出結果
      ----------沒有進行glom之前--------------
      當前分割區:1 元素:4,5,6
      當前分割區:0 元素:1,2,3
      ----------呼叫glom之後--------------
      當前分割區:0 元素:1,2,3
      當前分割區:1 元素:4,5,6
      

6.groupBy()

def groupBy[K](f T => K) (implicit kt: ClassTag[K]): RDD[(k, Iterable[T])]

分組,按照傳入函數的返回值進行分組,相同key對應的值放入一個迭代器(按照指定的規則,對RDD中的元素進行分組)

groupBy會存在shuffle的過程進行落盤

程式碼範例

  • 需求

    • 將RDD中奇數放在一組,偶數放到另一組

      	def main(args: Array[String]): Unit = {
              val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
      
              val sc: SparkContext = new SparkContext(conf)
      
              val rdd: RDD[Int] =sc.makeRDD(1 to 9, 3)
              println("-------------執行分組前----------------")
              rdd.mapPartitionsWithIndex(
                  (index, datas) => {
                      println(s"分割區號:$index " + "元素:" + datas.mkString(","))
                      datas
                  }
              ).collect()
      
              val newRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
              println("-------------執行分組後----------------")
              newRDD.mapPartitionsWithIndex(
                  (index, datas) => {
                      println(s"分割區號:$index " + "元素:" + datas.mkString(","))
                      datas
                  }
              ).collect()
      
              sc.stop()
          }
      
      --------------------------------------------
      輸出結果
      
      -------------執行分組前----------------
      分割區號:0 元素:1,2,3
      分割區號:2 元素:7,8,9
      分割區號:1 元素:4,5,6
      -------------執行分組後----------------
      分割區號:2 元素:
      分割區號:1 元素:(1,CompactBuffer(1, 3, 5, 7, 9))
      分割區號:0 元素:(0,CompactBuffer(2, 4, 6, 8))
      
    • 給出一個RDD,將相同的單詞放到一組

      val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Peanut", "Hadoop", "Hive"))
      val newRDD: RDD[(String, Iterable[String])] = rdd.groupBy(elem => elem)
      newRDD.collect().foreach(println)
      
      -------------------------------------------
      輸出結果
      (Hive,CompactBuffer(Hive))
      (Peanut,CompactBuffer(Peanut, Peanut))
      (Hadoop,CompactBuffer(Hadoop, Hadoop))
      

用以上的運算元來實現一個簡單版的WordCount

List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark")

首先從集合中讀取資料,用flatMap對RDD中的資料進行扁平化處理,通過map來對每一個單詞設定為元組,每個單詞對應的數為1,使用groupBy將相同的key放到一組,對相同的單詞進行累加

  • 實現方式1

    	def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark"))
    
            // 對RDD中的元素進行扁平化對映
            val flatMap: RDD[String] = rdd.flatMap(_.split(" "))
    
            // 將對映後的資料進行結構轉換,為每個單詞進行計數
            val mapRDD: RDD[(String, Int)] = flatMap.map((_, 1))
    
            // 按照key對RDD中的元素進行分組
            val groupByRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupBy(_._1)
    
            // 對分組後的元素再進行對映
            val resRDD: RDD[(String, Int)] = groupByRDD.map({
                case (word, datas) => {
                    (word, datas.size)
                }
            })
    
            resRDD.collect().foreach(println)
            sc.stop()
        }
    
    ------------------------------------------------
    輸出結果
    (Peanut,1)
    (Spark,3)
    (Hadoop,2)
    
  • 實現方式2

    • 其實在方式1當中,進行扁平化處理之後,不需要再將結果對映成map,可以直接對結果按照key進行groupBy,最後用map統計相同key的長度即為結果
    def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[String] = sc.makeRDD(List("Peanut", "Hadoop", "Hadoop", "Spark", "Spark", "Spark"))
            
        	// 對RDD中的元素進行扁平化對映
            val flatMap: RDD[String] = rdd.flatMap(_.split(" "))
            // 將RDD中的單詞進行分組
            val groupByRDD: RDD[(String, Iterable[String])] = flatMap.groupBy(word => word)
            // 對分組後的元素再進行對映
            val resRDD: RDD[(String, Int)] = groupByRDD.map({
                case (word, datas) => {
                    (word, datas.size)
                }
            })
            resRDD.collect().foreach(println)
            sc.stop()
        }
    ------------------------------------------------
    輸出結果
    (Peanut,1)
    (Spark,3)
    (Hadoop,2)
    

用以上的運算元來實現一個複雜版的WordCount

數位代表出現的次數

List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)
  • 方式1
    • 先用map對其進行字串拼接
    • 再進行扁平化處理,對結果按照key進行groupBy,最後用map統計相同key的長度即為結果
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
       
    	val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)))
    
        // 將原來RDD中字串以及字串出現的次數進行處理,形成一個新的字串
        val rdd1: RDD[String] = rdd.map{
            case (str, count) => {
                (str + " ") * count
            }
        }
        println("輸出當前rdd1")
        rdd1.collect().foreach(println)
        println("-------------------------------------")
        // 對RDD中的元素進行扁平化對映
        val flatMap: RDD[String] = rdd1.flatMap(_.split(" "))
        // 將RDD中的單詞進行分組
        val groupByRDD: RDD[(String, Iterable[String])] = flatMap.groupBy(word => word)
        // 對分組後的元素再進行對映
        val resRDD: RDD[(String, Int)] = groupByRDD.map({
            case (word, datas) => {
                (word, datas.size)
            }
        })
        resRDD.collect().foreach(println)
        sc.stop()
    }
-------------------------------------------------------------------
輸出結果
輸出當前rdd1
Peanut Spark Peanut Spark 
Hello Spark Hello Spark Hello Spark 
Hello World Hello World 
-------------------------------------
(Peanut,2)
(Hello,5)
(World,2)
(Spark,5)
  • 方式2
    • 將元素轉換成元組,然後進行分組統計
    • (「Peanut Spark」, 2) ----> (「Peanut」, 2) (「Spark」, 2)
    def main(args: Array[String]): Unit = {
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Peanut Spark", 2), ("Hello Spark", 3), ("Hello World", 2)))
        val flatMapRDD: RDD[(String, Int)] = rdd.flatMap {
                case (words, count) => {
                    // 對多個單片語成的字串進行切割
                    words.split(" ").map((_, count))
                }
            }

            // 按照單詞對RDD中的元素進行分組
            val groupByRDD: RDD[(String, Iterable[(String, Int)])] = flatMapRDD.groupBy(_._1)

            val resRDD: RDD[(String, Int)] = groupByRDD.map{
                case (word, datas) =>
                    (word, datas.map(_._2).sum)
            }

            resRDD.collect().foreach(println)
    }
--------------------------------------------------------------------------------
輸出結果
(Peanut,2)
(Hello,5)
(World,2)
(Spark,5)

7.filter()

def filter(f T=> Boolean): RDD[T]

RDD呼叫filter時,對RDD中的每一個元素應用f函數,將返回值為true的元素新增到新的RDD當中

按照指定的過濾規則,對RDD中的元素進行過濾

程式碼範例

  • 需求1

    List("Peanut", "zhangsan", "lisi", "wangwu", "xiaoliu", "xiaowang")
    
    • 將集合中包含"xiao"的字串提取出來
    	def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[String] = sc.makeRDD(List("Peanut", "zhangsan", "lisi", "wangwu", "xiaoliu", "xiaowang"))
    
            val newRDD: RDD[String] = rdd.filter(_.contains("xiao"))
            newRDD.collect().foreach(println)
    
            sc.stop()
        }
    
    -------------------------------------------
    輸出結果
    xiaoliu
    xiaowang
    
  • 需求2

    List(1, 2, 3, 4, 5, 6, 7, 8, 9)
    
    • 將所有的偶數過濾出來
    	def main(args: Array[String]): Unit = {
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    
            val sc: SparkContext = new SparkContext(conf)
    
            val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
            // 把所有的奇數過濾出來
            val newRDD: RDD[Int] = rdd.filter(_ % 2 == 0)
            newRDD.collect().foreach(println)
            sc.stop()
        }
    
    -------------------------------------------------------
    輸出結果
    2
    4
    6
    8
    

8.sample()

隨機抽樣

def sample(
	withReplacement: Boolean, // true為有放回的抽樣,false為無放回抽樣
    fraction: Double,   // 當withReplacement = false時,表示每個元素的概率,範圍是[0,1]
    					// 當withReplacement = true時,表示每個元素的期望次數,範圍是 >= 0
    seed: Long = Untils.random.nextLong  // send表示亂數生成器種子(生成亂數的初始值,種子一樣時,生成的亂數相同)抽樣演演算法的初始值,一般不需要指定
): RDD[T]

程式碼範例

  • 需求1
    • 抽樣放回
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(1 to 10)

        val newRDD: RDD[Int] = rdd.sample(true, 1)
        newRDD.collect().foreach(println)
        sc.stop()
    }
-----------------------------------------------------
輸出結果(每次執行的結果都不同)
1
2
2
2
2
5
5
6
6
6
7
10
10
10

最後的結果是隨機的,第二個引數設定期望出現次數為1,只是期望,並不代表每個數位都出現一次。

  • 需求2
    • 抽樣不放回
    • 那麼此時第二個引數代表的就是每個數出現的概率,如果設為1,則每個數都出現一次
    • 若為0,則都不出現
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(1 to 10)

        // 從RDD中隨機抽取資料(抽樣不放回)
        val newRDD: RDD[Int] = rdd.sample(false, 0.3)
        newRDD.collect().foreach(println)
        sc.stop()
    }

---------------------------------------------
3
5
7

takeSample可以在抽取時指定抽取的個數(在第二個引數中指定抽取的個數)

  • 小案例
    • 從名單中抽取三名幸運觀眾
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        val sc: SparkContext = new SparkContext(conf)

        val stds = List("張1", "張2", "張3", "張4", "張5", "張6", "張7", "張8", "張9",
            "張10", "張11", "張12", "張13", "張14", "張15", "張16",
            "張17", "張18", "張19", "張21", "張20", "張22")
    
        val nameRDD: RDD[String] = sc.makeRDD(stds)
    
        // 從上面RDD當中抽取一名幸運觀眾
        val res: Array[String] = nameRDD.takeSample(false, 3)
        res.foreach(println)
        sc.stop()
    }
------------------------------------------
輸出結果
張18131

9.distinct()

去重運算元

def distinct(): RDD[T]

distinct運算元是對內部元素進行去重,將去重後的結果放到新的RDD當中

預設情況下,distinct會生成與原分割區個數一致的分割區數

distinct有兩種,一種是不加引數,一種是可以指定分割區數

程式碼範例

  • 需求1
    • 對集合中的數去重,不指定分割區數
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        // 建立RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 4, 3, 3), 5)
        // 對RDD中資料進行去重
        val newRDD: RDD[Int] = rdd.distinct()
        newRDD.collect().foreach(println)
        sc.stop()
    }

-------------------------------------
輸出結果
5
1
2
3
4
  • 需求2
    • 對集合中的資料去重,指定分割區數
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        // 建立RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 4, 3, 3), 5)

        rdd.mapPartitionsWithIndex{
            (index, datas) =>
                println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
                datas
        }.collect()

        println("------------去重前後分割區對比----------------")
        
    	// 對RDD中資料進行去重
        val newRDD: RDD[Int] = rdd.distinct(2)

        newRDD.mapPartitionsWithIndex{
            (index, datas) =>
                println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
                datas
        }.collect()
        sc.stop()
    }

--------------------------------------------------------
輸出結果
分割區:4---> 分割區元素:3,3
分割區:1---> 分割區元素:3,4
分割區:0---> 分割區元素:1,2
分割區:2---> 分割區元素:5,5
分割區:3---> 分割區元素:4,4
------------去重前後分割區對比----------------
分割區:1---> 分割區元素:1,3,5
分割區:0---> 分割區元素:4,2

10.coalesce()

coalesce運算元可以重新分割區

def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
 : RDD[T]

程式碼範例

  • 需求1
    • 縮減分割區
    • 預設情況下,如果使用coalesce擴大分割區是不起作用的,因為底層沒有執行shuffle,如果擴大分割區,使用reparation
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        // 建立RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

        rdd.mapPartitionsWithIndex{
            (index, datas) =>
                println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
                datas
        }.collect()

        println("*****************縮減分割區前後********************")
        // 縮減分割區
        val newRDD: RDD[Int] = rdd.coalesce(2)
        newRDD.mapPartitionsWithIndex{
            (index, datas) =>
                println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
                datas
        }.collect()
        sc.stop()
    }

----------------------------------------------------------
分割區:0---> 分割區元素:1,2
分割區:2---> 分割區元素:5,6
分割區:1---> 分割區元素:3,4
***************縮減分割區前後**********************
分割區:0---> 分割區元素:1,2
分割區:1---> 分割區元素:3,4,5,6

11.reparation()

重新分割區,擴大分割區時,可以使用reparation,reparation會執行shuffle

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        // 建立RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

        rdd.mapPartitionsWithIndex{
            (index, datas) =>
                println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
                datas
        }.collect()

        // 使用reparation擴大分割區
        val newRDD: RDD[Int] = rdd.repartition(4)
        println("******************擴大分割區前後*******************")
        newRDD.mapPartitionsWithIndex{
            (index, datas) =>
                println("分割區:"+ index + "---> 分割區元素:" + datas.mkString(","))
                datas
        }.collect()
        sc.stop()
    }

-----------------------------------
輸出結果
分割區:2---> 分割區元素:5,6
分割區:1---> 分割區元素:3,4
分割區:0---> 分割區元素:1,2
******************擴大分割區前後*******************
分割區:2---> 分割區元素:
分割區:1---> 分割區元素:
分割區:3---> 分割區元素:1,3,5
分割區:0---> 分割區元素:2,4,6

repartition的底層其實是呼叫的coalesce,只不過在呼叫時,第二個引數指定執行shuffle機制

請看原始碼:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
}
  • coalesce
    • 第二個引數預設為false,底層預設不執行shuffle
    • 一般用於縮減分割區
  • repartition
    • 底層呼叫coalesce,呼叫coalesce時第二個引數為true,執行shuffle
    • 一般用於擴大分割區

12.sortBy()

排序

對RDD中的元素進行排序,排序時需要指定排序規則

def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,  // 預設為正序排序
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 

程式碼範例

  • 需求1
    • 升序排序
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 建立RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 6, 5, 3, 2))

        // 升序排序
        val sortedRDD: RDD[Int] = rdd.sortBy(num => num)
        sortedRDD.collect().foreach(println)
        sc.stop()
    }
-------------------------------------------------------
輸出結果
1
2
3
4
5
6
  • 需求2
    • 降序排序(sortBy的第二個引數需要指定為false)
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 建立RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 6, 5, 3, 2))

        // 降序排序
        val sortedRDD: RDD[Int] = rdd.sortBy(num => num, false)
        
        sortedRDD.collect().foreach(println)
        sc.stop()
    }
-----------------------------------------------------------------
輸出結果
6
5
4
3
2
1
  • 需求3
    • 對字串進行排序
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        
        val strRDD: RDD[String] = sc.makeRDD(List("1", "6", "2", "10"))
        
        println("-----------對字串按字典序排序-------------")
        // 按照字串字典順序進行排序
        val sortedRDD: RDD[String] = strRDD.sortBy(elem => elem)
        sortedRDD.collect().foreach(println)
        
        println("------------將字串按照字面的數位來進行排序---------------")
        // 按照字串轉換為整數後的大小進行排序
        val sortedRDD2: RDD[String] = strRDD.sortBy(_.toInt)
        sortedRDD2.collect().foreach(println)
        
        sc.stop()
    }

-----------------------------------------
輸出結果
-----------對字串按字典序排序-------------
1
10
2
6
------------將字串按照字面的數位來進行排序---------------
1
2
6
10

13.pipe()

def pipe(command: String): RDD[String]

針對每一個分割區,都呼叫一次shell指令碼,返回輸出的RDD


雙value型別

1.union()並集

def union(other: RDD[T]): RDD[T] 

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
        val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
        // 求並集
        val newRDD: RDD[Int] = rdd1.union(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
-------------------------------------
1
2
3
4
3
4
5
6
7

2.intersection()交集

def intersection(other: RDD[T]): RDD[T]

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
        val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
     	// 求交集
        val newRDD: RDD[Int] = rdd1.intersection(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
---------------------
輸出結果
3
4

3.subtract ()差集

def subtract(other: RDD[T]): RDD[T]

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
        val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7))
		// 求差集
        val newRDD: RDD[Int] = rdd1.subtract(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
--------------------------------------------
輸出結果
1
2

4. zip()拉鍊

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

拉鍊的功能是將兩個RDD中的元素合併成元組,元組中key為第一個RDD中的元素,value為第二個RDD中的元素

要求兩個RDD的分割區數量以及元素數量都相等,否則會拋異常

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
        val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))
      
        // 拉鍊
        val newRDD: RDD[(Int, Int)] = rdd1.zip(rdd2)

        newRDD.collect().foreach(println)
        sc.stop()
    }
----------------------------------------
輸出結果
(1,3)
(2,4)
(3,5)
(4,6)

Key-Value型別

1.partitionBy()

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

按照key從新分割區,將RDD[K, V]中的K,按照指定Partitioner重新進行分割區如果原有的partitionRDD和現有partitionRDD是一致的話,就不進行分割區,否則會產生shuffe過程。

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 建立RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c")), 3)
        rdd.mapPartitionsWithIndex(
          (index, datas) =>{
            println("分割區號:" + index + "---->" + "分割區資料" + datas.mkString(","))
            datas
          }
        ).collect()

        println("**************分割區前後對比*****************")
        val newRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
        newRDD.mapPartitionsWithIndex(
            (index, datas) =>{
                println("分割區號:" + index + "---->" + "分割區資料" + datas.mkString(","))
                datas
            }
        ).collect()

        sc.stop()
    }
----------------------------------------
輸出結果
分割區號:0---->分割區資料(1,a)
分割區號:2---->分割區資料(3,c)
分割區號:1---->分割區資料(2,b)
**************分割區前後對比*****************
分割區號:1---->分割區資料(1,a),(3,c)
分割區號:0---->分割區資料(2,b)

自定義分割區器

class MyPartitioner(partitions: Int) extends Partitioner {
    // 獲取分割區個數
    override def numPartitions: Int = partitions

    // 指定分割區規則  返回值Int表示分割區編號,從0開始
    override def getPartition(key: Any): Int = {
        // 此時返回為1,那麼所有的元素都會被分到1號區
        1
    }
}
------------------------------------------------------------------------------val newRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))new HashPartitioner(2)換成new MyPartitioner(2)
輸出結果
分割區號:1---->分割區資料(2,b)
分割區號:0---->分割區資料(1,a)
分割區號:2---->分割區資料(3,c)
**************分割區前後對比*****************
分割區號:0---->分割區資料
分割區號:1---->分割區資料(1,a),(2,b),(3,c)

2.reduceByKey()

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

reduceByKey可以將RDD[K, V]中的元素按照相同的K對V進行聚合

可以指定新的RDD中分割區器或分割區數

程式碼範例

  • 將集合中相同key的元組進行聚合
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 4), ("b", 3)))
        val resRDD: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
        resRDD.collect().foreach(println)
        sc.stop()
    }
--------------------------------------------------------------
輸出結果
(a,5)
(b,5)

3.groupByKey()

def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

groupByKey對集合中的每個key進行操作,會將相同key對應的value放到一個集合當中,不進行聚合

可以指定新的RDD中分割區器(預設使用的分割區器是HashPartitioner)或分割區數

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 2), ("a", 4), ("b", 3)))
        val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
        groupRDD.collect().foreach(println)
        sc.stop()
    }
---------------------------------
輸出結果
(a,CompactBuffer(5, 4))
(b,CompactBuffer(2, 3))
  • wordcount案例
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 2), ("a", 4), ("b", 3)))
        val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
        
        val resRDD = groupRDD.map {
            case (key, datas) => {
                (key, datas.sum)
            }
        }
        resRDD.collect().foreach(println)
        sc.stop()
    }

---------------------------------------------------------------
輸出結果
(a,9)
(b,5)
  • reduceByKey用來做聚合
  • groupByKey用來做分組

4.aggregateByKey()

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)]

aggregateByKey是按照key對分割區內以及分割區間的資料進行處理

aggregateByKey(引數1)(引數2, 引數3)

  • 引數1: 初始值

  • 引數2: 分割區內計算規則

  • 引數3: 分割區間計算規則

程式碼範例

  • 需求1
    • 用aggregateByKey實現wordCount
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 建立RDD
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("c", 5), ("b", 5), ("c", 1), ("b", 2)), 2)

        // aggregateByKey實現wordCount
        val newRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(_ + _, _ + _)
        newRDD.collect().foreach(println)

        sc.stop()
    }

----------------------------------------------
輸出結果
(b,7)
(a,6)
(c,6)
  • 需求2
    • 對每個分割區內的最大值求和
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 建立RDD
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("c", 5), ("a", 2), ("a", 3), ("b", 5), ("a", 3), ("c", 3), ("b", 2)), 2)

        rdd.mapPartitionsWithIndex(
            (index, datas) => {
              println("分割區:" + index + "---> 分割區資料:" + datas.mkString(","))
              datas
            }
        ).collect()
        println("求分割區最大值")
        val newRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(
            // 分割區內的計算規則,找出最大值,max在進行比較時,首先是將第一個元素跟初始值比較
            (x, y) => math.max(x, y),
            // 分割區間計算規則
            (a, b) => a + b
        )
        /*
        // 簡化
        val newRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(
            // 分割區內的計算規則,找出最大值
            math.max(_, _),
            // 分割區間計算規則
            _ + _
        )*/
        newRDD.collect().foreach(println)

        sc.stop()
    }

-----------------------------------------------------
輸出結果
分割區:0---> 分割區資料:(c,5),(a,2),(a,3)
分割區:1---> 分割區資料:(b,5),(a,3),(c,3),(b,2)
求分割區最大值
(b,5)
(a,6)
(c,8)

當分割區內計算邏輯和分割區間計算邏輯不一樣時,可以考慮使用aggregateByKey


5. foldByKey()

def foldByKey(
      zeroValue: V,
      partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
  • 引數含義
    • zeroValue代表初始值,可以取任意型別
    • func 代表一個函數
    • Partitioner 分割區器
    • numPartitions 分割區數

foldByKey是aggregateByKey的簡化版本,如果分割區內和分割區間計算邏輯相同,那麼可以使用foldByKey,foldByKey和reduceByKey之間的差別在於foldKey可以指定初始值

程式碼範例

  • 需求
    • 按照key對分割區內和分割區間的資料進行相加(用foldByKey實現wordCount)
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 建立RDD
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("c", 2), ("a", 1), ("a", 2), ("b", 5), ("a", 3), ("c", 3), ("b", 2)), 2)
        val newRDD: RDD[(String, Int)] = rdd.foldByKey(0)(_ + _)
        newRDD.collect().foreach(println)
        sc.stop()
    }

------------------------------------------------------
(b,7)
(a,6)
(c,5)

6.combineByKey()

轉換結構後分割區內和分割區間操作

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      numPartitions: Int): RDD[(K, C)]

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)]

引數說明:

  • createCombiner

    • 對RDD中當前key取出第一個value做初始化,把當前的值作為引數,對該值可以進行一些轉換操作,轉換成我們想要的格式,比如讀進來(10),可以轉換為(10, 1)
  • mergeValue

    • 分割區內的計算規則,會將當前分割區的value值(除去第一個value剩下的value),合併到初始化得到的c上面
  • mergeCombiners

    • 分割區間的計算規則

combineByKey運算元的使用場景

程式碼範例

List(("LaoWang", 90), ("LaoLiu", 80), ("LaoWang", 85), ("LaoLiu", 60)

List中表示的是學生姓名和成績,一個學生有不同科目的成績

  • 需要求出每個學生的平均成績
    • 先對讀進來的資料進行溢寫格式轉換,比如將"LaoWang"對應的90轉換為(90, 1),加上一個1是為了後面方便計算每個同學總共的科目數量
    • 然後在分割區內對分數相加,每加一次,課程門數加1
    • 分割區間,將分數以及課程門數分別相加
    • 得到每個同學的總成績以及課程數量之後,通過map來求平均值
def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 需求,求出每一個學生的平均成績
        // 建立RDD
        val scoreRDD: RDD[(String, Int)] = sc.makeRDD(List(("LaoWang", 90), ("LaoLiu", 80), ("LaoWang", 85), ("LaoLiu", 60)), 2)

        val combineRDD: RDD[(String, (Int, Int))] = scoreRDD.combineByKey(
            // 初始化
            (_, 1),
            // 分割區內計算規則 將分數相加,每加一次,課程門數加1
            (t1: (Int, Int), v) => {
                (t1._1 + v, t1._2 + 1)
            },
            // 分割區間計算規則,將分數以及課程門數分別相加
            (tup2: (Int, Int), tup3: (Int, Int)) => {
                (tup2._1 + tup3._1, tup2._2 + tup2._2)
            }
        )

        // 求平均成績
        val resRDD: RDD[(String, Int)] = combineRDD.map {
            case (name, (score, count)) => {
                (name, score / count)
            }
        }
        resRDD.collect().foreach(println)
        sc.stop()
    }

-------------------------------------------------------------------------------------
輸出結果
(LaoLiu,70)
(LaoWang,87)

7.幾種ByKey的對比

  • 如果分割區內和分割區間計算邏輯相同,並且不需要指定初始值,那麼優先使用reduceByKey
  • 如果分割區內和分割區間計算邏輯相同,並且需要指定初始值,那麼優先使用foldByKey
  • 如果分割區內和分割區間計算邏輯相同,並且需要指定初始值,那麼優先使用aggregateByKey
  • 需要對讀入的RDD中資料進行格式轉換時,並且要處理分割區內和分割區間的邏輯,那麼優先使用combineByKey

8.sortByKey()

按照K進行排序

在一個(K, V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)]

引數說明

  • 第一個引數代表排序規則,true為預設,預設為升序
  • 第二個引數為分割區數,預設跟當前情況下分割區數相同

程式碼範例

  • 按照key升序排序
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        // 建立RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((2, "aa"), (6, "dd"), (4, "bb"), (1, "cc")))
        // 按照key對RDD中的元素進行排序  Int已經預設實現了Ordered介面
        val newRDD: RDD[(Int, String)] = rdd.sortByKey()
        newRDD.collect().foreach(println)

        sc.stop()
    }
-------------------------------------------------------------
(1,cc)
(2,aa)
(4,bb)
(6,dd)

如果想降序排序,只需在方法裡設定第一個引數為false即可

val newRDD: RDD[(Int, String)] = rdd.sortByKey(false)
  • 如果key為自定義型別,要求必須混入Ordered特質
    • 自定義一個Student類,要求先按照名稱升序,如果名稱相同的話,再按照年齡降序
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val stdList: List[(Student, Int)] = List(
            (new Student("zhangsan", 18), 1),
            (new Student("lisi", 12), 1),
            (new Student("wangwu", 20), 1),
            (new Student("wangwu", 23), 1)
        )
        val stdRDD: RDD[(Student, Int)] = sc.makeRDD(stdList)
        val resRDD: RDD[(Student, Int)] = stdRDD.sortByKey()
        resRDD.collect().foreach(println)
        sc.stop()
    }
-----------------------------------------------------------------------------
輸出結果
(Student(lisi, 12) ,1)
(Student(wangwu, 23) ,1)
(Student(wangwu, 20) ,1)
(Student(zhangsan, 18) ,1)

Student類定義如下

class Student(var name: String, var age: Int) extends Ordered[Student] with Serializable {
    // 指定比較規則
    override def compare(that: Student): Int = {
        // 先按照名稱升序,如果名稱相同的話,再按照年齡降序
        var res: Int = this.name.compareTo(that.name)
        if (res == 0) {
            // res 等於0 說明名字相同,此時按照年齡降序排序
            res = that.age - this.age
        }
        res
    }

    override def toString: String = s"Student($name, $age) "
}

9.mapValues()

def mapValues[U](f: V => U): RDD[(K, U)]
對kv型別的RDD中的value部分進行對映

程式碼範例

  • 需求
    • 對RDD中的value新增字串" --> "
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d")))
        val newRDD: RDD[(Int, String)] = rdd.mapValues(" --> " + _)
        newRDD.collect().foreach(println)
        sc.stop()
    }

----------------------------------------------------
輸出結果
(1, --> a)
(2, --> c)
(1, --> b)
(3, --> d)

10. join()

我們先來回顧一下SQL中的連線

  • 連線
    • 兩張表或者多張表結合在一起獲取資料的過程
  • 內連線
    • 兩張表進行連線查詢,將兩張表中完全匹配的記錄查詢出來
    • 等值連線(某個屬性值相等)
      • =
    • 非等值連線(確定一個範圍)
      • between and …
    • 自連線(本張表和本張表連線)
  • 外連線
    • 兩張表進行連線查詢,將其中一張表的資料全部查詢出來,如果另外一張表中有資料無法與其匹配,則會自動模擬出空值與其進行匹配
    • 左(外)連線
      • 左連線是左邊表的所有資料都有顯示出來,右邊的表資料只顯示共同有的那部分,沒有對應的部分補空顯示
    • 右(外)連線
      • 右連線和左連線相反的
    • 全連線
      • 返回兩個表中的所有的值,沒有對應的資料則輸出為空

join()是將相同key對應的多個value關聯在一起

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] 

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

對於兩個RDD ,型別分別為(k, v)和(k, w),呼叫join運算元時,返回一個相同key對應的所有元素組合在一起的RDD(k, (v, w)),簡單點來說,就是k不變,將value從新組合放到一起

如果key只是某一個RDD中有,那麼這個key不會關聯,類似於內連線,將兩張表完全匹配的資料給顯示出來

程式碼範例

  • 將兩個RDD進行join
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
        val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))
		// join運算元相當於內連線,將兩個RDD中的key相同的資料匹配,如果key匹配不上,那麼資料不關聯
        val newRDD: RDD[(Int, (String, Int))] = rdd.join(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
---------------------------------------------------------------------------
輸出結果
(1,(a,3))
(1,(a,2))
(1,(b,3))
(1,(b,2))
(2,(c,5))
(3,(d,3))
  • 左外連線 leftOuterJoin(對於沒有匹配上的,會用空值進行匹配)
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
        val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))

        val newRDD: RDD[(Int, (String, Option[Int]))] = rdd.leftOuterJoin(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
-----------------------------------------------------------------------
(1,(a,Some(3)))
(1,(a,Some(2)))
(1,(b,Some(3)))
(1,(b,Some(2)))
(2,(c,Some(5)))
(3,(d,Some(3)))
(4,(e,None))

11.cogroup()

類似於全連線,但是在同一個RDD中對key聚合
操作兩個RDD中的KV元素,每個RDD中相同key中的元素分別聚合成一個集合。

舉例

  • RDD1(List(1,「a」), (1,「b」))
  • RDD2 (List(1, 「c」),(1,「d」))
  • cogruop之後,結果為(1,[「a」,「b」],[「c」,「d」])

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "c"), (1, "b"), (3, "d"), (4, "e")))
        val rdd2: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (2, 5), (1, 2), (3, 3)))

        // cogroup
        val newRDD: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd.cogroup(rdd2)
        newRDD.collect().foreach(println)
        sc.stop()
    }
---------------------------------------------------------------
輸出結果
(1,(CompactBuffer(a, b),CompactBuffer(3, 2)))
(2,(CompactBuffer(c),CompactBuffer(5)))
(3,(CompactBuffer(d),CompactBuffer(3)))
(4,(CompactBuffer(e),CompactBuffer()))

2.5Action行動運算元

1.reduce()

def reduce(f: (T, T) => T): T

聚合

f函數聚集RDD中的所有元素,先聚合分割區內資料,再聚合分割區間資料

程式碼範例

	 def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
        // reduce
        val res: Int = rdd.reduce(_ + _)
        println(res)
        sc.stop()
    }
-----------------------------------------------------
輸出結果
10

2.collect()

def collect(): Array[T] 

以陣列Array的形式返回資料集的所有元素

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        /*
        // reduce
        val res: Int = rdd.reduce(_ + _)
        println(res)
        */
        rdd.foreach(println)
        println("-------------------------")

        val ints: Array[Int] = rdd.collect()
        ints.foreach(println)

        sc.stop()
    }
----------------------------------------------------
輸出結果
1
3
2
4
-------------------------
1
2
3
4

注意:雖然用rdd直接呼叫foreach進行輸出和先呼叫collect再進行輸出,結果都可以出來,但是呼叫collect之後,會把資料先收集到driver端,資料是全部存到一個陣列當中;而不呼叫直接輸出時,不同分割區的資料是在不同的Executor並行輸出,我們會看到沒呼叫collect進行輸出的話,元素的順序可能會跟原順序不一樣


3.count()

計數,返回RDD中元素的個數

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
        val res: Long = rdd.count()
        println(res)
        sc.stop()
    }
---------------
輸出結果
4

4.first()

返回RDD中的第一個元素

def first(): T

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

        // first 返回RDD中的第一個元素
        val res: Int = rdd.first()
        println(res)
        sc.stop()
    }
-------------------
輸出結果
1

5.take()

take運算元可以返回由RDD前n個元素組成的陣列

def take(num: Int): Array[T]

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
        // take 返回rdd前n個元素組成的陣列
        val res: Array[Int] = rdd.take(3)
        println(res.mkString(","))
        sc.stop()
    }
-----------------------------------------------
輸出結果
1,2,3

6.takeOrdered()

返回RDD排序後前n個元素組成的陣列

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

程式碼範例

 	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(List(3, 1, 2, 5, 4), 2)

        // takeOrdered(n) 獲取rdd排序後前n個元素 
        val ans: Array[Int] = rdd.takeOrdered(3)
        println(ans.mkString(","))
        sc.stop()
    }
----------------------------------------------
輸出結果
1,2,3

7.aggregate()

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
  • 引數解釋
    • 第一個引數是設定的初始值,這個初始值在分割區內和分割區間都會用到
    • 第二個引數是分割區內的操作邏輯
    • 第三個引數是分割區間的操作邏輯

在進行分割區內的邏輯時,每個分割區會通過分割區內邏輯和初始值進行聚合(例如:初始值為1,分割區計算邏輯為相加,那麼分割區內的元素聚合時,最後的結果還要加一次初始值1),然後分割區間也會通過分割區間邏輯和初始值進行操作。

程式碼範例

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
        rdd.mapPartitionsWithIndex(
            (index, datas) => {
                println("分割區號:" + index + "-> 分割區資料:" + datas.mkString(","))
                datas
            }
        ).collect()
        // agreegate
        val res: Int = rdd.aggregate(1)(_ + _, _ + _)
        println(res)
        sc.stop()
    }
---------------------------------------------------
輸出結果
分割區號:4-> 分割區資料:
分割區號:2-> 分割區資料:
分割區號:1-> 分割區資料:1
分割區號:5-> 分割區資料:3
分割區號:7-> 分割區資料:4
分割區號:0-> 分割區資料:
分割區號:6-> 分割區資料:
分割區號:3-> 分割區資料:2
19

這裡有點不太好理解,我解釋一下程式碼,程式碼首先是一個集合,資料在八個分割區中,由輸出結果可看出,1,3,5,7號分割區中有資料,分割區裡的邏輯是相加,相加時每個分割區進行分割區間的邏輯操作(相加)時,分割區裡元素相加的結果還要加上初始值1

然後分割區間的邏輯也是相加,分割區間相加時也會加上初始值

總共8個分割區,每個分割區都要加一次初始值1,總共要加8,所有分割區裡的原始資料相加總共是10,那麼現在總共是18。分割區間在進行相加操作時,還要加一次初始值,即為結果19。

讀者可以通過修改rdd分割區數以及初始值來體會這個運算元。


8.fold()

def fold(zeroValue: T)(op: (T, T) => T): T

fold 是aggregate的簡化,分割區內和分割區間計算規則相同

	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 3)
        rdd.mapPartitionsWithIndex(
            (index, datas) => {
                println("分割區號:" + index + "-> 分割區資料:" + datas.mkString(","))
                datas
            }
        ).collect()

        val res: Int = rdd.fold(10)(_ + _)
        println(res)
        sc.stop()
    }
---------------------------------------------------------------------------
輸出結果
分割區號:1-> 分割區資料:2
分割區號:2-> 分割區資料:3,4
分割區號:0-> 分割區資料:1
50

初始值為10,3個分割區,所以要另外加上30,分割區間操作還要加上初始值,那麼總共要另外加40,40再加上原始的總值10,結果為50


9.countByKey()

統計每種key的個數

def countByKey(): Map[K, Long]
  • 引數解釋
    • 第一個引數代表出現的key
    • 第二個引數為此key出現的次數
	def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)

        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
        val res: collection.Map[Int, Long] = rdd.countByKey()
        println(res)
        sc.stop()
    }
----------------------------------------------------------------------------------------
輸出結果
Map(1 -> 2, 2 -> 1, 3 -> 2)

10.save運算元

saveAsTextFile(path)

  • 儲存成Text檔案
    • 將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統
    • 對於每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文字

**saveAsSequenceFile(path) **

  • 儲存成Sequencefile檔案
    • 將資料集中的元素以Hadoop Sequencefile的格式儲存到指定的目錄下
    • 可以使HDFS或者其他Hadoop支援的檔案系統。

saveAsObjectFile(path)

  • 序列化成物件儲存到檔案
    • 將RDD中的元素序列化成物件,儲存到檔案中。
		// save 相關運算元
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1)

        // 儲存為文字檔案,指定儲存路徑
        rdd.saveAsTextFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output")

        // 儲存為序列化檔案
        rdd.saveAsObjectFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output1")

        // 儲存為SequenceFile (只支援KV型別的RDD)
        rdd.map((_, 1)).saveAsSequenceFile("D:\\IDEA_code\\bigData\\Spark_code\\spark\\output2")

11.foreach()

遍歷RDD中每一個元素,前面的程式碼大多數地方都用過這個運算元了,這裡就不再贅述。