2014-01-22 64 views
2

我寫了下面驗證的概念代碼:爲什麼ThreadPool的線程不能異步運行?

import time 
from multiprocessing.pool import ThreadPool 

class Maybe: 
    def __init__(self): 
     self._value = None 
     self._exists = False 
    def exists(self): 
     return self._exists 
    def value(self): 
     if not self.exists(): 
      raise ValueError("Maybe doesn't hold any value") 
     return self._value 
    def set(self, value): 
     self._value = value 
     self._exists = True 
    def unset(self): 
     self._value = None 
     self._exists = False 

class Future(object): 

    def __init__(self): 
     self._holder = Maybe() 
     self._handler = None 

    def _set(self, value): 
     self._holder.set(value) 
     self._invoke() 

    def _invoke(self): 
     if self._handler and self._holder.exists(): 
      self._handler(self._holder.value()) 

    def then(self, handler): 
     self._handler = handler 
     self._invoke() 


def fib(count): 
    f,s = 0,1 
    for i in xrange(count): 
     f,s =s,f+s 
    return s 

pool = ThreadPool(5) 

def test(fun, arg): 
    def print_fib(x): 
     print("fib => {0}, {1}\n".format(arg, len(str(x)))) 
    tb = time.time() 
    future = Future() 
    future.then(print_fib) 
    future._async_result = pool.apply_async(fun, [arg], callback=future._set) 
    ta = time.time() 
    print ("Time elapsed : {0}".format(ta - tb)) 
    return future 


x1=test(fib, 2029) 
x2=test(fib, 989999) 
x3=test(fib, 103) 
x4=test(fib, 38484) 
x5=test(fib, 20) 

time.sleep(3) 

而且我期待所有的通話異步運行。但他們似乎不是異步運行。例如,103參數的調用預計會以989999作爲參數之前完成。我沒有看到發生。即使是第一次通話,如果我在第二次通話中使用更大的參數,也需要更多時間

有人可以解釋發生了什麼事嗎?爲什麼他們不能異步運行?

+0

你會得到什麼輸出?你怎麼知道最先完成的是什麼? – user2357112

+0

@ user2357112:查看名爲'print_fib'的回調。這告訴我哪一個先完成,等等。 – Nawaz

+0

請將你得到的輸出粘貼到問題中。該計劃花費多長時間才能完成? –

回答

1

的問題是,ThreadPool將所有線程設置爲守護進程,即如果主線程退出所有線程將被終止。 time.sleep(3)是不夠的,你的過程完成其任務,這就是原因。請注意,您的「時間流逝」日誌不正確,因爲它會測量啓動線程所需的時間,而不是實際的任務。

你應該總是.join的主題。隨着ThreadPool,你可以做到這一點與這段代碼:

... 

x1=test(fib, 2029) 
x2=test(fib, 989999) 
x3=test(fib, 103) 
x4=test(fib, 38484) 
x5=test(fib, 20) 

pool.close() 
pool.join() 

而且將所有日誌print_fib功能,如預期它應該工作。

+0

*「請注意,您的」時間流逝「日誌不正確,因爲它會測量啓動線程所需的時間,而不是實際的任務。」*。是的,我知道,過去並不是真正的任務。它應該給我所需的時間來開啓一個線程。這是它的目的。 – Nawaz

+0

@Nawaz不夠公平。一旦添加'.close()'和'.join()',它仍然可以工作。 – freakish

+0

他們做什麼,特別是'close()'?我可以在寫完之後調用'test(fib,231)'? – Nawaz

1

線程同時運行。但fib運行得太快;所以很難說它們是否同時運行。 (除fib(989999)

嘗試以下代替(I移動經過的時間打印部分print_fib正確打印的經過時間。):

def test(fun, arg): 
    def print_fib(x): 
     print("fib => {0}, {1}\n".format(arg, len(str(x)))) 
     ta = time.time() 
     print ("Time elapsed : {0}".format(ta - tb)) 
    tb = time.time() 
    future = Future() 
    future.then(print_fib) 
    future._async_result = pool.apply_async(fun, [arg], callback=future._set) 
    return future 

x1 = test(time.sleep, 2) 
x2 = test(time.sleep, 5) 
x3 = test(time.sleep, 1) 
x4 = test(time.sleep, 4) 
x5 = test(time.sleep, 3) 

#time.sleep(10) 

x1._async_result.get() 
x2._async_result.get() 
x3._async_result.get() 
x4._async_result.get() 
x5._async_result.get() 

輸出:

fib => 1, 4 

Time elapsed : 1.00200009346 
fib => 2, 4 

Time elapsed : 2.00099992752 
fib => 3, 4 

Time elapsed : 3.00200009346 
fib => 4, 4 

Time elapsed : 4.00200009346 
fib => 5, 4 

Time elapsed : 5.00200009346 
相關問題