RDD提供兩種型別的操作:
在Spark中,轉換的作用是從現有資料集建立新資料集。轉換是惰性的,因為它們僅在動作需要將結果返回到驅動程式時才計算。
下面來看看一些常用的RDD轉換。
map(func)
- 它返回一個新的分散式資料集, 該資料集是通過函式func
傳遞源的每個元素而形成的。filter(func)
- 它返回一個新資料集, 該資料集是通過選擇函式func
返回true
的源元素而形成的。flatMap(func)
- 這裡,每個輸入項可以對映到零個或多個輸出項, 因此函式func
應該返回序列而不是單個項。mapPartitions(func)
- 它類似於map,但是在RDD的每個分割區(塊)上單獨執行, 因此當在型別T的RDD上執行時, func
必須是Iterator <T> => Iterator <U>
型別。mapPartitionsWithIndex(func)
- 它類似於mapPartitions
,它為func
提供了一個表示分割區索引的整數值,因此當在型別T的RDD上執行時,func
必須是型別(Int,Iterator <T>)=> Iterator <U>
。sample(withReplacement, fraction, seed)
- 它使用給定的亂數生成器種子對資料的分數部分進行取樣,有或沒有替換。union(otherDataset)
- 它返回一個新資料集,其中包含源資料集和引數中元素的並集。intersection(otherDataset)
- 它返回一個新的RDD,其中包含源資料集和引數中的元素的交集。distinct([numPartitions]))
- 它返回一個新資料集,其中包含源資料集的不同元素。groupByKey([numPartitions])
- 當在(K,V)
對的資料集上呼叫時,它返回(K,Iterable)
對的資料集。reduceByKey(func, [numPartitions])
- 當呼叫(K,V)
對的資料集時,返回(K,V)
對的資料集,其中使用給定的reduce
函式func
聚合每個鍵的值,該函式必須是型別(V,V)=>V
。aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
- 當呼叫(K,V)
對的資料集時,返回(K,U)
對的資料集,其中使用給定的組合函式和中性「零」值聚合每個鍵的值。sortByKey([ascending], [numPartitions])
- 它返回按鍵按升序或降序排序的鍵值對的資料集,如在布林ascending
引數中所指定。join(otherDataset, [numPartitions])
-當呼叫型別(K,V)
和(K,W)
的資料集時,返回(K,(V,W))
對的資料集以及每個鍵的所有元素對。通過leftOuterJoin
,rightOuterJoin
和fullOuterJoin
支援外連線。cogroup(otherDataset, [numPartitions])
-當呼叫型別(K,V)
和(K,W)
的資料集時,返回(K,(Iterable,Iterable))
元組的資料集。此操作也稱為groupWith
。cartesian(otherDataset)
-當呼叫型別為T和U的資料集時,返回(T,U)
對的資料集(所有元素對)。pipe(command, [envVars])
-通過shell命令管道RDD的每個分割區,例如, 一個Perl或bash指令碼。coalesce(numPartitions)
-它將RDD中的分割區數減少到numPartitions
。repartition(numPartitions)
-它隨機重新調整RDD中的資料,以建立更多或更少的分割區,並在它們之間進行平衡。repartitionAndSortWithinPartitions(partitioner)
- 它根據給定的分割區器對RDD進行重新分割區,並在每個生成的分割區中鍵對記錄進行排序。在Spark中,操作的作用是在對資料集執行計算後將值返回給驅動程式。
下面來看看一些常用的RDD操作。
操作 | 描述 |
---|---|
reduce(func) |
它使用函式func(它接受兩個引數並返回一個)來聚合資料集的元素。該函式應該是可交換的和關聯的,以便可以並行正確計算。 |
collect() |
它將資料集的所有元素作為陣列返回到驅動程式中。在過濾器或其他返回足夠小的資料子集的操作之後,這通常很有用。 |
count() |
它返回資料集中的元素數。 |
first() |
它返回資料集的第一個元素(類似於take(1) )。 |
take(n) |
它返回一個包含資料集的前n個元素的陣列。 |
takeSample(withReplacement, num, [seed]) |
它返回一個陣列,其中包含資料集的num個元素的隨機樣本,有或沒有替換,可選地預先指定亂數生成器種子。 |
takeOrdered(n, [ordering]) |
它使用自然順序或自定義比較器返回RDD的前n個元素。 |
saveAsTextFile(path) |
它用於將資料集的元素作為文字檔案(或文字檔案集)寫入本地檔案系統,HDFS或任何其他Hadoop支援的檔案系統的給定目錄中。 |
saveAsSequenceFile(path) |
它用於在本地檔案系統,HDFS或任何其他Hadoop支援的檔案系統中的給定路徑中將資料集的元素編寫為Hadoop SequenceFile。 |
saveAsObjectFile(path) |
它用於使用Java序列化以簡單格式編寫資料集的元素,然後可以使用SparkContext.objectFile() 載入。 |
countByKey() |
它僅適用於型別(K,V)的RDD。因此,它返回(K,Int)對的雜湊對映與每個鍵的計數。 |
foreach(func) |
它在資料集的每個元素上執行函式func 以獲得副作用,例如更新累加器或與外部儲存系統互動。 |