如何:
import itertools
def grouper(n, iterable, fillvalue=None):
# Source: http://docs.python.org/library/itertools.html#recipes
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)
def block_generator(file):
with open(file) as lines:
for line in lines:
if line == 'START':
block=list(itertools.takewhile(lambda x:x!='STOP',lines))
yield block
blocks=block_generator(file)
p=multiprocessing.Pool(4)
for chunk in grouper(100,blocks,fillvalue=''):
p.map(my_f,chunk)
使用grouper
將限制由p.map
消耗的文件的數量。因此整個文件不需要一次讀入內存(送入任務隊列)。
我要求上面,當你調用p.map(func,iterator)
,整個迭代器immediatedly消耗填寫任務隊列。池中的工作人員然後從隊列中獲取任務並同時處理這些工作。
如果你看裏面0和跟蹤通過定義,你會看到 的_handle_tasks
線程會從self._taskqueue
項目,並列舉了一次:
for i, task in enumerate(taskseq):
...
put(task)
的結論是,傳遞給p.map
迭代得到立即消耗。在下一個任務從隊列中獲得之前,不需要等待一個任務結束。
作爲進一步的佐證,如果運行此:
演示代碼:
import multiprocessing as mp
import time
import logging
def foo(x):
time.sleep(1)
return x*x
def blocks():
for x in range(1000):
if x%100==0:
logger.info('Got here')
yield x
logger=mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG)
pool=mp.Pool()
print pool.map(foo, blocks())
您將看到Got here
消息印刷10次,幾乎立即,再長的停頓由於time.sleep(1)
致電foo
。這清楚地表明迭代器在池進程完成任務之前很久就被完全消耗掉了。
如果你使用'block = list(itertools.takewhile(lambda x:x!='STOP',lines))''會發生什麼,所以你沒有一次運行多個迭代器? – agf
嗨@agf謝謝,它現在給出正確的結果。但速度很慢。就好像p.map想要首先讀取內存中的整個文件一樣。或類似的東西。但問題仍然存在,爲什麼? – gstar2002