2015-03-19 62 views
0

我有一個多處理任務,用於處理輸入數據並將結果寫入臨時文件(以備後用)。但是,當我嘗試通過隊列將文件句柄傳輸到父進程時,它會失敗(不會引發異常,但隊列仍爲空)。在Python中通過隊列傳輸文件對象

import multiprocessing, tempfile 

def worker(i): 
    my_data_object = [] 
    my_tmp_file = tempfile.NamedTemporaryFile('wb') 
    my_tmp_file.write(bytes('Hello world #{}'.format(i), 'utf-8')) 
    my_tmp_file.seek(0) 
    queue.put(my_tmp_file) 

queue = multiprocessing.Queue() 

print('Writing...') 
proc = [] 
for i in range(16): 
    proc.append(multiprocessing.Process(target = worker, args = (i,))) 
    proc[i].start() 
for p in proc: 
    p.join() 

print('Reading...') 
my_strings = [] 
while True: 
    try: 
     tmp_file = queue.get_nowait() 
    except: 
     print('All data are read. Queue is now empty') 
     break 
    my_strings.append(tmp_file.read()) 
    tmp_file.close() 

print('Files content: ', my_strings) 
print('Successful termination') 

有沒有人知道解決方案?

回答

0

保持開放的文件似乎會造成問題,如果你打電話給你的工人函數讀取和關閉它的工作原理後:

from multiprocessing import Process, Queue 

def worker(i,queue): 
    my_tmp_file = tempfile.NamedTemporaryFile() 
    my_tmp_file.write(bytes('Hello world #{}'.format(i), 'utf-8')) 
    my_tmp_file.seek(0) 
    queue.put(my_tmp_file.read()) 
    my_tmp_file.close() 

q = Queue() 

processes = [Process(target=worker, args=(i, q)) for i in range(16)] 

for p in processes: 
    p.start() 

for p in processes: 
    p.join() 

while q.qsize(): 
    out = q.get() 
    print(out) 

如果你試圖關閉文件對象不讀,你會得到一個TypeError: cannot serialize '_io.FileIO' object作爲不可打開的_io.FileIO對象。

什麼可能取決於你想要做的就是把.NAME隊列和刪除設置爲False,並重新打開文件有什麼幫助:

import multiprocessing, tempfile 

def worker(i): 
    with tempfile.NamedTemporaryFile(delete=False) as my_tmp_file: 
     my_tmp_file.write(bytes('Hello world #{}'.format(i), 'utf-8')) 
     my_tmp_file.seek(0) 
     queue.put(my_tmp_file.name) 

queue = multiprocessing.Queue() 

print('Writing...') 
proc = [] 
for i in range(16): 
    proc.append(multiprocessing.Process(target = worker, args = (i,))) 
    proc[i].start() 
for p in proc: 
    p.join() 

print('Reading...') 
my_strings = [] 
while True: 
    try: 
     tmp_file = queue.get_nowait() 
    except Exception as e: 
     print('All data are read. Queue is now empty') 
     break 
    with open(tmp_file) as f: 
     my_strings.append(f) 

但你仍然需要重新打開該文件,因此不能確定如果有任何好處將會發生什麼。