Hadoop MapReduce


MapReduce它可以編寫應用程式來處理海量資料,並行,大叢集的普通硬體,以可靠的方式的框架。

MapReduce是什麼?

MapReduce是一種處理技術和程式模型基於Java的分散式計算。 MapReduce演算法包含了兩項重要任務,即Map 和 Reduce。Map採用了一組資料,並將其轉換成另一組資料,其中,各個元件被分解成元組(鍵/值對)。其次,減少任務,這需要從Map 作為輸入並組合那些資料元組成的一組小的元組輸出。作為MapReduce暗示的名稱的序列在Map作業之後執行reduce任務。

MapReduce主要優點是,它很容易大規模資料處理在多個計算節點。下面MapReduce模型中,資料處理的原語被稱為對映器和減速器。分解資料處理應用到對映器和減速器有時是普通的。但是編寫MapReduce形式的應用,擴充套件應用程式執行在幾百,幾千,甚至幾萬機叢集中的僅僅是一個組態的更改。這個簡單的可延伸性是吸引了眾多程式員使用MapReduce模型。

演算法

  • 通常MapReduce範例是基於向傳送計算機資料的位置!

  • MapReduce計劃分三個階段執行,即對映階段,shuffle階段,並減少階段。

    • 對映階段:對映或對映器的工作是處理輸入資料。一般輸入資料是在檔案或目錄的形式,並且被儲存在Hadoop的檔案系統(HDFS)。輸入檔案被傳遞到由線對映器功能線路。對映器處理該資料,並建立資料的若干小塊。

    • 減少階段:這個階段是:Shuffle階段和Reduce階段的組合。減速器的工作是處理該來自對映器中的資料。處理之後,它產生一組新的輸出,這將被儲存在HDFS。

  • 在一個MapReduce工作,Hadoop的傳送Map和Reduce任務到叢集的相應伺服器。

  • 框架管理資料傳遞例如發出任務的所有節點之間的叢集周圍的詳細資訊,驗證任務完成,和複製資料。

  • 大部分的計算發生在與在本地磁碟上,可以減少網路通訊量資料的節點。

  • 給定的任務完成後,將群集收集並減少了資料,以形成一個合適的結果,並且將其傳送回Hadoop伺服器。

MapReduce Algorithm

輸入和輸出(Java透檢視)

MapReduce框架上的<key, value>對操作,也就是框架檢視的輸入工作作為一組<key, value>對,並產生一組<key, value>對作為作業的輸出可以在不同的型別。

鍵和值類在框架連載的方式,因此,需要實現介面。此外,鍵類必須實現可寫,可比的介面,以方便框架排序。MapReduce工作的輸入和輸出型別:(輸入)<k1, v1> ->對映 - ><k2, v2>-> reduce - ><k3, v3>(輸出)。

  輸入 輸出
Map <k1, v1> list (<k2, v2>)
Reduce <k2, list(v2)> list (<k3, v3>)

術語

  • PayLoad - 應用程式實現對映和減少功能,形成工作的核心。

  • Mapper - 對映器的輸入鍵/值對對映到一組中間鍵/值對。

  • NamedNode - 節點管理Hadoop分散式檔案系統(HDFS)。

  • DataNode - 節點資料呈現在任何處理發生之前。

  • MasterNode - 節點所在JobTracker執行並接受來自用戶端作業請求。

  • SlaveNode - 節點所在Map和Reduce程式執行。

  • JobTracker - 排程作業並跟蹤作業分配給任務跟蹤器。

  • Task Tracker - 跟蹤任務和報告狀態的JobTracker。

  • Job -程式在整個資料集對映器和減速的執行。

  • Task - 一個對映程式的執行或對資料的一個片段的減速器。

  • Task Attempt - 一種嘗試的特定範例在SlaveNode執行任務。

範例場景

下面給出是關於一個組織的電消耗量的資料。它包含了每月的用電量,各年的平均。

  Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec Avg
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

如果上述資料作為輸入,我們需要編寫應用程式來處理它而產生的結果,如發現最大使用量,最低使用年份,依此類推。這是一個輕鬆取勝用於記錄有限數目的程式設計器。他們將編寫簡單地邏輯,以產生所需的輸出,並且將資料傳遞到寫入的應用程式。

但是,代表一個特定狀態下所有的大規模產業的電力消耗資料。

當我們編寫應用程式來處理這樣的大量資料,

  • 他們需要大量的時間來執行。
  • 將會有一個很大的網路流量,當我們將資料從源到網路伺服器等。

為了解決這些問題,使用MapReduce框架。

輸入資料

上述資料被儲存為 sample.txt 並作為輸入。輸入檔案看起來如下所示。

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45 

範例程式

下面給出的是使用MapReduce框架的樣本資料的程式。

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits 
{ 
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   { 
      
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      Reporter reporter) throws IOException 
      { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens())
            {
               lasttoken=s.nextToken();
            } 
            
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   } 
   
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements 
   Reducer< Text, IntWritable, Text, IntWritable > 
   {  
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
         OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
         { 
            int maxavg=30; 
            int val=Integer.MIN_VALUE; 
            
            while (values.hasNext()) 
            { 
               if((val=values.next().get())>maxavg) 
               { 
                  output.collect(key, new IntWritable(val)); 
               } 
            } 
 
         } 
   }  
   
   
   //Main function 
   public static void main(String args[])throws Exception 
   { 
      JobConf conf = new JobConf(Eleunits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
} 

儲存上述程式作為ProcessUnits.java。編譯和執行的程式如下的說明。

編譯和執行進程單位程式

讓我們假設是在Hadoop的使用者(如/home/hadoop)的主目錄。

按照下面給出編譯和執行上面程式的步驟。

第1步

下面的命令是建立一個目錄來儲存編譯的Java類。

$ mkdir units 

第2步

下載Hadoop-core-1.2.1.jar,它用於編譯和執行MapReduce程式。存取以下連結 http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下載JAR。假設下載的檔案夾是 /home/hadoop/.

第3步

下面的命令用於編譯ProcessUnits.java程式並建立一個jar程式。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ . 

第4步

下面的命令用來建立一個輸入目錄在HDFS中。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir 

第5步

下面的命令用於複製命名sample.txt在HDFS輸入目錄中輸入檔案。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir 

第6步

下面的命令用來驗證在輸入目錄中的檔案。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/ 

第7步

下面的命令用於通過從輸入目錄以輸入檔案來執行Eleunit_max應用。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir 

等待一段時間,直到執行檔案。在執行後,如下圖所示,輸出將包含輸入分割的數目,對映任務數,減速器任務的數量等。

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
File System Counters 
 
FILE: Number of bytes read=61 
FILE: Number of bytes written=279400 
FILE: Number of read operations=0 
FILE: Number of large read operations=0   
FILE: Number of write operations=0 
HDFS: Number of bytes read=546 
HDFS: Number of bytes written=40 
HDFS: Number of read operations=9 
HDFS: Number of large read operations=0 
HDFS: Number of write operations=2 Job Counters 


   Launched map tasks=2  
   Launched reduce tasks=1 
   Data-local map tasks=2  
   Total time spent by all maps in occupied slots (ms)=146137 
   Total time spent by all reduces in occupied slots (ms)=441   
   Total time spent by all map tasks (ms)=14613 
   Total time spent by all reduce tasks (ms)=44120 
   Total vcore-seconds taken by all map tasks=146137 
   
   Total vcore-seconds taken by all reduce tasks=44120 
   Total megabyte-seconds taken by all map tasks=149644288 
   Total megabyte-seconds taken by all reduce tasks=45178880 
   
Map-Reduce Framework 
 
Map input records=5  
   Map output records=5   
   Map output bytes=45  
   Map output materialized bytes=67  
   Input split bytes=208 
   Combine input records=5  
   Combine output records=5 
   Reduce input groups=5  
   Reduce shuffle bytes=6  
   Reduce input records=5  
   Reduce output records=5  
   Spilled Records=10  
   Shuffled Maps =2  
   Failed Shuffles=0  
   Merged Map outputs=2  
   GC time elapsed (ms)=948  
   CPU time spent (ms)=5160  
   Physical memory (bytes) snapshot=47749120  
   Virtual memory (bytes) snapshot=2899349504  
   Total committed heap usage (bytes)=277684224
     
File Output Format Counters 
 
   Bytes Written=40 

第8步

下面的命令用來驗證在輸出檔案夾所得檔案。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/ 

第9步

下面的命令是用來檢視輸出Part-00000檔案。該檔案由HDFS產生。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000 

下面是由MapReduce的程式所產生的輸出。

1981    34 
1984    40 
1985    45 

第10步

以下命令用於從HDFS輸出檔案夾複製到本地檔案系統進行分析。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop 

重要命令

所有的Hadoop命令是由$HADOOP_HOME/bin/hadoop命令呼叫。不帶任何引數執行Hadoop指令碼列印所有命令的描述。

Usage : hadoop [--config confdir] COMMAND

下表列出了可用的選項及其說明。

操作 描述
namenode -format 格式化DFS檔案系統。
secondarynamenode 執行DFS二次名稱節點。
namenode 執行DFS名稱節點。
datanode 執行DFS的Datanode。
dfsadmin 執行DFS管理用戶端。
mradmin 執行對映,減少管理用戶端。
fsck 執行DFS檔案系統檢查工具。
fs 執行一個通用的檔案系統的使用者用戶端。
balancer 執行叢集平衡工具。
oiv 適用於離線FsImage檢視器的fsimage。
fetchdt 從NameNode獲取團令牌。
jobtracker 執行MapReduce工作跟蹤節點。
pipes 執行管道的工作。
tasktracker 執行MapReduce任務跟蹤節點。
historyserver 執行作業歷史記錄伺服器作為一個獨立的守護行程。
job 操縱MapReduce工作。
queue 獲取有關作業佇列資訊。
version 列印版本。
jar <jar> 執行一個jar檔案。
distcp <srcurl> <desturl> 複製檔案或目錄的遞迴。
distcp2 <srcurl> <desturl> DistCp第2版。
archive -archiveName NAME -p 建立一個Hadoop的歸檔。
<parent path> <src>* <dest>  
classpath 列印需要得到Hadoop jar和所需要的庫的類路徑。
daemonlog 為每個守護行程獲取/設定紀錄檔級別

如何與MapReduce工作互動

Usage: hadoop job [GENERIC_OPTIONS]

以下是在一個Hadoop的作業可用通用選項。

GENERIC_OPTIONS 描述
-submit <job-file> 提交作業。
status <job-id> 列印對映,並減少完成的百分比以及所有的工作的計數器。
counter <job-id> <group-name> <countername> 列印的計數器值。
-kill <job-id> 終止任務。
-events <job-id> <fromevent-#> <#-of-events> 列印接收到JobTracker為給定範圍內的事件的詳細資訊。
-history [all] <jobOutputDir> - history < jobOutputDir> 列印作業的詳細資訊,未能終止提示詳細資訊。有關作業的更多詳細資訊,如每個任務取得成功的任務,任務可以嘗試通過指定[all]選項中檢視。
-list[all] 顯示所有作業。-list 只顯示尚未完成的作業。
-kill-task <task-id> 終止任務。終止任務不計入失敗的嘗試。
-fail-task <task-id> 失敗的任務。失敗的任務都算對失敗的嘗試。
   
set-priority <job-id> <priority> 更改作業的優先順序。允許優先順序值:VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

要檢視作業的狀態

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004 

要檢視作業歷史在output-dir

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output 

終止任務

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004