最近在學習Hadoop的MapReduce,此處記錄一下如何實現 TopN
的效果,以及在MapReduce中如何實現 自定義分組
。
我們有一份資料,資料中存在如下3個欄位,訂單編號
,訂單項
和訂單項價格
。 輸出的資料,需求如下:
訂單編號
與訂單編號之間需要正序
輸出。每個訂單價格最高的2個訂單項
。訂單編號
與訂單編號之間需要正序
輸出,那麼訂單編號
必須要作為Key
,因為只有Key才有排序操作。每個訂單價格最高的2個訂單項
: 這個輸出是在reduce
階段,並且是每個訂單
,因此需要根據訂單編號
進行分組操作(前後2個key比較,相同則為一組
),而分組也只有Key
才有,因此就需要JavaBean(訂單編號、訂單項、訂單項價格)
來作為組合Key
。訂單編號
與訂單編號之間需要正序
輸出 && 輸出每個訂單價格最高的2個訂單項
: 可以看出在Key中的排序規則為:根據訂單編號
升序,然後根據訂單項價格
倒序排序, 並且是根據訂單編號
來分組。預設MapReduce中預設的分割區規則
是,根據key的hascode來進行分割區,而 分割區 下是有多個 分組
,每個分組呼叫一次reduce方法
。 而我們上方的思路是,根據訂單編號
來進行分組,當我們Key是JavaBean組合Key時,相同的訂單編號
所在的JavaBean會被分在一個分組嗎,這個不一定,因為JavaBean的hashcode不一定一致,因此就需要我們自定義分割區(繼承Partitioner
類)。此處我們job.setNumReduceTasks設定為1個,因此不考慮這個分割區的問題
。reduce
方法。20230713000010 item-101 10
20230713000010 item-102 30
20230713000015 item-151 10
20230713000015 item-152 20
20230713000010 item-103 20
20230713000015 item-153 30
20230713000012 item-121 50
20230713000012 item-122 10
20230713000012 item-123 30
訂單編號 訂單項 訂單項價格
20230713000012 item-123 30
每行資料的分隔符為空格
20230713000010 item-102 30
20230713000010 item-103 20
20230713000012 item-121 50
20230713000012 item-123 30
20230713000015 item-153 30
20230713000015 item-152 20
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.huan.hadoop.mr.TopNDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
package com.huan.hadoop.mr;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 訂單資料
*
* @author huan.fu
* @date 2023/7/13 - 14:20
*/
@Getter
@Setter
public class OrderVo implements WritableComparable<OrderVo> {
/**
* 訂單編號
*/
private long orderId;
/**
* 訂單項
*/
private String itemId;
/**
* 訂單項價格
*/
private long price;
@Override
public int compareTo(OrderVo o) {
// 排序: 根據 訂單編號 升序, 如果訂單編號相同,則根據 訂單項價格 倒序
int result = Long.compare(this.orderId, o.orderId);
if (result == 0) {
// 等於0說明 訂單編號 相同,則需要根據 訂單項價格 倒序
result = -Long.compare(this.price, o.price);
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
// 序列化
out.writeLong(orderId);
out.writeUTF(itemId);
out.writeLong(price);
}
@Override
public void readFields(DataInput in) throws IOException {
// 反序列化
this.orderId = in.readLong();
this.itemId = in.readUTF();
this.price = in.readLong();
}
@Override
public String toString() {
return this.getOrderId() + "\t" + this.getItemId() + "\t" + this.getPrice();
}
}
WritableComparable
介面排序
和序列化
方法package com.huan.hadoop.mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 分組: 訂單編號相同說明是同一組,否則是不同的組
*
* @author huan.fu
* @date 2023/7/13 - 14:30
*/
public class TopNGroupingComparator extends WritableComparator {
public TopNGroupingComparator() {
// 第二個引數為true: 表示可以通過反射建立範例
super(OrderVo.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 訂單編號 相同說明是同一個物件,否則是不同的物件
return ((OrderVo) a).getOrderId() == ((OrderVo) b).getOrderId() ? 0 : 1;
}
}
WritableComparator
介面,自定義分組規則。reduce
階段,前後2個key比較,相同則為一組,一組呼叫一次reduce
方法。package com.huan.hadoop.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* map 操作: 輸出的key為OrderVo, 輸出的value為: price
*
* @author huan.fu
* @date 2023/7/13 - 14:28
*/
public class TopNMapper extends Mapper<LongWritable, Text, OrderVo, LongWritable> {
private final OrderVo outKey = new OrderVo();
private final LongWritable outValue = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderVo, LongWritable>.Context context) throws IOException, InterruptedException {
// 獲取一行資料 20230713000010 item-101 10
String row = value.toString();
// 根據 \t 進行分割
String[] cells = row.split("\\s+");
// 獲取訂單編號
long orderId = Long.parseLong(cells[0]);
// 獲取訂單項
String itemId = cells[1];
// 獲取訂單項價格
long price = Long.parseLong(cells[2]);
// 設定值
outKey.setOrderId(orderId);
outKey.setItemId(itemId);
outKey.setPrice(price);
outValue.set(price);
// 寫出
context.write(outKey, outValue);
}
}
package com.huan.hadoop.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reduce操作: Key(OrderVo)相同的分為一組, 此處 OrderVo 作為key, 分組是根據 TopNGroupingComparator 來實現,
* 即 訂單編號 相同的認為一組
*
* @author huan.fu
* @date 2023/7/13 - 14:29
*/
public class TopNReducer extends Reducer<OrderVo, LongWritable, OrderVo, NullWritable> {
@Override
protected void reduce(OrderVo key, Iterable<LongWritable> values, Reducer<OrderVo, LongWritable, OrderVo, NullWritable>.Context context) throws IOException, InterruptedException {
int topN = 0;
// 隨著每次遍歷, key的 orderId 是相同的(因為是根據這個分組的),但是裡面的itemId和price是不同的
for (LongWritable price : values) {
topN++;
if (topN > 2) {
break;
}
// 注意: 此處的key每次輸出都不一樣
context.write(key, NullWritable.get());
}
}
}
package com.huan.hadoop.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author huan.fu
* @date 2023/7/13 - 14:29
*/
public class TopNDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
// 構建設定物件
Configuration configuration = new Configuration();
// 使用 ToolRunner 提交程式
int status = ToolRunner.run(configuration, new TopNDriver(), args);
// 退出程式
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
// 構建Job物件範例 引數(設定物件,Job物件名稱)
Job job = Job.getInstance(getConf(), "topN");
// 設定mr程式執行的主類
job.setJarByClass(TopNDriver.class);
// 設定mr程式執行的 mapper型別和reduce型別
job.setMapperClass(TopNMapper.class);
job.setReducerClass(TopNReducer.class);
// 指定mapper階段輸出的kv資料型別
job.setMapOutputKeyClass(OrderVo.class);
job.setMapOutputValueClass(LongWritable.class);
// 指定reduce階段輸出的kv資料型別,業務mr程式輸出的最終型別
job.setOutputKeyClass(OrderVo.class);
job.setOutputValueClass(NullWritable.class);
// 設定本例子中的輸入資料路徑和輸出資料路徑,預設輸入輸出元件為: TextInputFormat和TextOutputFormat
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 先刪除輸出目錄(方便本地測試)
FileSystem.get(this.getConf()).delete(new Path(args[1]), true);
// 設定分組
job.setGroupingComparatorClass(TopNGroupingComparator.class);
return job.waitForCompletion(true) ? 0 : 1;
}
}
job.setGroupingComparatorClass(TopNGroupingComparator.class);
https://gitee.com/huan1993/spring-cloud-parent/tree/master/hadoop/mr-topn-group
本文來自部落格園,作者:huan1993,轉載請註明原文連結:https://www.cnblogs.com/huan1993/p/17558581.html