2014-10-16 10 views
2

我正在寫一個劇本動畫圖像數據。我有一些大型圖像立方體(3D陣列)。對於其中的每一個,我逐步瀏覽每個立方體中的幀,一旦我接近它的結尾,就加載下一個立方體並繼續。由於每個立方體的尺寸很大,因此存在顯着的加載時間(約5秒)。我希望動畫能夠在多維數據集之間無縫地切換(同時也節省內存),所以我歪曲了加載過程。我在解決方案方面取得了一些進展,但仍然存在一些問題。交錯的數據加載與multiprocessing.Queue somtimes導致項目被消耗無序

下面的代碼加載每個數據立方體,將其拆分爲幀並將它們放入multiprocessing.Queue。一旦隊列中的幀數低於某個閾值,就會觸發下一個加載進程,加載另一個多維數據集並將其解壓縮到隊列中。

看看下面的代碼:

import numpy as np 
import multiprocessing as mp 
import logging 
logger = mp.log_to_stderr(logging.INFO) 
import time 

def data_loader(event, queue, **kw): 
    '''loads data from 3D image cube''' 
    event.wait()  #wait for trigger before loading 

    logger.info('Loading data') 
    time.sleep(3)      #pretend to take long to load the data 
    n = 100 
    data = np.ones((n,20,20))*np.arange(n)[:,None,None]   #imaginary 3D image cube (increasing numbers so that we can track the data ordering) 

    logger.info('Adding data to queue') 
    for d in data: 
     queue.put(d) 
    logger.info('Done adding to queue!') 


def queue_monitor(queue, triggers, threshold=50, interval=5): 
    ''' 
    Triggers the load events once the number of data in the queue falls below 
    threshold, then doesn't trigger again until the interval has passed. 
    Note: interval should be larger than data load time. 
    ''' 
    while len(triggers): 
     if queue.qsize() < threshold: 
      logger.info('Triggering next load') 
      triggers.pop(0).set() 
      time.sleep(interval)  


if __name__ == '__main__': 
    logger.info("Starting") 
    out_queue = mp.Queue() 

    #Initialise the load processes 
    nprocs, procs = 3, [] 
    triggers = [mp.Event() for _ in range(nprocs)] 
    triggers[0].set()   #set the first process to trigger immediately 
    for i, trigger in enumerate(triggers): 
     p = mp.Process(name='data_loader %d'%i, target=data_loader, 
         args=(trigger, out_queue)) 
     procs.append(p) 
    for p in procs: 
     p.start() 

    #Monitoring process 
    qm = mp.Process(name='queue_monitor', target=queue_monitor, 
        args=(out_queue, triggers)) 
    qm.start() 

    #consume data 
    while out_queue.empty(): 
     pass 
    else: 
     for d in iter(out_queue.get, None): 
      time.sleep(0.2) #pretend to take some time to process/animate the data 
      logger.info('data: %i' %d[0,0]) #just to keep track of data ordering 

這出色的作品在某些情況下,但有時一個新的裝載過程被觸發後的數據的順序被搞亂。我無法弄清楚爲什麼會發生這種情況--mp.Queue應該是正確的?!例如。按原樣運行上面的代碼將不會保留輸出隊列中的正確順序,但是,將閾值更改爲更低的值,例如。 30修復了這個問題。 *如此困惑...

所以問題:如何正確實現這種交錯的加載策略與multiprocessing在python中?

回答

1

這看起來像一個緩衝問題。在內部,multiprocessing.Queue使用緩衝區來臨時存儲已排隊的項目,並最終在後臺線程中將其刷新到Pipe。只有在衝突發生之後,這些項目纔會被髮送到其他進程。由於您要將大對象放在Queue上,因此會發生很多緩衝。這會導致加載過程實際上重疊,即使您的日誌記錄顯示一個過程在另一個過程開始之前完成。該文檔實際上有一個關於此方案的警告:

當一個對象被放在一個隊列中,對象是醃製和 後臺線程後醃製數據刷新到底層 管。這有一些令人驚訝的後果,但 不應該導致任何實際困難 - 如果他們真的打擾 你,那麼你可以改爲使用由經理創建的隊列。

  1. 把一個物體在一個空隊列後可能會有一個微小的延遲隊列的empty()方法返回False 和get_nowait()之前可以不提高Queue.Empty返回。
  2. 如果有多個進程將對象排入隊列中,則可能會在另一端無序地接收對象。但是,由相同過程入列的對象總是處於相對於彼此的預期的 的順序。

我建議做作爲文檔狀態,並使用multiprocessing.Manager來創建隊列:

m = mp.Manager() 
out_queue = m.Queue() 

,這將讓你避免這個問題完全。

另一種選擇是使用一個進程來完成所有的數據加載,並使其循環運行,其中event.wait()調用位於循環的頂部。

相關問題