2013-03-26 18 views
3

我有一個芹菜任務,當被調用時,它簡單地點燃了雙絞線內部一些並行代碼的執行。下面是一些示例(不運行的)代碼來說明:如何用反應堆中的並行代碼確認芹菜任務?

def run_task_in_reactor(): 
    # this takes a while to run 
    do_something() 
    do_something_more() 


@celery.task 
def run_task(): 
    print "Started reactor" 
    reactor.callFromThread(run_task_in_reactor) 

(爲簡單起見,請假設當被工作人員接收到任務的反應器已經在運行,我使用的信號@worker_process_init.connect開始我當工作人員出現時,反應堆在另一個線程中)

當我呼叫run_task.delay()時,任務完成得非常快(因爲它不會等待run_task_in_reactor()完成,只安排在反應堆中執行)。並且,當run_task_in_reactor()終於運行時,do_something()do_something_more()可以拋出異常,這將會不受注意。

使用pika從我的隊列中消耗,我可以使用do_something_more()中的ACK來讓工作人員通知正確完成任務,例如。然而,在芹菜裏面,這似乎不是可能的(或者至少我不知道如何實現相同的效果)

另外,我不能移除反應堆,因爲它是第三個我正在使用的第四方代碼。其他獲得同樣結果的方法也是值得讚賞的。

回答

0

改爲使用reactor.blockingCallFromThread

+0

請解釋在哪裏以及如何使用它,甚至包括問題的問題 – 2015-05-19 01:22:29

+0

@MauricioGracia在問題的代碼中,它實際上只是'reactor.blockingCallFromThread'而不是'reactor.callFromThread'。 – 2015-05-19 01:24:34