2016-11-24 112 views
1

我有一個multiprocessing腳本與pool.map工作。問題是並非所有進程都需要很長時間才能完成,因此有些進程會因爲等待所有進程完成而睡着(與this question中的問題相同)。一些文件在不到一秒鐘內完成,其他文件需要幾分鐘(或幾小時)。Python多處理池地圖和imap

如果我正確理解了手冊(and this post),則pool.imap並未等待所有進程完成,如果完成,它將提供一個新文件進行處理。當我嘗試這些時,腳本正在加速處理文件,小文件按預期處理,大文件(需要更多時間處理)直到最後纔會結束(在沒有通知的情況下死亡?)。這是pool.imap的正常行爲,還是我需要添加更多命令/參數?當我在else部分中添加time.sleep(100)作爲測試時,它正在處理更大的文件,但其他進程睡着了。有什麼建議麼 ?謝謝

def process_file(infile): 
    #read infile 
    #compare things in infile 
    #acquire Lock, save things in outfile, release Lock 
    #delete infile 

def main(): 
    #nprocesses = 8 
    global filename 
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9'] 
    for d in pathlist: 
     os.chdir(d)  
     todolist = [] 
     for infile in os.listdir(): 
      todolist.append(infile) 
     try: 
      p = Pool(processes=nprocesses) 
      p.imap(process_file, todolist) 
     except KeyboardInterrupt:     
      print("Shutting processes down") 
      # Optionally try to gracefully shut down the worker processes here.  
      p.close() 
      p.terminate() 
      p.join() 
     except StopIteration: 
      continue  
     else: 
      time.sleep(100) 
      os.chdir('..') 
     p.close() 
     p.join() 

if __name__ == '__main__': 
    main()  
+0

我一直在思考的'imap'問題。 'Map'正在等待所有進程完成以返回結果。 Imap'在第一個過程完成後立即返回結果,並可能終止其他過程並給出所有新的工作。這是正確的嗎? – avierstr

回答

1

由於您已將所有文件放入列表中,因此可以將它們直接放入隊列中。然後,隊列將與您的子進程共享,從隊列中獲取文件名並執行其操作。不需要兩次(首先進入列表,然後通過Pool.imap進入pickle列表)。 Pool.imap正在做同樣的事情,但沒有你知道它。

todolist = [] 
for infile in os.listdir(): 
    todolist.append(infile) 

可以被替換爲:

todolist = Queue() 
for infile in os.listdir(): 
    todolist.put(infile) 

然後將完整的解決方案看起來像:

def process_file(inqueue): 
    for infile in iter(inqueue.get, "STOP"): 
     #do stuff until inqueue.get returns "STOP" 
    #read infile 
    #compare things in infile 
    #acquire Lock, save things in outfile, release Lock 
    #delete infile 

def main(): 
    nprocesses = 8 
    global filename 
    pathlist = ['tmp0', 'tmp1', 'tmp2', 'tmp3', 'tmp4', 'tmp5', 'tmp6', 'tmp7', 'tmp8', 'tmp9'] 
    for d in pathlist: 
     os.chdir(d)  
     todolist = Queue() 
     for infile in os.listdir(): 
      todolist.put(infile) 
     process = [Process(target=process_file, 
         args=(todolist) for x in range(nprocesses)] 
     for p in process: 
      #task the processes to stop when all files are handled 
      #"STOP" is at the very end of queue 
      todolist.put("STOP") 
     for p in process: 
      p.start() 
     for p in process: 
      p.join()  
if __name__ == '__main__': 
    main() 
+0

非常感謝RaJa!現在它按我的意圖工作。爲了完整性:'範圍(nprocesses)]中的'args =(todolist)]'必須是'範圍(nprocesses)中的x'的'args =(todolist,))'''。那天晚上我一直在Queue嘗試,但到目前爲止發生了很多錯誤。現在我很清楚它是如何工作的! – avierstr