MapReduce它可以編寫應用程式來處理海量資料,並行,大叢集的普通硬體,以可靠的方式的框架。
MapReduce是一種處理技術和程式模型基於Java的分散式計算。 MapReduce演算法包含了兩項重要任務,即Map 和 Reduce。Map採用了一組資料,並將其轉換成另一組資料,其中,各個元件被分解成元組(鍵/值對)。其次,減少任務,這需要從Map 作為輸入並組合那些資料元組成的一組小的元組輸出。作為MapReduce暗示的名稱的序列在Map作業之後執行reduce任務。
MapReduce主要優點是,它很容易大規模資料處理在多個計算節點。下面MapReduce模型中,資料處理的原語被稱為對映器和減速器。分解資料處理應用到對映器和減速器有時是普通的。但是編寫MapReduce形式的應用,擴充套件應用程式執行在幾百,幾千,甚至幾萬機叢集中的僅僅是一個組態的更改。這個簡單的可延伸性是吸引了眾多程式員使用MapReduce模型。
通常MapReduce範例是基於向傳送計算機資料的位置!
MapReduce計劃分三個階段執行,即對映階段,shuffle階段,並減少階段。
對映階段:對映或對映器的工作是處理輸入資料。一般輸入資料是在檔案或目錄的形式,並且被儲存在Hadoop的檔案系統(HDFS)。輸入檔案被傳遞到由線對映器功能線路。對映器處理該資料,並建立資料的若干小塊。
減少階段:這個階段是:Shuffle階段和Reduce階段的組合。減速器的工作是處理該來自對映器中的資料。處理之後,它產生一組新的輸出,這將被儲存在HDFS。
在一個MapReduce工作,Hadoop的傳送Map和Reduce任務到叢集的相應伺服器。
框架管理資料傳遞例如發出任務的所有節點之間的叢集周圍的詳細資訊,驗證任務完成,和複製資料。
大部分的計算發生在與在本地磁碟上,可以減少網路通訊量資料的節點。
給定的任務完成後,將群集收集並減少了資料,以形成一個合適的結果,並且將其傳送回Hadoop伺服器。
MapReduce框架上的<key, value>對操作,也就是框架檢視的輸入工作作為一組<key, value>對,並產生一組<key, value>對作為作業的輸出可以在不同的型別。
鍵和值類在框架連載的方式,因此,需要實現介面。此外,鍵類必須實現可寫,可比的介面,以方便框架排序。MapReduce工作的輸入和輸出型別:(輸入)<k1, v1> ->對映 - ><k2, v2>-> reduce - ><k3, v3>(輸出)。
輸入 | 輸出 | |
---|---|---|
Map | <k1, v1> | list (<k2, v2>) |
Reduce | <k2, list(v2)> | list (<k3, v3>) |
PayLoad - 應用程式實現對映和減少功能,形成工作的核心。
Mapper - 對映器的輸入鍵/值對對映到一組中間鍵/值對。
NamedNode - 節點管理Hadoop分散式檔案系統(HDFS)。
DataNode - 節點資料呈現在任何處理發生之前。
MasterNode - 節點所在JobTracker執行並接受來自用戶端作業請求。
SlaveNode - 節點所在Map和Reduce程式執行。
JobTracker - 排程作業並跟蹤作業分配給任務跟蹤器。
Task Tracker - 跟蹤任務和報告狀態的JobTracker。
Job -程式在整個資料集對映器和減速的執行。
Task - 一個對映程式的執行或對資料的一個片段的減速器。
Task Attempt - 一種嘗試的特定範例在SlaveNode執行任務。
下面給出是關於一個組織的電消耗量的資料。它包含了每月的用電量,各年的平均。
Jan | Feb | Mar | Apr | May | Jun | Jul | Aug | Sep | Oct | Nov | Dec | Avg | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
如果上述資料作為輸入,我們需要編寫應用程式來處理它而產生的結果,如發現最大使用量,最低使用年份,依此類推。這是一個輕鬆取勝用於記錄有限數目的程式設計器。他們將編寫簡單地邏輯,以產生所需的輸出,並且將資料傳遞到寫入的應用程式。
但是,代表一個特定狀態下所有的大規模產業的電力消耗資料。
當我們編寫應用程式來處理這樣的大量資料,
為了解決這些問題,使用MapReduce框架。
上述資料被儲存為 sample.txt 並作為輸入。輸入檔案看起來如下所示。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25 1980 26 27 28 28 28 30 31 31 31 30 30 30 29 1981 31 32 32 32 33 34 35 36 36 34 34 34 34 1984 39 38 39 39 39 41 42 43 40 39 38 38 40 1985 38 39 39 39 39 41 41 41 00 40 39 39 45
下面給出的是使用MapReduce框架的樣本資料的程式。
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,/*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line,"\t"); String year = s.nextToken(); while(s.hasMoreTokens()) { lasttoken=s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce( Text key, Iterator <IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg=30; int val=Integer.MIN_VALUE; while (values.hasNext()) { if((val=values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(Eleunits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
儲存上述程式作為ProcessUnits.java。編譯和執行的程式如下的說明。
讓我們假設是在Hadoop的使用者(如/home/hadoop)的主目錄。
按照下面給出編譯和執行上面程式的步驟。
下面的命令是建立一個目錄來儲存編譯的Java類。
$ mkdir units
下載Hadoop-core-1.2.1.jar,它用於編譯和執行MapReduce程式。存取以下連結 http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下載JAR。假設下載的檔案夾是 /home/hadoop/.
下面的命令用於編譯ProcessUnits.java程式並建立一個jar程式。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java $ jar -cvf units.jar -C units/ .
下面的命令用來建立一個輸入目錄在HDFS中。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
下面的命令用於複製命名sample.txt在HDFS輸入目錄中輸入檔案。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
下面的命令用來驗證在輸入目錄中的檔案。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
下面的命令用於通過從輸入目錄以輸入檔案來執行Eleunit_max應用。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段時間,直到執行檔案。在執行後,如下圖所示,輸出將包含輸入分割的數目,對映任務數,減速器任務的數量等。
INFO mapreduce.Job: Job job_1414748220717_0002 completed successfully 14/10/31 06:02:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=61 FILE: Number of bytes written=279400 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=546 HDFS: Number of bytes written=40 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=146137 Total time spent by all reduces in occupied slots (ms)=441 Total time spent by all map tasks (ms)=14613 Total time spent by all reduce tasks (ms)=44120 Total vcore-seconds taken by all map tasks=146137 Total vcore-seconds taken by all reduce tasks=44120 Total megabyte-seconds taken by all map tasks=149644288 Total megabyte-seconds taken by all reduce tasks=45178880 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=67 Input split bytes=208 Combine input records=5 Combine output records=5 Reduce input groups=5 Reduce shuffle bytes=6 Reduce input records=5 Reduce output records=5 Spilled Records=10 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=948 CPU time spent (ms)=5160 Physical memory (bytes) snapshot=47749120 Virtual memory (bytes) snapshot=2899349504 Total committed heap usage (bytes)=277684224 File Output Format Counters Bytes Written=40
下面的命令用來驗證在輸出檔案夾所得檔案。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
下面的命令是用來檢視輸出Part-00000檔案。該檔案由HDFS產生。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
下面是由MapReduce的程式所產生的輸出。
1981 34 1984 40 1985 45
以下命令用於從HDFS輸出檔案夾複製到本地檔案系統進行分析。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
所有的Hadoop命令是由$HADOOP_HOME/bin/hadoop命令呼叫。不帶任何引數執行Hadoop指令碼列印所有命令的描述。
Usage : hadoop [--config confdir] COMMAND
下表列出了可用的選項及其說明。
操作 | 描述 |
---|---|
namenode -format | 格式化DFS檔案系統。 |
secondarynamenode | 執行DFS二次名稱節點。 |
namenode | 執行DFS名稱節點。 |
datanode | 執行DFS的Datanode。 |
dfsadmin | 執行DFS管理用戶端。 |
mradmin | 執行對映,減少管理用戶端。 |
fsck | 執行DFS檔案系統檢查工具。 |
fs | 執行一個通用的檔案系統的使用者用戶端。 |
balancer | 執行叢集平衡工具。 |
oiv | 適用於離線FsImage檢視器的fsimage。 |
fetchdt | 從NameNode獲取團令牌。 |
jobtracker | 執行MapReduce工作跟蹤節點。 |
pipes | 執行管道的工作。 |
tasktracker | 執行MapReduce任務跟蹤節點。 |
historyserver | 執行作業歷史記錄伺服器作為一個獨立的守護行程。 |
job | 操縱MapReduce工作。 |
queue | 獲取有關作業佇列資訊。 |
version | 列印版本。 |
jar <jar> | 執行一個jar檔案。 |
distcp <srcurl> <desturl> | 複製檔案或目錄的遞迴。 |
distcp2 <srcurl> <desturl> | DistCp第2版。 |
archive -archiveName NAME -p | 建立一個Hadoop的歸檔。 |
<parent path> <src>* <dest> | |
classpath | 列印需要得到Hadoop jar和所需要的庫的類路徑。 |
daemonlog | 為每個守護行程獲取/設定紀錄檔級別 |
Usage: hadoop job [GENERIC_OPTIONS]
以下是在一個Hadoop的作業可用通用選項。
GENERIC_OPTIONS | 描述 |
---|---|
-submit <job-file> | 提交作業。 |
status <job-id> | 列印對映,並減少完成的百分比以及所有的工作的計數器。 |
counter <job-id> <group-name> <countername> | 列印的計數器值。 |
-kill <job-id> | 終止任務。 |
-events <job-id> <fromevent-#> <#-of-events> | 列印接收到JobTracker為給定範圍內的事件的詳細資訊。 |
-history [all] <jobOutputDir> - history < jobOutputDir> | 列印作業的詳細資訊,未能終止提示詳細資訊。有關作業的更多詳細資訊,如每個任務取得成功的任務,任務可以嘗試通過指定[all]選項中檢視。 |
-list[all] | 顯示所有作業。-list 只顯示尚未完成的作業。 |
-kill-task <task-id> | 終止任務。終止任務不計入失敗的嘗試。 |
-fail-task <task-id> | 失敗的任務。失敗的任務都算對失敗的嘗試。 |
set-priority <job-id> <priority> | 更改作業的優先順序。允許優先順序值:VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW |
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> e.g. $ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004