算例如我想在Python 的瞭解Hadoop的字數例如http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/數據流在Python
筆者與映射器和減速器的天真的版本開始。這裏是減速機(我刪除了簡潔一些評論)
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
經筆者測試程序有:
echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
所以好像減速作業的輸入數據是象減速機寫的是:
aa 1
aa 1
bb 1
cc 1
cc 1
cc 1
我最初對減速器的理解是給定減速器的輸入數據將包含一個唯一鍵。所以在前面的例子中,需要3個減速器工作。我的理解不正確?
然後,作者介紹了mapper和reducer的改進版本。這裏是減速機:
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
total_count = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
# count was not a number, so silently discard this item
pass
if __name__ == "__main__":
main()
筆者增加了以下警告:
注:以下Map和Reduce腳本將只工作「正常」 在Hadoop的上下文中運行時,即作爲Mapreduce作業中的映射器和Reducer在 中。這意味着運行天真的測試命令「cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py「將不再正確工作 ,因爲某些功能故意將 外包給Hadoop。
我不明白爲什麼樸素測試命令不適用於新版本。我認爲使用sort -k1,1
會爲這兩個版本的減速器產生相同的輸入。我錯過了什麼?