2011-04-22 126 views
0

即時嘗試在hadoop中實現算法。 我試圖執行Hadoop中的部分代碼,但流作業失敗hadoop流式作業在python中失敗

$ /home/hadoop/hadoop/bin/hadoop jar contrib/streaming/hadoop-*-streaming.jar -file /home/hadoop/hadoop/PR/mapper.py -mapper mapper.py -file /home/hadoop/hadoop/PR/reducer.py -reducer reducer.py -input pagerank/* -output PRoutput6 

packageJobJar: [/home/hadoop/hadoop/PR/mapper.py, /home/hadoop/hadoop/PR/reducer.py, /home/hadoop/hadoop/tmp/dir/hadoop-hadoop/hadoop-unjar7101759175212283428/] [] /tmp/streamjob6286075675343269479.jar tmpDir=null 

11/04/23 01:03:24 INFO mapred.FileInputFormat: Total input paths to process : 1 

11/04/23 01:03:24 INFO streaming.StreamJob: getLocalDirs(): [/home/hadoop/hadoop/tmp/dir/hadoop-hadoop/mapred/local] 

11/04/23 01:03:24 INFO streaming.StreamJob: Running job: job_201104222325_0021 

11/04/23 01:03:24 INFO streaming.StreamJob: To kill this job, run: 

11/04/23 01:03:24 INFO streaming.StreamJob: /home/hadoop/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201104222325_0021 

11/04/23 01:03:24 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201104222325_0021 

11/04/23 01:03:25 INFO streaming.StreamJob: map 0% reduce 0% 

11/04/23 01:03:31 INFO streaming.StreamJob: map 50% reduce 0% 

11/04/23 01:03:41 INFO streaming.StreamJob: map 50% reduce 17% 

11/04/23 01:03:56 INFO streaming.StreamJob: map 100% reduce 100% 

11/04/23 01:03:56 INFO streaming.StreamJob: To kill this job, run: 

11/04/23 01:03:56 INFO streaming.StreamJob: /home/hadoop/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:54311 -kill job_201104222325_0021 

11/04/23 01:03:56 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201104222325_0021 

11/04/23 01:03:56 ERROR streaming.StreamJob: Job not Successful! 

11/04/23 01:03:56 INFO streaming.StreamJob: killJob... 

Streaming Job Failed! 

mapper.py

#!/usr/bin/env python 
import sys 
import itertools 

def ipsum(input_key,input_value_list): 
    return sum(input_value_list) 

n= 20 # works up to about 1000000 pages 
i = {} 
for j in xrange(n): i[j] = [1.0/n,0,[]] 
j=0 
u=0 
for line in sys.stdin: 
    if j<n: 
    i[j][1]=int(line) 
    j=j+1 

    if j > n: 
    if line != "-1\n": 
     i[u][2] = line.split(',') 
    else: 
     i[u][2]=[] 
    u=u+1 
for j in xrange(n): 
    if i[j][1] != 0: 
    i[j][2] = map(int,i[j][2])  

intermediate=[] 
for (input_key,input_value) in i.items(): 
    if input_value[1] == 0: intermediate.extend([(1,input_value[0])]) 
    else: intermediate.extend([]) 
grp = {} 
for key, group in itertools.groupby(sorted(intermediate),lambda x: x[0]): 
    grp[key] = list([y for x, y in group]) 
iplist = [ipsum(intermediate_key,grp[intermediate_key]) for intermediate_key in grp] 
inter=[] 
for (input_key,input_value) in i.items(): 
    if input_value[1] == 0: inter.extend([(input_key,0.0)]+[(outlink,input_value[0]/input_value[1]) for outlink in input_value[2]]) 
    else: inter.extend([]) 

for value in inter: 
    value1 = value[0] 
    value2 = value[1] 
    print '%s %s' % (value1,value2) 

reducer.py

#!/usr/bin/env python 
import sys 
import itertools 
for line in sys.stdin: 
    input_key, input_value=line.split(' ',1) 
    input_key = input_key.strip() 
    input_value = input_value.strip() 
    input_key = int(input_key) 
    input_value = float(input_value) 
    print str(input_key)+' '+str(input_value) 

我不知道是否錯誤是在我的代碼或hadoop配置...因爲我能夠使用執行代碼$ cat /home/hadoop/hadoop/PR/pagerank/input.txt | python /home/hadoop/hadoop/PR/mapper.py |排序| python /home/hadoop/hadoop/PR/reducer.py

將不勝感激, 謝謝。

回答

0

我的猜測是你的數據可能是關鍵。通過從字符串或類似問題中拋出一個浮點數可能會在您的本地測試數據中沒有出現的實際數據中出現問題。也許你可以用異常處理或斷言來解決。

0

查看輸出中的作業信息頁面url。在你的情況, 本地主機:50030/jobdetails.jsp作業ID = job_201104222325_0021

點擊「最後8KB」(或其他)登錄鏈接,你會看到在「失敗映射器」列中的數字和(最可能)你打的python異常。