2014-10-03 52 views
1

我想找到一種機制來輕鬆報告Python線程的進度。例如,如果我的線程有一個計數器,我想知道計數器的價值,但重要的是,我只需要知道最新的價值,而不是每一個過去的價值。Python線程的易於刷新隊列

我認爲最簡單的解決方案是一個單一的值Queue,其中每當我在線索中新建一個值時,它就會用新值替換舊值。然後當我在主程序中做一個get時,它只會返回最新值。

因爲我不知道如何做到上述,而是我所做的是每個計數器值put排隊,當我get,我得到的所有值,直到沒有更多的,只是保持最後。但是,這看起來很不理想,因爲我正在填充我不在乎的數千個價值的隊列。

這裏是什麼,我現在做的一個例子:

from threading import Thread 
from Queue import Queue, Empty 
from time import sleep 

N = 1000 

def fast(q): 
    count = 0 
    while count<N: 
     sleep(.02) 
     count += 1 
     q.put(count) 

def slow(q): 
    while 1: 
     sleep(5) # sleep for a long time 
     # read last item in queue 
     val = None 
     while 1: # read all elements of queue, only saving last 
      try: 
       val = q.get(block=False) 
      except Empty: 
       break 
     print val # the last element read from the queue 
     if val==N: 
      break 

if __name__=="__main__": 
    q = Queue() 
    fast_thread = Thread(target=fast, args=(q,)) 
    fast_thread.start() 
    slow(q) 
    fast_thread.join() 

我的問題是,有沒有更好的方法嗎?

+1

IM不太清楚你爲什麼不直接使用共享變量?只要只有一個線程寫入和所有其他讀取,就不會有問題。 – 2014-10-03 20:55:39

+0

你能用'maxsize = 1'來做一個'Queue'嗎? – 2014-10-03 20:56:16

+1

我在@DonQuestion這裏。爲什麼不直接在'slow'內訪問'count'? – dano 2014-10-03 20:57:41

回答

3

只需使用全局變量和threading.Lock即可在作業期間保護它:

import threading 
from time import sleep 

N = 1000 
value = 0 
def fast(lock): 
    global value 
    count = 0 
    while count<N: 
     sleep(.02) 
     count += 1 
     with lock: 
      value = count 

def slow(): 
    while 1: 
     sleep(5) # sleep for a long time 
     print value # read current value 
     if value == N: 
      break 

if __name__=="__main__": 
    lock = threading.Lock() 
    fast_thread = threading.Thread(target=fast, args=(lock,)) 
    fast_thread.start() 
    slow() 
    fast_thread.join() 

收益率(像)

249 
498 
747 
997 
1000 

正如唐問題指出,如果只有一個線程修改value,然後 確實需要在fast功能沒有鎖。正如dano指出的那樣,如果您想要 確保slow中打印的值與 if-statement中使用的值相同,則需要在slow函數中鎖定。

有關需要鎖定的更多信息,請參閱Thread Synchronization Mechanisms in Python

+1

爲什麼在這個例子中需要鎖?只有一個線程(如op)實際寫入'value'。 – 2014-10-03 20:53:22

+1

我不確定在這裏需要一個鎖,除非你想確保'value'在你打印它的時間和'if value == N'的時間之間沒有變化。如果這就是你要做的行爲,那麼你實際上並沒有得到它,因爲你沒有獲得'slow'內的'lock'。 – dano 2014-10-03 20:56:54

+0

@DonQuestion:你是對的;如果只有一個線程寫入值,則不需要鎖。但是一般來說,如果多個線程可以分配給'value',那麼你需要一個鎖。 – unutbu 2014-10-03 20:57:55

3

只需使用一個deque,最大長度爲1.它將保持您的最新值。

因此,而不是:

q = Queue() 

使用:

from collections import deque 
q = deque(maxlen=1) 

從deque的閱讀,沒有get方法,所以你必須做一些事情,如:

val = None 
try: 
    val = q[0] 
except IndexError: 
    pass 
+1

'deque'線程安全嗎?我認爲我必須堅持隊列結構,因爲它們具有內置的適當鎖定機制。 – tom10 2014-10-03 20:48:50

+1

或'Queue(maxsize = 1)'。 'deque'不完全是線程安全的。 – msvalkon 2014-10-03 20:50:21

+0

@msvalkon:你的意思是它不完全是線程安全的?該文檔說:「Deques支持線程安全,高效的內存追加,並從雙方的任何一方彈出,在任一方向都具有大致相同的O(1)性能。」 – Gerrat 2014-10-03 21:10:17

3

在您的特殊情況下,您可能會過度複雜化問題。如果你的變量只是單個線程的某種進度消除器,並且只有這個線程實際上改變了這個變量,那麼只要所有其他線程只能讀取,使用一個共享對象來傳遞進度是完全安全的。

我想我們都會閱讀很多關於併發編程中共享狀態的種族條件和其他缺陷的警告,所以我們傾向於過時並添加更多的預防措施,然後有時需要。

你基本上可以共享一個預構建的字典:

thread_progress = dict.fromkeys(list_of_threads, progress_start_value) 

或手動:

thread_progress = {thread: progress_value, ...} 

沒有進一步的預防措施,只要沒有線程改變了字典密鑰。

這樣你就可以在一個字典中跟蹤多個線程的進度。一旦線程開始,唯一的條件是不改變字典。這意味着該字典必須包含全部線程之前第一個子線程啓動,否則你必須使用一個鎖,然後寫入字典。隨着「改變字典」我的意思是所有關於鑰匙的操作。您可以更改關鍵字的關聯值,因爲這處於下一級間接關係。

更新

根本問題是共享狀態。這在線性程序中已經是一個問題,但是併發的噩夢。

例如:想象一個全局(共享)變量sv和兩個函數G(ood)B(ad)在線性程序中。兩個函數都根據sv計算結果,但B無意中更改爲sv。現在你想知道爲什麼G沒有做它應該做的事情,儘管在你的函數G中沒有發現任何錯誤,即使在你測試它被隔離並且它是完美的。

現在想象一下在併發程序中使用兩個線程AB的相同場景。兩個線程都將共享狀態/變量sv加1。

無鎖(括號中的sv電流值):

sv = 0 
A reads sv (0) 
B reads sv (0) 
A inc sv (0) 
B inc sv (0) 
A writes sv (1) 
B writes sv (1) 
sv == 1 # should be 2! 

要找到問題的根源是一個純粹的噩夢!因爲它有時也會成功。通常A實際上會成功完成,在B甚至開始讀取sv之前,但現在您的問題似乎表現爲非確定性或不穩定,並且更難以找到。與我的線性例子相比,這兩個線程都是「好」的,但表現得不像是有意的。

與鎖定:

sv = 0 
l = lock (for access on sv) 
A tries to aquire lock for sv -> success (0) 
B tries to aquire lock for sv -> failure, blocked by A (0) 
A reads sv (0) 
B blocked (0) 
A inc sv (0) 
B blocked (0) 
A writes sv (1) 
B blocked (1) 
A releases lock on sv (1) 
B tries to aquire lock for sv -> success (1) 
... 
sv == 2 

我希望我的小例子來說明訪問共享狀態,並且 爲什麼通過鎖定進行寫操作(包括讀操作)的原子是必要的根本問題。

關於我的一個預初始化字典的建議是:這僅僅是一種預防措施,因爲兩個原因:

  1. ,如果你在遍歷線程一個for循環,循環可能引發 例外如果一個線程在仍處於循環中的同時向字典添加或刪除一個條目,因爲它現在不清楚下一個鍵 應該是什麼。

  2. 線程A讀取字典並被線程B中斷,線程B添加了 條目並結束。線程A恢復,但沒有字典 線程B發生了變化,並將前B與它自己的更改 一起寫回。線程Bs更改丟失。

順便提一下,由於原始類型的不變性,我提出的解決方案不適用於atm。但是這可以通過使它們變得可變來容易地解決,例如,通過將它們封裝到列表或特殊的進度對象中,或者甚至更簡單:給線程函數訪問thread_progress字典。

說明舉例:

t = Thread() 
progress = 0  # progress points to the object `0` 
dict[t] = progress # dict[t] points now to object `0` 
progress = 1  # progress points to object `1` 
dict[t]    # dict[t] still points to object `0` 

更好:

t = Thread() 
t.progress = 0   
dict[thread_id] = t 
t.progress = 1   
dict[thread_id].progress == 1 
+0

這一切都非常有趣,我很欣賞這個想法。我無法找到正確的文檔。據我所知,關鍵詞似乎是「原子」,因爲在Python中,原子操作是線程安全的。但是我發現很難獲得這方面的細節,例如,您對字典所做的要點,以及何時可以添加密鑰等。 – tom10 2014-10-06 02:21:59

+0

我做了更新,希望能夠解決您評論的問題。 – 2014-10-06 10:16:50