我正在寫一個劇本動畫圖像數據。我有一些大型圖像立方體(3D陣列)。對於其中的每一個,我逐步瀏覽每個立方體中的幀,一旦我接近它的結尾,就加載下一個立方體並繼續。由於每個立方體的尺寸很大,因此存在顯着的加載時間(約5秒)。我希望動畫能夠在多維數據集之間無縫地切換(同時也節省內存),所以我歪曲了加載過程。我在解決方案方面取得了一些進展,但仍然存在一些問題。交錯的數據加載與multiprocessing.Queue somtimes導致項目被消耗無序
下面的代碼加載每個數據立方體,將其拆分爲幀並將它們放入multiprocessing.Queue
。一旦隊列中的幀數低於某個閾值,就會觸發下一個加載進程,加載另一個多維數據集並將其解壓縮到隊列中。
看看下面的代碼:
import numpy as np
import multiprocessing as mp
import logging
logger = mp.log_to_stderr(logging.INFO)
import time
def data_loader(event, queue, **kw):
'''loads data from 3D image cube'''
event.wait() #wait for trigger before loading
logger.info('Loading data')
time.sleep(3) #pretend to take long to load the data
n = 100
data = np.ones((n,20,20))*np.arange(n)[:,None,None] #imaginary 3D image cube (increasing numbers so that we can track the data ordering)
logger.info('Adding data to queue')
for d in data:
queue.put(d)
logger.info('Done adding to queue!')
def queue_monitor(queue, triggers, threshold=50, interval=5):
'''
Triggers the load events once the number of data in the queue falls below
threshold, then doesn't trigger again until the interval has passed.
Note: interval should be larger than data load time.
'''
while len(triggers):
if queue.qsize() < threshold:
logger.info('Triggering next load')
triggers.pop(0).set()
time.sleep(interval)
if __name__ == '__main__':
logger.info("Starting")
out_queue = mp.Queue()
#Initialise the load processes
nprocs, procs = 3, []
triggers = [mp.Event() for _ in range(nprocs)]
triggers[0].set() #set the first process to trigger immediately
for i, trigger in enumerate(triggers):
p = mp.Process(name='data_loader %d'%i, target=data_loader,
args=(trigger, out_queue))
procs.append(p)
for p in procs:
p.start()
#Monitoring process
qm = mp.Process(name='queue_monitor', target=queue_monitor,
args=(out_queue, triggers))
qm.start()
#consume data
while out_queue.empty():
pass
else:
for d in iter(out_queue.get, None):
time.sleep(0.2) #pretend to take some time to process/animate the data
logger.info('data: %i' %d[0,0]) #just to keep track of data ordering
這出色的作品在某些情況下,但有時一個新的裝載過程被觸發後的數據的順序被搞亂。我無法弄清楚爲什麼會發生這種情況--mp.Queue應該是正確的?!例如。按原樣運行上面的代碼將不會保留輸出隊列中的正確順序,但是,將閾值更改爲更低的值,例如。 30修復了這個問題。 *如此困惑...
所以問題:如何正確實現這種交錯的加載策略與multiprocessing
在python中?