我有一個模擬內部類「模擬」,和一個類「DataRecorder」負責保存磁盤上的數據(經過多次操作)。這裏是一個簡化的樣機:python異步數據記錄器
class DataRecorder(object):
"""
Fill in an internal buffer, and flush it on disk when it reaches
a given data amount.
"""
_f = r'n:\99-tmp\test_async\toto.txt'
_events_buffer = []
_events_buffer_limit = 10
flushing_cpt = 0
def __init__(self):
with open(self._f, 'w') as fh:
fh.write('new sim')
def save_event(self, ix, val):
""" append data to internal buffer, and flush it when limit is reached
"""
if len(self._events_buffer)>self._events_buffer_limit:
self._flush_events_buffer()
self.flushing_cpt += 1
self._events_buffer.append((ix, val))
def _flush_events_buffer(self):
""" write bunch of data on disk """
# here, in reality, deal with numpy arrays and HDF5 file
buf = [str(i) for i in self._events_buffer]
_s = '\n'.join(buf)
with open(self._f, 'a') as fh:
fh.write(_s)
self._events_buffer = []
def stop_records(self):
self._flush_events_buffer()
class Simulation(object):
def __init__(self):
self.dr = DataRecorder()
def run(self, nb=10000):
""" long-term simulation (could be 10min calculations generating about 1Gb of data) """
for ix in range(nb):
sol = ix * 3.14
self.dr.save_event(ix, sol)
self.dr.stop_records()
if __name__ == '__main__':
sim = Simulation()
sim.run()
雖然這工作得很好,磁盤IO是我目前的瓶頸,因爲DataRecorder
停止模擬,時間爲它傾倒在磁盤(HDF5文件)的數據,每次緩衝區滿。
我的目標是將DataRecorder
轉換爲異步類,在後臺寫入磁盤並讓模擬在填充數據緩衝區時繼續運行。
我不是(從很遠的)一個多處理器的超級英雄,在這裏使用pool
我的第一次失敗的嘗試:
我從Write data to disk in Python as a background process 從 Solving embarassingly parallel problems using Python multiprocessing
class MPDataRecorder(object):
_f = r'n:\99-tmp\test_async\toto_mp.txt'
_events_buffer = []
_events_buffer_limit = 10
flushing_cpt = 0
numprocs = mp.cpu_count()
def __init__(self):
with open(self._f, 'w') as fh:
fh.write('new sim')
self.record = True
self.pool = mp.Pool()
self._watch_buffer()
def save_event(self, ix, val):
""" append data to internal buffer, and flush it when limit is reached
"""
self._events_buffer.append((ix, val))
def _flush_events_buffer(self):
""" write bunch of data on disk """
# here, in reality, deal with numpy arrays and HDF5 file
buf = [str(i) for i in self._events_buffer]
_s = '\n'.join(buf)
with open(self._f, 'a') as fh:
fh.write(_s)
self._events_buffer = []
def _watch_buffer(self):
# here, in reality, deal with numpy arrays and HDF5 file
while self.record:
self.pool.apply_async(self._flush_events_buffer)
def stop_records(self):
self.record = False
self.pool.close()
self.pool.join()
得到了靈感,也試過
Queue
這導致以下TraceBack,然後存儲器錯誤:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
有沒有機會將這種異步數據寫入器功能封裝在泛型類中?
這可能是有用的:http://stackoverflow.com/questions/21375509/in-a-pickle-how-to-serialise-legacy-objects-for-submission-to-a-python-multipro?lq= 1 – jonrsharpe
你用'Queue'嘗試了什麼?它是怎麼樣的? –
嗨@卡里拉!我沒有這個第一個代碼,但我用這個[post](http://stackoverflow.com/a/2364667/2069099)來寫它。只要有時間,我會盡力在這個方向重試。 – Nic