2017-06-16 55 views
0

當試圖使用多處理庫(python 3.5)時,我遇到了不會結束的函數。似乎所有要處理,但(主)程序不會繼續......Python多處理:導入的函數沒有結束

我目前的設置如下:

# Main.py 

import multiprocessing as mp 
import pandas as pd 
from dosomething import dosomething 

csvfolder = 'data/' 

data = pd.DataFrame([ 
     {'a':12,'b':13}, 
     {'a':2,'b':14}, 
     {'a':1,'b':23}, 
     {'a':123,'b':16}, 
     {'a':142,'b':14}, 
     ]) 

print('start') 

result = mp.Queue() 
dos = mp.Process(target=dosomething, args=(data,csvfolder,result,'dosomething')) 
dos.start() 
dos.join() 
result.get() 
print('finished') 

然後在DoSomething的我已經定義了一個函數DoSomething的,做以下操作:

# dosomething.py 
import os 
def dosomething(data,csvfolder,result,name): 
    data.to_csv(os.path.join(csvfolder,'test.csv')) 
    result.put({name:{'data':data}}) 

看來該函數按預期執行,但永遠不會結束導致主程序停頓。當結束程序我得到以下信息:

流程流程1:回溯(最近通話最後一個):文件 「/usr/lib/python3.5/multiprocessing/process.py」,線路252,在 _bootstrap util._exit_function()文件 「/usr/lib/python3.5/multiprocessing/util.py」,線314,在 _exit_function _run_finalizers()文件「/usr/lib/python3.5/multiprocessing/ util.py「,第254行,在 _run_finalizers finalizer()文件」/usr/lib/python3.5/multiprocessing/util.py「,第186行,在調用 res = self._callback(* self。 _args,**自我._kwargs)文件「/usr/lib/python3.5/multiprocessing/queues.py」,第198行,在 _finalize_join thread.join()文件「/usr/lib/python3.5/threading.py」,行1054,在加入 self._wait_for_tstate_lock()文件 「/usr/lib/python3.5/threading.py」,線1070,在_wait_for_tstate_lock elif的lock.acquire(框,超時):一個KeyboardInterrupt

基於在評論中,我瞭解到result.put()是(當使用實際數據時)很多時間,變得沒有反應。我在這個隊列中放置的結果是一個包含兩個元素的字典,其中一個是熊貓數據框(幾萬條記錄)。

我該如何解決這個問題?

+0

我跑你的代碼,它工作正常。它創建test.csv,打印「完成」並退出。 – Hannu

+0

根據您的評論編輯該問題。 –

回答

0

multiprocessing guidelines

...無論何時您使用隊列,您都需要確保放入隊列的所有項目在加入之前最終都會被移除。

只需交換Process.joinQueue.get行即可使其工作。