我們在部落格《Hadoop: 單詞計數(Word Count)的MapReduce實現 》中學習瞭如何用Hadoop-MapReduce實現單詞計數,現在我們來看如何用Spark來實現同樣的功能。
Spark框架也是MapReduce-like模型,採用「分治-聚合」策略來對資料分佈進行分佈並行處理。不過該框架相比Hadoop-MapReduce,具有以下兩個特點:
對巨量資料處理框架的輸入/輸出,中間資料進行建模,將這些資料抽象為統一的資料結構命名為彈性分散式資料集(Resilient Distributed Dataset),並在此資料結構上構建了一系列通用的資料操作,使得使用者可以簡單地實現複雜的資料處理流程。
採用了基於記憶體的資料聚合、資料快取等機制來加速應用執行尤其適用於迭代和互動式應用。
Spark社群推薦使用者使用Dataset、DataFrame等面向結構化資料的高層API(Structured API)來替代底層的RDD API,因為這些高層API含有更多的資料型別資訊(Schema),支援SQL操作,並且可以利用經過高度優化的Spark SQL引擎來執行。不過,由於RDD API更基礎,更適合用來展示基本概念和原理,後面我們的程式碼都使用RDD API。
Spark的RDD/dataset分為多個分割區。RDD/Dataset的每一個分割區都對映一個或多個資料檔案, Spark通過該對映讀取資料輸入到RDD/dataset中。
Spark的分割區數和以下引數都有關係:
spark.default.parallelism
(預設為CPU的核數)
spark.sql.files.maxPartitionBytes
(預設為128 MB)讀取檔案時打包到單個分割區中的最大位元組數)
spark.sql.files.openCostInBytes
(預設為4 MB) 該引數預設4M,表示小於4M的小檔案會合併到一個分割區中,用於減小小檔案,防止太多單個小檔案佔一個分割區情況。這個引數就是合併小檔案的閾值,小於這個閾值的檔案將會合並。
我們下面的流程描述中,假設每個檔案對應一個分割區(實際上因為檔案很小,導致三個檔案都在同一個分割區中,大家可以通過呼叫RDD
物件的getNumPartitions()
檢視)。
Spark的Map示意圖如下:
Spark的Reduce示意圖如下:
專案架構如下圖:
Word-Count-Spark
├─ input
│ ├─ file1.txt
│ ├─ file2.txt
│ └─ file3.txt
├─ output
│ └─ result.txt
├─ pom.xml
├─ src
│ ├─ main
│ │ └─ java
│ │ └─ WordCount.java
│ └─ test
└─ target
WordCount.java
檔案如下:
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.io.*;
import java.nio.file.*;
public class WordCount {
private static Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordCount <intput directory> <output directory>");
System.exit(1);
}
String input_path = args[0];
String output_path = args[1];
SparkSession spark = SparkSession.builder()
.appName("WordCount")
.master("local")
.getOrCreate();
JavaRDD<String> lines = spark.read().textFile(input_path).javaRDD();
JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> output = counts.collect();
String filePath = Paths.get(output_path, "result.txt").toString();
BufferedWriter out = new BufferedWriter(new FileWriter(filePath));
for (Tuple2<?, ?> tuple : output) {
out.write(tuple._1() + ": " + tuple._2() + "\n");
}
out.close();
spark.stop();
}
}
pom.xml
檔案設定如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.WordCount</groupId>
<artifactId>WordCount</artifactId>
<version>1.0-SNAPSHOT</version>
<name>WordCount</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<!-- 集中定義版本號 -->
<properties>
<scala.version>2.12.10</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.timezone>UTC</project.timezone>
<java.version>11</java.version>
<scoverage.plugin.version>1.4.0</scoverage.plugin.version>
<site.plugin.version>3.7.1</site.plugin.version>
<scalatest.version>3.1.2</scalatest.version>
<scalatest-maven-plugin>2.0.0</scalatest-maven-plugin>
<scala.maven.plugin.version>4.4.0</scala.maven.plugin.version>
<maven.compiler.plugin.version>3.8.0</maven.compiler.plugin.version>
<maven.javadoc.plugin.version>3.2.0</maven.javadoc.plugin.version>
<maven.source.plugin.version>3.2.1</maven.source.plugin.version>
<maven.deploy.plugin.version>2.8.2</maven.deploy.plugin.version>
<nexus.staging.maven.plugin.version>1.6.8</nexus.staging.maven.plugin.version>
<maven.help.plugin.version>3.2.0</maven.help.plugin.version>
<maven.gpg.plugin.version>1.6</maven.gpg.plugin.version>
<maven.surefire.plugin.version>2.22.2</maven.surefire.plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<spark.version>3.2.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!--======SCALA======-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>11</source>
<target>11</target>
<fork>true</fork>
<executable>/Library/Java/JavaVirtualMachines/jdk-11.0.15.jdk/Contents/Home/bin/javac</executable>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
記得設定輸入引數input
和output
代表輸入目錄和輸出目錄(在VSCode中在launch.json
檔案中設定)。編譯執行後可在output
目錄下檢視result.txt
:
Tom: 1
Hello: 3
Goodbye: 1
World: 2
David: 1
可見成功完成了單詞計數功能。
先使用pip按照pyspark==3.8.2
:
pip install pyspark==3.8.2
注意PySpark只支援Java 8/11,請勿使用更高階的版本。這裡我使用的是Java 11。執行java -version
可檢視本機Java版本。
(base) orion-orion@MacBook-Pro ~ % java -version
java version "11.0.15" 2022-04-19 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.15+8-LTS-149)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.15+8-LTS-149, mixed mode)
專案架構如下:
Word-Count-Spark
├─ input
│ ├─ file1.txt
│ ├─ file2.txt
│ └─ file3.txt
├─ output
│ └─ result.txt
├─ src
│ └─ word_count.py
word_count.py
編寫如下:
from pyspark.sql import SparkSession
import sys
import os
from operator import add
if len(sys.argv) != 3:
print("Usage: WordCount <intput directory> <output directory>", file=sys.stderr)
exit(1)
input_path, output_path = sys.argv[1], sys.argv[2]
spark = SparkSession.builder.appName("WordCount").master("local").getOrCreate()
lines = spark.read.text(input_path).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda s: s.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(add)
output = counts.collect()
with open(os.path.join(output_path, "result.txt"), "wt") as f:
for (word, count) in output:
f.write(str(word) +": " + str(count) + "\n")
spark.stop()
使用python word_count.py input output
執行後,可在output
中檢視對應的輸出檔案result.txt
:
Hello: 3
World: 2
Goodbye: 1
David: 1
Tom: 1
可見成功完成了單詞計數功能。