2010-05-18 51 views
11

Twisted(python)的優勢在於它的異步框架(我認爲)。我已經編寫了一個圖像處理服務器,通過透視經紀人接受請求。只要我一次餵它少於幾百張圖像,它就可以很好地工作。但是,有時候它幾乎在同一時間會出現數百幅圖像。因爲它試圖同時處理它們,服務器崩潰。將遠程調用隊列隊列到Python Twisted透視代理?

正如我想排隊的服務器上的remote_calls以便它僅處理的溶液〜100張圖像的時間。看起來這可能是Twisted已經做的事情,但我似乎無法找到它。關於如何開始實施這個的任何想法?一個正確的方向?謝謝!

回答

29

一個現成的選項,可能會幫助這是twisted.internet.defer.DeferredSemaphore。這是您可能已經知道的正常(計數)信號量的異步版本,如果您已經完成了多線程編程。

A(計數)信號是很多像互斥(鎖定)。但是,只有在相應的版本發佈之前,互斥鎖才能獲得一次,則可以配置(計數)信號量,以便在需要任何相應版本之前允許任意(但指定)數量的採集成功。

下面是使用DeferredSemaphore來跑十個異步操作的例子,但同時運行最多三個人:

from twisted.internet.defer import DeferredSemaphore, gatherResults 
from twisted.internet.task import deferLater 
from twisted.internet import reactor 


def async(n): 
    print 'Starting job', n 
    d = deferLater(reactor, n, lambda: None) 
    def cbFinished(ignored): 
     print 'Finishing job', n 
    d.addCallback(cbFinished) 
    return d 


def main(): 
    sem = DeferredSemaphore(3) 

    jobs = [] 
    for i in range(10): 
     jobs.append(sem.run(async, i)) 

    d = gatherResults(jobs) 
    d.addCallback(lambda ignored: reactor.stop()) 
    reactor.run() 


if __name__ == '__main__': 
    main() 

DeferredSemaphore也有明確的acquirerelease的方法,但是run方法是如此方便它幾乎總是你想要的。它調用acquire方法,該方法返回Deferred。對於第一個Deferred,它添加了一個回調,它調用您傳入的函數(以及任何位置或關鍵字參數)。如果該函數返回Deferred,那麼到第二個Deferred將添加一個回調,該回調調用release方法。

同步的情況下被處理,以及,通過立即調用release。還通過允許它們傳播錯誤來處理錯誤,但要確保完成必要的release以使DeferredSemaphore處於一致狀態。傳遞給run函數(或它返回Deferred的結果)的結果成爲由run返回的Deferred的結果。

另一種可能的方法可能基於DeferredQueuecooperateDeferredQueue大多像普通隊列,但其get方法返回Deferred。如果在通話時隊列中沒有任何項目,則在添加項目之前Deferred不會觸發。

下面是一個例子:

from random import randrange 

from twisted.internet.defer import DeferredQueue 
from twisted.internet.task import deferLater, cooperate 
from twisted.internet import reactor 


def async(n): 
    print 'Starting job', n 
    d = deferLater(reactor, n, lambda: None) 
    def cbFinished(ignored): 
     print 'Finishing job', n 
    d.addCallback(cbFinished) 
    return d 


def assign(jobs): 
    # Create new jobs to be processed 
    jobs.put(randrange(10)) 
    reactor.callLater(randrange(10), assign, jobs) 


def worker(jobs): 
    while True: 
     yield jobs.get().addCallback(async) 


def main(): 
    jobs = DeferredQueue() 

    for i in range(10): 
     jobs.put(i) 

    assign(jobs) 

    for i in range(3): 
     cooperate(worker(jobs)) 

    reactor.run() 


if __name__ == '__main__': 
    main() 

注意,async工人功能是一樣的,從第一個例子中的一個。但是,這次還有一個worker函數明確將作業從DeferredQueue中刪除,並使用async(將async作爲對返回的Deferred的回調)進行處理。 worker發電機由cooperate驅動,它在每個Deferred產生火災後迭代它。然後,主循環啓動這些工作生成器中的三個,以便在任何給定時間進行三項工作。

這種方法涉及的代碼比DeferredSemaphore方法多一點,但有一些好處可能會讓人感興趣。首先,cooperate返回一個CooperativeTask實例,其中有一些有用的方法,如pauseresume和其他一些其他實例。此外,分配給相同合作者的所有作業將在調度中彼此配合,以免重複事件循環(這是API的名稱)。在DeferredQueue方面,還可以對待處理項目的數量設置限制,以避免完全超載服務器(例如,如果圖像處理器卡住並停止完成任務)。如果調用put的代碼處理隊列溢出異常,則可以用此壓力嘗試停止接受新作業(可能將其分流到另一個服務器或警告管理員)。與DeferredSemaphore做類似的事情是有點棘手,因爲沒有辦法限制有多少工作等待能夠獲得信號量。

+0

很酷,我很欣賞這些想法。 迴應使用DeferredSemaphore的想法。如果需要完成不連續的批次工作,這將非常有用。如果一個批次有太多的工作要做,它只會同時完成一些工作,然後在所有工作完成時收集批次。這有缺點,直到整個批次完成正確後才返回結果?我認爲這個缺點通過使用DeferredQueue來解決...... – agartland 2010-05-19 21:50:33

+1

DeferredQueue和合作的方法很聰明。就縮放處理器而言,它確實會給我更多的控制權。我甚至不認爲它一定更復雜。謝謝。 – agartland 2010-05-19 22:06:38

-2

您可能還喜歡我寫的txRDQ(Resizable Dispatch Queue)。 Google它,它在LaunchPad上的tx集合中。對不起,我沒有更多的時間回覆 - 即將上臺。

特里