2015-05-08 32 views
3

我想使用2個線程遍歷列表。一個來自領先,另一個來自尾隨,並在每次迭代中將元素放在Queue中。但在將值放入Queue之前,我需要檢查Queue(當其中一個線程將該值放在Queue中時)的值是否存在,所以當發生這種情況時,我需要停止線程並返回遍歷值的列表爲每個線程。多線程檢查隊列中的成員並停止線程

這是我到目前爲止已經試過:

from Queue import Queue 
from threading import Thread, Event 

class ThreadWithReturnValue(Thread): 
    def __init__(self, group=None, target=None, name=None, 
       args=(), kwargs={}, Verbose=None): 
     Thread.__init__(self, group, target, name, args, kwargs, Verbose) 
     self._return = None 
    def run(self): 
     if self._Thread__target is not None: 
      self._return = self._Thread__target(*self._Thread__args, 
               **self._Thread__kwargs) 
    def join(self): 
     Thread.join(self) 
     return self._return 

main_path = Queue() 

def is_in_queue(x, q): 
    with q.mutex: 
     return x in q.queue 

def a(main_path,g,l=[]): 
    for i in g: 
    l.append(i) 
    print 'a' 
    if is_in_queue(i,main_path): 
     return l 
    main_path.put(i) 

def b(main_path,g,l=[]): 
    for i in g: 
    l.append(i) 
    print 'b' 
    if is_in_queue(i,main_path): 
     return l 
    main_path.put(i) 

g=['a','b','c','d','e','f','g','h','i','j','k','l'] 

t1 = ThreadWithReturnValue(target=a, args=(main_path,g)) 
t2 = ThreadWithReturnValue(target=b, args=(main_path,g[::-1])) 
t2.start() 
t1.start() 
# Wait for all produced items to be consumed 
print main_path.join() 

我用ThreadWithReturnValue,這將創建一個返回值自定義線程。

並在成員檢查我用下面的功能:

def is_in_queue(x, q): 
    with q.mutex: 
     return x in q.queue 

現在,如果我第一次啓動t1然後t2我會得到12 a然後一個b那麼它不會做任何事情,我需要手動終止python!

但是,如果我先運行t2然後t1我會得到以下結果:

b 
b 
b 
b 
ab 

ab 
b 

b 
b 
b 
a 
a 

所以我的問題是,爲什麼蟒蛇胎面在這種情況下,有什麼不同?以及如何終止線程並使它們彼此通信?

+0

看看這裏http://pymotw.com/2/multiprocessing/communication.html ...你更感興趣的是管理共享狀態 – OWADVL

+0

@OWADVL聲音有用,我會看到的!謝謝! – Kasramvd

+0

您是否有從列表兩端進行迭代的實際要求,或者您是否只是將其作爲劃分任務的一種方式? – 101

回答

2

在我們進入更大的問題,你不使用Queue.join權。

該函數的重點在於,將一堆物品添加到隊列中的生產者可以等到消費者或消費者完成所有這些項目的工作。這可以通過讓消費者在完成與get之間的每個項目的完成工作後致電task_done。一旦有多少task_done調用作爲put調用,隊列就完成了。你不是在任何地方做get,更不用說task_done,所以隊列無法完成。所以,這就是爲什麼在兩個線程完成後永遠阻塞。


這裏的第一個問題是你的線程在實際同步之外幾乎沒有工作。如果他們所做的唯一的事情是通過一個隊列進行戰鬥,那麼他們中只有一個能夠一次運行。

當然,這是在玩具問題常見,但你必須考慮自己真正的問題:

  • 如果你正在做大量的I/O工作(監聽套接字,等待用戶輸入,等等),線程效果很好。
  • 如果你正在做大量的CPU工作(計算素數),線程不能在Python中工作,因爲GIL,但過程。
  • 如果你實際上主要是處理同步單獨的任務,那麼任何一個都不會很好地工作(並且進程會變得更糟)。它可能仍然是簡單以線程方式思考,但它會是做事最慢的方式。你可能想看看協程; Greg Ewing有一個great demonstration關於如何使用yield from來使用協程來構建諸如調度程序或許多參與者模擬的東西。

接下來,正如我提到在你前面的問題,使得線程(或進程)共享狀態的有效工作需要持有儘可能短的時間儘可能的鎖。

所以,如果你不得不在一個鎖下搜索整個隊列,那最好是一個常量時間搜索,而不是線性時間搜索。這就是爲什麼我建議使用諸如OrderedSet配方而不是list的配方,比如stdlib的Queue.Queue中的配方。那麼這個功能:

def is_in_queue(x, q): 
    with q.mutex: 
     return x in q.queue 

...只阻塞隊列中的一小部分第二,足夠長的時間來查找表中的哈希值,而不是足夠長,每個元素在隊列中對比較x


最後,我試着解釋你的其他問題的競爭條件,但讓我再試一次。

您需要鎖定代碼中的每個完整的「事務」,而不是圍繞單個操作。

例如,如果你這樣做:

with queue locked: 
    see if x is in the queue 
if x was not in the queue: 
    with queue locked: 
     add x to the queue 

...那麼它總是可能的,則x不在隊列中,當您檢查,但在當你解鎖並重新鎖定它的時間,有人加入它。這正是爲什麼兩個線程都可能提前停下來的原因。

要解決這個問題,你需要把一個鎖周圍的整個事情:

with queue locked: 
    if x is not in the queue: 
     add x to the queue 

當然這正好直接對我說之前有關鎖定隊列儘可能短的時間儘可能。真的,這就是多線程技術簡單化的原因。編寫安全的代碼很容易,只要可能是必要的,就可以鎖定所有內容,但是隨後代碼只會使用單個內核,而其他所有線程都會被阻塞,等待鎖定。編寫快速代碼很簡單,只需儘可能簡短地鎖定所有內容,但這樣就不安全,並且會得到垃圾數據甚至崩潰。弄清楚需要做什麼事情,以及如何最大限度地減少這些事務內部的工作,以及如何處理多個鎖,這些鎖可能需要在不造成死鎖的情況下完成工作......這並不容易。

+0

非常感謝@abarnert花時間和完整的解釋!你澄清我的一些小姐的理解,其實我在多處理中很糟糕,我想我需要更多的學習! :) – Kasramvd

+1

@卡斯拉:有一個原因「共享內存多線程很難」是一個老生常談。每個人都可怕,因爲我們的直覺是錯誤的(至少在從語言級別設計到CPU微碼級別的系統上,以優化單一處理並且不能改變)。儘可能使用更高級別的抽象(消息傳遞,而不是共享內存,STM等),當不是時,您將逐漸感覺到什麼時候忽略了您的直覺並嚴格地完成了工作(和/或測試)會發生什麼,但是你仍然會做出難以調試的錯誤... – abarnert

+0

是的,在我的問題中有很多場景!我正在慢慢的工作,我想深深地學習它!正如你所說的「共享內存多線程很難」是一種陳詞濫調,對我來說也是如此,但我喜歡硬件,我會努力! – Kasramvd

2

幾件事情,我認爲可以改進:

  1. 由於GIL,你可能想使用multiprocessing(而不是threading)模塊。通常,CPython線程不會導致CPU密集型工作加速。 (根據問題的背景,multiprocessing也可能不會,但threading幾乎肯定不會。)
  2. 像您的is_inqueue這樣的函數可能會導致較高的爭用。

鎖定的時間似乎線性在需要遍歷的項目數:

def is_in_queue(x, q): 
    with q.mutex: 
     return x in q.queue 

所以,相反,你可能做到以下幾點。

使用multiprocessing與共享dict

from multiprocessing import Process, Manager 

manager = Manager() 
d = manager.dict() 

# Fn definitions and such 

p1 = Process(target=p1, args=(d,)) 
p2 = Process(target=p2, args=(d,)) 
每個函數內

,檢查這樣的項目:

def p1(d): 

    # Stuff 

    if 'foo' in d: 
     return