2015-11-18 33 views
1

我正在使用Python multiprocessing.JoinableQueue類,我試圖對隊列施加一個大小限制。如果隊列已滿到此限制,則循環將會休眠,並在隊列中的空間釋放時嘗試重新添加任務,但似乎無法找到跟蹤隊列大小的可靠方法。確定Python中有多少項目JoinableQueue

我在考慮使用一些像這樣的邏輯,只有找出.qsize()功能我從Queue模塊期望不存在的:

from multiprocessing import JoinableQueue 
QUEUE_SIZE = 50 
QUEUE_WAIT = 900 
task_queue = JoinableQueue(QUEUE_SIZE) 
.... 
if QUEUE_SIZE is not 0: 
    # if QUEUE_SIZE is zero, there is no limit on the queue 
    while True: 
     # if the size of the queue equals our self-imposed limit, wait to try and add this task 
     if task_queue.qsize() == QUEUE_SIZE: 
      print 'task queue limit is met. task will be added when space clears' 
      time.sleep(QUEUE_WAIT) 
     else: 
      # add the task if we can 
      self.task_queue.put(path) 
      print 'task queued" task="%s"' % path) 
      break 

    else: 
     # if there's no limit just add the file_path 
     self.task_queue.put(file_path) 

是否有跟蹤有多少項目是目前優選的方式在JoinableQueue中,或者更好的方法來重新嘗試添加項目到隊列,如果他們不能立即添加?也許只是一個循環內的try/except/sleep?雖然這看起來不是最好的選擇。

任何將是極大的讚賞:)

+0

循環中的try/except/sleep可能是最合適的方式(儘管我必須承認,我之前沒有使用過JoinableQueue)。但是,跟蹤從多個線程訪問的東西的大小JoinableQueue所執行的任何鎖定似乎都有點棘手,但仍然必須處理「except」情況,因爲可能存在其他原因,爲什麼隊列不會接受您的項目。 –

回答

1

JoinableQueue應該有一個.full()方法,你應該能夠用來確定隊列是否有新項目的空間。使用full()而不是qsize()意味着您可以避免必須分別跟蹤隊列的最大大小。

但是,我會避免使用這個,因爲它會像.qsize()一樣不可靠。隊列讀取時可能會進行中等修改,因此無論如何你必須處理異常情況。在睡眠環內使用try....except可能是實現你想要嘗試的最清晰,最安全和最實用的方式。

結束語這一個輔助功能可以使代碼更容易(你必須修改這個處理參數func或者將它傳遞給try_until()之前包裹在一個無參數的拉姆達呼叫。

def try_until(func, max_tries, sleep_time): 
    for _ in range(0,max_tries): 
     try: 
      return func() 
     except: 
      sleep(sleep_time) 
    raise WellNamedException() 
+0

Perf等等,謝謝! 不幸的是,我知道'.qsize()'有點不可預測。我還將排隊的項目(這是文件路徑)輸入到MongoDB集合中,以避免在再次選中目錄時重新添加它們,所以也許我可以通過使用此選項檢查Mongo集合來跟蹤排隊的項目數量包裝器? – deadbits

相關問題