2013-03-29 59 views
6

這似乎是一個簡單的問題,但我無法讓我的頭在附近。使用多處理將數據寫入hdf文件

我有一個模擬運行在一個雙循環,並將結果寫入HDF文件。這個程序的簡化版本如下所示:

import tables as pt 

a = range(10) 
b = range(5) 

def Simulation(): 
    hdf = pt.openFile('simulation.h5',mode='w') 
    for ii in a: 
     print(ii) 
     hdf.createGroup('/','A%s'%ii) 
     for i in b: 
      hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) 
     hdf.close() 
    return 
Simulation() 

此代碼不正是我想要的,但因爲過程可能需要相當長的時間來運行我試圖用多模塊,並使用下面的代碼:

import multiprocessing 
import tables as pt 

a = range(10) 
b = range(5) 

def Simulation(ii): 
    hdf = pt.openFile('simulation.h5',mode='w') 
    print(ii) 
     hdf.createGroup('/','A%s'%ii) 
     for i in b: 
      hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) 
     hdf.close() 
    return 

if __name__ == '__main__': 
    jobs = [] 
    for ii in a: 
     p = multiprocessing.Process(target=Simulation, args=(ii,)) 
     jobs.append(p)  
     p.start() 

然而,這隻會將最後一次模擬打印到HDF文件,不知何故它會覆蓋所有其他組。

回答

10

每次以寫入(w)模式打開文件時,都會創建一個新文件 - 因此,如果該文件已存在,則文件內容會丟失。只有最後一個文件句柄才能成功寫入文件。即使您將其更改爲追加模式,也不應嘗試從多個進程寫入相同的文件 - 如果兩個進程嘗試同時寫入,則輸出會出現亂碼。

相反,都放輸出隊列中的工作進程,並且具有單個專用處理(或者子過程或主處理)從隊列中處理的輸出和寫入文件:


import multiprocessing as mp 
import tables as pt 


num_arrays = 100 
num_processes = mp.cpu_count() 
num_simulations = 1000 
sentinel = None 


def Simulation(inqueue, output): 
    for ii in iter(inqueue.get, sentinel): 
     output.put(('createGroup', ('/', 'A%s' % ii))) 
     for i in range(num_arrays): 
      output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) 


def handle_output(output): 
    hdf = pt.openFile('simulation.h5', mode='w') 
    while True: 
     args = output.get() 
     if args: 
      method, args = args 
      getattr(hdf, method)(*args) 
     else: 
      break 
    hdf.close() 

if __name__ == '__main__': 
    output = mp.Queue() 
    inqueue = mp.Queue() 
    jobs = [] 
    proc = mp.Process(target=handle_output, args=(output,)) 
    proc.start() 
    for i in range(num_processes): 
     p = mp.Process(target=Simulation, args=(inqueue, output)) 
     jobs.append(p) 
     p.start() 
    for i in range(num_simulations): 
     inqueue.put(i) 
    for i in range(num_processes): 
     # Send the sentinal to tell Simulation to end 
     inqueue.put(sentinel) 
    for p in jobs: 
     p.join() 
    output.put(None) 
    proc.join() 

爲了便於比較,這裏是使用mp.Pool版本:

import multiprocessing as mp 
import tables as pt 


num_arrays = 100 
num_processes = mp.cpu_count() 
num_simulations = 1000 


def Simulation(ii): 
    result = [] 
    result.append(('createGroup', ('/', 'A%s' % ii))) 
    for i in range(num_arrays): 
     result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) 
    return result 


def handle_output(result): 
    hdf = pt.openFile('simulation.h5', mode='a') 
    for args in result: 
     method, args = args 
     getattr(hdf, method)(*args) 
    hdf.close() 


if __name__ == '__main__': 
    # clear the file 
    hdf = pt.openFile('simulation.h5', mode='w') 
    hdf.close() 
    pool = mp.Pool(num_processes) 
    for i in range(num_simulations): 
     pool.apply_async(Simulation, (i,), callback=handle_output) 
    pool.close() 
    pool.join() 

它看起來更簡單嗎?但是有一個重要的區別。原始代碼使用output.put將參數發送到在其自己的子進程中運行的handle_outputhandle_output將從output隊列中取args並立即處理它們。使用上面的池代碼,Simulationresult中積累了大量的args,並且result不會發送到handle_output,直到Simulation返回後。

如果Simulation需要很長時間,將會有很長的等待期,而沒有任何內容正在寫入simulation.h5

+0

除了這個問題,我已經使用上面的代碼成功了,但現在我擴展了這個模擬,由a = range(1000)定義的for循環以及由b = range(100)定義的for循環。這種保護措施可以大量使用我的記憶。我有8個CPU和16 Gb RAM,但是當我運行該文件時(即使沒有真實的模擬),我的RAM使用率達到100%,這導致我的系統失速。 – user2143958

+0

我認爲我們需要將任務數量與子流程數量分開。這聽起來像你想要1000個任務,但可能不是1000個子進程。我會編輯帖子,提出一種可以做到的方式。 – unutbu

+0

是的你是對的,在前面的例子中,對於大的迭代,同樣大量的子進程被創建,堵塞了所有的內存。您編輯的文件完美無瑕!但僅僅爲了澄清,我也在嘗試使用Pool()函數,並且此函數似乎也很好,但當需要傳遞多個變量時,它會變得更加困難。通過Pool()函數選擇Process()函數的主要原因是什麼? – user2143958

相關問題