我有一個芹菜任務,當被調用時,它簡單地點燃了雙絞線內部一些並行代碼的執行。下面是一些示例(不運行的)代碼來說明:如何用反應堆中的並行代碼確認芹菜任務?
def run_task_in_reactor():
# this takes a while to run
do_something()
do_something_more()
@celery.task
def run_task():
print "Started reactor"
reactor.callFromThread(run_task_in_reactor)
(爲簡單起見,請假設當被工作人員接收到任務的反應器已經在運行,我使用的信號@worker_process_init.connect
開始我當工作人員出現時,反應堆在另一個線程中)
當我呼叫run_task.delay()
時,任務完成得非常快(因爲它不會等待run_task_in_reactor()
完成,只安排在反應堆中執行)。並且,當run_task_in_reactor()
終於運行時,do_something()
或do_something_more()
可以拋出異常,這將會不受注意。
使用pika
從我的隊列中消耗,我可以使用do_something_more()
中的ACK來讓工作人員通知正確完成任務,例如。然而,在芹菜裏面,這似乎不是可能的(或者至少我不知道如何實現相同的效果)
另外,我不能移除反應堆,因爲它是第三個我正在使用的第四方代碼。其他獲得同樣結果的方法也是值得讚賞的。
請解釋在哪裏以及如何使用它,甚至包括問題的問題 – 2015-05-19 01:22:29
@MauricioGracia在問題的代碼中,它實際上只是'reactor.blockingCallFromThread'而不是'reactor.callFromThread'。 – 2015-05-19 01:24:34