Hadoop Streaming


Hadoop資料流是Hadoop自帶發行的實用程式。該實用程式允許建立和執行Map/Reduce任務的任何可執行檔案或指令碼對映器和/或減速器。

使用Python範例

對於Hadoop的資料流,我們考慮的字計數問題。任何工作在Hadoop中必須有兩個階段:對映器和減速器。我們使用python指令碼程式碼對映器和減速器在Hadoop下執行它。使用Perl和Ruby也是類似的。

對映階段程式碼

!/usr/bin/python
import sys
# Input takes from standard input for myline in sys.stdin: 
# Remove whitespace either side myline = myline.strip() 
# Break the line into words words = myline.split() 
# Iterate the words list for myword in words: 
# Write the results to standard output print '%s\t%s' % (myword, 1)

請確保此檔案具有執行許可權(使用chmod +x /home/ expert/hadoop-1.2.1/mapper.py)。

減速器階段程式碼

#!/usr/bin/python
from operator import itemgetter 
import sys 
current_word = ""
current_count = 0 
word = "" 
# Input takes from standard input for myline in sys.stdin: 
# Remove whitespace either side myline = myline.strip() 
# Split the input we got from mapper.py word, count = myline.split('\t', 1) 
# Convert count variable to integer 
   try: 
      count = int(count) 
except ValueError: 
   # Count was not a number, so silently ignore this line continue
if current_word == word: 
   current_count += count 
else: 
   if current_word: 
      # Write result to standard output print '%s\t%s' % (current_word, current_count) 
   current_count = count
   current_word = word
# Do not forget to output the last word if needed! 
if current_word == word: 
   print '%s\t%s' % (current_word, current_count)

儲存mapper.py和reducer.py 在 Hadoop 的主目錄對映器和減速器程式碼。確保這些檔案具有執行許可權(使用chmod +x mapper.py 和 chmod +x reducer.py)。由於python具有大小寫敏感,因此相同的程式碼可以從以下連結下載。

wordCount程式的執行

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \ 
   -output output_dir \ 
   -mapper <path/mapper.py \ 
   -reducer <path/reducer.py

其中“\”用於續行以便於閱讀。

例如,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

資料流工作原理

在上面的例子中,這兩個對映器和減速是從標準輸入讀取作為輸入,並輸出到標準輸出到Python指令碼。實用程式將建立一個Map/Reduce作業,並將作業提交到一個合適的叢集,並監督工作的進展情況,直至完成。

當指定對映器的指令碼,每個對映任務將啟動指令碼作為一個單獨的進程時對映器初始化。作為mapper任務執行時,輸入轉換成行給進程的標準輸入(STDIN)。在此期間,對映器收集從該方法的標準輸出(stdout)面向行輸出和每一行轉換為鍵/值對,其被收集作為對映器的輸出。預設情況下,一行到第一個製表符的字首是鍵和行(不包括製表符)的其餘部分為值。如果在該行沒有任何製表符,則整行鍵和值被視為null。然而,這可以被客製化,每次需要1個。

當指定減速指令碼,每個減速器任務將啟動指令碼作為一個單獨的進程,然後減速初始化。減速器任務執行時將其轉換其輸入鍵/值對,進入行並將該行進程的標準輸入(STDIN)。在此期間,在減速機收集來自該過程的標準輸出(stdout)的面向行的輸出,每行轉換成一個金鑰/值對,其被收集作為減速機的輸出。預設情況下,一行到第一個製表符的字首是鍵,(不包括製表符)的其餘部分的值為行。然而,這可以被客製化為每次具體要求。

重要的命令

引數 描述
-input directory/file-name 輸入位置對映。 (必填)
-output directory-name 輸出位置的減速器。 (必填)
-mapper executable or script or JavaClassName 對映器可執行檔案。 (必填)
-reducer executable or script or JavaClassName 減速器的可執行檔案。 (必填)
-file file-name 使現有的對映器,減速機,或組合的可執行本地計算節點上。
-inputformat JavaClassName 類,應該提供返回鍵/值對文字類。如果沒有指定,使用TextInputFormat作為預設。
-outputformat JavaClassName 類,提供應採取鍵/值對文字類的。如果沒有指定,使用TextOutputformat作為預設值。
-partitioner JavaClassName 類,確定哪個減少一個鍵被傳送。
-combiner streamingCommand or JavaClassName 組合可執行檔案對映輸出。
-cmdenv name=value 通過環境變數資料流的命令。
-inputreader 對於向後相容性:指定記錄讀取器類(而不是輸入格式類)。
-verbose 詳細的輸出。
-lazyOutput 建立懶輸出。例如,如果輸出格式是基於FileOutputFormat,輸出檔案僅在第一次呼叫output.collect(或Context.write)建立。
-numReduceTasks 指定減速器的數目。
-mapdebug 當map任務失敗的指令碼呼叫。
-reducedebug 指令碼呼叫時降低任務失敗。