2011-08-13 48 views
3

我有一個奇怪的問題。我有格式的文件:爲什麼在python map()和multiprocessing.Pool.map()有不同的答案?

START 
1 
2 
STOP 
lllllllll 
START 
3 
5 
6 
STOP 

和我想讀STARTSTOP之間的線塊,而使用my_f來處理每個塊。

​​

在我的主要功能中,我嘗試使用map()完成工作。有效。

blocks=block_generator(file) 
map(my_f,blocks) 

實際上會給我我想要的。但是,當我試圖用multiprocessing.Pool.map()同樣的事情,它給了我一個錯誤說takewhile()想帶2個參數,給出0

blocks=block_generator(file) 
    p=multiprocessing.Pool(4) 
    p.map(my_f,blocks) 

這是一個錯誤?

  1. 該文件有超過1000000個塊,每個塊少於100行。
  2. 我接受答案表單unubu。
  3. 但也許我會簡單地拆分文件,並使用我的原始腳本沒有多處理n實例來處理它們,然後貓結果在一起。這樣,只要腳本在小文件上工作,就永遠不會出錯。
+1

如果你使用'block = list(itertools.takewhile(lambda x:x!='STOP',lines))''會發生什麼,所以你沒有一次運行多個迭代器? – agf

+0

嗨@agf謝謝,它現在給出正確的結果。但速度很慢。就好像p.map想要首先讀取內存中的整個文件一樣。或類似的東西。但問題仍然存在,爲什麼? – gstar2002

回答

2

如何:

import itertools 

def grouper(n, iterable, fillvalue=None): 
    # Source: http://docs.python.org/library/itertools.html#recipes 
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx" 
    return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue) 

def block_generator(file): 
    with open(file) as lines: 
     for line in lines: 
      if line == 'START': 
       block=list(itertools.takewhile(lambda x:x!='STOP',lines)) 
       yield block 

blocks=block_generator(file) 
p=multiprocessing.Pool(4) 
for chunk in grouper(100,blocks,fillvalue=''): 
    p.map(my_f,chunk) 

使用grouper將限制由p.map消耗的文件的數量。因此整個文件不需要一次讀入內存(送入任務隊列)。


我要求上面,當你調用p.map(func,iterator),整個迭代器immediatedly消耗填寫任務隊列。池中的工作人員然後從隊列中獲取任務並同時處理這些工作。

如果你看裏面0​​和跟蹤通過定義,你會看到 的_handle_tasks線程會從self._taskqueue項目,並列舉了一次:

  for i, task in enumerate(taskseq): 
      ... 
      put(task) 

的結論是,傳遞給p.map迭代得到立即消耗。在下一個任務從隊列中獲得之前,不需要等待一個任務結束。

作爲進一步的佐證,如果運行此:

演示代碼:

import multiprocessing as mp 
import time 
import logging 

def foo(x): 
    time.sleep(1) 
    return x*x 

def blocks(): 
    for x in range(1000): 
     if x%100==0: 
      logger.info('Got here') 
     yield x 

logger=mp.log_to_stderr(logging.DEBUG) 
logger.setLevel(logging.DEBUG) 
pool=mp.Pool() 
print pool.map(foo, blocks()) 

您將看到Got here消息印刷10次,幾乎立即,再長的停頓由於time.sleep(1)致電foo。這清楚地表明迭代器在池進程完成任務之前很久就被完全消耗掉了。

+0

這沒有任何意義。這樣,你一次讀100個塊到內存中,然後並行地使用它們。如果沒有石斑魚,你只能在每個進程中將一個塊讀入內存,因此一次只能有4個塊。他的問題在於每個'list'調用正在消耗'taketime'的時​​間 - 你沒有做任何事情來加快速度。解決方案是不使用'list'。 – agf

+0

也許你是對的,它看起來像'Pool.map_async'做'list()'迭代。但是,這仍然假定文件中有很多相對較短的塊。如果文件中的塊數少於100個(相對較長),這根本不會改進,如果塊數低於100,則不會有太大改進。完全擺脫添加的list()(我只將其作爲診斷工具)適用於任意數量的塊。我看到你刪除了你的評論,我會離開這個。 – agf

+0

@agf:我添加了一些代碼,我認爲在工作進程完成(幾乎)完成任何任務之前,生成器已完全耗盡。關於我在代碼中使用的塊大小 - 這僅僅是一個例子。 OP可以將其改變爲任何對他的問題有意義的事情。他沒有告訴我們有多少個START/STOP塊,但從他的例子來看,它們看起來很短。由於讀取整個文件會導致內存問題,因此我推斷必須有很多短塊。 – unutbu

1

基本上,當你迭代文件時,就像你一樣,每次從文件中讀取新行時,都會將文件指針向前移動一行。

所以,當你

block=itertools.takewhile(lambda x:x!='STOP',lines) 

每次做的takewhile返回的迭代器lines得到一個新的項目,它移動文件指針。

for循環中推進已經循環的迭代器通常是不好的。但是,for循環暫時暫停在每個yield上,並且map用盡takewhile,然後繼續for循環,以便獲得所需的行爲。

當您同時運行for循環和takewhile時,文件指針快速移動到最後,並且出現錯誤。

試試這個,它應該比包裹快於takewhile一個list

from contextlib import closing 
from itertools import repeat 

def block_generator(filename): 
    with open(filename) as infile: 
     for pos in (infile.tell() for line in infile if line == 'START'): 
      yield pos 

def my_f_wrapper(pos, filename): 
    with open(filename) as infile: 
     infile.seek(pos) 
     block=itertools.takewhile(lambda x:x!='STOP', infile) 
     my_f(block) 

blocks = block_generator(filename) 
p.imap(my_f_wrapper, blocks, repeat(filename)) 

基本上,你希望每個my_f獨立的文件進行操作,所以你需要單獨打開文件每一個。

我不能想到一種方法,不需要將文件迭代兩次,一次由for循環和一次由takewhile組合在一起,同時仍然並行處理文件。在您的原始版本中,takewhile提高了for循環的文件指針,因此效率非常高。

如果您不是遍歷行,但只是字節,我建議使用mmap,但如果您使用的是文本行,它會使事情變得複雜得多。

編輯:另一種方法是有block_generator經過文件並找到STARTSTOP所有的位置,然後餵它們成對包裝。這樣,包裝將不必將行與STOP進行比較,它只需在文件上使用tell()以確保它不在STOP。我不確定這是否會更快。

+1

'p.imap(my_f_wrapper,blocks)'立即完全消耗'blocks'。我的答案後半部分的演示代碼證實了這一說法。如果有很多START/STOP塊,那麼這意味着在任務隊列中會有很多打開的文件對象在等待。我認爲這不是一個好主意,因爲操作系統會限制進程可以同時打開的文件描述符的數量。 – unutbu

+0

我實際上雖然這(根據您的信息,可迭代被消耗)迴應您的文章,並已移動文件打開到包裝。好的音符雖然。 – agf

+0

當CPU問題出現時,'multiprocessing'可以有效。 IO越多,'multiprocessing'變得越不有效。 'my_f_wrapper'增加每個工作進程必須執行的IO數量。文件不僅打開一次才能找到所有的搜索位置,每個START/STOP塊都會打開,查找,讀取和關閉文件。 – unutbu

相關問題