2016-04-19 26 views
0

我的單元測試中有一個幻像問題。 我使用包裝中的ThreadPool包裝stdoutstderr我的班級使用paramiko的功能。在創作過程中,我使用下面的代碼進行了一些真實生活測試,並且它工作得很好。但是在編寫針對該代碼的單元測試期間,我設法遇到了問題,ThreadPool的這種用法掛起。來自python的多處理的ThreadPool掛起

這部分掛出了95%的時間,並且有時會正常執行。

while not (self.__stdout_async_r.ready() and self.__stderr_async_r.ready()): 
    time.sleep(WAIT_FOR_DATA) 

我檢查調試過程中的價值觀和我發現,有時候有一個或其他條件設定爲結束,但對方不是。但是這兩個功能都已經完成,所以結果只是要求未來不會改變的狀態。

的再現(必要對這個問題的功能)代碼:

import time 
from multiprocessing.pool import ThreadPool 

class ExecResult(object): 
    def __init__(self, command=None, exit_status_func=None, 
       receive_stdout_func=None, receive_stderr_func=None, 
       connection=None): 
    self.connection = connection 
    self.stdout = None 
    self.stderr = None 
    self.ecode = None 
    self.ts_stop = None 
    self._exit_status_f = exit_status_func 
    self.result_available = False 
    self.__fetch_streams(receive_stdout_func, receive_stderr_func) 

    def wait_for_data(self): 
    WAIT_FOR_DATA = 0.1 

    if not self.result_available: 

     # Here it hangs out for 95 percent 
     while not (self.__stdout_async_r.ready() and self.__stderr_async_r.ready()): 
     time.sleep(WAIT_FOR_DATA) 

     self.result_available = True 
     self.ts_stop = time.time() 
     self.stdout = self.__stdout_async_r.get(timeout=2) 
     self.stderr = self.__stderr_async_r.get(timeout=2) 
     self.ecode = self._exit_status_f() 


    def __fetch_streams(self, stdout_func, stderr_func): 
    stdout_t = ThreadPool(processes=1) 
    stderr_t = ThreadPool(processes=1) 

    self.__stdout_async_r = stdout_t.apply_async(func=stdout_func) 
    self.__stderr_async_r = stderr_t.apply_async(func=stderr_func) 
    stdout_t.close() 
    stderr_t.close() 

def stderr(): 
    return "stderr" 

def stdout(): 
    return "stdout" 

def exit(): 
    return "0" 

# actual reproduction 
res = ExecResult(None, exit, stdout, stderr, None) 
res.wait_for_data() #if are data available get them or wait 
print res.stdout 
print res.stderr 
print res.ecode 

回答

1

因爲通常,我發現了一些時間花在罵後這個答案,做一些茶。

解決方案是close方法後補充一點:

stdout_t.join() 
stderr_t.join() 

因此,這是維修的部件作爲一個整體:

def __fetch_streams(self, stdout_func, stderr_func): 
    stdout_t = ThreadPool(processes=1) 
    stderr_t = ThreadPool(processes=1) 

    self.__stdout_async_r = stdout_t.apply_async(func=stdout_func) 
    self.__stderr_async_r = stderr_t.apply_async(func=stderr_func) 
    stdout_t.close() 
    stderr_t.close() 
    stdout_t.join() 
    stderr_t.join()