首先,Hadoop會把輸入資料劃分成等長的輸入分片(input split) 或分片傳送到MapReduce。Hadoop為每個分片建立一個map任務,由它來執行使用者自定義的map函數以分析每個分片中的記錄。在我們的單詞計數例子中,輸入是多個檔案,一般一個檔案對應一個分片,如果檔案太大則會劃分為多個分片。map函數的輸入以<key, value>
形式做為輸入,value
為檔案的每一行,key
為該行在檔案中的偏移量(一般我們會忽視)。這裡map函數起到的作用為將每一行進行分詞為多個word
,並在context
中寫入<word, 1>
以代表該單詞出現一次。
map過程的示意圖如下:
mapper程式碼編寫如下:
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//每次處理一行,一個mapper裡的value為一行,key為該行在檔案中的偏移量
StringTokenizer iter = new StringTokenizer(value.toString());
while (iter.hasMoreTokens()) {
word.set(iter.nextToken());
// 向context中寫入<word, 1>
context.write(word, one);
System.out.println(word);
}
}
}
如果我們能夠並行處理分片(不一定是完全並行),且分片是小塊的資料,那麼處理過程將會有一個好的負載平衡。但是如果分片太小,那麼管理分片與map任務建立將會耗費太多時間。對於大多數作業,理想分片大小為一個HDFS塊的大小,預設是64MB。
map任務的執行節點和輸入資料的儲存節點相同時,Hadoop的效能能達到最佳,這就是計算機系統中所謂的data locality optimization(資料區域性性優化)。而最佳分片大小與塊大小相同的原因就在於,它能夠保證一個分片儲存在單個節點上,再大就不能了。
接下來我們看reducer的編寫。reduce任務的多少並不是由輸入大小來決定,而是需要人工單獨指定的(預設為1個)。和上面map不同的是,reduce任務不再具有本地讀取的優勢————一個reduce任務的輸入往往來自於所有mapper的輸出,因此map和reduce之間的資料流被稱為 shuffle(洗牌) 。Hadoop會先按照key-value對進行排序,然後將排序好的map的輸出通過網路傳輸到reduce任務執行的節點,並在那裡進行合併,然後傳遞到使用者定義的reduce函數中。
reduce 函數示意圖如下:
reducer程式碼編寫如下:
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
關於VSCode+Java+Maven+Hadoop開發環境搭建,可以參見我的部落格《VSCode+Maven+Hadoop開發環境搭建》,此處不再贅述。這裡展示我們的專案架構圖:
Word-Count-Hadoop
├─ input
│ ├─ file1
│ ├─ file2
│ └─ file3
├─ output
├─ pom.xml
├─ src
│ └─ main
│ └─ java
│ └─ WordCount.java
└─ target
WordCount.java
程式碼如下:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount{
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//每次處理一行,一個mapper裡的value為一行,key為該行在檔案中的偏移量
StringTokenizer iter = new StringTokenizer(value.toString());
while (iter.hasMoreTokens()) {
word.set(iter.nextToken());
// 向context中寫入<word, 1>
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word_count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
//此處的Combine操作意為即第每個mapper工作完了先區域性reduce一下,最後再全域性reduce
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//第0個引數是輸入目錄,第1個引數是輸出目錄
//先判斷output path是否存在,如果存在則刪除
Path path = new Path(args[1]);//
FileSystem fileSystem = path.getFileSystem(conf);
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
//設定輸入目錄和輸出目錄
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
pom.xml
中記得設定Hadoop的依賴環境:
...
<!-- 集中定義版本號 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<hadoop.version>3.3.1</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- 匯入hadoop依賴環境 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
...
</project>
此外,因為我們的程式自帶輸入引數,我們還需要在VSCode的launch.json
中設定輸入引數intput
(代表輸入目錄)和output
(代表輸出目錄):
...
"args": [
"input",
"output"
],
...
編譯執行完畢後,可以檢視output
資料夾下的part-r-00000
檔案:
David 1
Goodbye 1
Hello 3
Tom 1
World 2
可見我們的程式正確地完成了單詞計數的功能。