2013-07-10 286 views
12

要求是啓動五個線程,並且只在最快的線程中等待。所有五個線程都去尋找相同的數據5個方向,一個足以繼續控制流程。如何等待,直到在Python中完成第一個線程

實際上,我需要等待前兩個線程返回來驗證對方。但我猜如果我知道如何等待最快的。我可以想出如何等待第二快。

很多人都在談論join(timeout),但是您事先不知道哪一個要等待(哪一個需要提前申請join)。

回答

1

或者只是跟蹤列表中的所有完成的線程,並讓第二個線程完成處理任何應該完成的任務,Python列表是線程安全的。

finished_threads = [] 
event = threading.Event() 

def func(): 
    do_important_stuff() 

    thisthread = threading.current_thread() 
    finished_threads.append(thisthread) 
    if len(finished_threads) > 1 and finished_threads[1] == thisthread: 
     #yay we are number two! 
     event.set() 

for i in range(5): 
    threading.Thread(target=func).start() 

event.wait() 
+0

這不回答關於主線程等待,直到兩個線程位完成,然後繼續:代替你傳遞的所有剩餘活動到第二個線程來完成這可能不是什麼都想。 – Duncan

+1

true; handle_two_threads_done()應該可以在事件上設置。編輯。 –

+0

嗯,Python列表是線程安全嗎?真?我以爲需要使用Queue()來實現線程一致性! –

3

如果你有某種在你的線程處理循環的,下面的代碼將終止他們當一個終止使用threading.Event()

def my_thread(stop_event): 
    while not stop_event.is_set(): 
     # do stuff in a loop 

     # some check if stuff is complete 
     if stuff_complete: 
      stop_event.set() 
      break 

def run_threads(): 
    # create a thread event 
    a_stop_event = threading.Event() 

    # spawn the threads 
    for x in range(5): 
     t = threading.Thread(target=my_thread, args=[a_stop_event]) 
     t.start() 

    while not a_stop_event.is_set(): 
     # wait for an event 
     time.sleep(0.1) 

    print "At least one thread is done" 

如果你的過程是「便宜」或單請求 - 響應類型線程(即例如異步HTTP請求),那麼Duncan's answer是一種好方法。

13

使用隊列:建成後每個線程放在隊列中的結果,然後你只需要讀取結果的適當數量而忽略其餘:

#!python3.3 
import queue # For Python 2.x use 'import Queue as queue' 
import threading, time, random 

def func(id, result_queue): 
    print("Thread", id) 
    time.sleep(random.random() * 5) 
    result_queue.put((id, 'done')) 

def main(): 
    q = queue.Queue() 
    threads = [ threading.Thread(target=func, args=(i, q)) for i in range(5) ] 
    for th in threads: 
     th.daemon = True 
     th.start() 

    result1 = q.get() 
    result2 = q.get() 

    print("Second result: {}".format(result2)) 

if __name__=='__main__': 
    main() 

的文檔Queue.get()(不帶參數它相當於Queue.get(True, None)

Queue.get([嵌段[,超時]])

卸下並從 返回一個項目隊列。如果可選args塊爲true並且超時時間爲無(默認爲 ),則在必要時阻止,直到項目可用。如果超時時間爲 爲正數,則最多會阻止超時秒數,如果在該時間內沒有可用項目,則會引發空例外情況。否則 (塊爲假),返回一個項目,如果一個是立即可用的,否則 引發空異常(在這種情況下超時被忽略)。

+1

如果在執行'q.get()'時Queue爲空,這是否會引發'Empty'異常? – Michael

+2

@Michael,'q.get()'的默認值是做一個阻塞get,所以不會拋出異常,而是阻塞主線程,直到有可用結果。 – Duncan

1

鄧肯的方法可能是最好的,我會推薦。不過,我之前一直缺乏「等待下一個完成的線程完成」的說法,我對此感到有些惱火,所以我只是寫了這個文件來嘗試一下。似乎工作。只需使用MWThread代替threading.thread,即可獲得新的wait_for_thread功能。

全局變量有點klunky;另一種辦法是讓他們成爲班級變量。但是,如果這是隱藏在一個模塊(mwthread.py或其他),它應該沒問題。

#! /usr/bin/env python 

# Example of how to "wait for"/join whichever threads is/are done, 
# in (more or less) the order they're done. 

import threading 
from collections import deque 

_monitored_threads = [] 
_exited_threads = deque() 
_lock = threading.Lock() 
_cond = threading.Condition(_lock) 

class MWThread(threading.Thread): 
    """ 
    multi-wait-able thread, or monitored-wait-able thread 
    """ 
    def run(self): 
     tid = threading.current_thread() 
     try: 
      with _lock: 
       _monitored_threads.append(tid) 
      super(MWThread, self).run() 
     finally: 
      with _lock: 
       _monitored_threads.remove(tid) 
       _exited_threads.append(tid) 
       _cond.notifyAll() 

def wait_for_thread(timeout=None): 
    """ 
    Wait for some thread(s) to have finished, with optional 
    timeout. Return the first finished thread instance (which 
    is removed from the finished-threads queue). 

    If there are no unfinished threads this returns None 
    without waiting. 
    """ 
    with _cond: 
     if not _exited_threads and _monitored_threads: 
      _cond.wait(timeout) 
     if _exited_threads: 
      result = _exited_threads.popleft() 
     else: 
      result = None 
    return result 

def main(): 
    print 'testing this stuff' 
    def func(i): 
     import time, random 
     sleeptime = (random.random() * 2) + 1 
     print 'thread', i, 'starting - sleep for', sleeptime 
     time.sleep(sleeptime) 
     print 'thread', i, 'finished' 

    threads = [MWThread(target=func, args=(i,)) for i in range(3)] 
    for th in threads: 
     th.start() 
    i = 0 
    while i < 3: 
     print 'main: wait up to .5 sec' 
     th = wait_for_thread(.5) 
     if th: 
      print 'main: got', th 
      th.join() 
      i += 1 
     else: 
      print 'main: timeout' 
    print 'I think I collected them all' 
    print 'result of wait_for_thread():' 
    print wait_for_thread() 

if __name__ == '__main__': 
    main() 
相關問題