我有一個多線程程序,我創建一個生成器函數,然後將其傳遞給新線程。我希望它在本質上是共享/全局的,因此每個線程都可以從生成器獲取下一個值。發電機線程安全嗎?
使用像這樣的生成器是否安全,還是會遇到從多個線程訪問共享生成器的問題/條件?
如果沒有,是否有更好的方法來解決這個問題?我需要一些能夠循環訪問列表併爲任何線程調用它的下一個值。
我有一個多線程程序,我創建一個生成器函數,然後將其傳遞給新線程。我希望它在本質上是共享/全局的,因此每個線程都可以從生成器獲取下一個值。發電機線程安全嗎?
使用像這樣的生成器是否安全,還是會遇到從多個線程訪問共享生成器的問題/條件?
如果沒有,是否有更好的方法來解決這個問題?我需要一些能夠循環訪問列表併爲任何線程調用它的下一個值。
它不是線程安全的;同時通話可能會交錯,並與局部變量混淆。
常用的方法是使用主從模式(現在稱PC中的農民工模式)。創建第三個線程來生成數據,並在主服務器和從服務器之間添加一個隊列,從服務器將從隊列中讀取數據,並且主服務器將向其寫入數據。標準隊列模塊提供必要的線程安全性,並安排阻塞主機,直到從機準備好讀取更多數據。
不,它們不是線程安全的。你可以找到關於發電機和多線程的有趣的信息:
這取決於你使用的Python實現。在CPython中,GIL對python對象進行線程安全的所有操作,因爲在任何給定的時間只有一個線程可以執行代碼。
「GIL對python對象進行線程安全操作」 - 嗯?所有的操作都不是原子的 – 2009-07-15 18:16:50
這是很危險的誤導。 GIL只意味着Python代碼不會在多線程環境中破壞Python狀態:不能在字節碼操作中改變線程。 (例如,您可以修改共享字典而不會破壞它。)您仍然可以在任何兩個字節碼操作符之間更改線程。 – 2009-07-15 19:45:56
編輯以低於基準添加。
你可以用一個鎖包裹發電機。例如,
import threading
class LockedIterator(object):
def __init__(self, it):
self.lock = threading.Lock()
self.it = it.__iter__()
def __iter__(self): return self
def next(self):
self.lock.acquire()
try:
return self.it.next()
finally:
self.lock.release()
gen = [x*2 for x in [1,2,3,4]]
g2 = LockedIterator(gen)
print list(g2)
鎖定需要我的系統上50毫秒,隊列中取出350毫秒。當你真的有一個隊列時,隊列是有用的;例如,如果您有傳入的HTTP請求,並且您想將它們排隊以供工作線程處理。 (這不適合Python迭代器模型 - 一旦迭代器用完了項目,就完成了。)如果你確實有一個迭代器,那麼LockedIterator是使線程安全的更快,更簡單的方法。
from datetime import datetime
import threading
num_worker_threads = 4
class LockedIterator(object):
def __init__(self, it):
self.lock = threading.Lock()
self.it = it.__iter__()
def __iter__(self): return self
def next(self):
self.lock.acquire()
try:
return self.it.next()
finally:
self.lock.release()
def test_locked(it):
it = LockedIterator(it)
def worker():
try:
for i in it:
pass
except Exception, e:
print e
raise
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
def test_queue(it):
from Queue import Queue
def worker():
try:
while True:
item = q.get()
q.task_done()
except Exception, e:
print e
raise
q = Queue()
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.setDaemon(True)
t.start()
t1 = datetime.now()
for item in it:
q.put(item)
q.join()
start_time = datetime.now()
it = [x*2 for x in range(1,10000)]
test_locked(it)
#test_queue(it)
end_time = datetime.now()
took = end_time-start_time
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000)
Queue.Queue肯定爲+1,適用於組織線程系統(這是大多數情況下,絕對是此任務)的好方法。 – 2009-07-15 13:46:18