SparkCore之RDD程式設計進階

2020-08-12 08:07:28

累加器

累加器用來對資訊進行聚合,通常在向 Spark傳遞函數時,比如使用 map() 函數或者用 filter() 傳條件時,可以使用驅動器程式中定義的變數,但是叢集中執行的每個任務都會得到這些變數的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變數。如果我們想實現所有分片處理時更新共用變數的功能,那麼累加器可以實現我們想要的效果

系統累加器

針對一個輸入的日誌檔案,如果我們想計算檔案中所有空行的數量,我們可以編寫以下程式

scala> val notice = sc.textFile("./NOTICE")
notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at <console>:32
scala> val blanklines = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
blanklines: org.apache.spark.Accumulator[Int] = 0
scala> val tmp = notice.flatMap(line => {
     |    if (line == "") {
     |       blanklines += 1
     |    }
     |    line.split(" ")
     | })
tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at flatMap at <console>:36
scala> tmp.count()
res31: Long = 3213
scala> blanklines.value
res32: Int = 171

累加器的用法如下所示。
通過在驅動器中呼叫SparkContext.accumulator(initialValue)方法,建立出存有初始值的累加器。返回值爲 org.apache.spark.Accumulator[T] 物件,其中 T 是初始值 initialValue 的型別。Spark閉包裡的執行器程式碼可以使用累加器的 += 方法(在Java中是 add)增加累加器的值。 驅動器程式可以呼叫累加器的value屬性(在Java中使用value()或setValue())來存取累加器的值。
注意:工作節點上的任務不能存取累加器的值。從這些任務的角度來看,累加器是一個只寫變數。
對於要在行動操作中使用的累加器,Spark只會把每個任務對各累加器的修改應用一次。因此,如果想要一個無論在失敗還是重複計算時都絕對可靠的累加器,我們必須把它放在 foreach() 這樣的行動操作中。轉化操作中累加器可能會發生不止一次更新

自定義累加器

自定義累加器型別的功能在1.X版本中就已經提供了,但是使用起來比較麻煩,在2.0版本後,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義型別累加器的實現方式。實現自定義型別累加器需要繼承AccumulatorV2並至少覆寫下例中出現的方法,下面 下麪這個累加器可以用於在程式執行過程中收集一些文字類資訊,最終以Set[String]的形式返回

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
class LogAccumulator extends org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] {
  private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
  override def isZero: Boolean = {
    _logArray.isEmpty
  }
  override def reset(): Unit = {
    _logArray.clear()
  }
  override def add(v: String): Unit = {
    _logArray.add(v)
  }
  override def merge(other: org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]]): Unit = {
    other match {
      case o: LogAccumulator => _logArray.addAll(o.value)
    }
  }
  override def value: java.util.Set[String] = {
    java.util.Collections.unmodifiableSet(_logArray)
  }
  override def copy():org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] = {
    val newAcc = new LogAccumulator()
    _logArray.synchronized{
      newAcc._logArray.addAll(_logArray)
    }
    newAcc
  }
}
// 過濾掉帶字母的
object LogAccumulator {
  def main(args: Array[String]) {
    val conf=new SparkConf().setAppName("LogAccumulator")
    val sc=new SparkContext(conf)
    val accum = new LogAccumulator
    sc.register(accum, "logAccum")
    val sum = sc.parallelize(Array("1", "2a", "3", "4b", "5", "6", "7cd", "8", "9"), 2).filter(line => {
      val pattern = """^-?(\d+)"""
      val flag = line.matches(pattern)
      if (!flag) {
        accum.add(line)
      }
      flag
    }).map(_.toInt).reduce(_ + _)
    println("sum: " + sum)
    for (v <- accum.value) print(v + "")
    println()
    sc.stop()
  }
}

廣播變數(調優策略)

廣播變數用來高效分發較大的物件。向所有工作節點發送一個較大的只讀值,以供一個或多個Spark操作使用。比如,如果你的應用需要向所有節點發送一個較大的只讀查詢表,甚至是機器學習演算法中的一個很大的特徵向量,廣播變數用起來都很順手。 在多個並行操作中使用同一個變數,但是 Spark會爲每個任務分別發送。

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)
scala> broadcastVar.value
res33: Array[Int] = Array(1, 2, 3)

使用廣播變數的過程如下:

  1. 通過對一個型別 T 的物件呼叫 SparkContext.broadcast 建立出一個 Broadcast[T] 物件。 任何可序列化的型別都可以這麼實現。
  2. 通過 value 屬性存取該物件的值(在 Java 中爲 value() 方法)。
  3. 變數只會被髮到各個節點一次,應作爲只讀值處理(修改這個值不會影響到別的節點)。

关注微信公众号
簡書:https://www.jianshu.com/u/0278602aea1d
CSDN:https://blog.csdn.net/u012387141