2013-01-22 46 views
7

我正在尋找一個可靠的實施方案,以便我逐步使用Queue通過一系列物品。只要隊列可用,就會立即拾起物品

這個想法是我想要使用一定數量的工作人員,這些工作人員將通過20多個數據庫密集型任務列表並返回結果。我希望Python從五個第一項開始,一旦完成一個任務,就開始隊列中的下一個任務。

這就是我目前的做法,沒有Threading

for key, v in self.sources.iteritems(): 
    # Do Stuff 

我想有一個類似的方法,但可能不必將列表分成五個子組。這樣它會自動選取列表中的下一個項目。我們的目標是確保如果一個數據庫正在放慢這個過程,它不會對整個應用程序產生負面影響。

回答

5

你可以自己實現,但是Python 3已經有了基於Executor的線程管理解決方案,你可以在Python 2.x中安裝the backported version。然後

你的代碼可能看起來像

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: 
    future_to_key = {} 
    for key, value in sources.items(): 
     future_to_idday[executor.submit(do_stuff, value)] = key 
    for future in concurrent.futures.as_completed(future_to_key): 
     key = future_to_key[future] 
     result = future.result() 
     # process result 
+0

感謝。我會試試看。忘了提及我使用的是2.x. – eandersson

3

如果使用python3,我建議併發期貨模塊。如果你沒有使用python3並且沒有連接到線程(與進程相關),那麼你可以嘗試multiprocessing.Pool(儘管它帶有一些注意事項,並且在我的應用程序中沒有正確關閉池的麻煩)。如果你必須使用線程,在python2中,你最終可能會自己編寫代碼 - 產生5個運行消費者函數的線程,並簡單地將調用(函數+ args)推入隊列中,供消費者反覆查找和處理它們。

+0

['multiprrocessing.dummy'也提供相同的接口](http://stackoverflow.com/a/14461365/4279)使用線程而不是進程 – jfs

1

你可以只使用STDLIB做到這一點:

#!/usr/bin/env python 
from multiprocessing.dummy import Pool # use threads 

def db_task(key_value): 
    try: 
     key, value = key_value 
     # compute result.. 
     return result, None 
    except Exception as e: 
     return None, e 

def main(): 
    pool = Pool(5) 
    for result, error in pool.imap_unordered(db_task, sources.items()): 
     if error is None: 
      print(result) 

if __name__=="__main__": 
    main() 
相關問題