序列化想必大家都很熟悉了,物件在進行網路傳輸過程中,需要序列化之後才能傳輸到使用者端,或者使用者端的資料序列化之後送達到伺服器端
序列化的標準解釋如下:
序列化就是把記憶體中的物件,轉換成位元組序列(或其他資料傳輸協定)以便於儲存到磁碟(持久化)和網路傳輸
對應的反序列化為序列化的逆向過程
反序列化就是將收到位元組序列(或其他資料傳輸協定)或者是磁碟的持久化資料,轉換成記憶體中的物件
一般來說,程式動態建立出來的「活的」 物件只生存在記憶體裡,一旦服務停機或斷電就沒了。而且「活」物件只能存活於本地程序,不能傳送到網路上其他的伺服器或者程序中使用。 然而通過序列化之後,則可以儲存「活的」物件,從而進行網路傳輸,提供給其他程序或機器使用。
在Java中,建立一個物件如果希望這個物件是序列化的物件,只需要實現Serializable介面即可,但Java的序列化在Hadoop看來,是一個重量級序列化框架,一個物件被序列化後,會附帶很多額外的資訊(各種校驗資訊,Header,繼承體系等),從而不便於在網路中高效傳輸。所以,Hadoop自己開發了一套序列化機制,只需要物件實現Writable介面,重寫裡面的兩個方法。
在真實的業務場景中,類似於wordcount那樣的單個字串的場景很少,而且無法應對各種複雜的巨量資料場景和海量資料的處理業務,因此在傳輸過程中,為了更加靈活的進行資料在Map、Reduce中的傳輸,將解析到的資料以序列化物件的方式傳輸,是非常便捷的
在Hadoop中,具體實現bean物件序列化步驟如下7步:
業務需求描述,如下資料為從某個地方匯出來的一批統計手機號峰值流量和低谷流量的文字檔案,現在的業務需求是,通過程式,最終輸出各個手機號對應的峰值流量、低谷流量以及總流量的統計分析檔案
那麼最終的效果可按如下格式輸出
瞭解了上面的業務後,下面開始按照前面描述的幾個步驟進行編碼實現
1、定義一個封裝手機流量各個屬性的物件
從wordcount的案例中我們瞭解了使用mapreduce編碼的基本編碼套路,即map邏輯中讀取原始資料檔案,然後傳遞到reduce中
同樣,在這裡的map邏輯中,需要讀取上面的原始的流量文字檔案,但是既然在reduce中要能實現最終的統計輸出,那麼從map中出來的資料格式,必然是已經處理好的bean物件,key為手機號,而value值則為封裝了當前手機號對應的峰值流量、低谷流量以及計算的總流量資訊
瞭解了這一點,就大概知道這個bean物件該如何定義了
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PhoneBean implements Writable {
//峰值流量
private long upFlow;
//低谷流量
private long downFlow;
//總流量
private long sumFlow;
//提供無參構造
public PhoneBean() {
}
//提供三個引數的getter和setter方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//實現序列化和反序列化方法,注意順序一定要保持一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
//重寫ToString方法
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
2、自定義Mapper類
該類讀取和解析文字檔案,將各個手機號的屬性封裝到PhoneBean物件中,並輸出到Reduce使用
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PhoneMapper extends Mapper<LongWritable, Text, Text, PhoneBean> {
private Text outK = new Text();
private PhoneBean outV = new PhoneBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//分割資料
String[] split = line.split("\t");
//抓取需要的資料:手機號,上行流量,下行流量
String phone = split[1];
String max = split[3];
String mine = split[4];
//封裝outK outV
outK.set(phone);
outV.setUpFlow(Long.parseLong(max));
outV.setDownFlow(Long.parseLong(mine));
outV.setSumFlow();
//寫出outK outV
context.write(outK, outV);
}
}
4、自定義Reduce類
關於Reduce中的入參型別和出參型別,到這裡想必都已經瞭解,就不再過多解釋了
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.LinkedList;
public class PhoneMapper extends Mapper<LongWritable, Text, Text, PhoneBean> {
private Text outK = new Text();
private PhoneBean outV = new PhoneBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//分割資料
String[] splits = line.split("\t");
LinkedList<String> linkedList = new LinkedList<>();
for(String str:splits){
if(StringUtils.isNotEmpty(str)){
linkedList.add(str.trim());
}
}
//抓取需要的資料:手機號,上行流量,下行流量
String phone = linkedList.get(1);
String max = linkedList.get(3);
String mine = linkedList.get(4);
//封裝outK outV
outK.set(phone);
outV.setUpFlow(Long.parseLong(max));
outV.setDownFlow(Long.parseLong(mine));
outV.setSumFlow();
//寫出outK outV
context.write(outK, outV);
}
}
5、job類
依照wordcount案例中的模板做即可
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PhoneJob {
public static void main(String[] args) throws Exception {
//1 獲取job物件
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 關聯本Driver類
job.setJarByClass(PhoneJob.class);
//3 關聯Mapper和Reducer
job.setMapperClass(PhoneMapper.class);
job.setReducerClass(PhoneReducer.class);
//4 設定Map端輸出KV型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PhoneBean.class);
//5 設定程式最終輸出的KV型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PhoneBean.class);
//6 設定程式的輸入輸出路徑
String inPath = "F:\\網路硬碟\\csv\\phone_data.txt";
String outPath = "F:\\網路硬碟\\csv\\out.txt";
FileInputFormat.setInputPaths(job, new Path(inPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
//7 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
}
執行這段程式,觀察是否在輸出的目標路徑下,生成了統計結果
開啟最後那個檔案,然後對比下原始的檔案,正好滿足預期的業務需求