hadoop MapReduce運營商案例關於使用者基站停留資料統計

2022-06-24 21:01:14

如果需要檔案和程式碼的話可評論區留言郵箱,我給你發原始碼

本文來自部落格園,作者:Arway,轉載請註明原文連結:https://www.cnblogs.com/cenjw/p/hadoop-mapReduce-operator-case.html

實驗要求

統計每個使用者在不同時段中各個基站的停留時間。

1.功能描述

使用者的手機,連線到不同的基站會產生一條記錄。
資料格式為:使用者標識 裝置標識 基站位置 通訊的日期 通訊時間
example: 0000009999 0054785806 00000089 2016-02-21 21:55:37

需要得到的資料格式為:
使用者標識 時段 基站位置 停留時間
example: 0000000001 09-18 00000003 15
使用者0000000001在09-18點這個時間段在基站00000003停留了15分鐘

2.實現思路

程式執行支援傳入時間段,比如「09-18-24」,表示分為0點到9點,9點到18點,18點到24點三個時間段。

  • (1)Mapper階段
    對輸入的資料,算出它屬於哪個時間段。
    k1:每行記錄在文字中的偏移量。
    v2:一條記錄
    k2用「使用者ID,時間段」輸出。
    v2用「基站位置,時間」。時間用unix time

  • (2)Reducer階段
    對獲取的v3(v3是一個集合,每個元素是v2,相當於按照k2對v2分組)進行排序,以時間升序排序。
    計算兩兩之間的時間間隔,儲存到另一個集合中,兩個不同的時間間隔中,從基站A移動到基站B,這樣獲取到在A基站的停留的時間。
    同理從基站B移動到基站C,基站C移動到基站D,依次類推,所有的時間都獲取到。再把時間累加起來,就可以獲取到總的時間。

本文來自部落格園,作者:Arway,轉載請註明原文連結:https://www.cnblogs.com/cenjw/p/hadoop-mapReduce-operator-case.html

程式碼實現

PhoneMain.java

package phoneMapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

/**
 * Created by ue50 on 11/13/19.
 */
public class PhoneMain
{
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException
    {
        //String.equals()比較字串的值是否相同
        if(args == null || "0".equals(args[0]))
        {
            throw new RuntimeException("argument is not right!");
        }
        //Configuration是作業的設定資訊類
        Configuration configuration = new Configuration();
        //set(String name, String value)設定設定項
        configuration.set("timeRange", args[0]);

        Job job = Job.getInstance(configuration);
        job.setJarByClass(PhoneMain.class);

        job.setMapperClass(PhoneMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputKeyClass(Text.class);

        job.setReducerClass(PhoneReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //FileInputFormat.setInputPaths(job, new Path("hdfs://xdata-m0:8020/user/ue50/pos.txt"));
        //FileOutputFormat.setOutputPath(job, new Path("hdfs://xdata-m0:8020/user/ue50/out"));

        FileInputFormat.setInputPaths(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        job.waitForCompletion(true);
    }
}

Mapper階段
PhoneMapper.java

package phoneMapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**
 * Created by ue50 on 11/13/19.
 */
public class PhoneMapper extends Mapper<LongWritable, Text, Text, Text>
{
    private int[] timeRangeList;
    @Override
    //setup()被MapReduce框架僅且執行一次,在執行Map任務前,進行相關變數或者資源的集中初始化工作
    protected void setup(Context context) throws IOException,InterruptedException
    {
        //Configuration是作業的設定資訊類,通過Configuration可以實現在多個mapper和多個reducer任務之間共用資訊
        Configuration configuration = context.getConfiguration();

        //get(String name)根據設定項的鍵name獲取相應的值
        String timeRange = configuration.get("timeRange");//執行時傳入的時間段,比如「09-18-24」
        String[] timeRangeString = timeRange.split("-");

        timeRangeList = new int[timeRangeString.length];
        for(int i = 0; i < timeRangeString.length;i++)
        {
            //timeRangeList陣列儲存傳入的時間,如:09、18、24
            timeRangeList[i] = Integer.parseInt(timeRangeString[i]);
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String values[] = value.toString().split("\\s+");//對一條記錄"使用者標識 裝置標識   基站位置 通訊的時間"按空格拆分
        String userId = values[0];//使用者標識
        String baseStation = values[2];//基站位置
        String timeString = values[4];//存取時間,如:21:55:37

        String[] times = timeString.split(":");//對存取時間按':'拆分
        int hour = Integer.parseInt(times[0]);//小時

        //startHour、endHour時間段的起止時間
        int startHour = 0;
        int endHour = 0;
        for(int i = 0; i < timeRangeList.length; i++)
        {
            if(hour < timeRangeList[i])
            {
                if(i == 0)
                {
                    startHour = 0;
                }
                else
                {
                    startHour = timeRangeList[i-1];
                }
                endHour = timeRangeList[i];
                break;
            }
        }

        if(startHour == 0 && endHour == 0)
        {
            return;
        }

        //k2:使用者標識  時間段  v2:基站位置-存取時間
        context.write(new Text(userId + "\t" + startHour + "-" + endHour + "\t"), new Text(baseStation + "-" + timeString));
    }
}

Reducer階段

package phoneMapReduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * Created by ue50 on 11/13/19.
 */
public class PhoneReducer extends Reducer<Text, Text, Text, LongWritable>
{
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
    {
        List<String> valueList = new LinkedList<String>();//基於連結串列的動態陣列

        //Map是一種把鍵物件和值物件對映的集合,TreeMap是一個有序的key-value集合,
        //它是通過紅黑樹實現的,TreeMap中的元素預設按照key的自然排序排列
        Map<String, Long> residenceTimeMap = new TreeMap<String, Long>();

        for(Text value : values)
        {
            String item = value.toString();
            valueList.add(item);//"基站位置-存取時間"的集合
        }

        if(valueList == null || valueList.size() <= 1)
        {
            return;
        }

        //Comparator是比較器
        //Collections.sort()方法中的自定義比較器,根據比較器的實現邏輯對valueList進行排序
        Collections.sort(valueList, new Comparator<String>() {//匿名內部類
            @Override
            //重寫比較器中的比較方法:compare方法
            public int compare(String o1, String o2) {
                o1 = o1.split("-")[1];
                o2 = o2.split("-")[1];
                return o1.compareTo(o2);//根據存取時間對valueList排序,第一個引數.compareTo(第二個引數)升序
            }
        });


        for(int i = 0;i < valueList.size()-1; i++)
        {
            String station = valueList.get(i).split("-")[0];//基站位置
            String time1 = valueList.get(i).split("-")[1];//存取時間
            String time2 = valueList.get(i + 1).split("-")[1];

            //對日期/時間進行格式化,HH:24小時制
            DateFormat dateFormat = new SimpleDateFormat("HH:hh:ss");
            //Date物件用於處理日期與時間
            Date date1 = null;
            Date date2 = null;
            try{
                date1 = dateFormat.parse(time1);//parse():把String型的字串轉換成特定格式的Date型別
                date2 = dateFormat.parse(time2);
            }catch (ParseException e)
            {
                e.printStackTrace();
            }

            //date1.before(date2),當date1小於date2時,返回TRUE,當大於等於時,返回false;
            if(date1.before(date2))
            {
                long time = date2.getTime() - date1.getTime();//getTime方法返回的是毫秒數

                Long count = residenceTimeMap.get(station);//返回key關聯的值,沒有值返回null
                if(count == null)
                {
                    residenceTimeMap.put(station, time);//<基站位置,停留時間>
                }
                else
                {
                    residenceTimeMap.put(station, count + time);//將停留時間累積
                }
            }
        }

        valueList = null;

        //TreeMap的keySet():以升序返回一個具有TreeMap鍵的Set檢視
        Set<String> keySet = residenceTimeMap.keySet();//keySet:<基站位置>
        for(String mapKey : keySet)
        {
            long minute = residenceTimeMap.get(mapKey);//停留時間毫秒
            minute = minute/1000/60;//分鐘
            //minute = minute/1000;//秒

            context.write(new Text(key +"\t" + mapKey +"\t"), new LongWritable(minute));
        }

        residenceTimeMap = null;
    }
}

如果需要檔案和程式碼的話可評論區留言郵箱,我給你發原始碼