一個現成的選項,可能會幫助這是twisted.internet.defer.DeferredSemaphore
。這是您可能已經知道的正常(計數)信號量的異步版本,如果您已經完成了多線程編程。
A(計數)信號是很多像互斥(鎖定)。但是,只有在相應的版本發佈之前,互斥鎖才能獲得一次,則可以配置(計數)信號量,以便在需要任何相應版本之前允許任意(但指定)數量的採集成功。
下面是使用DeferredSemaphore
來跑十個異步操作的例子,但同時運行最多三個人:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
也有明確的acquire
和release
的方法,但是run
方法是如此方便它幾乎總是你想要的。它調用acquire
方法,該方法返回Deferred
。對於第一個Deferred
,它添加了一個回調,它調用您傳入的函數(以及任何位置或關鍵字參數)。如果該函數返回Deferred
,那麼到第二個Deferred
將添加一個回調,該回調調用release
方法。
同步的情況下被處理,以及,通過立即調用release
。還通過允許它們傳播錯誤來處理錯誤,但要確保完成必要的release
以使DeferredSemaphore
處於一致狀態。傳遞給run
函數(或它返回Deferred
的結果)的結果成爲由run
返回的Deferred
的結果。
另一種可能的方法可能基於DeferredQueue
和cooperate
。 DeferredQueue
大多像普通隊列,但其get
方法返回Deferred
。如果在通話時隊列中沒有任何項目,則在添加項目之前Deferred
不會觸發。
下面是一個例子:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
注意,async
工人功能是一樣的,從第一個例子中的一個。但是,這次還有一個worker
函數明確將作業從DeferredQueue
中刪除,並使用async
(將async
作爲對返回的Deferred
的回調)進行處理。 worker
發電機由cooperate
驅動,它在每個Deferred
產生火災後迭代它。然後,主循環啓動這些工作生成器中的三個,以便在任何給定時間進行三項工作。
這種方法涉及的代碼比DeferredSemaphore
方法多一點,但有一些好處可能會讓人感興趣。首先,cooperate
返回一個CooperativeTask
實例,其中有一些有用的方法,如pause
,resume
和其他一些其他實例。此外,分配給相同合作者的所有作業將在調度中彼此配合,以免重複事件循環(這是API的名稱)。在DeferredQueue
方面,還可以對待處理項目的數量設置限制,以避免完全超載服務器(例如,如果圖像處理器卡住並停止完成任務)。如果調用put
的代碼處理隊列溢出異常,則可以用此壓力嘗試停止接受新作業(可能將其分流到另一個服務器或警告管理員)。與DeferredSemaphore
做類似的事情是有點棘手,因爲沒有辦法限制有多少工作等待能夠獲得信號量。
很酷,我很欣賞這些想法。 迴應使用DeferredSemaphore的想法。如果需要完成不連續的批次工作,這將非常有用。如果一個批次有太多的工作要做,它只會同時完成一些工作,然後在所有工作完成時收集批次。這有缺點,直到整個批次完成正確後才返回結果?我認爲這個缺點通過使用DeferredQueue來解決...... – agartland 2010-05-19 21:50:33
DeferredQueue和合作的方法很聰明。就縮放處理器而言,它確實會給我更多的控制權。我甚至不認爲它一定更復雜。謝謝。 – agartland 2010-05-19 22:06:38