2016-01-06 157 views
1

我想實現一個簡單的Hadoop地圖使用Cloudera的5.5.0 地圖&減少步驟應該使用Python 2.6.6實現減少例如爲什麼hadoop mapreduce與python失敗,但腳本正在命令行上工作?

問題:

  • 如果腳本正在在unix命令行上執行,他們工作得非常好,併產生預期的輸出。

cat join2 * .txt | ./join3_mapper.py |排序| ./join3_reducer.py

  • 執行腳本作爲Hadoop的任務非常失敗

Hadoop的罐子/usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input /user/cloudera/inputTV/join2_gen*.txt -output /用戶/ Cloudera的/ output_tv -mapper /home/cloudera/join3_mapper.py -reducer /home/cloudera/join3_reducer.py -numReduceTasks 1

16/01/06 12:32:32 INFO mapreduce.Job: Task Id : attempt_1452069211060_0026_r_000000_0, Status : FAILED Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538) at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134) at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:244) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

  • 映射器的工作原理,如果hadoop的命令與-numReduceTasks 0, Hadoop的作業正在執行僅地圖步驟執行時,成功地結束,並且輸出目錄從地圖步驟包含結果的文件。

  • 我想減法步驟一定有什麼問題呢?

  • 在色彩上標準錯誤日誌不顯示任何有關:

日誌上傳時間:星期三年1月6 12時33分十秒-0800 2016 日誌長度:222 的log4j:警告沒有附加目的地可以發現記錄儀(org.apache.hadoop.ipc.Server)。 log4j:WARN請正確初始化log4j系統。 log4j:警告有關更多信息,請參見http://logging.apache.org/log4j/1.2/faq.html#noconfig

代碼的腳本: 第一檔:join3_mapper.py

#!/usr/bin/env python 

import sys 

for line in sys.stdin: 
    line  = line.strip() #strip out carriage return 
    tuple2 = line.split(",") #split line, into key and value, returns a list 

    if len(tuple2) == 2: 
     key = tuple2[0] 
     value = tuple2[1] 
     if value == 'ABC': 
     print('%s\t%s' % (key, value)) 
     elif value.isdigit(): 
     print('%s\t%s' % (key, value)) 

的第二個選項:join3_reducer.py

#!/usr/bin/env python 
import sys 

last_key  = None    #initialize these variables 
running_total = 0 
abcFound =False; 
this_key  = None 

# ----------------------------------- 
# Loop the file 
# -------------------------------- 
for input_line in sys.stdin: 
    input_line = input_line.strip() 

    # -------------------------------- 
    # Get Next Key value pair, splitting at tab 
    # -------------------------------- 
    tuple2 = input_line.split("\t") 

    this_key = tuple2[0]  
    value = tuple2[1] 
    if value.isdigit(): 
     value = int(value) 

    # --------------------------------- 
    # Key Check part 
    # if this current key is same 
    #   as the last one Consolidate 
    # otherwise Emit 
    # --------------------------------- 
    if last_key == this_key:  
     if value == 'ABC': # filter for only ABC in TV shows 
      abcFound=True; 
     else: 
      if isinstance(value, (int,long)): 
       running_total += value 

    else: 
     if last_key:   #if this key is different from last key, and the previous 
          # (ie last) key is not empy, 
          # then output 
          # the previous <key running-count> 
      if abcFound: 
       print('%s\t%s' % (last_key, running_total)) 
       abcFound=False; 

     running_total = value #reset values 
     last_key = this_key 

if last_key == this_key: 
    print('%s\t%s' % (last_key, running_total)) 

我曾嘗試聲明輸入文件到的各種不同的方式hadoop命令,沒有區別,沒有成功。

我在做什麼錯?提示,想法非常讚賞謝謝

+0

您是否需要toolrunner才能夠從命令行運行jar文件? –

+0

另外,Java程序不是jar文件嗎? –

+0

我不是自己執行一個jar文件,我正在執行hadoop命令並告訴hadoop執行聲明的jar文件。庫路徑後面的其餘部分是與hadoop-streaming.jar相關的參數,並與執行的MapReduce操作相關。是的,jar文件是java程序 –

回答

1

多麼幸運衝,與一個對天鬥,知道我得到它的工作:

由於

cat join2_gen*.txt | ./join2_mapper.py | sort | ./join2_reducer.py 

本地(UNIX)執行工作罰款我有主意,用1合併輸入文件,而不是提供6個輸入文件,因此:

cat join2_gen*.txt >> mergedinputFile.txt 

hdfs dfs -put mergedInputFile.txt /user/cloudera/input 

然後再次執行同樣的hadoop的命令,指揮輸入mergedInputFile在輸入文件夾 - >完美結果,沒問題,沒有異常工作完成。

對我來說這提出了一個問題:

  • 爲什麼有一個工作合併輸入文件,但現在提供較小的6個文件?不知道(還)
+0

我有同樣的問題!它在串行模式下工作非常完美。 –

1

嘗試將所有輸入文本文件放在一個目錄中,然後將目錄作爲輸入。通過這種方式,您不必合併所有輸入文件

相關問題