2017-03-03 149 views
2

我正面臨以下問題。我試圖並行化一個更新文件的函數,但由於OSError: [Errno 12] Cannot allocate memory,我無法啓動Pool()。我已經開始在服務器上四處查看了,這並不像我使用的是舊的,弱的/不是實際的內存。 見htopenter image description here 此外,free -m顯示我有足夠的可用RAM除了交換內存〜7GB: enter image description here 我試圖與之合作的文件並不大任。我將在下面粘貼我的代碼(和堆棧跟蹤),那裏的大小如下:Python多處理 - 調試OSError:[Errno 12]無法分配內存

predictionmatrix數據幀佔用了約ca. 80MB根據pandasdataframe.memory_usage() 文件geo.geojson是2MB

我該如何去調試呢?我可以檢查什麼以及如何?感謝您提供任何提示/技巧!

代碼:

def parallelUpdateJSON(paramMatch, predictionmatrix, data): 
    for feature in data['features']: 
     currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch] 
     if (len(currentfeature) > 0): 
      feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}}) 
     else: 
      feature['properties'].update({"style": {"opacity": 0}}) 

def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix): 
    with open('geo.geojson') as f: 
     data = json.load(f) 
    paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict) 
    pool = Pool() 
    func = partial(parallelUpdateJSON, paramMatch, predictionmatrix) 
    pool.map(func, data) 
    pool.close() 
    pool.join() 

    with open('output.geojson', 'w') as outfile: 
     json.dump(data, outfile) 

堆棧跟蹤:

--------------------------------------------------------------------------- 
OSError         Traceback (most recent call last) 
<ipython-input-428-d6121ed2750b> in <module>() 
----> 1 writeGeoJSON(6, 15, baseline) 

<ipython-input-427-973b7a5a8acc> in writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix) 
    14  print("Start loop") 
    15  paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict) 
---> 16  pool = Pool(2) 
    17  func = partial(parallelUpdateJSON, paramMatch, predictionmatrix) 
    18  print(predictionmatrix.memory_usage()) 

/usr/lib/python3.5/multiprocessing/context.py in Pool(self, processes, initializer, initargs, maxtasksperchild) 
    116   from .pool import Pool 
    117   return Pool(processes, initializer, initargs, maxtasksperchild, 
--> 118      context=self.get_context()) 
    119 
    120  def RawValue(self, typecode_or_type, *args): 

/usr/lib/python3.5/multiprocessing/pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context) 
    166   self._processes = processes 
    167   self._pool = [] 
--> 168   self._repopulate_pool() 
    169 
    170   self._worker_handler = threading.Thread(

/usr/lib/python3.5/multiprocessing/pool.py in _repopulate_pool(self) 
    231    w.name = w.name.replace('Process', 'PoolWorker') 
    232    w.daemon = True 
--> 233    w.start() 
    234    util.debug('added worker') 
    235 

/usr/lib/python3.5/multiprocessing/process.py in start(self) 
    103    'daemonic processes are not allowed to have children' 
    104   _cleanup() 
--> 105   self._popen = self._Popen(self) 
    106   self._sentinel = self._popen.sentinel 
    107   _children.add(self) 

/usr/lib/python3.5/multiprocessing/context.py in _Popen(process_obj) 
    265   def _Popen(process_obj): 
    266    from .popen_fork import Popen 
--> 267    return Popen(process_obj) 
    268 
    269  class SpawnProcess(process.BaseProcess): 

/usr/lib/python3.5/multiprocessing/popen_fork.py in __init__(self, process_obj) 
    18   sys.stderr.flush() 
    19   self.returncode = None 
---> 20   self._launch(process_obj) 
    21 
    22  def duplicate_for_child(self, fd): 

/usr/lib/python3.5/multiprocessing/popen_fork.py in _launch(self, process_obj) 
    65   code = 1 
    66   parent_r, child_w = os.pipe() 
---> 67   self.pid = os.fork() 
    68   if self.pid == 0: 
    69    try: 

OSError: [Errno 12] Cannot allocate memory 

UPDATE

據@ robyschek的解決方案,我已經更新了我的代碼:

global g_predictionmatrix 

def worker_init(predictionmatrix): 
    global g_predictionmatrix 
    g_predictionmatrix = predictionmatrix  

def parallelUpdateJSON(paramMatch, data_item): 
    for feature in data_item['features']: 
     currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch] 
     if (len(currentfeature) > 0): 
      feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}}) 
     else: 
      feature['properties'].update({"style": {"opacity": 0}}) 

def use_the_pool(data, paramMatch, predictionmatrix): 
    pool = Pool(initializer=worker_init, initargs=(predictionmatrix,)) 
    func = partial(parallelUpdateJSON, paramMatch) 
    pool.map(func, data) 
    pool.close() 
    pool.join() 


def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix): 
    with open('geo.geojson') as f: 
     data = json.load(f) 
    paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict) 
    use_the_pool(data, paramMatch, predictionmatrix)  
    with open('trentino-grid.geojson', 'w') as outfile: 
     json.dump(data, outfile) 

我仍然遇到同樣的錯誤。另外,根據documentationmap()應該將我的data分成塊,所以我不認爲它應該複製我80MB的rownum時間。我可能是錯的,雖然... :) 另外我注意到,如果我使用較小的輸入(~11MB而不是80MB),我不會收到錯誤。所以我想我試圖使用太多的內存,但我無法想象它是如何從80MB到16GB的內存無法處理的。

+0

對不起,我是懶得看的堆棧跟蹤,並沒有注意到,在'OS出現錯誤.fork'。 另外,我看了一下多處理源,發現 我關於重複'predictionmatrix'的理論只與 相關,而'Pool.imap'只有很小的'chunksize',Pool.map默認不受影響。 我刪除了我的答案。 – robyschek

回答

1

我們已經有幾次了。根據我的系統管理員的說法,unix中存在一個「錯誤」,如果你的內存不足,會導致同樣的錯誤,如果你的進程達到最大文件描述符限制。

我們有文件描述符泄漏,並且引發錯誤[Errno 12]無法分配內存#012OSError。

所以,你應該看看你的腳本,並仔細檢查,如果這個問題沒有太多的FD的創造,而不是

3

使用multiprocessing.Pool時,啓動進程的默認方式是forkfork的問題是整個過程是重複的。 (see details here)。因此,如果你的主進程已經使用了大量的內存,這個內存將被複制,達到這個MemoryError。例如,如果您的主進程使用內存的2GB,並且使用8個子進程,則RAM中需要18GB

你應該嘗試使用不同的啓動方式,如'forkserver''spawn'

from multiprocessing import set_start_method, Pool 
set_start_method('forkserver') 

# You can then start your Pool without each process 
# cloning your entire memory 
pool = Pool() 
func = partial(parallelUpdateJSON, paramMatch, predictionmatrix) 
pool.map(func, data) 

這些方法避免重複你的Process的工作空間,但可以慢一點開始,你需要重新加載模塊,你正在使用。

+0

謝謝,我會研究這一點,但我不認爲100MB(甚至是2個演出)對於具有16GB可用RAM的系統應該處理得太多。此外,即使根據Python文檔,「pool = Pool()」方法也是多處理庫的方式。 –

+0

我澄清了我的答案。 start方法在這裏要求多處理使用與'fork'不同的方法來啓動子進程,這會導致'MemoryError'。 –

+0

實際上,目前使用的所有類UNIX操作系統在其內存管理中都有寫入時拷貝。這意味着進程之間共享*相同的內存頁面。只有當進程修改頁面中的任何內容時,它纔會獲得私人副本。 –