2012-06-14 152 views
2

所以我只是想多處理並閱讀文本文檔中的每一行。有660918行,我知道所有這些都是相同的長度。雖然用下面的代碼,線條的長度似乎改變了,但我不明白爲什麼。多處理問題

import multiprocessing 

class Worker(multiprocessing.Process): 
    def __init__(self,in_q): 
     multiprocessing.Process.__init__(self) 
     self.in_q = in_q 
    def run(self):  
     while True: 
      try: 
       in_q.get() 
       temp_line = short_file.readline() 
       temp_line = temp_line.strip().split() 
       print len(temp_line) 
       self.in_q.task_done() 
      except:        
       break  

if __name__ == "__main__": 
    num_proc = 10 
    lines = 100000 #660918 is how many lines there actually are 
    in_q = multiprocessing.JoinableQueue() 
    File = 'HGDP_FinalReport_Forward.txt' 
    short_file = open(File) 

    for i in range(lines): 
     in_q.put(i)  

    for i in range(num_proc): 
     worker = Worker(in_q) 
     worker.start() 
    in_q.join() 
+2

爲什麼不讀取主進程中的行,並將它們用'multiprocessing.Pool'處理到子進程? – robert

+0

那麼上面並沒有真正回答我的問題,除非由於某種原因,彙集會使所有行讀取相同的長度。 – user1423020

+0

@ user1423020:令人驚訝的是,它實際上會使行讀取相同的長度。 – abarnert

回答

7

您正在打開主進程中的文件,然後從子進程中讀取該文件。你不能那樣做。

深層次地講,文件對象實際上是一個原始文件句柄和一個內存緩衝區。每個進程共享文件句柄,但每個進程都有自己的內存緩衝區。

假設所有的行都是50字節,而內存緩衝區是4096字節。

進程1調用readline,它從文件中讀取字節0-4095到其緩衝區,然後掃描該緩衝區以找到50字節的換行符,並返回前50個字節。到現在爲止還挺好。

進程2調用readline,它從文件讀取字節4096-8191到其緩衝區,然後掃描該緩衝區以換行。第一個是4100,它是5個字節,所以它返回前5個字節。

依此類推。

你理論上可以通過做非緩衝I/O來解決這個問題,但是真的,爲什麼?爲什麼不只是讀你的主要過程中的線?除了避免這個問題,這也可能會提高並行性--I/O本質上是順序的,因此所有這些進程將大部分時間都花在I/O上,這意味着它們對你沒有任何好處。作爲一個附註,在運行循環的頂部附近,您正在執行in_q.get()而不是self.in_q.get()。 (這是因爲in_q是一個全局變量,永遠不會消失,self.in_q只是它的一個副本,但你不想依賴它)。

+0

哇,很好的答案。我希望我可以多勞多得。 – steveha

+0

+1:非常清晰的答案。 – EOL

+0

這有幫助,但我有點困惑於如何組織代碼,以便我閱讀主要代替兒童的代碼。 – user1423020

1

因此,我將其更改爲使用Pool ,它似乎工作。以下更好?

import multiprocessing as mp 

File = 'HGDP_FinalReport_Forward.txt' 
#short_file = open(File) 
test = [] 

def pro(temp_line): 
    temp_line = temp_line.strip().split() 
    return len(temp_line) 

if __name__ == "__main__": 
    with open("HGDP_FinalReport_Forward.txt") as lines: 
     pool = mp.Pool(processes = 10) 
     t = pool.map(pro,lines.readlines()) 
    print t 
+0

你可能不應該添加這個答案,但是......我不確定你應該如何添加它。無論如何,是的,我會說它更好。首先,因爲你正在讀取父進程中的所有文件,它現在可以工作。這也意味着你正在分發CPU綁定的工作,而不是I/O綁定的工作,分佈式工作中沒有任何阻塞,這意味着更高的效率和並行性。使用使得它更短,更易於閱讀,並且更難以在(比如in_q而不是self.in_q)中產生微小的錯誤。 – abarnert