2012-07-05 51 views
1

我有一個大規模的日誌處理問題,我必須在hadoop集羣上運行。其任務是將日誌的每一行填入可執行文件「cmd」中,並檢查結果以決定是否保留這一行日誌。在Hadoop流媒體中損壞的python管道

由於「cmd」程序打開了一個非常大的字典,我無法負擔調用該日誌的everyline程序。我想保持它運行並將所需的輸入提供給它。蟒蛇我目前的解決方案使用子模塊,在這裏是代碼:

import sys 
from subprocess import Popen, PIPE 

def main(): 
    pp = Popen('./bqc/bqc/bqc_tool ./bqc/bqc/bqc_dict/ ./bqc/bqc/word_dict/ flag', shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) 

    for line in sys.stdin: 
     lstr = line.strip().split('\t') 
     if len(lstr) != 7: 
      continue 
     pp.stdin.write('%s\n' % lstr[5]) 
     pp.stdin.flush() 
     out = pp.stdout.readline() 
     lout = out.strip().split('\t') 
     if len(lout) == 3 and lout[1] == '401': 
      print line.strip() 

if __name__ == '__main__': 
    main() 

上面的代碼工作找到我的本地機器測試時。 將作業提交給hadoop時,它被用作映射器。我不使用減速器,以下是配置。

hadoop streaming \ 
-input /path_to_input \ 
-output /path_to_output \ 
-mapper "python/python2.7/bin/python27.sh ./mapper.py" \ 
-cacheArchive /path_to_python/python272.tar.gz#python \ 
-cacheArchive /path_to_cmd/bqc.tar.gz#bqc \ 
-file ./mapper.py \ 
-jobconf mapred.job.name="JobName" \ 
-jobconf mapred.job.priority=HIGH 

在bqc.tar.gz這些文件看起來是這樣的:

bqc/ 
bqc/bqc_tool 
bqc/bqc_dict/ 
bqc/word_dict/ 

在我看來,該行 「-cacheArchive /path_to_cmd/bqc.tar.gz#bqc \」 應提取tar文件並將其解壓縮到名爲bqc的文件夾中。

但是,當出現以下錯誤信息提交到Hadoop集羣失敗:

Traceback (most recent call last): 
     File "./mapper.py", line 19, in 
     main() 
     File "./mapper.py", line 11, in main 
     pp.stdin.write('%s\n' % lstr[5]) 
    IOError: [Errno 32] Broken pipe 
    java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 
     at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:335) 
     at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:590) 
     at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:152) 
     at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) 
     at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:18) 
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:388) 
     at org.apache.hadoop.mapred.Child.main(Child.java:194) 
    java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 
     at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:335) 
     at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:590) 
     at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:163) 
     at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61) 
     at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:18) 
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:388) 
     at org.apache.hadoop.mapred.Child.main(Child.java:194) 

任何人都得到一個想法?任何幫助,將不勝感激!

謝謝!

扎卡里

+0

你爲什麼認爲'./cmd'存在?在這裏如何使用Popen存在很多問題 – jfs 2012-07-05 13:07:34

+0

我同意@ J.F.Sebastian - 您確定cmd在運行時當前的工作目錄中可用嗎?你如何提交你的工作 - python代碼和這個cmd程序是否捆綁成一個zip文件,然後使用-archives generic hadoop選項進行部署? – 2012-07-05 22:57:46

+0

@ChrisWhite - 感謝您的提示。以上代碼在流式作業中用作映射器,我不需要減速器。映射器通過使用-file選項進行緩存。 cmd可執行文件的存檔及其必需文件的存檔將使用-cacheArchive選項進行分配和分發。讓我編輯並添加我使用的配置。 – FreezingGod 2012-07-06 01:45:29

回答

2

神祕解決了!這應該是由於hadoop施加的內存限制導致命令無法成功加載。該命令需要大約2G內存,hadoop配置爲每個節點大約800MB。

+0

你能否提供更多的上下文(例如,你如何表達800MB,哪個配置參數?,你是如何增加到2GB的?是否真的解決了你的問題?) – 2012-10-30 22:32:15