MapReduce實現TopN的效果

2023-07-17 15:00:36

1、背景

最近在學習Hadoop的MapReduce,此處記錄一下如何實現 TopN 的效果,以及在MapReduce中如何實現 自定義分組

2、需求

我們有一份資料,資料中存在如下3個欄位,訂單編號,訂單項訂單項價格。 輸出的資料,需求如下:

  1. 訂單編號與訂單編號之間需要正序輸出。
  2. 輸出每個訂單價格最高的2個訂單項

3、分析

  1. 訂單編號與訂單編號之間需要正序輸出,那麼訂單編號必須要作為Key,因為只有Key才有排序操作。
  2. 輸出每個訂單價格最高的2個訂單項: 這個輸出是在reduce階段,並且是每個訂單,因此需要根據訂單編號進行分組操作(前後2個key比較,相同則為一組),而分組也只有Key才有,因此就需要JavaBean(訂單編號、訂單項、訂單項價格)來作為組合Key
  3. 訂單編號與訂單編號之間需要正序輸出 && 輸出每個訂單價格最高的2個訂單項: 可以看出在Key中的排序規則為:根據訂單編號升序,然後根據訂單項價格倒序排序, 並且是根據訂單編號來分組。
  4. 我們知道預設MapReduce中預設的分割區規則是,根據key的hascode來進行分割區,而 分割區 下是有多個 分組,每個分組呼叫一次reduce方法。 而我們上方的思路是,根據訂單編號來進行分組,當我們Key是JavaBean組合Key時,相同的訂單編號所在的JavaBean會被分在一個分組嗎,這個不一定,因為JavaBean的hashcode不一定一致,因此就需要我們自定義分割區(繼承Partitioner類)。此處我們job.setNumReduceTasks設定為1個,因此不考慮這個分割區的問題
  5. 一個分割區下有多個分組,每個分組呼叫一次reduce方法。

4、準備資料

4.1 準備資料

20230713000010  item-101    10
20230713000010  item-102    30
20230713000015  item-151    10
20230713000015  item-152    20
20230713000010  item-103    20
20230713000015  item-153    30
20230713000012  item-121    50
20230713000012  item-122    10
20230713000012  item-123    30

4.2 每行資料格式

訂單編號          訂單項      訂單項價格
20230713000012  item-123    30

每行資料的分隔符為空格

4.3 期望輸出結果

20230713000010  item-102    30
20230713000010  item-103    20
20230713000012  item-121    50
20230713000012  item-123    30
20230713000015  item-153    30
20230713000015  item-152    20

5、編碼實現

5.1 引入jar包

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.4</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <version>3.2.2</version>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <classpathPrefix>lib/</classpathPrefix>
                        <mainClass>com.huan.hadoop.mr.TopNDriver</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
    </plugins>
</build>

5.2 編寫實體類

package com.huan.hadoop.mr;

import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 訂單資料
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:20
 */
@Getter
@Setter
public class OrderVo implements WritableComparable<OrderVo> {
    /**
     * 訂單編號
     */
    private long orderId;
    /**
     * 訂單項
     */
    private String itemId;
    /**
     * 訂單項價格
     */
    private long price;

    @Override
    public int compareTo(OrderVo o) {
        // 排序: 根據 訂單編號 升序, 如果訂單編號相同,則根據 訂單項價格 倒序
        int result = Long.compare(this.orderId, o.orderId);
        if (result == 0) {
            // 等於0說明 訂單編號 相同,則需要根據 訂單項價格 倒序
            result = -Long.compare(this.price, o.price);
        }
        return result;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // 序列化
        out.writeLong(orderId);
        out.writeUTF(itemId);
        out.writeLong(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // 反序列化
        this.orderId = in.readLong();
        this.itemId = in.readUTF();
        this.price = in.readLong();
    }

    @Override
    public String toString() {
        return this.getOrderId() + "\t" + this.getItemId() + "\t" + this.getPrice();
    }
}

  1. 此處需要實現 WritableComparable介面
  2. 需要編寫 排序序列化方法

5.3 編寫分組方法

package com.huan.hadoop.mr;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 分組: 訂單編號相同說明是同一組,否則是不同的組
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:30
 */
public class TopNGroupingComparator extends WritableComparator {

    public TopNGroupingComparator() {
        // 第二個引數為true: 表示可以通過反射建立範例
        super(OrderVo.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // 訂單編號 相同說明是同一個物件,否則是不同的物件
        return ((OrderVo) a).getOrderId() == ((OrderVo) b).getOrderId() ? 0 : 1;
    }
}

  1. 實現 WritableComparator介面,自定義分組規則。
  2. 分組是發生在reduce階段,前後2個key比較,相同則為一組,一組呼叫一次reduce方法。

5.4 編寫 map 方法

package com.huan.hadoop.mr;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * map 操作: 輸出的key為OrderVo, 輸出的value為: price
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:28
 */
public class TopNMapper extends Mapper<LongWritable, Text, OrderVo, LongWritable> {

    private final OrderVo outKey = new OrderVo();
    private final LongWritable outValue = new LongWritable();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderVo, LongWritable>.Context context) throws IOException, InterruptedException {
        // 獲取一行資料 20230713000010  item-101    10
        String row = value.toString();
        // 根據 \t 進行分割
        String[] cells = row.split("\\s+");
        // 獲取訂單編號
        long orderId = Long.parseLong(cells[0]);
        // 獲取訂單項
        String itemId = cells[1];
        // 獲取訂單項價格
        long price = Long.parseLong(cells[2]);

        // 設定值
        outKey.setOrderId(orderId);
        outKey.setItemId(itemId);
        outKey.setPrice(price);
        outValue.set(price);

        // 寫出
        context.write(outKey, outValue);
    }
}

  1. map 操作: 輸出的key為OrderVo, 輸出的value為: price

5.5 編寫reduce方法

package com.huan.hadoop.mr;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reduce操作: Key(OrderVo)相同的分為一組, 此處 OrderVo 作為key, 分組是根據 TopNGroupingComparator 來實現,
 * 即 訂單編號 相同的認為一組
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:29
 */
public class TopNReducer extends Reducer<OrderVo, LongWritable, OrderVo, NullWritable> {

    @Override
    protected void reduce(OrderVo key, Iterable<LongWritable> values, Reducer<OrderVo, LongWritable, OrderVo, NullWritable>.Context context) throws IOException, InterruptedException {
        int topN = 0;
        // 隨著每次遍歷, key的 orderId 是相同的(因為是根據這個分組的),但是裡面的itemId和price是不同的
        for (LongWritable price : values) {
            topN++;
            if (topN > 2) {
                break;
            }
            // 注意: 此處的key每次輸出都不一樣
            context.write(key, NullWritable.get());
        }
    }
}

  1. reduce操作: Key(OrderVo)相同的分為一組, 此處 OrderVo 作為key, 分組是根據 TopNGroupingComparator 來實現,即 訂單編號 相同的認為一組.
  2. 隨著每次遍歷, key的 orderId 是相同的(因為是根據這個分組的),但是裡面的itemId和price是不同的

5.6 編寫driver類

package com.huan.hadoop.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author huan.fu
 * @date 2023/7/13 - 14:29
 */
public class TopNDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        // 構建設定物件
        Configuration configuration = new Configuration();
        // 使用 ToolRunner 提交程式
        int status = ToolRunner.run(configuration, new TopNDriver(), args);
        // 退出程式
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 構建Job物件範例 引數(設定物件,Job物件名稱)
        Job job = Job.getInstance(getConf(), "topN");
        // 設定mr程式執行的主類
        job.setJarByClass(TopNDriver.class);
        // 設定mr程式執行的 mapper型別和reduce型別
        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);
        // 指定mapper階段輸出的kv資料型別
        job.setMapOutputKeyClass(OrderVo.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reduce階段輸出的kv資料型別,業務mr程式輸出的最終型別
        job.setOutputKeyClass(OrderVo.class);
        job.setOutputValueClass(NullWritable.class);
        // 設定本例子中的輸入資料路徑和輸出資料路徑,預設輸入輸出元件為: TextInputFormat和TextOutputFormat
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 先刪除輸出目錄(方便本地測試)
        FileSystem.get(this.getConf()).delete(new Path(args[1]), true);

        // 設定分組
        job.setGroupingComparatorClass(TopNGroupingComparator.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }
}
  1. 需要設定分組 job.setGroupingComparatorClass(TopNGroupingComparator.class);

5.7 執行結果

6、完整程式碼

https://gitee.com/huan1993/spring-cloud-parent/tree/master/hadoop/mr-topn-group