我在AWS的彈性地圖縮減羣集中運行了大量作業。大概而言,我的意思是我正在處理超過800,000個文件,每個文件有超過25,000條記錄。在我的測試運行中,我一直使用100個m1.medium斑點實例進行處理。AWS EMR機器未合併減少輸出
工作似乎正常運行,但我注意到輸出(part-00000,part-00001等)有多個輸出中列出的相同鍵的記錄。這些應該不會在EMR中降低?
任何有識之士將不勝感激。
我在AWS的彈性地圖縮減羣集中運行了大量作業。大概而言,我的意思是我正在處理超過800,000個文件,每個文件有超過25,000條記錄。在我的測試運行中,我一直使用100個m1.medium斑點實例進行處理。AWS EMR機器未合併減少輸出
工作似乎正常運行,但我注意到輸出(part-00000,part-00001等)有多個輸出中列出的相同鍵的記錄。這些應該不會在EMR中降低?
任何有識之士將不勝感激。
我遇到了同樣的問題 - 我使用EMR來創建一個 「倒排索引」 使用流API:
- 輸入S3N:// mybucket/HTML2 - 輸出S3N:// mybucket /結果-mapper S3N://mybucket/mapper.py -reducer S3N://mybucket/reduce.py
凡// mybucket/HTML2有幾個HTML文件和
mapper.py:
def main(args):
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
#do some preprocessing
if word.startswith("http://"):
#output the URL with a count of 1
print "%s,%s" % (word, 1)
else:
#cleanup HTML tags
url = get_url() #irrelevant
print "%s,%s" % (word, url)
if __name__ == "__main__":
main(sys.argv)
and reduce.p y是:
def main(args):
current_word = None
current_count = 0
current_url_list = []
key = None
for line in sys.stdin:
line = line.strip()
(key, val) = line.split(',', 1)
# If key is a URL - act as word count reducer
if key.startswith("http:"):
# convert count (currently a string) to int
try:
count = int(val)
except:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == key:
current_count += count
else:
if current_word:
#Check if previous word was a regular word
if current_word.startswith('http:'):
print '%s,%s' % (current_word, current_count)
else:
# previous word was a regular word
print '%s,%s' % (current_word, ','.join(current_url_list))
current_count = count
current_word = key
else:
#If key is a word - as act a URL-list-appending reducer
if current_word == key:
if val not in current_url_list:
current_url_list.append(val)
else: #Got to a new key
if current_word:
#Check if previous word was a URL
if(current_word.startswith("http:")):
print '%s,%s' % (current_word, current_count)
else:
# previous word was a regular word
print '%s,%s' % (current_word, ','.join(current_url_list))
current_url_list = []
current_url_list.append(val)
current_word = key
我開始使用AWS控制檯嚮導這個流動(「創建新的工作流」)和除設定輸入,輸出,地圖和減少腳本,我把一切都爲默認值(除日誌路徑)。
在輸出中,我得到的文件很少,在它們中我看到了相同的鍵(每次使用不同的值)。
也許這可以幫助闡明這個問題更多的光線並幫助解決其
在我的情況下,問題出在我的mapper腳本上 - 我用逗號分隔符從map函數發出結果,而Hadoop期待tab分隔符。 因此,key1,value1和key1,value2都被視爲不同的鍵(具有空值),而不是作爲鍵值對,因此可能最終在不同的縮減器中結束。 我通過更改我的映射器解決了問題。 也可以添加在作業流程創建一個選項來指定分隔符: -D stream.map.output.field.separator =, 託梅爾 –
有趣......我將與來試試吧標籤,看看是否它。感謝帖子! – Saul
你確定這關鍵是一個失控在映射階段的關鍵?你有任何定製的分區器或什麼? – Amar
我有一個自定義映射器,它爲鍵組合了兩個字段。我在挖掘過程中發現的一點是,每個減少任務只會結合它所擁有的數據,而不關心如何處理其他減少任務。 – Saul
你是什麼意思,不關心'處理'其他減少任務?減少任務從未與其他減少任務一起工作!根據映射器的輸出鍵對數據進行分區,並且單個鍵的所有輸出必須到同一個reducer,除非分區器中有一些定製邏輯。 – Amar