2014-09-29 22 views
11

我在這裏搜索瞭如何在python中執行線程,但是到目前爲止我還沒有能夠得到我需要的答案。 我對Queue和Threading python類不是很熟悉,因此一些在這裏出現的anwsers對我來說毫無意義。如何從Python中的線程池中獲取結果?

我想創建一個線程池,我可以給不同的任務,當他們都結束了獲得結果值並處理它們。 到目前爲止,我試圖做到這一點,但我無法得到結果。我寫的代碼是:

from threading import Thread 
from Queue import Queue 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.result = None 
     self.start() 
    def run(self): 
     while True: 
      func, args, kargs = self.tasks.get() 
      try: 
       self.result = func(*args, **kargs) 
      except Exception, e: 
       print e 
      self.tasks.task_done() 
    def get_result(self): 
     return self.result 

class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     self.results = [] 
     for _ in range(num_threads): 
      w = Worker(self.tasks) 
      self.results.append(w.get_result()) 
    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, args, kargs)) 
    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 
    def get_results(self): 
     return self.results 

def foo(word, number): 
    print word*number 
    return number 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
for i in range(0, len(words)): 
    pool.add_task(foo, words[i], numbers[i]) 

pool.wait_completion() 
results = pool.get_results() 
print results 

輸出與打印字給出的次數給出的字符串,但是結果列表中存滿了沒有價值,所以在這裏我應該把FUNC的返回值。

或者簡單的方法是創建一個列表,其中我填充隊列並添加一個字典或一些變量以將結果作爲參數存儲到我的函數中,並且在將任務添加到隊列後,將此結果參數添加到結果列表:

def foo(word, number, r): 
    print word*number 
    r[(word,number)] = number 
    return number 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
results = [] 
for i in range(0, len(words)): 
    r = {} 
    pool.add_task(foo, words[i], numbers[i], r) 
    results.append(r) 
print results 

我會非常感謝您的幫助。

回答

9

的Python實際上有一個內置的線程池就可以使用,its just not well documented

from multiprocessing.pool import ThreadPool 

def foo(word, number): 
    print (word * number) 
    r[(word,number)] = number 
    return number 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
results = [] 
for i in range(0, len(words)): 
    results.append(pool.apply_async(foo, args=(words[i], numbers[i]))) 

pool.close() 
pool.join() 
results = [r.get() for r in results] 
print results 

或(使用map代替apply_async):

from multiprocessing.pool import ThreadPool 

def foo(word, number): 
    print word*number 
    return number 

def starfoo(args): 
    """ 

    We need this because map only supports calling functions with one arg. 
    We need to pass two args, so we use this little wrapper function to 
    expand a zipped list of all our arguments. 

    """  
    return foo(*args) 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
# We need to zip together the two lists because map only supports calling functions 
# with one argument. In Python 3.3+, you can use starmap instead. 
results = pool.map(starfoo, zip(words, numbers)) 
print results 

pool.close() 
pool.join() 
+0

第二種情況是有用的,當數任務的大小與池的大小相同,不是嗎? – 2014-09-29 16:39:57

+0

它可以很好地處理任何數量的任務,並且可以與任意數量的工作人員共享一個「Pool」。如果你想對一個迭代的所有項目運行一個函數,'map'是有用的,並且返回每個調用的結果。如果你有5名工作人員處理一個長度爲100的迭代器,'Pool'將針對所有100個項目調用該函數,但是絕不會同時運行多於5個線程。輸出將是一個長度爲100的迭代器,其中包含所有函數調用的結果值。 – dano 2014-09-29 16:47:13

+1

@RafaelRios另一個需要注意的是,因爲[GIL](https://wiki.python.org/moin/GlobalInterpreterLock),使用線程在Python中執行CPU綁定的工作沒有任何性能優勢。爲了解決這個限制,你需要通過['multiprocessing'](https://docs.python.org/2.7/library/multiprocessing.html)模塊來使用多個進程。對於上面的示例,可以使用'from multiprocessing import Pool'而不是'from multiprocessing.pool import ThreadPool'來創建開關。其他一切都保持不變。 – dano 2014-09-29 16:50:02

相關問題