MapReduce是一個分佈式運算程式的程式設計框架,是使用者開發「基於 Hadoop的數據分析應用」的核心框架。
MapReduce核心功能是將使用者編寫的業務邏輯程式碼和自帶預設元件整合成一個完整的分佈式運算程式,併發執行在一個 Hadoop叢集上。
1.1.1 優點
1.1.2 缺點
1.MapReduce核心程式設計思想,如圖所示。
1)分佈式的運算程式往往需要分成至少2個階段。
2)第一個階段的MapTask併發範例,完全並行執行,互不相幹。
3)第二個階段的ReduceTask併發範例互不相幹,但是他們的數據依賴於上一個階段的所有MapTask併發範例的輸出。
4)MapReduce程式設計模型只能包含一個Map階段和一個Reduce階段,如果使用者的業務邏輯非常複雜,那就只能多個MapReduce程式,序列執行。
總結:分析WordCount數據流走向深入理解MapReduce核心思想。
2.MapReduce進程
一個完整的Map Reduce程式在分佈式執行時有三類範例進程:
使用者編寫的程式分成三個部分:Mapper、Reducer和Driver。
1.需求
在給定的文字檔案中統計輸出每一個單詞出現的總次數
2.需求分析
按照MapReduce程式設計規範,分別編寫Mapper,Reducer,Driver。
3.環境準備
(1)建立maven工程
(2)在pom.xml檔案中新增如下依賴
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
</dependencies>
(2)在專案的src/main/resources目錄下,新建一個檔案,命名爲「log4j.properties」,在檔案中填入。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4.編寫程式
(1)編寫Mapper類
package com.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 輸出
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
(2)編寫Reducer類
package com.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 輸出
v.set(sum);
context.write(key,v);
}
}
(3)編寫Driver驅動類
package com.mapreduce;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 1 獲取設定資訊以及封裝任務
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 設定jar載入路徑
job.setJarByClass(WordcountDriver.class);
// 3 設定map和reduce類
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 4 設定map輸出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 設定最終輸出kv型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//FileSystem fs = FileSystem.get(new URI("hdfs://spark1:9000"), configuration, "root");
Path outPath = new Path("hdfs://spark1:9000/meanwhileFind");// 輸出路徑
FileSystem fs = outPath.getFileSystem(configuration);// 根據輸出路徑找到檔案,參數爲組態檔
if (fs.exists(outPath)) {
fs.delete(outPath);
// fs.delete(outPath, true);true的意思是,就算output有東西,也一帶刪除,預設爲true
}
Path inpPath = new Path("hdfs://spark1:9000/wordcount.txt");
// 6 設定輸入和輸出路徑 args[0]
FileInputFormat.setInputPaths(job, inpPath);
FileOutputFormat.setOutputPath(job, outPath);
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
7.叢集上測試
(0)用maven打jar包,需要新增的打包外掛依賴
注意:mainClass部分需要替換爲自己工程主類
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin </artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.atguigu.mr.WordcountDriver</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
注意:如果工程上顯示紅叉。在專案上右鍵->maven->update project即可。
(1)將程式打成jar包,然後拷貝到Hadoop叢集中
步驟詳情:右鍵->Run as->maven install。等待編譯完成就會在專案的target資料夾中生成jar包。如果看不到。在專案上右鍵-》Refresh,即可看到。修改不帶依賴的jar包名稱爲wc.jar,並拷貝該jar包到Hadoop叢集。
(2)啓動Hadoop叢集
(3)執行WordCount程式
hadoop jar wc.jar com.mapreduce.WordcountDriver
Java型別 | Hadoop Writable型別 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
String | Text |
map | MapWritable |
array | ArrayWritable |
在開發中往往常用的基本序列化型別不能滿足所有需求,比如在Hadoop框架內部傳遞一個bean物件,那麼該物件就需要實現序列化介面。
具體實現bean物件序列化步驟如下7步。
(1)必須實現Writable介面
(2)反序列化時,需要反射呼叫空參建構函式,所以必須有空參構造
public FlowBean() {
super();
}
(3)重寫序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
(4)重寫反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
(5)注意反序列化的順序和序列化的順序完全一致
(6)要想把結果顯示在檔案中,需要重寫toString(),可用」\t」分開,方便後續用。
(7)如果需要將自定義的bean放在key中傳輸,則還需要實現Comparable介面,因爲MapReduce框中的Shuffle過程要求對key必須能排序。詳見後面排序案例。
@Override
public int compareTo(FlowBean o) {
// 倒序排列,從大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
流程
編碼
編寫流量統計的Bean物件
package com.serialize;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable{
private long upFlow;
private long downFlow;
private long sumFlow;
//2 反序列化時,需要反射呼叫空參建構函式,所以必須有
public FlowBean() {
super();
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//3 寫序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//4 反序列化方法
//5 反序列化方法讀順序必須和寫序列化方法的寫順序必須一致
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
// 6 編寫toString方法,方便後續列印到文字
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
}
1.問題引出
MapTask的並行度決定Map階段的任務處理併發度,進而影響到整個Job的處理速度。
思考:1G的數據,啓動8個MapTask,可以提高叢集的併發處理能力。那麼1K的數據,也啓動8個MapTask,會提高叢集效能嗎?MapTask並行任務是否越多越好呢?哪些因素影響了MapTask並行度?
2.MapTask並行度決定機制 機製
數據塊:Block是HDFS物理上把數據分成一塊一塊。
數據切片:數據切片只是在邏輯上對輸入進行分片,並不會在磁碟上將其切分成片進行儲存。
1.Job提交流程原始碼詳解
waitForCompletion()
submit();
// 1建立連線
connect();
// 1)建立提交Job的代理
new Cluster(getConfiguration());
// (1)判斷是本地yarn還是遠端
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)建立給叢集提交數據的Stag路徑
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)獲取jobid ,並建立Job路徑
JobID jobId = submitClient.getNewJobID();
// 3)拷貝jar包到叢集
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)計算切片,生成切片規劃檔案
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向Stag路徑寫XML組態檔
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交Job,返回提交狀態
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
2.FileInputFormat切片原始碼解析(input.getSplits(job))
切片機制 機製
(1)簡單地按照檔案的內容長度進行切片
(2) 切片大小, 預設等於Block大小
(3)切片時不考慮數據集整體,而是逐個針對每一個檔案單獨切片
案例分析
(1)輸入數據有兩個檔案:
file l.txt320M
file 2.txt10M
(2) 經過File Input Format的切片機制 機製
運算後,形成的切片資訊如下:
file l.txt.split 1–0~128
file l.txt.split 2–128~256
file l.txt.split 3–256~320
file 2.txt.split 1–0~10M
FileInputFormat切片大小的參數設定
(1)原始碼中計算切片大小的公式
Math.max(minSize, Math.min(maxSize, blockSize) ) ;
map reduce n put.file input format.splt minsize=1預設值爲1
map reduce.n put.file input format.spl imax size=Long.MAXValue預設值Long.MAXValue
因此, 預設情況下, 切片大小=blocksize。
(2)切片大小設定
maxsize(切片最大值) :參數如果調得比blockSize小, 則會讓切片變小, 而且就等於設定的這個參數的值。
minsize(切片最小值) :參數調的比blockSize大, 則可以讓切片變得比blockSize還大。
(3) 獲取切片資訊API
//獲取切片的檔名稱
String name = inputSplit.getPath().getName();
//根據檔案型別獲取切片資訊
FileSplit inputSplit = (FileSplit) context.getInputSplit();
框架預設的TextInputFormat切片機制 機製是對任務按檔案規劃切片,不管檔案多小,都會是一個單獨的切片,都會交給一個MapTask,這樣如果有大量小檔案,就會產生大量的MapTask,處理效率極其低下。
1、應用場景:
CombineTextInputFormat用於小檔案過多的場景,它可以將多個小檔案從邏輯上規劃到一個切片中,這樣,多個小檔案就可以交給一個MapTask處理。
2、虛擬儲存切片最大值設定
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虛擬儲存切片最大值設定最好根據實際的小檔案大小情況來設定具體的值。
3、切片機制 機製
生成切片過程包括:虛擬儲存過程和切片過程二部分。
(1)虛擬儲存過程:
將輸入目錄下所有檔案大小,依次和設定的setMaxInputSplitSize值比較,如果不大於設定的最大值,邏輯上劃分一個塊。如果輸入檔案大於設定的最大值且大於兩倍,那麼以最大值切割一塊;當剩餘數據大小超過設定的最大值且不大於最大值2倍,此時將檔案均分成2個虛擬儲存塊(防止出現太小切片)。
例如setMaxInputSplitSize值爲4M,輸入檔案大小爲8.02M,則先邏輯上分成一個4M。剩餘的大小爲4.02M,如果按照4M邏輯劃分,就會出現0.02M的小的虛擬儲存檔案,所以將剩餘的4.02M檔案切分成(2.01M和2.01M)兩個檔案。
(2)切片過程:
(a)判斷虛擬儲存的檔案大小是否大於setMaxInputSplitSize值,大於等於則單獨形成一個切片。
(b)如果不大於則跟下一個虛擬儲存檔案進行合併,共同形成一個切片。
(c)測試舉例:有4個小檔案大小分別爲1.7M、5.1M、3.4M以及6.8M這四個小檔案,則虛擬儲存之後形成6個檔案塊,大小分別爲:
1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最終會形成3個切片,大小分別爲:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
1.需求
將輸入的大量小檔案合併成一個切片統一處理。
(1)輸入數據
準備4個小檔案
(2)期望
期望一個切片處理4個檔案
2.實現過程
(1)不做任何處理,執行WordCount案例程式,觀察切片個數爲4。
(2)在WordcountDriver中增加如下程式碼,執行程式,並觀察執行的切片個數爲3。
(a)驅動類中新增程式碼如下:
// 如果不設定InputFormat,它預設用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虛擬儲存切片最大值設定4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
(b)執行結果爲3個切片。擴大最大值設定爲20971520執行結果爲1個切片。(number of splits:1)
思考:在執行Map Reduce程式時, 輸入的檔案格式包括:基於行的日誌檔案、二進制格式檔案、數據庫表等。那麼, 針對不同的數據型別, Map Reduce是如何讀取這些數據的呢?
FileInputFormat常見的接實現類包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等。
每一行均爲一條記錄, 被分隔符分割爲key, value。可以通過在驅動類中設定conf.set(KeyValue Line Record Reader KEY_VALUE_SEPERATOR, 」\t」) ; 來設定分隔符。預設分隔符是tab() 。
以下是一個範例,輸入是一個包含4條記錄的分片。其中——>表示一個(水平方向的)製表符。
此時的鍵是每行排在製表符之前的Text序列。
line 1 —>Rich learning form
line 2 —>Intelligent learning engine
line 3 —>Learning more convenient
line 4—>From the real demand for more close to the enterprise
每條記錄表示爲以下鍵/值對:
(line 1, Rich learning form)
(line 2, Intelligent learning engine)
(line 3, Learning more convenient)
(line 4, From the realdemand for more close to the enterprise)
例統計輸入檔案中每一行的第一個單詞相同的行數。
(1)輸入數據
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
(2)期望結果數據
banzhang 2
xihuan 2
(3)程式碼實現
編寫Mapper類
package com.TextInputFormat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class KVTextMapper extends Mapper<Text,Text,Text,LongWritable>{
LongWritable v = new LongWritable(1);
@Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
System.out.println("mapper key:" + key +" value:"+v);
context.write(key, v);
}
}
編寫Reducer類
package com.TextInputFormat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
LongWritable v = new LongWritable();
@Override
protected void reduce(Text k, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum = 0L;
for (LongWritable value : values) {
sum += value.get();
}
v.set(sum);
System.out.println("reduce key:" + k +" value:"+v);
// 2 輸出
context.write(k, v);
}
}
編寫Driver類
package com.TextInputFormat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class KVTextDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"D:\\Users\\Administrator\\Desktop\\KVTextDriver.txt","hdfs://spark1:9000/out"};
Configuration conf = new Configuration();
// 設定切割符
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
// 1 獲取job物件
Job job = Job.getInstance(conf);
// 2 設定jar包位置,關聯mapper和reducer
job.setJarByClass(KVTextDriver.class);
job.setMapperClass(KVTextMapper.class);
job.setReducerClass(KVTextReducer.class);
// 3 設定map輸出kv型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 4 設定最終輸出kv型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 5 設定輸入輸出數據路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 設定輸入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);
Path outPath = new Path(args[1]);// 輸出路徑
FileSystem fs = outPath.getFileSystem(conf);// 根據輸出路徑找到檔案,參數爲組態檔
if (fs.exists(outPath)) {
fs.delete(outPath);
// fs.delete(outPath, true);true的意思是,就算output有東西,也一帶刪除,預設爲true
}
// 6 設定輸出數據路徑
FileOutputFormat.setOutputPath(job,outPath);
// 7 提交job
job.waitForCompletion(true);
}
}
package com.NLineInputFormat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
private Text k = new Text();
private LongWritable v = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
// 1 獲取一行
String line = value.toString();
// 2 切割
String[] splited = line.split(" ");
// 3 回圈寫出
for (int i = 0; i < splited.length; i++) {
k.set(splited[i]);
System.out.println("mapper key:" + k +" value:"+v);
context.write(k, v);
}
}
}
編寫Reducer類
package com.NLineInputFormat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum = 0l;
// 1 彙總
for (LongWritable value : values) {
sum += value.get();
}
v.set(sum);
System.out.println("reduce key:" + key +" value:"+v);
// 2 輸出
context.write(key, v);
}
}
編寫Driver類
package com.NLineInputFormat;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NLineDriver {
public static void main(String[] args)
throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
args = new String[] { "D:\\Users\\Administrator\\Desktop\\NLineDriver.txt", "hdfs://spark1:9000/out" };
// 1 獲取job物件
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 7設定每個切片InputSplit中劃分三條記錄
NLineInputFormat.setNumLinesPerSplit(job, 3);
// 8使用NLineInputFormat處理記錄數
job.setInputFormatClass(NLineInputFormat.class);
// 2設定jar包位置,關聯mapper和reducer
job.setJarByClass(NLineDriver.class);
job.setMapperClass(NLineMapper.class);
job.setReducerClass(NLineReducer.class);
// 3設定map輸出kv型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 4設定最終輸出kv型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
Path outPath = new Path(args[1]);// 輸出路徑
FileSystem fs = outPath.getFileSystem(configuration);// 根據輸出路徑找到檔案,參數爲組態檔
if (fs.exists(outPath)) {
fs.delete(outPath);
// fs.delete(outPath, true);true的意思是,就算output有東西,也一帶刪除,預設爲true
}
// 5設定輸入輸出數據路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outPath);
// 6提交job
job.waitForCompletion(true);
}
}
無論HDFS還是MapReduce,在處理小檔案時效率都非常低,但又難免面臨處理大量小檔案的場景,此時,就可以自定義InputFormat實現小檔案的合併。
需求
將多個小檔案合併成一個SequenceFile檔案(SequenceFile檔案是Hadoop用來儲存二進制形式的key-value對的檔案格式),SequenceFile裏面儲存着多個檔案,儲存的形式爲檔案路徑+名稱爲key,檔案內容爲value。
程式實現
(1)自定義InputFromat
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
// 定義類繼承FileInputFormat
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(split, context);
return recordReader;
}
}
(2)自定義RecordReader類
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeRecordReader extends RecordReader<Text, BytesWritable>{
private Configuration configuration;
private FileSplit split;
private boolean isProgress= true;
private BytesWritable value = new BytesWritable();
private Text k = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit)split;
configuration = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProgress) {
// 1 定義快取區
byte[] contents = new byte[(int)split.getLength()];
FileSystem fs = null;
FSDataInputStream fis = null;
try {
// 2 獲取檔案系統
Path path = split.getPath();
fs = path.getFileSystem(configuration);
// 3 讀取數據
fis = fs.open(path);
// 4 讀取檔案內容
IOUtils.readFully(fis, contents, 0, contents.length);
// 5 輸出檔案內容
value.set(contents, 0, contents.length);
// 6 獲取檔案路徑及名稱
String name = split.getPath().toString();
// 7 設定輸出的key值
k.set(name);
} catch (Exception e) {
}finally {
IOUtils.closeStream(fis);
}
isProgress = false;
return true;
}
return false;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return k;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
}
}
(3)編寫SequenceFileMapper類處理流程
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
@Override
protected void map(Text key, BytesWritable value,Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
(4)編寫SequenceFileReducer類處理流程
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, values.iterator().next());
}
}
(5)編寫SequenceFileDriver類處理流程
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class SequenceFileDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
args = new String[] { "e:/input/inputinputformat", "e:/output1" };
// 1 獲取job物件
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 設定jar包儲存位置、關聯自定義的mapper和reducer
job.setJarByClass(SequenceFileDriver.class);
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);
// 7設定輸入的inputFormat
job.setInputFormatClass(WholeFileInputformat.class);
// 8設定輸出的outputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 3 設定map輸出端的kv型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 4 設定最終輸出端的kv型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 5 設定輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
Map方法之後,Reduce方法之前的數據處理過程稱之爲Shuffle。
如圖3.4流程是整個MapReduce最全工作流程,但是Shuffle過程只是從第7步開始到第16步結束,具體Shuffle過程詳解,如下:
1)MapTask收集我們的map()方法輸出的kv對,放到記憶體緩衝區中
2)從記憶體緩衝區不斷溢位本地磁碟檔案,可能會溢位多個檔案
3)多個溢位檔案會被合併成大的溢位檔案
4)在溢位過程及合併的過程中,都要呼叫Partitioner進行分割區和針對key進行排序
5)ReduceTask根據自己的分割區號,去各個MapTask機器上取相應的結果分割區數據
6)ReduceTask會取到同一個分割區的來自不同MapTask的結果檔案,ReduceTask會將這些檔案再進行合併(歸併排序)
7)合併成大檔案後,Shuffle的過程也就結束了,後面進入ReduceTask的邏輯運算過程(從檔案中取出一個一個的鍵值對Group,呼叫使用者自定義的reduce()方法)
3. 注意
Shuffle中的緩衝區大小會影響到MapReduce程式的執行效率,原則上說,緩衝區越大,磁碟io的次數越少,執行速度就越快。
緩衝區的大小可以通過參數調整,參數:io.sort.mb預設100M。
4. 原始碼解析流程
context.write(k, NullWritable.get());
output.write(key, value);
collector.collect(key, value,partitioner.getPartition(key, value, partitions));
HashPartitioner();
collect()
close()
collect.flush()
sortAndSpill()
sort() QuickSort
mergeParts();
collector.close();
1、問題引出
要求將統計結果按照條件輸出到不同檔案中(分割區)。比如:將統計結果按照手機歸屬地不同省份輸出到不同檔案中(分割區)
2、預設Partitioner分割區
public class Hash Partitioner<K, v>extends Partitioner<K, v>{
public int getPartition(K key, V value, int numReduceTasks) {
return(key.hashCode() & Integer.MAXVALUE) % numReduceTasks;
}
}
預設分割區是根據key的hashCode對ReduceTasks個數取模得到的。使用者沒法控制哪個key儲存到哪個分割區。
3、自定義Partitioner步驟
(1) 自定義類繼承Parttioner, 重寫getPariion() 方法
public class CustomPartitioner extends Partitioner<Text, FlowBean>{
@override
public int getPartition(Text key, Flow Beanvalue, int numPartitions) {
//控制分割區程式碼邏輯
.....
return partition;
}
}
(2) 在Job驅動中, 設定自定義Partitioner
job.setPartitionerClass(CustomParttioner.class);
(3) 自定義Partition後, 要根據自定義Partitioner的邏輯設定相應數量的ReduceTask
job.setNumReduceTasks(5);
4、分割區總結
(1) 如果ReduceTask的數量>getParttion的結果數, 則會多產生幾個空的輸出檔案part-r-000xx;
(2) 如果1<ReduceTask的數量<getPartition的結果數, 則有一部分分割區數據無處安放, 會Exception;
(3) 如果ReduceTask的數量=1, 則不管MapTask端輸出多少個分割區檔案, 最終結果都交給這一個ReduceTask, 最終也就只會產生一個結果檔案part-r-00000;
(4)分割區號必須從零開始,逐一累加。
例如:假設自定義分割區數爲5,則
(1) job.setNumReduceTasks(1) ;會正常執行,只不過會產生一個輸出檔案
(2) job.setNumReduceTasks(2) ;會報錯
(3) job.setNumReduceTasks(6) ; 大於5,程式會正常執行,會產生空檔案
5、 Partition分割區例
需求
將統計結果按照手機歸屬地不同省份輸出到不同檔案中(分割區)
手機號136、137、138、139開頭都分別放到一個獨立的4個檔案中,其他開頭的放到一個檔案中。
在序列化例子的基礎上,增加一個分割區類
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 獲取電話號碼的前三位
String preNum = key.toString().substring(0, 3);
int partition = 4;
// 2 判斷是哪個省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
在驅動函數中增加自定義數據分割區設定和ReduceTask設定
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowsumDriver {
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
args = new String[]{"e:/output1","e:/output2"};
// 1 獲取設定資訊,或者job物件範例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程式的jar包所在的本地路徑
job.setJarByClass(FlowsumDriver.class);
// 3 指定本業務job要使用的mapper/Reducer業務類
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 4 指定mapper輸出數據的kv型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 指定最終輸出的數據的kv型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 8 指定自定義數據分割區
job.setPartitionerClass(ProvincePartitioner.class);
// 9 同時指定相應數量的reduce task
job.setNumReduceTasks(5);
// 6 指定job的輸入原始檔案所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 將job中設定的相關參數,以及job所用的java類所在的jar包, 提交給yarn去執行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask均會對數據按照key進行排序。該操作屬於Hadoop的預設行爲。任何應用程式中的數據均會被排序, 而不管邏輯上是否需要。
預設排序是按照字典順序排序,且實現該排序的方法是快速排序。
對於MapTask, 它會將處理的結果暫時放到環形緩衝區中, 當環形緩衝區使用率達到一定閾值後,再對緩衝區中的數據進行一次快速排序,並將這些有序數據溢寫到磁碟上,而當數據處理完畢後,它會對磁碟上所有檔案進行歸併排序。
對於ReduceTask, 它從每個MapTask上遠端拷貝相應的數據檔案, 如果檔案大小超過一定閾值,則溢寫磁碟上,否則儲存在記憶體中。如果磁碟上檔案數目達到一定閾值,則進行一次歸併排序以生成一個更大檔案;如果記憶體中檔案大小或者數目超過一定閾值,則進行一次合併後將數據溢寫到磁碟上。當所有數據拷貝完畢後, ReduceTask統一對記憶體和磁碟上的所有數據進行一次歸併排序。
@Override
public int compareTo(FlowBean o) {
int result;
// 按照總流量大小,倒序排列
if (sumFlow > bean.getSumFlow()) {
result = -1;
}else if (sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}
(1)FlowBean物件在在需求1基礎上增加了比較功能
package com.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class FlowBean implements WritableComparable<FlowBean>{
private long upFlow;
private long downFlow;
private long sumFlow;
// 反序列化時,需要反射呼叫空參建構函式,所以必須有
public FlowBean() {
super();
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
public void set(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
/**
* 序列化方法
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/**
* 反序列化方法 注意反序列化的順序和序列化的順序完全一致
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowBean o) {
int result;
// 按照總流量大小,倒序排列
System.out.println("sumFlow:" + sumFlow +" o.getSumFlow():"+o.getSumFlow());
if (sumFlow > o.getSumFlow()) {
result = -1;
}else if (sumFlow < o.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}
}
(2)編寫Mapper類
package com.WritableComparable;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
FlowBean bean = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取一行
String line = value.toString();
// 2 擷取
String[] fields = line.split(" ");
// 3 封裝物件
String phoneNbr = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
bean.set(upFlow, downFlow);
v.set(phoneNbr);
System.out.println("mapper key:" + bean +" value:"+v);
// 4 輸出
context.write(bean, v);
}
}
(3)編寫Reducer類
package com.WritableComparable;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 回圈輸出,避免總流量相同情況
for (Text text : values) {
System.out.println("reduce key:" + text +" value:"+key);
context.write(text, key);
}
}
}
(4)編寫Driver類
package com.WritableComparable;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCountSortDriver {
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
args = new String[] { "D:\\Users\\Administrator\\Desktop\\FlowCountSortDriver.txt", "hdfs://spark1:9000/out" };
// 1 獲取設定資訊,或者job物件範例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程式的jar包所在的本地路徑
job.setJarByClass(FlowCountSortDriver.class);
// 3 指定本業務job要使用的mapper/Reducer業務類
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);
// 4 指定mapper輸出數據的kv型別
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5 指定最終輸出的數據的kv型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
Path outPath = new Path(args[1]);// 輸出路徑
FileSystem fs = outPath.getFileSystem(configuration);// 根據輸出路徑找到檔案,參數爲組態檔
if (fs.exists(outPath)) {
fs.delete(outPath);
// fs.delete(outPath, true);true的意思是,就算output有東西,也一帶刪除,預設爲true
}
// 6 指定job的輸入原始檔案所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outPath);
// 7 將job中設定的相關參數,以及job所用的java類所在的jar包, 提交給yarn去執行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
package com.atguigu.mapreduce.sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean key, Text value, int numPartitions) {
// 1 獲取手機號碼前三位
String preNum = value.toString().substring(0, 3);
int partition = 4;
// 2 根據手機號歸屬地設定分割區
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
(2)在驅動類中新增分割區類
// 載入自定義分割區類
job.setPartitionerClass(ProvincePartitioner.class);
// 設定Reducetask個數
job.setNumReduceTasks(5);
Mapper | Reducer |
---|---|
357->(3+5+7)/3=5 | (3+5+7+2+6)/5=23/5不等於(5+4)/2=9/2 |
26->(2+6)/2=4 |
(6)自定義Combiner實現步驟
(a)自定義一個Combiner繼承Reducer,重寫Reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// 1 彙總操作
int count = 0;
for(IntWritable v :values){
count += v.get();
}
// 2 寫出
context.write(key, new IntWritable(count));
}
}
(b)在Job驅動類中設定:
job.setCombinerClass(WordcountCombiner.class);
統計過程中對每一個MapTask的輸出進行區域性彙總,以減小網路傳輸量即採用Combiner功能。
期望:Combine輸入數據多,輸出時經過合併,輸出數據降低。
方法一
1)增加一個WordcountCombiner類繼承Reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1 彙總
int sum = 0;
for(IntWritable value :values){
sum += value.get();
}
v.set(sum);
// 2 寫出
context.write(key, v);
}
}
2)在WordcountDriver驅動類中指定Combiner
// 指定需要使用combiner,以及用哪個類作爲combiner的邏輯
job.setCombinerClass(WordcountCombiner.class);
方法二
1)將WordcountReducer作爲Combiner在WordcountDriver驅動類中指定
// 指定需要使用Combiner,以及用哪個類作爲Combiner的邏輯
job.setCombinerClass(WordcountReducer.class);
對Reduce階段的數據根據某一個或幾個欄位進行分組。
分組排序步驟:
(1)自定義類繼承WritableComparator
(2)重寫compare()方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比較的業務邏輯
return result;
}
(3)建立一個構造將比較物件的類傳給父類別
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
案例
有如下訂單數據
訂單id | 商品id | 成交金額 |
---|---|---|
0000001 | Pdt_01 | 222.8 |
Pdt_02 | 33.8 | |
0000002 | Pdt_03 | 522.8 |
Pdt_04 | 122.4 | |
Pdt_05 | 722.4 | |
0000003 | Pdt_06 | 232.8 |
Pdt_02 | 33.8 |
現在需要求出每一個訂單中最貴的商品。
期望輸出數據
1 222.8
2 722.4
3 232.8
(1)定義訂單資訊OrderBean類
package com.atguigu.mapreduce.order;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class OrderBean implements WritableComparable<OrderBean> {
private int order_id; // 訂單id號
private double price; // 價格
public OrderBean() {
super();
}
public OrderBean(int order_id, double price) {
super();
this.order_id = order_id;
this.price = price;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) throws IOException {
order_id = in.readInt();
price = in.readDouble();
}
@Override
public String toString() {
return order_id + "\t" + price;
}
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
// 二次排序
@Override
public int compareTo(OrderBean o) {
int result;
if (order_id > o.getOrder_id()) {
result = 1;
} else if (order_id < o.getOrder_id()) {
result = -1;
} else {
// 價格倒序排序
result = price > o.getPrice() ? -1 : 1;
}
return result;
}
}
(2)編寫OrderSortMapper類
package com.atguigu.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
OrderBean k = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取一行
String line = value.toString();
// 2 擷取
String[] fields = line.split("\t");
// 3 封裝物件
k.setOrder_id(Integer.parseInt(fields[0]));
k.setPrice(Double.parseDouble(fields[2]));
// 4 寫出
context.write(k, NullWritable.get());
}
}
(3)編寫OrderSortGroupingComparator類
package com.atguigu.mapreduce.order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderGroupingComparator extends WritableComparator {
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
int result;
if (aBean.getOrder_id() > bBean.getOrder_id()) {
result = 1;
} else if (aBean.getOrder_id() < bBean.getOrder_id()) {
result = -1;
} else {
result = 0;
}
return result;
}
}
(4)編寫OrderSortReducer類
package com.atguigu.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
(5)編寫OrderSortDriver類
package com.atguigu.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OrderDriver {
public static void main(String[] args) throws Exception, IOException {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
args = new String[]{"e:/input/inputorder" , "e:/output1"};
// 1 獲取設定資訊
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 設定jar包載入路徑
job.setJarByClass(OrderDriver.class);
// 3 載入map/reduce類
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
// 4 設定map輸出數據key和value型別
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 設定最終輸出數據的key和value型別
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// 6 設定輸入數據和輸出數據路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8 設定reduce端的分組
job.setGroupingComparatorClass(OrderGroupingComparator.class);
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
OutputFormat是MapReduce輸出的基礎類別, 所有實現MapReduce輸出都實現了OutputFormat介面。下面 下麪我們介紹幾種常見的OutputFormat實現類。
自定義OutputFormat案例
過濾輸入的log日誌,包含atguigu的網站輸出到e:/atguigu.log,不包含atguigu的網站輸出到e:/other.log。
(1)編寫FilterMapper類
package com.OutputFormat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 寫出
context.write(value, NullWritable.get());
}
}
(2)編寫FilterReducer類
package com.OutputFormat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
Text k = new Text();
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 1 獲取一行
String line = key.toString();
// 2 拼接
line = line + "\r\n";
// 3 設定key
k.set(line);
// 4 輸出
context.write(k, NullWritable.get());
}
}
(3)自定義一個OutputFormat類
package com.OutputFormat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{
@Override
public org.apache.hadoop.mapreduce.RecordWriter<Text, NullWritable> getRecordWriter(
org.apache.hadoop.mapreduce.TaskAttemptContext job) throws IOException, InterruptedException {
// 建立一個RecordWriter
return new FilterRecordWriter(job);
}
}
(4)編寫RecordWriter類
package com.OutputFormat;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
FSDataOutputStream atguiguOut = null;
FSDataOutputStream otherOut = null;
public FilterRecordWriter(TaskAttemptContext job) {
// 1 獲取檔案系統
FileSystem fs;
try {
fs = FileSystem.get(job.getConfiguration());
// 2 建立輸出檔案路徑
Path atguiguPath = new Path("e:/atguigu.log");
Path otherPath = new Path("e:/other.log");
// 3 建立輸出流
atguiguOut = fs.create(atguiguPath);
otherOut = fs.create(otherPath);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 判斷是否包含「atguigu」輸出到不同檔案
if (key.toString().contains("atguigu")) {
atguiguOut.write(key.toString().getBytes());
} else {
otherOut.write(key.toString().getBytes());
}
}
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
// 關閉資源
IOUtils.closeStream(atguiguOut);
IOUtils.closeStream(otherOut);
}
}
(5)編寫FilterDriver類
package com.OutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterDriver {
public static void main(String[] args) throws Exception {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
args = new String[] { "D:\\Users\\Administrator\\Desktop\\FilterDriver.txt", "hdfs://spark1:9000/out" };
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FilterDriver.class);
job.setMapperClass(FilterMapper.class);
job.setReducerClass(FilterReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 要將自定義的輸出格式元件設定到job中
job.setOutputFormatClass(FilterOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
Path outPath = new Path(args[1]);// 輸出路徑
FileSystem fs = outPath.getFileSystem(conf);// 根據輸出路徑找到檔案,參數爲組態檔
if (fs.exists(outPath)) {
fs.delete(outPath);
// fs.delete(outPath, true);true的意思是,就算output有東西,也一帶刪除,預設爲true
}
// 雖然我們自定義了outputformat,但是因爲我們的outputformat繼承自fileoutputformat
// 而fileoutputformat要輸出一個_SUCCESS檔案,所以,在這還得指定一個輸出目錄
FileOutputFormat.setOutputPath(job,outPath);
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
MapTask工作機制 機製如圖所示。
MapTask工作機制 機製
(1)Read階段:MapTask通過使用者編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
(2)Map階段:該節點主要是將解析出的key/value交給使用者編寫map()函數處理,併產生一系列新的key/value。
(3)Collect收集階段:在使用者編寫map()函數中,當數據處理完成後,一般會呼叫OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分割區(呼叫Partitioner),並寫入一個環形記憶體緩衝區中。
(4)Spill階段:即「溢寫」,當環形緩衝區滿後,MapReduce會將數據寫到本地磁碟上,生成一個臨時檔案。需要注意的是,將數據寫入本地磁碟之前,先要對數據進行一次本地排序,並在必要時對數據進行合併、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序演算法對快取區內的數據進行排序,排序方式是,先按照分割區編號Partition進行排序,然後按照key進行排序。這樣,經過排序後,數據以分割區爲單位聚集在一起,且同一分割區內所有數據按照key有序。
步驟2:按照分割區編號由小到大依次將每個分割區中的數據寫入任務工作目錄下的臨時檔案output/spillN.out(N表示當前溢寫次數)中。如果使用者設定了Combiner,則寫入檔案之前,對每個分割區中的數據進行一次聚集操作。
步驟3:將分割區數據的元資訊寫到記憶體索引數據結構SpillRecord中,其中每個分割區的元資訊包括在臨時檔案中的偏移量、壓縮前數據大小和壓縮後數據大小。如果當前記憶體索引大小超過1MB,則將記憶體索引寫到檔案output/spillN.out.index中。
(5)Combine階段:當所有數據處理完成後,MapTask對所有臨時檔案進行一次合併,以確保最終只會生成一個數據檔案。
當所有數據處理完後,MapTask會將所有臨時檔案合併成一個大檔案,並儲存到檔案output/file.out中,同時生成相應的索引檔案output/file.out.index。
在進行檔案合併過程中,MapTask以分割區爲單位進行合併。對於某個分割區,它將採用多輪遞回合併的方式。每輪合併io.sort.factor(預設10)個檔案,並將產生的檔案重新加入待合併列表中,對檔案排序後,重複以上過程,直到最終得到一個大檔案。
讓每個MapTask最終只生成一個數據檔案,可避免同時開啓大量檔案和同時讀取大量小檔案產生的隨機讀取帶來的開銷。
ReduceTask工作機制 機製,如圖所示。
MapTask =16
ReduceTask | 1 | 5 | 10 | 15 | 16 | 20 | 25 | 30 | 45 | 60 |
---|---|---|---|---|---|---|---|---|---|---|
總時間 | 892 | 146 | 110 | 92 | 88 | 100 | 128 | 101 | 145 | 104 |
1、需求
訂單數據表t_order
id | pid | amount |
---|---|---|
1001 | 01 | 1 |
1002 | 02 | 2 |
1003 | 03 | 3 |
1004 | 01 | 4 |
1005 | 02 | 5 |
1006 | 03 | 6 |
商品資訊表t_product
pid | pname |
---|---|
01 | 小米 |
02 | 華爲 |
03 | 格力 |
將商品資訊表中數據根據商品pid合併到訂單數據表中。
最終數據形式
id | pname | amount |
---|---|---|
1001 | 小米 | 1 |
1004 | 小米 | 4 |
1002 | 華爲 | 2 |
1005 | 華爲 | 5 |
1003 | 格力 | 3 |
1006 | 格力 | 6 |
2、需求分析
通過將關聯條件作爲Map輸出的key,將兩表滿足Join條件的數據並攜帶數據所來源的檔案資訊,發往同一個ReduceTask,在Reduce中進行數據的串聯。
3、程式碼實現
1)建立商品和訂合併後的Bean類
package com.atguigu.mapreduce.table;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class TableBean implements Writable {
private String order_id; // 訂單id
private String p_id; // 產品id
private int amount; // 產品數量
private String pname; // 產品名稱
private String flag; // 表的標記
public TableBean() {
super();
}
public TableBean(String order_id, String p_id, int amount, String pname, String flag) {
super();
this.order_id = order_id;
this.p_id = p_id;
this.amount = amount;
this.pname = pname;
this.flag = flag;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
public String getOrder_id() {
return order_id;
}
public void setOrder_id(String order_id) {
this.order_id = order_id;
}
public String getP_id() {
return p_id;
}
public void setP_id(String p_id) {
this.p_id = p_id;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(order_id);
out.writeUTF(p_id);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.order_id = in.readUTF();
this.p_id = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}
@Override
public String toString() {
return order_id + "\t" + pname + "\t" + amount + "\t" ;
}
}
2)編寫TableMapper類
package com.atguigu.mapreduce.table;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{
String name;
TableBean bean = new TableBean();
Text k = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 1 獲取輸入檔案切片
FileSplit split = (FileSplit) context.getInputSplit();
// 2 獲取輸入檔名稱
name = split.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取輸入數據
String line = value.toString();
// 2 不同檔案分別處理
if (name.startsWith("order")) {// 訂單表處理
// 2.1 切割
String[] fields = line.split("\t");
// 2.2 封裝bean物件
bean.setOrder_id(fields[0]);
bean.setP_id(fields[1]);
bean.setAmount(Integer.parseInt(fields[2]));
bean.setPname("");
bean.setFlag("order");
k.set(fields[1]);
}else {// 產品表處理
// 2.3 切割
String[] fields = line.split("\t");
// 2.4 封裝bean物件
bean.setP_id(fields[0]);
bean.setPname(fields[1]);
bean.setFlag("pd");
bean.setAmount(0);
bean.setOrder_id("");
k.set(fields[0]);
}
// 3 寫出
context.write(k, bean);
}
}
3)編寫TableReducer類
package com.atguigu.mapreduce.table;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
// 1準備儲存訂單的集合
ArrayList<TableBean> orderBeans = new ArrayList<>();
// 2 準備bean物件
TableBean pdBean = new TableBean();
for (TableBean bean : values) {
if ("order".equals(bean.getFlag())) {// 訂單表
// 拷貝傳遞過來的每條訂單數據到集閤中
TableBean orderBean = new TableBean();
try {
BeanUtils.copyProperties(orderBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
orderBeans.add(orderBean);
} else {// 產品表
try {
// 拷貝傳遞過來的產品表到記憶體中
BeanUtils.copyProperties(pdBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 3 表的拼接
for(TableBean bean:orderBeans){
bean.setPname (pdBean.getPname());
// 4 數據寫出去
context.write(bean, NullWritable.get());
}
}
}
4)編寫TableDriver類
package com.atguigu.mapreduce.table;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TableDriver {
public static void main(String[] args) throws Exception {
// 0 根據自己電腦路徑重新設定
args = new String[]{"e:/input/inputtable","e:/output1"};
// 1 獲取設定資訊,或者job物件範例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程式的jar包所在的本地路徑
job.setJarByClass(TableDriver.class);
// 3 指定本業務job要使用的Mapper/Reducer業務類
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
// 4 指定Mapper輸出數據的kv型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
// 5 指定最終輸出的數據的kv型別
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
// 6 指定job的輸入原始檔案所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 將job中設定的相關參數,以及job所用的java類所在的jar包, 提交給yarn去執行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
2020-08-10 10:57:17,561 INFO [org.apache.hadoop.mapred.MapTask] - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2020-08-10 10:57:17,570 INFO [org.apache.hadoop.mapred.MapTask] - Starting flush of map output
2020-08-10 10:57:17,588 INFO [org.apache.hadoop.mapred.LocalJobRunner] - map task executor complete.
2020-08-10 10:57:17,594 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local1090202339_0001
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.FileSplit cannot be cast to org.apache.hadoop.mapred.FileSplit
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.FileSplit cannot be cast to org.apache.hadoop.mapred.FileSplit
at com.ReduceJoin.TableMapper.setup(TableMapper.java:25)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-08-10 10:57:17,821 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1090202339_0001 running in uber mode : false
2020-08-10 10:57:17,824 INFO [org.apache.hadoop.mapreduce.Job] - map 0% reduce 0%
2020-08-10 10:57:17,829 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1090202339_0001 failed with state FAILED due to: NA
2020-08-10 10:57:17,840 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 0
4.測試
執行程式檢視結果
1001 小米 1
1001 小米 1
1002 華爲 2
1002 華爲 2
1003 格力 3
1003 格力 3
5.總結
缺點:這種方式中, 合併的操作是在Reduce階段完成, Reduce端的處理壓力太大, Map節點的運算負載則很低, 資源利用率不高, 且在Reduce階段極易產生數據傾斜。
解決方案:Map端實現數據合併。
1.使用場景
Map Join適用於一張表十分小、一張表很大的場景。
2.優點
在Map端快取多張表,提前處理業務邏輯,這樣增加Map端業務,減少Reduce端數據的壓力,儘可能的減少數據傾斜。
3.具體辦法:採用DistributedCache
(1)在Mapper的setup階段,將檔案讀取到快取集閤中。
(2)在驅動函數中載入快取。
// 快取普通檔案到Task執行節點。
job.addCacheFile(new URI(「file://e:/cache/pd.txt」));
實現程式碼
(1)先在驅動模組中新增快取檔案
package test;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DistributedCacheDriver {
public static void main(String[] args) throws Exception {
// 0 根據自己電腦路徑重新設定
args = new String[]{"e:/input/inputtable2", "e:/output1"};
// 1 獲取job資訊
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 設定載入jar包路徑
job.setJarByClass(DistributedCacheDriver.class);
// 3 關聯map
job.setMapperClass(DistributedCacheMapper.class);
// 4 設定最終輸出數據型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 5 設定輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 載入快取數據
job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt"));
// 7 Map端Join的邏輯不需要Reduce階段,設定reduceTask數量爲0
job.setNumReduceTasks(0);
// 8 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(2)讀取快取的檔案數據
package test;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Map<String, String> pdMap = new HashMap<>();
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 1 獲取快取的檔案
URI[] cacheFiles = context.getCacheFiles();
String path = cacheFiles[0].getPath().toString();
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
String line;
while(StringUtils.isNotEmpty(line = reader.readLine())){
// 2 切割
String[] fields = line.split("\t");
// 3 快取數據到集合
pdMap.put(fields[0], fields[1]);
}
// 4 關流
reader.close();
}
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取一行
String line = value.toString();
// 2 擷取
String[] fields = line.split("\t");
// 3 獲取產品id
String pId = fields[1];
// 4 獲取商品名稱
String pdName = pdMap.get(pId);
// 5 拼接
k.set(line + "\t"+ pdName);
// 6 寫出
context.write(k, NullWritable.get());
}
}
Hadoop爲每個作業維護若幹內建計數器, 以描述多項指標。例如, 某些計數器記錄已處理的位元組數和記錄數,使使用者可監控已處理的輸入數據量和已產生的輸出數據量.
1.計數器API
(1)採用列舉的方式統計計數
enum My Counter(MALFORORMED,NORMAL)
//對列舉定義的自定義計數器加1
context.get Counter(MyCounter.MALFORORMED) increment(1);
(2)採用計數器組、計數器名稱的方式統計
context.get Counter(」counterGroup」, 」counter」) .increment(1);
組名和計數器名稱隨便起,但最好有意義。
(3)計數結果在程式執行後的控制檯上檢視。
計數器案例詳見數據清洗案例。
在執行核心業務MapReduce程式之前,往往要先對數據進行清洗,清理掉不符合使用者要求的數據。清理的過程往往只需要執行Mapper程式,不需要執行Reduce程式。
package com.atguigu.mapreduce.weblog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取1行數據
String line = value.toString();
// 2 解析日誌
boolean result = parseLog(line,context);
// 3 日誌不合法退出
if (!result) {
return;
}
// 4 設定key
k.set(line);
// 5 寫出數據
context.write(k, NullWritable.get());
}
// 2 解析日誌
private boolean parseLog(String line, Context context) {
// 1 擷取
String[] fields = line.split(" ");
// 2 日誌長度大於11的爲合法
if (fields.length > 11) {
// 系統計數器
context.getCounter("map", "true").increment(1);
return true;
}else {
context.getCounter("map", "false").increment(1);
return false;
}
}
}
(2)編寫LogDriver類
package com.atguigu.mapreduce.weblog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogDriver {
public static void main(String[] args) throws Exception {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
args = new String[] { "e:/input/inputlog", "e:/output1" };
// 1 獲取job資訊
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 載入jar包
job.setJarByClass(LogDriver.class);
// 3 關聯map
job.setMapperClass(LogMapper.class);
// 4 設定最終輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 設定reducetask個數爲0
job.setNumReduceTasks(0);
// 5 設定輸入和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交
job.waitForCompletion(true);
}
}
package com.atguigu.mapreduce.log;
public class LogBean {
private String remote_addr;// 記錄用戶端的ip地址
private String remote_user;// 記錄用戶端使用者名稱稱,忽略屬性"-"
private String time_local;// 記錄存取時間與時區
private String request;// 記錄請求的url與http協定
private String status;// 記錄請求狀態;成功是200
private String body_bytes_sent;// 記錄發送給用戶端檔案主體內容大小
private String http_referer;// 用來記錄從那個頁面鏈接存取過來的
private String http_user_agent;// 記錄客戶瀏覽器的相關資訊
private boolean valid = true;// 判斷數據是否合法
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\001").append(this.remote_addr);
sb.append("\001").append(this.remote_user);
sb.append("\001").append(this.time_local);
sb.append("\001").append(this.request);
sb.append("\001").append(this.status);
sb.append("\001").append(this.body_bytes_sent);
sb.append("\001").append(this.http_referer);
sb.append("\001").append(this.http_user_agent);
return sb.toString();
}
}
(2)編寫LogMapper類
package com.atguigu.mapreduce.log;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取1行
String line = value.toString();
// 2 解析日誌是否合法
LogBean bean = parseLog(line);
if (!bean.isValid()) {
return;
}
k.set(bean.toString());
// 3 輸出
context.write(k, NullWritable.get());
}
// 解析日誌
private LogBean parseLog(String line) {
LogBean logBean = new LogBean();
// 1 擷取
String[] fields = line.split(" ");
if (fields.length > 11) {
// 2封裝數據
logBean.setRemote_addr(fields[0]);
logBean.setRemote_user(fields[1]);
logBean.setTime_local(fields[3].substring(1));
logBean.setRequest(fields[6]);
logBean.setStatus(fields[8]);
logBean.setBody_bytes_sent(fields[9]);
logBean.setHttp_referer(fields[10]);
if (fields.length > 12) {
logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
}else {
logBean.setHttp_user_agent(fields[11]);
}
// 大於400,HTTP錯誤
if (Integer.parseInt(logBean.getStatus()) >= 400) {
logBean.setValid(false);
}
}else {
logBean.setValid(false);
}
return logBean;
}
}
(3)編寫LogDriver類
package com.atguigu.mapreduce.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogDriver {
public static void main(String[] args) throws Exception {
// 1 獲取job資訊
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 載入jar包
job.setJarByClass(LogDriver.class);
// 3 關聯map
job.setMapperClass(LogMapper.class);
// 4 設定最終輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 5 設定輸入和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交
job.waitForCompletion(true);
}
}
在編寫MapReduce程式時,需要考慮如下幾個方面:
輸入數據介面:InputFormat
(1) 預設使用的實現類是:TextInputFormat
(2) TextInputFormat的功能邏輯是:一次讀一行文字, 然後將該行的起始偏移量作爲key, 行內容作爲value返回。
(3) KeyValueTextInputFormat每一行均爲一條記錄, 被分隔符分割爲key,value。預設分隔符是tab(\t) 。
(4) NlineInputFormat按照指定的行數N來劃分切片。
(5) CombineTextInputFormat可以把多個小檔案合併成一個切片處理, 提高處理效率。
(6) 使用者還可以自定義InputFormat。
邏輯處理介面:Mapper
使用者根據業務需求實現其中三個方法:map();setup();cleanup()
Partitioner分割區
(1)有預設實現
HashPartitioner, 邏輯是根據key的雜湊值和numReduces來返回一個分割區號;
key.hashCode() & Integer.MAXVALUE % numReduces
(2)如果業務上有特別的需求,可以自定義分割區。
Comparable排序
(1) 當我們用自定義的物件作爲key來輸出時, 就必須要實現WritableComparable介面, 重寫其中的compareTo()方法。
(2)部分排序:對最終輸出的每一個檔案進行內部排序。
(3) 全排序:對所有數據進行排序, 通常只有一個Reduce。
(4)二次排序:排序的條件有兩個。
Combiner合併
Combiner合併可以提高程式執行效率, 減少IO傳輸。但是使用時必須不能影響原有的業務處理結果。
Reduce端分組:Grouping Comparator
在Reduce端對key進行分組。應用於:在接收的key爲bean物件時, 想讓一個或幾個欄位相同(全部欄位比較不相同) 的key進入到同一個reduce方法時, 可以採用分組排序。
邏輯處理介面:Reducer
使用者根據業務需求實現其中三個方法:reduce() setup() cleanup()
輸出數據介面:OutputFormat
(1) 預設實現類是TextOutputFormat, 功能邏輯是:將每一個KV對, 向目標文字檔案輸出一行。
(2) 將SequenceFileOutputFormat輸出作爲後續MapReduce任務的輸入, 這便是一種好的輸出格式,因爲它的格式緊湊,很容易被壓縮。
(3) 使用者還可以自定義Output Format。
壓縮格式 | hadoop自帶? | 演算法 | 副檔名 | 是否可切分 | 換成壓縮格式後,原來的程式是否需要修改 |
---|---|---|---|---|---|
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文字處理一樣,不需要修改 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文字處理一樣,不需要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文字處理一樣,不需要修改 |
LZO | 否,需要安裝 | LZO | .lzo | 是 | 需要建索引,還需要指定輸入格式 |
Snappy | 否,需要安裝 | Snappy | .snappy | 否 | 和文字處理一樣,不需要修改 |
爲了支援多種壓縮/解壓縮演算法,Hadoop引入了編碼/解碼器,如下表所示。
壓縮格式 | 對應的編碼/解碼器 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
壓縮效能的比較
壓縮演算法 | 原始檔案大小 | 壓縮檔案大小 | 壓縮速度 | 解壓速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
LZO | 8.3GB | 2.9GB | 49.3MB/s | 74.6MB/s |
http://google.github.io/snappy/
在64位元模式的CoreI7處理器的單核上,Snappy壓縮速度約爲250 MB/s或更高,解壓縮速度約爲500 MB/s或更高。
Gzip壓縮
優點:壓縮率比較高, 而且壓縮/解壓速度也比較快; Had oop本身支援, 在應用中處理Gzip格式的檔案就和直接處理文字一樣; 大部分Linux系統都自帶Gzip命令, 使用方便。
缺點:不支援Split。
應用場景:當每個檔案壓縮之後在130M以內的(1個塊大小內),都可以考慮用Gzip壓縮格式。例如說一天或者一個小時的日誌壓縮成一個Gzip檔案。
Bzip2壓縮
優點:支援Split; 具有很高的壓縮率, 比Gzip壓縮率都高; Had oop本身自帶,使用方便。
缺點:壓縮/解壓速度慢。
應用場景:適合對速度要求不高,但需要較高的壓縮率的時候;或者輸出之後的數據比較大,處理之後的數據需要壓縮存檔減少磁碟空間並且以後數據用得比較少的情況;或者對單個很大的文字檔案想壓縮減少儲存空間,同時又需要支援Split, 而且相容之前的應用程式的情況。
Lzo壓縮
優點:壓縮/解壓速度也比較快, 合理的壓縮率; 支援Split, 是Hadoop中最流行的壓縮格式; 可以在Linux系統下安裝lzop命令, 使用方便。
缺點:壓縮率比Gzip要低一些; Had oop本身不支援, 需要安裝; 在應用中對Lzo格式的檔案需要做一些特殊處理(爲了支援Spt需要建索引, 還需要指定Input Format爲Lzo格式) 。
應用場景:一個很大的文字檔案,壓縮之後還大於200M以上的可以考慮,而且單個檔案越大, Lzo優點越越明顯。
Snappy壓縮
優點:高速壓縮速度和合理的壓縮率。
缺點:不支援Split; 壓縮率比Gzip要低; Had oop本身不支援, 需要安裝。
應用場景:當Map Reduce作業的Map輸出的數據比較大的時候, 作爲Map到Reduce的中間數據的壓縮格式; 或者作爲一個Map Reduce作業的輸出和另外一個Map Reduce作業的輸入。
壓縮可以在MapReduce作用的任意階段啓用,如圖MapReduce數據壓縮
要在Hadoop中啓用壓縮,可以設定如下參數:
參數 | 預設值 | 階段 | 建議 |
---|---|---|---|
io.compression.codecs(在core-site.xml中設定) | org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec | 輸入壓縮 | Hadoop使用副檔名判斷是否支援某種編解碼器 |
mapreduce.map.output.compress(在mapred-site.xml中設定) | false | mapper輸出 | 這個參數設爲true啓用壓縮 |
mapreduce.map.output.compress.codec(在mapred-site.xml中設定) | org.apache.hadoop.io.compress.DefaultCodec | mapper輸出 | 企業多使用LZO或Snappy編解碼器在此階段壓縮數據 |
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中設定) | false | reducer輸出 | 這個參數設爲true啓用壓縮 |
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中設定) | org.apache.hadoop.io.compress. DefaultCodec | reducer輸出 | 使用標準工具或者編解碼器,如gzip和bzip2 |
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中設定) | RECORD | reducer輸出 | SequenceFile輸出使用的壓縮型別:NONE和BLOCK |
4.6.1 數據流的壓縮和解壓縮
測試一下如下壓縮方式:
表4-11
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
package com.atguigu.mapreduce.compress;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
public class TestCompress {
public static void main(String[] args) throws Exception {
compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");
// decompress("e:/hello.txt.bz2");
}
// 1、壓縮
private static void compress(String filename, String method) throws Exception {
// (1)獲取輸入流
FileInputStream fis = new FileInputStream(new File(filename));
Class codecClass = Class.forName(method);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
// (2)獲取輸出流
FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
CompressionOutputStream cos = codec.createOutputStream(fos);
// (3)流的對拷
IOUtils.copyBytes(fis, cos, 1024*1024*5, false);
// (4)關閉資源
cos.close();
fos.close();
fis.close();
}
// 2、解壓縮
private static void decompress(String filename) throws FileNotFoundException, IOException {
// (0)校驗是否能解壓縮
CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
CompressionCodec codec = factory.getCodec(new Path(filename));
if (codec == null) {
System.out.println("cannot find codec for file " + filename);
return;
}
// (1)獲取輸入流
CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
// (2)獲取輸出流
FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));
// (3)流的對拷
IOUtils.copyBytes(cis, fos, 1024*1024*5, false);
// (4)關閉資源
cis.close();
fos.close();
}
}
4.6.2 Map輸出端採用壓縮
即使你的MapReduce的輸入輸出檔案都是未壓縮的檔案,你仍然可以對Map任務的中間結果輸出做壓縮,因爲它要寫在硬碟並且通過網路傳輸到Reduce節點,對其壓縮可以提高很多效能,這些工作只要設定兩個屬性即可,我們來看下程式碼怎麼設定。
1.給大家提供的Hadoop原始碼支援的壓縮格式有:BZip2Codec 、DefaultCodec
package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
// 開啓map端輸出壓縮
configuration.setBoolean("mapreduce.map.output.compress", true);
// 設定map端輸出壓縮方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 1 : 0);
}
}
2.Mapper保持不變
package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
// 1 獲取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 回圈寫出
for(String word:words){
k.set(word);
context.write(k, v);
}
}
}
3.Reducer保持不變
package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
// 1 彙總
for(IntWritable value:values){
sum += value.get();
}
v.set(sum);
// 2 輸出
context.write(key, v);
}
}
4.6.3 Reduce輸出端採用壓縮
基於WordCount案例處理。
1.修改驅動
package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 設定reduce端輸出壓縮開啓
FileOutputFormat.setCompressOutput(job, true);
// 設定壓縮的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
boolean result = job.waitForCompletion(true);
System.exit(result?1:0);
}
}
2.Mapper和Reducer保持不變(詳見4.6.2)
Yarn是一個資源排程平臺,負責爲運算程式提供伺服器運算資源,相當於一個分佈式的操作系統平臺,而MapReduce等運算程式則相當於執行於操作系統之上的應用程式。
YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等元件構成
1.Yarn執行機制 機製。
2.工作機制 機製詳解
(1)MR程式提交到用戶端所在的節點。
(2)YarnRunner向ResourceManager申請一個Application。
(3)RM將該應用程式的資源路徑返回給YarnRunner。
(4)該程式將執行所需資源提交到HDFS上。
(5)程式資源提交完畢後,申請執行mrAppMaster。
(6)RM將使用者的請求初始化成一個Task。
(7)其中一個NodeManager領取到Task任務。
(8)該NodeManager建立容器Container,併產生MRAppmaster。
(9)Container從HDFS上拷貝資源到本地。
(10)MRAppmaster向RM 申請執行MapTask資源。
(11)RM將執行MapTask任務分配給另外兩個NodeManager,另兩個NodeManager分別領取任務並建立容器。
(12)MR向兩個接收到任務的NodeManager發送程式啓動指令碼,這兩個NodeManager分別啓動MapTask,MapTask對數據分割區排序。
(13)MrAppMaster等待所有MapTask執行完畢後,向RM申請容器,執行ReduceTask。
(14)ReduceTask向MapTask獲取相應分割區的數據。
(15)程式執行完畢後,MR會向RM申請註銷自己。
1.作業提交過程之YARN。
作業提交過程之Yarn
作業提交全過程詳解
(1)作業提交
第1步:Client呼叫job.waitForCompletion方法,向整個叢集提交MapReduce作業。
第2步:Client向RM申請一個作業id。
第3步:RM給Client返回該job資源的提交路徑和作業id。
第4步:Client提交jar包、切片資訊和組態檔到指定的資源提交路徑。
第5步:Client提交完資源後,向RM申請執行MrAppMaster。
(2)作業初始化
第6步:當RM收到Client的請求後,將該job新增到容量排程器中。
第7步:某一個空閒的NM領取到該Job。
第8步:該NM建立Container,併產生MRAppmaster。
第9步:下載Client提交的資源到本地。
(3)任務分配
第10步:MrAppMaster向RM申請執行多個MapTask任務資源。
第11步:RM將執行MapTask任務分配給另外兩個NodeManager,另兩個NodeManager分別領取任務並建立容器。
(4)任務執行
第12步:MR向兩個接收到任務的NodeManager發送程式啓動指令碼,這兩個NodeManager分別啓動MapTask,MapTask對數據分割區排序。
第13步:MrAppMaster等待所有MapTask執行完畢後,向RM申請容器,執行ReduceTask。
第14步:ReduceTask向MapTask獲取相應分割區的數據。
第15步:程式執行完畢後,MR會向RM申請註銷自己。
(5)進度和狀態更新
YARN中的任務將其進度和狀態(包括counter)返回給應用管理器, 用戶端每秒(通過mapreduce.client.progressmonitor.pollinterval設定)嚮應用管理器請求進度更新, 展示給使用者。
(6)作業完成
除了嚮應用管理器請求作業進度外, 用戶端每5秒都會通過呼叫waitForCompletion()來檢查作業是否完成。時間間隔可以通過mapreduce.client.completion.pollinterval來設定。作業完成之後, 應用管理器和Container會清理工作狀態。作業的資訊會被作業歷史伺服器儲存以備之後使用者覈查。
2.作業提交過程之MapReduce.
作業提交過程之MapReduce
目前,Hadoop作業排程器主要有三種:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop2.7.2預設的資源排程器是Capacity Scheduler。
具體設定詳見:yarn-default.xml檔案
<property>
<description>The class to use as the resource scheduler.</description>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
1.先進先出排程器(FIFO)
2.容量排程器(Capacity Scheduler)
3.公平排程器(Fair Scheduler)
1.作業完成時間取決於最慢的任務完成時間
一個作業由若幹個Map任務和Reduce任務構成。因硬體老化、軟體Bug等,某些任務可能執行非常慢。
思考:系統中有99%的Map任務都完成了,只有少數幾個Map老是進度很慢,完不成,怎麼辦?
2.推測執行機制 機製
發現拖後腿的任務,比如某個任務執行速度遠慢於任務平均速度。爲拖後腿任務啓動一個備份任務,同時執行。誰先執行完,則採用誰的結果。
3.執行推測任務的前提條件
(1)每個Task只能有一個備份任務
(2)當前Job已完成的Task必須不小於0.05(5%)
(3)開啓推測執行參數設定。mapred-site.xml檔案中預設是開啓的。
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some map tasks may be executed in parallel.</description>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
</property>
4.不能啓用推測執行機制 機製情況
(1)任務間存在嚴重的負載傾斜;
(2)特殊任務,比如任務向數據庫中寫數據。
5.演算法原理
MapReduce 跑的慢的原因, MapReduce程式效率的瓶頸在於兩點:
MapReduce優化方法主要從六個方面考慮:數據輸入、Map階段、Reduce階段、IO傳輸、數據傾斜問題和常用的調優參數。
設定參數 | 參數說明 |
---|---|
mapreduce.map.memory.mb | 一個MapTask可使用的資源上限(單位:MB),預設爲1024。如果MapTask實際使用的資源量超過該值,則會被強制殺死。 |
mapreduce.reduce.memory.mb | 一個ReduceTask可使用的資源上限(單位:MB),預設爲1024。如果ReduceTask實際使用的資源量超過該值,則會被強制殺死。 |
mapreduce.map.cpu.vcores | 每個MapTask可使用的最多cpu core數目,預設值: 1 |
mapreduce.reduce.cpu.vcores | 每個ReduceTask可使用的最多cpu core數目,預設值: 1 |
mapreduce.reduce.shuffle.parallelcopies | 每個Reduce去Map中取數據的並行數。預設值是5 |
mapreduce.reduce.shuffle.merge.percent | Buffer中的數據達到多少比例開始寫入磁碟。預設值0.66 |
mapreduce.reduce.shuffle.input.buffer.percent | Buffer大小佔Reduce可用記憶體的比例。預設值0.7 |
mapreduce.reduce.input.buffer.percent | 指定多少比例的記憶體用來存放Buffer中的數據,預設值是0.0 |
(2)應該在YARN啓動之前就設定在伺服器的組態檔中才能 纔能生效(yarn-default.xml)
設定參數 | 參數說明 |
---|---|
yarn.scheduler.minimum-allocation-mb | 給應用程式Container分配的最小記憶體,預設值:1024 |
yarn.scheduler.maximum-allocation-mb | 給應用程式Container分配的最大記憶體,預設值:8192 |
yarn.scheduler.minimum-allocation-vcores | 每個Container申請的最小CPU核數,預設值:1 |
yarn.scheduler.maximum-allocation-vcores | 每個Container申請的最大CPU核數,預設值:3 |
yarn.nodemanager.resource.memory-mb | 給Containers分配的最大實體記憶體,預設值:8192 |
(3)Shuffle效能優化的關鍵參數,應在YARN啓動之前就設定好(mapred-default.xml)
設定參數 | 參數說明 |
---|---|
mapreduce.task.io.sort.mb | Shuffle的環形緩衝區大小,預設100m |
mapreduce.map.sort.spill.percent | 環形緩衝區溢位的閾值,預設80% |
2.容錯相關參數(MapReduce效能優化)
設定參數 | 參數說明 |
---|---|
mapreduce.map.maxattempts | 每個Map Task最大重試次數,一旦重試參數超過該值,則認爲Map Task執行失敗,預設值:4。 |
mapreduce.reduce.maxattempts | 每個Reduce Task最大重試次數,一旦重試參數超過該值,則認爲Map Task執行失敗,預設值:4。 |
mapreduce.task.timeout | Task超時時間,經常需要設定的一個參數,該參數表達的意思爲:如果一個Task在一定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認爲該Task處於Block狀態,可能是卡住了,也許永遠會卡住,爲了防止因爲使用者程式永遠Block住不退出,則強制設定了一個該超時時間(單位毫秒),預設是600000。如果你的程式對每條輸入數據的處理時間過長(比如會存取數據庫,通過網路拉取數據等),建議將該參數調大,該參數過小常出現的錯誤提示是「AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.」。 |
HDFS小檔案弊端
HDFS上每個檔案都要在NameNode上建立一個索引,這個索引的大小約爲150byte,這樣當小檔案比較多的時候,就會產生很多的索引檔案,一方面會大量佔用NameNode的記憶體空間,另一方面就是索引檔案過大使得索引速度變慢。
6.3.2 HDFS小檔案解決方案
小檔案的優化無非以下幾種方式:
(1)在數據採集的時候,就將小檔案或小批數據合成大檔案再上傳HDFS。
(2)在業務處理之前,在HDFS上使用MapReduce程式對小檔案進行合併。
(3)在MapReduce處理時,可採用CombineTextInputFormat提高效率。
1.Hadoop Archive
是一個高效地將小檔案放入HDFS塊中的檔案存檔工具, 它能夠將多個小檔案打包成一個HAR檔案, 這樣就減少了Name Node的記憶體使用。
2.Sequence File
Sequence File由一系列的二進制key/value組成, 如果key爲檔名, value爲檔案內容,則可以將大批小檔案合併成一個大檔案。
3.CombineFileInputFormat
CombineFileInputFormat是一種新的InputFormat, 用於將多個檔案合併成一個單獨的Split, 另外, 它會考慮數據的儲存位置。
4.開啓JVM重用
對於大量小檔案Job, 可以開啓JVM重用會減少45%執行時間。JVM重用原理:一個Map執行在一個JVM上, 開啓重用的話, 該Map在JVM上執行完畢後, JVM繼續執行其他Map。
具體設定:map reduce.job.jvm.num tasks值在10-20之間。
宣告:文章內容來自尚硅谷,B站有免費視屏。