2013-12-09 23 views
2

回調我很新的使用芹菜,想知道如何TWSITED型多延遲迴調可以芹菜實施實施扭曲式的地方多遞延芹菜

MY倍捻代碼使用的角度經紀人,如下。我有一個處理器(服務器),它處理一些事件並返回結果。分派器(客戶端)打印使用延遲迴調返回的結果。

Handler.py(服務器)

from twisted.application import service, internet 
from twisted.internet import reactor, task 
from twisted.spread import pb 
from Dispatcher import Event 
from Dispatcher import CopyEvent 

class ReceiverEvent(pb.RemoteCopy, Event): 
    pass 
pb.setUnjellyableForClass(CopyEvent, ReceiverEvent) 


class Handler(pb.Root): 

def remote_eventEnqueue(self, pond): 
    d = task.deferLater(reactor,5,handle_event,sender=self) 
    return d 

def handle_event(sender): 
    print "Do Something" 
    return "did something" 

if __name__ == '__main__': 
    h=Handler() 
    reactor.listenTCP(8739, pb.PBServerFactory(h)) 
    reactor.run() 

現在Dispatcher.py(客戶端)

from twisted.spread import pb, jelly 
from twisted.python import log 
from twisted.internet import reactor 
from Event import Event 

class CopyEvent(Event, pb.Copyable): 
    pass 

class Dispatcher: 
    def __init__(self, event): 
     self.event = event 

    def dispatch_event(self, remote): 
     d = remote.callRemote("eventEnqueue", self.event) 
     d.addCallback(self.printMessage) 

    def printMessage(self, text): 
     print text 

def main(): 
    from Handler import CopyEvent 
    event = CopyEvent() 
    d = Dispatcher(event) 
    factory = pb.PBClientFactory() 
    reactor.connectTCP("localhost", 8739, factory) 
    deferred = factory.getRootObject() 
    deferred.addCallback(d.dispatch_event) 
    reactor.run() 

if __name__ == '__main__': 
    main() 

我試着在芹菜地執行這個。

Handler.py(服務器)

from celery import Celery 

app=Celery('tasks',backend='amqp',broker='amqp://[email protected]//') 

@app.task 

def handle_event(): 
    print "Do Something" 
    return "did something" 

Dispatcher.py(客戶端)

from Handler import handle_event 
from datetime import datetime 

def print_message(text): 
    print text 


t=handle_event.apply_async(countdown=10,link=print_message.s('Done')) ##HOWTO? 

我確切的問題是,一個人如何可以實現延遲迴調當地的功能,如芹菜print_message倍捻風格。當handle_Event方法完成時,它返回的結果是我想要的另一個回調方法(print_message),它是本地的

任何其他可能的設計工作流在Celery中做到這一點?

感謝

JR

回答

1

好了,終於想通了。這是不太可能直接在芹菜客戶端添加回調像扭曲的風格。但是芹菜支持任務監視功能,使客戶端監控不同類型的工人事件,並在其上添加回調。

一個簡單的任務監視器(Task_Monitor.py)將是這個樣子。

Task_Monitor.py

from celery import Celery 

def task_monitor(app): 
    state = app.events.State() 

    def announce_completed_tasks(event): 
     state.event(event) 
     task = state.tasks.get(event['uuid']) 

     print('TASK SUCCEEDED: %s[%s] %s' % (task.name, task.uuid, task.info(),)) 

    with app.connection() as connection: 
     recv = app.events.Receiver(connection, handlers={'task-succeeded': announce_completed_tasks}) 
     recv.capture(limit=None, timeout=None, wakeup=True) 

if __name__ == '__main__': 
    app = Celery(broker='amqp://[email protected]//') 
    task_monitor(app) 

Task_Monitor.py必須被運行作爲單獨的進程(客戶端)(詳細情況可以芹菜真實處理文檔http://docs.celeryproject.org/en/latest/userguide/monitoring.html#real-time-processing中找到)。除了芹菜應用(服務器端)需要使用

app.conf.CELERY_SEND_EVENTS = TRUE 

或使用-E選項,同時運行芹菜被配置

,以便它發送事件,以便爲要監測的工人。

0

我會建議使用鏈或爲Celery Canvas docs類似的機制之一。

實施例從文檔採取:

>>> from celery import chain 
>>> from proj.tasks import add, mul 

# (4 + 4) * 8 * 10 
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10)) 
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10) 
>>> res.apply_async() 
+0

感謝您的評論。客戶端和任務隊列被分開/遠程分發。我想在客戶端添加回調,當任務完成遠程,就像什麼是扭曲的支持。在上面的示例中,不是在成功完成添加時運行的遠程任務(而不是客戶端的回調函數)? – user1302884

+0

啊,你想在回調中返回結果。那麼,你需要在調度程序中運行,比如'gevent','twisted'或'tulip'。使用其中的一個,您可以生成一個greenlet,等待異步通過'.get()'返回。 –