2012-03-27 60 views
2

已經有類似的問題被問到(和回答),但從來沒有真正在一起,我似乎無法獲得任何工作。由於我剛剛開始使用Python,所以很容易理解的東西會很棒!連接大文件,管道和獎金

我有3個大型數據文件(> 500G),我需要解壓,連接,管道到一個子進程,然後管輸出到另一個子進程。然後我需要處理我想用Python做的最終輸出。注意除了處理之外,我不需要解壓縮和/或拼接文件 - 創建一個我認爲會浪費空間的文件。這是我到目前爲止...

import gzip 
from subprocess import Popen, PIPE 

#zipped files 
zipfile1 = "./file_1.txt.gz" 
zipfile2 = "./file_2.txt.gz" 
zipfile3 = "./file_3.txt.gz" 


# Open the first pipe 
p1 = Popen(["dataclean.pl"], stdin=PIPE, stdout=PIPE) 

# Unzip the files and pipe them in (has to be a more pythonic way to do it - 
# if this is even correct) 
unzipfile1 = gzip.open(zipfile1, 'wb') 
p1.stdin.write(unzipfile1.read()) 
unzipfile1.close() 

unzipfile2 = gzip.open(zipfile2, 'wb') 
p1.stdin.write(unzipfile2.read()) 
unzipfile2.close() 

unzipfile3 = gzip.open(zipfile3, 'wb') 
p1.stdin.write(unzipfile3.read()) 
unzipfile3.close() 


# Pipe the output of p1 to p2 
p2 = Popen(["dataprocess.pl"], stdin=p1.stdout, stdout=PIPE) 

# Not sure what this does - something about a SIGPIPE 
p1.stdout.close() 

## Not sure what this does either - but it is in the pydoc 
output = p2.communicate()[0] 

## more processing of p2.stdout... 
print p2.stdout 

任何意見將不勝感激。 *作爲獎勵問題...讀取()的pydoc說:

「還要注意,在非阻塞模式下,即使沒有給出大小參數,也可能返回比請求的數據更少的數據。 「

這似乎很可怕。任何人都可以解釋它嗎?我不想只讀取數據集的一部分,認爲它是整個事情。我認爲留下文件的大小是件好事,特別是當我不知道文件的大小時。

感謝,

GK

+0

您確定要使用Python來處理超過1 TB的數據嗎?解壓縮,連接和管道正好在shell腳本或批處理文件的小巷中。 – 2012-03-27 01:52:01

+0

我會盡量避免一次加載那麼多的數據。你究竟想要對數據做什麼?你可以用一系列的發電機來完成這個任務。 – 2012-03-27 01:55:46

+0

目前使用bash腳本完成一些數據清理的Perl腳本調用,然後使用C++腳本進行一些分析(非常大的fMRI文件)。我試圖給原始的bash腳本添加更多的功能,但是它變得有點冗長乏味。我想我會給python一個去。聽起來這是一個壞主意? – user1294223 2012-03-27 02:10:53

回答

4

首先第一件事情;我認爲你的模式不正確:

unzipfile1 = gzip.open(zipfile1, 'wb') 

This should open zipfile1 for writing,not reading。我希望你的數據仍然存在。

其次,你不想試圖一次處理全部數據。您應該使用16k或32k或其他的數據塊來處理數據。 (最佳尺寸將根據許多因素而變化;如果此任務必須多次完成才能配置,因此您可以設定不同的尺寸。)

您要查找的內容可能更像是這樣的未經測試的僞代碼,代碼:

while (block = unzipfile1.read(4096*4)): 
    p1.stdin.write(a) 

如果你想多進程掛鉤在一起,Python中的管道,那麼它可能會看起來更像是這樣的:

while (block = unzipfile1.read(4096*4)): 
    p1.stdin.write(a) 
    p2.stdin.write(p1.stdout.read()) 

這使輸出p1p2儘快BLE。我已經假設p1不會產生比它給出的更多的輸入。如果p1的輸出將比輸入大10倍,那麼你應該製作另一個類似於這個循環的循環。


但是,我不得不說,這種感覺就像很多額外的工作,以複製的shell腳本:

gzip -cd file1.gz file2.gz file3.gz | dataclean.py | dataprocess.pl 

​​因爲我會自動處理該塊大小的數據傳輸VE如上所述,並且假設你dataclean.pydataprocess.pl腳本數據工作塊,而不是執行全讀取(你原來這個腳本的版本一樣),那麼就應該在靠近自己最大的能力全部被並聯運行。

+0

我想讓Python做的大部分工作是選擇我想要的某些文件(取決於一週中的某一天,文件的可用性等)。此外,還想調用不同的處理腳本,具體取決於我想要做什麼,以及文件的可用性。我想我可以用Python做這些事情,創建一個字符串,然後調用一個os.system()。這會是一個更好的主意嗎? – user1294223 2012-03-27 02:38:25

+0

如果用'os.system()'[你的意思是'subprocess.call()'](http://stackoverflow.com/questions/204017/how-do-i-execute-a-program-from-python- os-system-due-to-spaces-in-path),那麼是的。 ;)這是一個理智的方式來做到這一點。 – 2012-03-27 04:31:28

+0

如果你要用腳本做更多,那麼是的,這很有道理。李昂建議使用'subprocess.call()'值得注意。 :) – sarnold 2012-03-27 22:25:31