7

一切本地工作正常,當我做如下:破裂的管道錯誤導致流彈性MapReduce工作在AWS上失敗

cat input | python mapper.py | sort | python reducer.py 

然而,當我運行在AWS彈性MapReduce流的MapReduce作業,作業不成功完成。 mapper.py運行部分路徑(我知道這是因爲沿途寫入stderr)。

java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) 
    at java.io.DataOutputStream.write(DataOutputStream.java:90) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 


2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:106) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:124) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished 
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Bad file descriptor 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:106) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579) 
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished 
2012-03-26 07:19:05,405 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-13): MRErrorThread done 
2012-03-26 07:19:05,408 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1 
2012-03-26 07:19:05,519 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds. 
2012-03-26 07:19:05,520 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation 
2012-03-26 07:19:05,522 WARN org.apache.hadoop.mapred.Child (main): Error running child 
java.io.IOException: log:null 
R/W/S=7018/3/0 in:NA [rec/s] out:NA [rec/s] 
minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null 
HOST=null 
USER=hadoop 
HADOOP_USER=null 
last Hadoop input: |null| 
last tool output: |text/html 1| 
Date: Mon Mar 26 07:19:05 UTC 2012 
java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) 
    at java.io.DataOutputStream.write(DataOutputStream.java:90) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 


    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:125) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 
2012-03-26 07:19:05,525 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task 
2012-03-26 07:19:05,526 INFO org.apache.hadoop.mapred.DirectFileOutputCommitter (main): Nothing to clean up on abort since there are no temporary files written 

這裏是mapper.py:映射器由一個「破管道」的錯誤,這我能夠從任務企圖的日誌檢索失敗後中斷。請注意,我寫信給標準錯誤提供自己與調試信息:

#!/usr/bin/env python 

import sys 
from warc import ARCFile 

def main(): 
    warc_file = ARCFile(fileobj=sys.stdin) 
    for web_page in warc_file: 
     print >> sys.stderr, '%s\t%s' % (web_page.header.content_type, 1) #For debugging 
     print '%s\t%s' % (web_page.header.content_type, 1) 
    print >> sys.stderr, 'done' #For debugging 
if __name__ == "__main__": 
    main() 

以下是我在標準錯誤的任務試圖獲得當mapper.py運行:

text/html 1 
text/html 1 
text/html 1 

基本上,循環運行3次,然後突然停止,沒有python拋出任何錯誤。 (注意:應該可輸出數千行)。即使是未捕獲的異常也應出現在stderr中。

因爲MapReduce在我的本地計算機上運行得很好,我的猜測是這是Hadoop如何處理從mapper.py打印的輸出的問題。但我對這個問題可能會有些無知。

回答

9

您的流式處理過程(您的Python腳本)過早終止。這可能會影響到思考輸入是否完成(例如解釋EOF)或吞下的異常。無論採用哪種方式,Hadoop都試圖通過STDIN將腳本提供給您的腳本,但由於應用程序已終止(因此STDIN不再是有效的文件描述符),您將收到BrokenPipe錯誤。我會建議在腳本中添加stderr跟蹤,以查看哪一行輸入導致問題。快樂編碼,

-Geoff

+4

babonk,可以你提供瞭如何使用這個建議來解決你的問題的細節? – 2013-07-26 00:57:57

+0

相同。我顯然在這裏有一個類似的錯誤:http:// stackoverflow。com/questions/18556270/aws-elastic-mapreduce-doesnt-seem-to-be-being-converting-the-streaming-to-j,並且假設它在管道工作時起作用,我不知道如何「修復「它的流媒體。 – Mittenchops 2013-09-09 03:55:30

1

我用Hadoop在AWS上沒有經驗,但我有一個普通的Hadoop集羣上同樣的錯誤 - 在我的情況下,問題是我如何開始蟒蛇-mapper ./mapper.py -reducer ./reducer.py工作,但-mapper python mapper.py沒」噸。

你似乎也使用非標準的python包warc你提交必要的文件到streamjob嗎? -cacheFiles-cacheArchive可能會有所幫助。

+0

你怎麼包括非標準Python包?尤其是AWS彈性地圖縮減似乎並沒有提供像cachefiles這樣的選項。 – Mittenchops 2013-09-09 03:57:28

6

這是上面說的,但讓我試圖澄清 - 即使你不需要它,你也必須阻止stdin。這是不是與Linux管道相同,所以不要讓那個騙你。直觀地說,Streaming站在你的可執行文件上,然後說:「在我爲你輸入信息的時候在這裏等待」。如果在Streaming向您發送100%的輸入之前,您的可執行文件因任何原因而停止播放,Streaming會說:「嘿,那個可執行文件在哪裏站起來了?......嗯......管道壞了,讓我舉出這個例外!」所以,這裏是一些Python代碼,它的作用是什麼貓,但你會注意到,這段代碼將不會退出,直到所有的輸入進行處理,這是關鍵點:

#!/usr/bin/python 
import sys 

while True: 
    s = sys.stdin.readline() 
    if not s: 
     break 
    sys.stdout.write(s) 
+1

我得到這個錯誤,因爲我沒有做任何輸入。我添加了這個代碼(儘管它對我沒有任何幫助)並且錯誤消失了。 – schoon 2014-08-20 15:58:46