累加器用來對資訊進行聚合,通常在向 Spark傳遞函數時,比如使用 map() 函數或者用 filter() 傳條件時,可以使用驅動器程式中定義的變數,但是叢集中執行的每個任務都會得到這些變數的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變數。如果我們想實現所有分片處理時更新共用變數的功能,那麼累加器可以實現我們想要的效果
針對一個輸入的日誌檔案,如果我們想計算檔案中所有空行的數量,我們可以編寫以下程式
scala> val notice = sc.textFile("./NOTICE")
notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at <console>:32
scala> val blanklines = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
blanklines: org.apache.spark.Accumulator[Int] = 0
scala> val tmp = notice.flatMap(line => {
| if (line == "") {
| blanklines += 1
| }
| line.split(" ")
| })
tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at flatMap at <console>:36
scala> tmp.count()
res31: Long = 3213
scala> blanklines.value
res32: Int = 171
累加器的用法如下所示。
通過在驅動器中呼叫SparkContext.accumulator(initialValue)方法,建立出存有初始值的累加器。返回值爲 org.apache.spark.Accumulator[T] 物件,其中 T 是初始值 initialValue 的型別。Spark閉包裡的執行器程式碼可以使用累加器的 += 方法(在Java中是 add)增加累加器的值。 驅動器程式可以呼叫累加器的value屬性(在Java中使用value()或setValue())來存取累加器的值。
注意:工作節點上的任務不能存取累加器的值。從這些任務的角度來看,累加器是一個只寫變數。
對於要在行動操作中使用的累加器,Spark只會把每個任務對各累加器的修改應用一次。因此,如果想要一個無論在失敗還是重複計算時都絕對可靠的累加器,我們必須把它放在 foreach() 這樣的行動操作中。轉化操作中累加器可能會發生不止一次更新
自定義累加器型別的功能在1.X版本中就已經提供了,但是使用起來比較麻煩,在2.0版本後,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義型別累加器的實現方式。實現自定義型別累加器需要繼承AccumulatorV2並至少覆寫下例中出現的方法,下面 下麪這個累加器可以用於在程式執行過程中收集一些文字類資訊,最終以Set[String]的形式返回
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
class LogAccumulator extends org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] {
private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
override def isZero: Boolean = {
_logArray.isEmpty
}
override def reset(): Unit = {
_logArray.clear()
}
override def add(v: String): Unit = {
_logArray.add(v)
}
override def merge(other: org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]]): Unit = {
other match {
case o: LogAccumulator => _logArray.addAll(o.value)
}
}
override def value: java.util.Set[String] = {
java.util.Collections.unmodifiableSet(_logArray)
}
override def copy():org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] = {
val newAcc = new LogAccumulator()
_logArray.synchronized{
newAcc._logArray.addAll(_logArray)
}
newAcc
}
}
// 過濾掉帶字母的
object LogAccumulator {
def main(args: Array[String]) {
val conf=new SparkConf().setAppName("LogAccumulator")
val sc=new SparkContext(conf)
val accum = new LogAccumulator
sc.register(accum, "logAccum")
val sum = sc.parallelize(Array("1", "2a", "3", "4b", "5", "6", "7cd", "8", "9"), 2).filter(line => {
val pattern = """^-?(\d+)"""
val flag = line.matches(pattern)
if (!flag) {
accum.add(line)
}
flag
}).map(_.toInt).reduce(_ + _)
println("sum: " + sum)
for (v <- accum.value) print(v + "")
println()
sc.stop()
}
}
廣播變數用來高效分發較大的物件。向所有工作節點發送一個較大的只讀值,以供一個或多個Spark操作使用。比如,如果你的應用需要向所有節點發送一個較大的只讀查詢表,甚至是機器學習演算法中的一個很大的特徵向量,廣播變數用起來都很順手。 在多個並行操作中使用同一個變數,但是 Spark會爲每個任務分別發送。
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)
scala> broadcastVar.value
res33: Array[Int] = Array(1, 2, 3)
使用廣播變數的過程如下:
簡書:https://www.jianshu.com/u/0278602aea1d
CSDN:https://blog.csdn.net/u012387141