2013-07-12 44 views
2

我在AWS的彈性地圖縮減羣集中運行了大量作業。大概而言,我的意思是我正在處理超過800,000個文件,每個文件有超過25,000條記錄。在我的測試運行中,我一直使用100個m1.medium斑點實例進行處理。AWS EMR機器未合併減少輸出

工作似乎正常運行,但我注意到輸出(part-00000,part-00001等)有多個輸出中列出的相同鍵的記錄。這些應該不會在EMR中降低?

任何有識之士將不勝感激。

+0

你確定這關鍵是一個失控在映射階段的關鍵?你有任何定製的分區器或什麼? – Amar

+0

我有一個自定義映射器,它爲鍵組合了兩個字段。我在挖掘過程中發現的一點是,每個減少任務只會結合它所擁有的數據,而不關心如何處理其他減少任務。 – Saul

+0

你是什麼意思,不關心'處理'其他減少任務?減少任務從未與其他減少任務一起工作!根據映射器的輸出鍵對數據進行分區,並且單個鍵的所有輸出必須到同一個reducer,除非分區器中有一些定製邏輯。 – Amar

回答

1

我遇到了同樣的問題 - 我使用EMR來創建一個 「倒排索引」 使用流API:

- 輸入S3N:// mybucket/HTML2 - 輸出S3N:// mybucket /結果-mapper S3N://mybucket/mapper.py -reducer S3N://mybucket/reduce.py

凡// mybucket/HTML2有幾個HTML文件和

mapper.py:

def main(args): 
    for line in sys.stdin: 
     line = line.strip() 
     words = line.split()    
     for word in words: 
      #do some preprocessing 
      if word.startswith("http://"): 
       #output the URL with a count of 1 
       print "%s,%s" % (word, 1) 
      else:   
       #cleanup HTML tags 

       url = get_url() #irrelevant 
       print "%s,%s" % (word, url) 

if __name__ == "__main__": 
    main(sys.argv) 

and reduce.p y是:

def main(args): 
    current_word = None 
    current_count = 0 
    current_url_list = [] 
    key = None 

    for line in sys.stdin: 
     line = line.strip() 
     (key, val) = line.split(',', 1) 

     # If key is a URL - act as word count reducer 
     if key.startswith("http:"): 
      # convert count (currently a string) to int 
      try: 
       count = int(val) 
      except: 
       # count was not a number, so silently 
       # ignore/discard this line 
       continue 

      # this IF-switch only works because Hadoop sorts map output 
      # by key (here: word) before it is passed to the reducer 
      if current_word == key: 
       current_count += count 
      else: 
       if current_word: 
        #Check if previous word was a regular word 
        if current_word.startswith('http:'): 
         print '%s,%s' % (current_word, current_count) 
        else: 
         # previous word was a regular word 
         print '%s,%s' % (current_word, ','.join(current_url_list)) 
       current_count = count 
       current_word = key 
     else: 
      #If key is a word - as act a URL-list-appending reducer 
      if current_word == key: 
       if val not in current_url_list: 
        current_url_list.append(val) 
      else: #Got to a new key 
       if current_word: 
        #Check if previous word was a URL 
        if(current_word.startswith("http:")): 
         print '%s,%s' % (current_word, current_count) 
        else: 
         # previous word was a regular word 
         print '%s,%s' % (current_word, ','.join(current_url_list)) 
       current_url_list = [] 
       current_url_list.append(val) 
       current_word = key 

我開始使用AWS控制檯嚮導這個流動(「創建新的工作流」)和除設定輸入,輸出,地圖和減少腳本,我把一切都爲默認值(除日誌路徑)。

在輸出中,我得到的文件很少,在它們中我看到了相同的鍵(每次使用不同的值)。

也許這可以幫助闡明這個問題更多的光線並幫助解決其

+0

在我的情況下,問題出在我的mapper腳本上 - 我用逗號分隔符從map函數發出結果,而Hadoop期待tab分隔符。 因此,key1,value1和key1,value2都被視爲不同的鍵(具有空值),而不是作爲鍵值對,因此可能最終在不同的縮減器中結束。 我通過更改我的映射器解決了問題。 也可以添加在作業流程創建一個選項來指定分隔符: -D stream.map.output.field.separator =, 託梅爾 –

+0

有趣......我將與來試試吧標籤,看看是否它。感謝帖子! – Saul