2012-12-27 30 views
1

我嘗試apply_async到共用的反傳給任務多,但它不能與這樣的錯誤「RuntimeError:同步的對象只能進程之間通過繼承共享」。這是怎麼回事Python的多apply_async +價值

def processLine(lines, counter, mutex): 
    pass 

counter = multiprocessing.Value('i', 0) 
mutex = multiprocessing.Lock() 
pool = Pool(processes = 8) 
lines = [] 

for line in inputStream: 
    lines.append(line) 
    if len(lines) >= 5000: 
     #don't queue more than 1'000'000 lines 
     while counter.value > 1000000: 
       time.sleep(0.05) 
     mutex.acquire() 
     counter.value += len(lines) 
     mutex.release() 
     pool.apply_async(processLine, args=(lines, counter,), callback = collectResults) 
     lines = [] 

回答

0

我解決它在這樣的不優雅的方式

def processLine(lines): 
    pass 

def collectResults(result): 
    global counter 
    counter -= len(result) 

counter = 0 
pool = Pool(processes = 8) 
lines = [] 

for line in inputStream: 
    lines.append(line) 
    if len(lines) >= 5000: 
     #don't queue more than 1'000'000 lines 
     while counter.value > 1000000: 
      time.sleep(0.05) 
     counter.value += len(lines) 
     pool.apply_async(processLine, args=(lines), callback = collectResults) 
     lines = [] 
+0

我明白,這是一些最起碼的片段,但我擔心的是你的回調函數沒有做你的期望。由於'processLine'沒有返回值,所以你真的不應該調用'len(result)',因爲它沒有意義。如果你要回答你自己的問題,你應該保持完全獨立。 – Hooked

1

讓池處理調度:

for result in pool.imap(process_single_line, input_stream): 
    pass 

如果順序並不重要:

​​

pool.*map*() functi有chunksize論點,你可以改變,看看它是否影響你的情況下的性能。

如果你的代碼需要多條線路在單個呼叫傳遞:

from itertools import izip_longest 

chunks = izip_longest(*[iter(inputStream)]*5000, fillvalue='') # grouper recipe 
for result in pool.imap(process_lines, chunks): 
    pass 

一些替代品來限制排隊的項數是:

  • multiprocessing.Queue與設置最大大小(你不」在這種情況下需要一個池)。直到其它過程調用queue.get()
  • 手工執行使用多基元如條件或BoundedSemaphor生產者/消費者圖案的queue.put()將阻止達到最大大小時。

注意:每個值都有關聯的鎖,您不需要單獨的鎖。