2014-02-16 61 views
0

我有一個模擬內部類「模擬」,和一個類「DataRecorder」負責保存磁盤上的數據(經過多次操作)。這裏是一個簡化的樣機:python異步數據記錄器

class DataRecorder(object): 
    """ 
    Fill in an internal buffer, and flush it on disk when it reaches 
    a given data amount. 
    """ 
    _f = r'n:\99-tmp\test_async\toto.txt' 
    _events_buffer = [] 
    _events_buffer_limit = 10  
    flushing_cpt = 0   
    def __init__(self): 
     with open(self._f, 'w') as fh: 
      fh.write('new sim') 

    def save_event(self, ix, val): 
     """ append data to internal buffer, and flush it when limit is reached 
     """ 
     if len(self._events_buffer)>self._events_buffer_limit: 
      self._flush_events_buffer() 
      self.flushing_cpt += 1 
     self._events_buffer.append((ix, val)) 

    def _flush_events_buffer(self): 
     """ write bunch of data on disk """ 
     # here, in reality, deal with numpy arrays and HDF5 file 
     buf = [str(i) for i in self._events_buffer] 
     _s = '\n'.join(buf) 
     with open(self._f, 'a') as fh: 
      fh.write(_s) 
     self._events_buffer = [] 

    def stop_records(self): 
     self._flush_events_buffer() 


class Simulation(object): 
    def __init__(self):  
     self.dr = DataRecorder() 

    def run(self, nb=10000): 
     """ long-term simulation (could be 10min calculations generating about 1Gb of data) """ 
     for ix in range(nb): 
      sol = ix * 3.14 
      self.dr.save_event(ix, sol) 
     self.dr.stop_records() 

if __name__ == '__main__': 
    sim = Simulation()  
    sim.run() 

雖然這工作得很好,磁盤IO是我目前的瓶頸,因爲DataRecorder停止模擬,時間爲它傾倒在磁盤(HDF5文件)的數據,每次緩衝區滿。

我的目標是將DataRecorder轉換爲異步類,在後臺寫入磁盤並讓模擬在填充數據緩衝區時繼續運行。

我不是(從很遠的)一個多處理器的超級英雄,在這裏使用pool我的第一次失敗的嘗試:

我從Write data to disk in Python as a background processSolving embarassingly parallel problems using Python multiprocessing

class MPDataRecorder(object): 
    _f = r'n:\99-tmp\test_async\toto_mp.txt' 
    _events_buffer = [] 
    _events_buffer_limit = 10 
    flushing_cpt = 0  
    numprocs = mp.cpu_count() 

    def __init__(self): 
     with open(self._f, 'w') as fh: 
      fh.write('new sim') 
     self.record = True  
     self.pool = mp.Pool() 
     self._watch_buffer()         

    def save_event(self, ix, val): 
     """ append data to internal buffer, and flush it when limit is reached 
     """ 
     self._events_buffer.append((ix, val)) 

    def _flush_events_buffer(self): 
     """ write bunch of data on disk """ 
     # here, in reality, deal with numpy arrays and HDF5 file 
     buf = [str(i) for i in self._events_buffer] 
     _s = '\n'.join(buf) 
     with open(self._f, 'a') as fh: 
      fh.write(_s) 
     self._events_buffer = [] 

    def _watch_buffer(self):   
     # here, in reality, deal with numpy arrays and HDF5 file 
     while self.record: 
      self.pool.apply_async(self._flush_events_buffer) 



    def stop_records(self): 
     self.record = False 
     self.pool.close() 
     self.pool.join() 
得到了靈感,也試過 Queue

這導致以下TraceBack,然後存儲器錯誤:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed   

有沒有機會將這種異步數據寫入器功能封裝在泛型類中?

+0

這可能是有用的:http://stackoverflow.com/questions/21375509/in-a-pickle-how-to-serialise-legacy-objects-for-submission-to-a-python-multipro?lq= 1 – jonrsharpe

+0

你用'Queue'嘗試了什麼?它是怎麼樣的? –

+0

嗨@卡里拉!我沒有這個第一個代碼,但我用這個[post](http://stackoverflow.com/a/2364667/2069099)來寫它。只要有時間,我會盡力在這個方向重試。 – Nic

回答

0

如果你的磁盤I/O是瓶頸,沒有任何聰明的緩衝將解決你必須將整個輸出保存在內存中的問題。如果磁盤寫入程序無法跟上,它將如何「跟上」你的仿真過程?

但是,如果這只是一些密集的「峯值」期間的問題,那麼可以通過緩衝來解決您的問題。在嘗試更奇特的東西之前,我會建議至少從一個非常簡單的解決方案開始:在它們之間使用兩個獨立的進程和管道輸出。 Python中最簡單的方法是使用subprocess模塊。一個更漂亮的解決方案可能是圍繞它使用一個框架,比如Parallel python(但是我不能保證它,因爲我從來沒有做過任何事情而不是玩弄它)。

+0

當然!我的觀點如下:純粹的計算需要20秒。運行,然後在磁盤上寫約10秒。總計是30秒。關鍵是我在磁盤上寫入所有中間數據(不適合內存),所以,如果這可以在並行過程中完成,我可以節省10秒的開銷。 – Nic