2009-07-15 53 views
34

我有一個多線程程序,我創建一個生成器函數,然後將其傳遞給新線程。我希望它在本質上是共享/全局的,因此每個線程都可以從生成器獲取下一個值。發電機線程安全嗎?

使用像這樣的生成器是否安全,還是會遇到從多個線程訪問共享生成器的問題/條件?

如果沒有,是否有更好的方法來解決這個問題?我需要一些能夠循環訪問列表併爲任何線程調用它的下一個值。

回答

49

它不是線程安全的;同時通話可能會交錯,並與局部變量混淆。

常用的方法是使用主從模式(現在稱PC中的農民工模式)。創建第三個線程來生成數據,並在主服務器和從服務器之間添加一個隊列,從服務器將從隊列中讀取數據,並且主服務器將向其寫入數據。標準隊列模塊提供必要的線程安全性,並安排阻塞主機,直到從機準備好讀取更多數據。

+7

Queue.Queue肯定爲+1,適用於組織線程系統(這是大多數情況下,絕對是此任務)的好方法。 – 2009-07-15 13:46:18

-7

這取決於你使用的Python實現。在CPython中,GIL對python對象進行線程安全的所有操作,因爲在任何給定的時間只有一個線程可以執行代碼。

http://en.wikipedia.org/wiki/Global_Interpreter_Lock

+1

「GIL對python對象進行線程安全操作」 - 嗯?所有的操作都不是原子的 – 2009-07-15 18:16:50

+6

這是很危險的誤導。 GIL只意味着Python代碼不會在多線程環境中破壞Python狀態:不能在字節碼操作中改變線程。 (例如,您可以修改共享字典而不會破壞它。)您仍然可以在任何兩個字節碼操作符之間更改線程。 – 2009-07-15 19:45:56

40

編輯以低於基準添加。

你可以用一個鎖包裹發電機。例如,

import threading 
class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

gen = [x*2 for x in [1,2,3,4]] 
g2 = LockedIterator(gen) 
print list(g2) 

鎖定需要我的系統上50毫秒,隊列中取出350毫秒。當你真的有一個隊列時,隊列是有用的;例如,如果您有傳入的HTTP請求,並且您想將它們排隊以供工作線程處理。 (這不適合Python迭代器模型 - 一旦迭代器用完了項目,就完成了。)如果你確實有一個迭代器,那麼LockedIterator是使線程安全的更快,更簡單的方法。

from datetime import datetime 
import threading 
num_worker_threads = 4 

class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

def test_locked(it): 
    it = LockedIterator(it) 
    def worker(): 
     try: 
      for i in it: 
       pass 
     except Exception, e: 
      print e 
      raise 

    threads = [] 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     threads.append(t) 
     t.start() 

    for t in threads: 
     t.join() 

def test_queue(it): 
    from Queue import Queue 
    def worker(): 
     try: 
      while True: 
       item = q.get() 
       q.task_done() 
     except Exception, e: 
      print e 
      raise 

    q = Queue() 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     t.setDaemon(True) 
     t.start() 

    t1 = datetime.now() 

    for item in it: 
     q.put(item) 

    q.join() 

start_time = datetime.now() 
it = [x*2 for x in range(1,10000)] 

test_locked(it) 
#test_queue(it) 
end_time = datetime.now() 
took = end_time-start_time 
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000)