2009-12-11 89 views

回答

59

,我建議您在開始線程之前實例化一個Queue.Queue,並且把它作爲線程的args來一個:線程完成之前,它.put S中它作爲參數接收隊列結果。父母可以隨意.get.get_nowait

隊列一般都安排在Python線程同步和通信的最佳途徑:他們本質上是線程安全的,消息傳遞車輛 - 組織一般多任務的最佳途徑 - )

+2

'線程完成之前,它.puts它作爲一個argument'接收隊列中的結果,你的意思是,這將自動蟒蛇做什麼?如果不是(意思是設計小費),那麼你能否在答案中說清楚。 – n611x007 2013-11-11 12:21:21

+3

爲了專業化現有的功能,它很難看。隊列對於單個結果問題有很多不必要的開銷。更清楚而有效的子類'threading.Thread'和新的run()方法只是簡單地將結果存儲爲像'self.ret = ...'這樣的屬性(更爲舒服的是Thread的子類,它處理返回值/異常自定義的目標函數確實'threading.Thread'應該被擴展以提供開箱即用 - 因爲它將與舊的行爲「return None」兼容。) – kxr 2016-08-17 10:38:01

1

那麼,在Python線程模塊中,存在與鎖關聯的條件對象。一種方法acquire()將返回從底層方法返回的任何值。欲瞭解更多信息:Python Condition Objects

5

另一種方法!是將回調函數傳遞給線程。這提供了一種簡單,安全和靈活的方法,可以隨時從新線程向父節點返回一個值。

# A sample implementation 

import threading 
import time 

class MyThread(threading.Thread): 
    def __init__(self, cb): 
     threading.Thread.__init__(self) 
     self.callback = cb 

    def run(self): 
     for i in range(10): 
      self.callback(i) 
      time.sleep(1) 


# test 

import sys 

def count(x): 
    print x 
    sys.stdout.flush() 

t = MyThread(count) 
t.start() 
+8

問題在於回調仍然在子線程,而不是原始線程。 – babbageclunk 2009-12-11 12:58:43

+0

@wilberforce你能解釋一下它可能導致什麼問題嗎? – 2009-12-11 13:50:12

+4

好的。一個例子是,如果回調函數寫入父線程在線程運行時寫入的日誌文件。由於回調在子線程中運行,因此存在兩次寫入同時發生併發生衝突的風險 - 如果日誌記錄框架做了一些內部簿記,則可能會出現亂碼或交錯輸出,或者崩潰。使用一個線程安全的隊列並且有一個線程可以避免這種情況。這些問題可能是討厭的,因爲它們不是確定性的 - 它們可能只在生產中出現,並且可能難以再現。 – babbageclunk 2009-12-11 14:39:43

12

如果你調用join()等待線程完成,你可以簡單地附上結果線程實例本身,然後從join()方法返回後,主線程檢索。

另一方面,您不告訴我們您是如何打算髮現線程已完成並且結果可用。如果您已經有了這樣做的方法,那麼可能會指出您(如果您要告訴我們,請告訴我們)以獲得結果的最佳方式。

+0

*您可以簡單地將結果附加到Thread實例本身*如何將Thread實例傳遞到它運行的目標,以便目標可以將結果附加到此實例? – 2012-11-26 21:48:57

+1

Piotr Dobrogost,如果您不是爲您的實例創建Thread的子類,那麼您可以在目標可調用的末尾使用threading.current_thread()。我會說這有點醜,但亞歷克斯的方法總是更優雅。在某些情況下,這種方法更爲方便。 – 2012-11-28 16:41:14

+3

如果'join()'只會返回被調用的方法返回的內容,那就好了......看起來很愚蠢,而是返回'None'。 – ArtOfWarfare 2014-12-10 15:51:42

10

您應該傳遞一個Queue實例作爲參數,然後您應該將.put()返回對象放入隊列中。你可以通過queue.get()收集返回值,無論你放置什麼對象。

樣品:

queue = Queue.Queue() 
thread_ = threading.Thread(
       target=target_method, 
       name="Thread1", 
       args=[params, queue], 
       ) 
thread_.start() 
thread_.join() 
queue.get() 

def target_method(self, params, queue): 
""" 
Some operations right here 
""" 
your_return = "Whatever your object is" 
queue.put(your_return) 

使用多線程:

#Start all threads in thread pool 
    for thread in pool: 
     thread.start() 
     response = queue.get() 
     thread_results.append(response) 

#Kill all threads 
    for thread in pool: 
     thread.join() 

我用這個實現,它爲我的偉大工程。我希望你這樣做。

+1

你不是想念你的thread_.start()? – sadmicrowave 2013-09-13 02:48:05

+1

當然,我開始線我只想念把這一行在這裏:)感謝您的通知。 – 2013-09-13 08:50:33

+0

如果你有多個線程,這將如何? que.get()只爲我返回一個線程的結果? – ABros 2014-04-16 11:05:09

6

使用拉姆達來包裝你的目標線程的功能和使用隊列通過其返回值回父線程。 (您原來的目標函數仍然沒有額外的隊列參數不變。)

示例代碼:

import threading 
import queue 
def dosomething(param): 
    return param * 2 
que = queue.Queue() 
thr = threading.Thread(target = lambda q, arg : q.put(dosomething(arg)), args = (que, 2)) 
thr.start() 
thr.join() 
while not que.empty(): 
    print(que.get()) 

輸出:

4 
2

POC:

import random 
import threading 

class myThread(threading.Thread): 
    def __init__(self, arr): 
     threading.Thread.__init__(self) 
     self.arr = arr 
     self.ret = None 

    def run(self): 
     self.myJob(self.arr) 

    def join(self): 
     threading.Thread.join(self) 
     return self.ret 

    def myJob(self, arr): 
     self.ret = sorted(self.arr) 
     return 

#Call the main method if run from the command line. 
if __name__ == '__main__': 
    N = 100 

    arr = [ random.randint(0, 100) for x in range(N) ] 
    th = myThread(arr) 
    th.start() 
    sortedArr = th.join() 

    print "arr2: ", sortedArr 
3

您可以使用同步queue模塊。
考慮您需要檢查從數據庫中的用戶的相關信息與已知ID:

def check_infos(user_id, queue): 
    result = send_data(user_id) 
    queue.put(result) 

現在你可以讓你的數據是這樣的:

import queue, threading 
queued_request = queue.Queue() 
check_infos_thread = threading.Thread(target=check_infos, args=(user_id, queued_request)) 
check_infos_thread.start() 
final_result = queued_request.get() 
6

我很驚訝沒有人提到,你可以只通過它可變:

>>> thread_return={'success': False} 
>>> from threading import Thread 
>>> def task(thread_return): 
... thread_return['success'] = True 
... 
>>> Thread(target=task, args=(thread_return,)).start() 
>>> thread_return 
{'success': True} 

也許這有我不知道的重大問題。

+1

這完美的作品!如果有的話,我們真的很想聽聽有關這種方法遺漏的事情的一些看法。 – 2016-06-16 15:07:30

+0

工程。就其專業化現有功能而言,其醜陋 - 以及那些令人困惑的事情(可讀性) - 請參閱對第一個答案的評論。 – kxr 2016-08-17 10:44:09

+0

用多線程怎麼樣? – backslash112 2016-08-17 16:29:30

0

以下包裝函數將包裝現有函數並返回一個指向線程的對象(以便您可以調用start(),join()等)以及訪問/查看其最終返回值。

def threadwrap(func,args,kwargs): 
    class res(object): result=None 
    def inner(*args,**kwargs): 
    res.result=func(*args,**kwargs) 
    import threading 
    t = threading.Thread(target=inner,args=args,kwargs=kwargs) 
    res.thread=t 
    return res 

def myFun(v,debug=False): 
    import time 
    if debug: print "Debug mode ON" 
    time.sleep(5) 
    return v*2 

x=threadwrap(myFun,[11],{"debug":True}) 
x.thread.start() 
x.thread.join() 
print x.result 

它看起來OK,和threading.Thread類似乎很容易擴展(*)具有這種功能,所以我不知道爲什麼它已不存在。上述方法有缺陷嗎? (*)請注意,husanu對這個問題的回答完全是這樣的,子類threading.Thread產生了一個版本,join()給出了返回值。

1

基於jcomeau_ictx的建議。我遇到的最簡單的一個。這裏的要求是從服務器上運行的三個不同進程中獲得退出狀態staus,並在三個都成功時觸發另一個腳本。這似乎是工作的罰款

class myThread(threading.Thread): 
     def __init__(self,threadID,pipePath,resDict): 
      threading.Thread.__init__(self) 
      self.threadID=threadID 
      self.pipePath=pipePath 
      self.resDict=resDict 

     def run(self): 
      print "Starting thread %s " % (self.threadID) 
      if not os.path.exists(self.pipePath): 
      os.mkfifo(self.pipePath) 
      pipe_fd = os.open(self.pipePath, os.O_RDWR | os.O_NONBLOCK) 
      with os.fdopen(pipe_fd) as pipe: 
       while True: 
        try: 
        message = pipe.read() 
        if message: 
         print "Received: '%s'" % message 
         self.resDict['success']=message 
         break 
        except: 
         pass 

    tResSer={'success':'0'} 
    tResWeb={'success':'0'} 
    tResUisvc={'success':'0'} 


    threads = [] 

    pipePathSer='/tmp/path1' 
    pipePathWeb='/tmp/path2' 
    pipePathUisvc='/tmp/path3' 

    th1=myThread(1,pipePathSer,tResSer) 
    th2=myThread(2,pipePathWeb,tResWeb) 
    th3=myThread(3,pipePathUisvc,tResUisvc) 

    th1.start() 
    th2.start() 
    th3.start() 

    threads.append(th1) 
    threads.append(th2) 
    threads.append(th3) 

    for t in threads: 
     print t.join() 

    print "Res: tResSer %s tResWeb %s tResUisvc %s" % (tResSer,tResWeb,tResUisvc) 
    # The above statement prints updated values which can then be further processed