2017-06-21 80 views
0

我想在處理超過2GB的csv文件時使用多處理器。問題在於輸入只是在一個進程中被消耗,而其他進程似乎是空閒的。迭代器上的Python多處理器

以下重新創建我遇到的問題。是否有可能使用迭代器使用多進程?將內存全部輸入到內存中是不理想的。

import csv 
import multiprocessing 
import time 

def something(row): 
    # print row[0] 
    # pass 
    return row 

def main(): 
    start = time.time() 
    i = open("input.csv") 
    reader = csv.reader(i, delimiter='\t') 

    print reader.next() 

    p = multiprocessing.Pool(16) 
    print "Starting processes" 
    j = p.imap(something, reader, chunksize=10000) 

    count= 1 
    while j: 
     print j.next() 

    print time.time() - start 


if __name__ == '__main__': 
    main() 

回答

1

我認爲你很困惑「進程」與「處理器」。

您的程序肯定是同時產生多個進程,您可以在程序運行時在系統或資源監視器中進行驗證。主要使用的處理器或CPU內核數量主要取決於操作系統,並且與委派給每個進程的任務密集程度有關。

做一點點修改你的something功能,引入睡眠時間,模擬工作正在該函數來完成:

def something(row): 
    time.sleep(.4) 
    return row 

現在,先在你的文件中運行功能順序您的每一行,並注意到每個結果都會以每400毫秒一個一個的速度出現。

def main(): 
    with open("input.csv") as i: 
     reader = csv.reader(i) 
     print (next(reader)) 

     # SEQUENTIALLY: 
     for row in reader: 
      result = something(row) 
      print (result) 

現在嘗試與工人的池。保持在一個較低的數字,說4名工人,你會看到的結果是每年400毫秒,但是在4(或大致工人池中的數量)的羣體:

def main(): 
    with open("input.csv") as i: 
     reader = csv.reader(i) 
     print (next(reader)) 

     # IN PARALLEL 
     print ("Starting processes") 
     p = multiprocessing.Pool(4) 
     results = p.imap(something, reader) 
     for result in results: 
      print(result) # one result is the processing of 4 rows... 

雖然並行運行,檢查系統監視器並查找正在執行多少「python」進程。應該是一個加上工人的數量。

我希望這個解釋很有用。

+0

你的回答非常有幫助,但它似乎提出了幾個問題。當我運行代碼而沒有在()中加入睡眠時,一個進程會消耗大量內存。當我將睡眠(0.4)添加到某物中時,此問題不存在。這有什麼理由嗎? – BHa

+0

我能說什麼?這些過程確實並行運行。但是如果不知道關於你的數據和你的功能的其他信息,我不能告訴你其他任何事情。如果您的進程比CPU密集型的更多,那麼多處理可能無濟於事。 – chapelo