2013-08-17 50 views
0

算例如我想在Python 的瞭解Hadoop的字數例如http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/數據流在Python

筆者與映射器和減速器的天真的版本開始。這裏是減速機(我刪除了簡潔一些評論)

#!/usr/bin/env python 

from operator import itemgetter 
import sys 

current_word = None 
current_count = 0 
word = None 

# input comes from STDIN 
for line in sys.stdin: 
    line = line.strip() 

    word, count = line.split('\t', 1) 

    try: 
     count = int(count) 
    except ValueError: 
     continue 

    if current_word == word: 
     current_count += count 
    else: 
     if current_word: 
      # write result to STDOUT 
      print '%s\t%s' % (current_word, current_count) 
     current_count = count 
     current_word = word 

# do not forget to output the last word if needed! 
if current_word == word: 
    print '%s\t%s' % (current_word, current_count) 

經筆者測試程序有:

echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py 

所以好像減速作業的輸入數據是象減速機寫的是:

aa 1 
aa 1 
bb 1 
cc 1 
cc 1 
cc 1 

我最初對減速器的理解是給定減速器的輸入數據將包含一個唯一鍵。所以在前面的例子中,需要3個減速器工作。我的理解不正確?

然後,作者介紹了mapper和reducer的改進版本。這裏是減速機:

#!/usr/bin/env python 
"""A more advanced Reducer, using Python iterators and generators.""" 

from itertools import groupby 
from operator import itemgetter 
import sys 

def read_mapper_output(file, separator='\t'): 
    for line in file: 
     yield line.rstrip().split(separator, 1) 

def main(separator='\t'): 
    # input comes from STDIN (standard input) 
    data = read_mapper_output(sys.stdin, separator=separator) 

    for current_word, group in groupby(data, itemgetter(0)): 
     try: 
      total_count = sum(int(count) for current_word, count in group) 
      print "%s%s%d" % (current_word, separator, total_count) 
     except ValueError: 
      # count was not a number, so silently discard this item 
      pass 

if __name__ == "__main__": 
    main() 

筆者增加了以下警告:

注:以下Map和Reduce腳本將只工作「正常」 在Hadoop的上下文中運行時,即作爲Mapreduce作業中的映射器和Reducer在 中。這意味着運行天真的測試命令「cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py「將不再正確工作 ,因爲某些功能故意將 外包給Hadoop。

我不明白爲什麼樸素測試命令不適用於新版本。我認爲使用sort -k1,1會爲這兩個版本的減速器產生相同的輸入。我錯過了什麼?

回答

0

關於你的第一個問題:「我最初對減速器的理解是,給定減速器的輸入數據將包含一個唯一鍵,因此在前面的例子中,需要3個減速器作業,我的理解是否不正確?

MapReduce抽象與Hadoop的抽象實現有所不同。在抽象中,縮減器與唯一鍵相關聯。另一方面,Hadoop實現爲同一個reducer分配了幾個鍵(以避免關閉進程並啓動新進程的成本)。特別是,在Hadoop流媒體中,reducer接收與特定數量的鍵(它可以是零個,一個或多個鍵)相對應的鍵值對,但您可以保證與某個鍵相關聯的鍵值對連續地彼此相連。

例如,讓我們用輸入「foo foo quux labs foo bar quux」來計算您的字數。然後可能是reducer接收到輸入「bar 1 \ nfoo 1 \ nfoo 1 \ nfoo1」,而另一個reducer接收到「labs 1 \ nquux 1 \ nquux 1」。運行的實際Reducer進程的數量由您使用選項mapred.reduce.tasks決定。例如使用2個減速,你可以做

$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=2 -mapper .... 

關於你提到的第二個問題,我同意你的說法sort -k1,1會做的伎倆,所以我也不看這個問題。