join2_mapper.py的Python的MapReduce:從映射器
#!/usr/bin/env python
import sys
shows = []
for line in sys.stdin:
line = line.strip()
key_value = line.split(',')
if key_value[1] == 'ABC':
if key_value[1] not in shows:
shows.append(key_value[0])
if key_value[1].isdigit() and (key_value[0] in shows):
print('{0}\t{1}'.format(key_value[0], key_value[1]))
樣品I/P空文件
Hourly_Sports,DEF
Baked_Games,ABC
Dumb_Talking,ABC
Surreal_Talking,DEF
Cold_Sports,BAT
Hourly_Talking,XYZ
Baked_Talking,CNO
PostModern_Games,ABC
Loud_Talking,DEF
Almost_News,BAT
Hot_Talking,XYZ
Dumb_News,CNO
Surreal_News,ABC
Cold_Talking,DEF
Hourly_Show,BAT
Baked_Show,XYZ
PostModern_Talking,CNO
Loud_Show,ABC
Almost_Cooking,DEF
Hot_News,BAT
Dumb_Cooking,XYZ
Surreal_Cooking,CNO
Cold_News,ABC
Hourly_Sports,DEF
Baked_Sports,BAT
PostModern_Show,XYZ
Loud_Sports,CNO
Almost_Games,ABC
Hot_Cooking,DEF
Dumb_Games,BAT
Surreal_Games,XYZ
Cold_Cooking,CNO
Hourly_Talking,ABC
Baked_Talking,DEF
PostModern_Sports,BAT
Loud_Talking,XYZ
Almost_Talking,CNO
Hot_Games,ABC
Dumb_Talking,DEF
Surreal_Talking,BAT
Cold_Games,XYZ
Hourly_News,CNO
Baked_News,ABC
PostModern_Talking,DEF
Loud_News,BAT
Almost_Show,XYZ
Hot_Talking,CNO
Dumb_Show,ABC
Surreal_Show,DEF
Cold_Talking,BAT
Hourly_Cooking,XYZ
Baked_Cooking,CNO
PostModern_News,ABC
Loud_Cooking,DEF
Almost_Sports,BAT
Hot_Show,XYZ
Dumb_Sports,CNO
Surreal_Sports,ABC
Cold_Show,DEF
Hourly_Games,BAT
Baked_Games,XYZ
PostModern_Cooking,CNO
Loud_Games,ABC
Almost_Talking,DEF
Hot_Sports,BAT
Dumb_Talking,XYZ
Surreal_Talking,CNO
Cold_Sports,ABC
Hourly_Talking,DEF
Baked_Talking,BAT
PostModern_Games,XYZ
Loud_Talking,CNO
Almost_News,ABC
Hot_Talking,DEF
Dumb_News,BAT
Surreal_News,XYZ
Cold_Talking,CNO
Hourly_Show,ABC
Almost_Cooking,855
Baked_Games,991
Baked_News,579
Baked_Games,200
Baked_Games,533
Cold_News,590
Hourly_Show,896
$ cat j2.txt | python join2_mapper.py
Baked_Games 991
Baked_News 579
Baked_Games 200
Baked_Games 533
Cold_News 590
Hourly_Show 896
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input /user/cloudera/join2_data/join2_genchan*.txt -input /user/cloudera/join2_data/join2_gennum*.txt -output /user/cloudera/join2_f1f -mapper /home/cloudera/join2_mapper.py -reducer /home/cloudera/join2_reducer.py -numReduceTasks 0
Map-Reduce Framework
Map input records=6600
Map output records=0
Input split bytes=759
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=4419
CPU time spent (ms)=9170
Physical memory (bytes) snapshot=702300160
Virtual memory (bytes) snapshot=9022578688
Total committed heap usage (bytes)=364511232
File Input Format Counters
Bytes Read=113055
File Output Format Counters
Bytes Written=0
問題在於輸入文件。其實我也六個輸入文件內容如下:
$ hdfs dfs -ls /user/cloudera/join2_data/join2_gen*.txt
-rw-r--r-- 1 cloudera cloudera 1714 2015-11-07 12:24 /user/cloudera/join2_data/join2_genchanA.txt
-rw-r--r-- 1 cloudera cloudera 3430 2015-11-07 12:24 /user/cloudera/join2_data/join2_genchanB.txt
-rw-r--r-- 1 cloudera cloudera 5152 2015-11-07 12:24 /user/cloudera/join2_data/join2_genchanC.txt
-rw-r--r-- 1 cloudera cloudera 17114 2015-11-07 12:24 /user/cloudera/join2_data/join2_gennumA.txt
-rw-r--r-- 1 cloudera cloudera 34245 2015-11-07 12:24 /user/cloudera/join2_data/join2_gennumB.txt
-rw-r--r-- 1 cloudera cloudera 51400 2015-11-07 12:24 /user/cloudera/join2_data/join2_gennumC.txt
當我所有的文件連接成一個單一的文件,並運行它正在工作。獲得理想的結果。當提供六個塊的輸入文件時,我會得到一個空文件。請指教。
請問我可以看到你的減速機嗎?你在執行命令中引用了一個,但是它的代碼缺失。你的兩個輸入命令也可以用'-input/user/cloudera/join2_data/join2_gen * .txt'替換。 –
因爲我指定了numReduceTasks = 0,reducer將不會被正確執行?如果我錯了,請糾正我。由於映射器中的文件是空的,因此reducer會拋出錯誤。我用另一個映射器測試了reducer,它運行良好。我認爲減速機沒有問題。如果需要,我可以張貼減速機。 – Praveen
我對使用Python的Hadoop Streaming不太瞭解,但是我會嘗試不指定'-reducer'選項?當然,它不應該改變任何東西,但它是值得一試。另外如果你將'j2.txt'傳遞給作業會發生什麼?它似乎與您實際使用的輸入文件不同。 –