2014-02-17 123 views
5

我正在調用Django-Celery中任務的任務Python芹菜 - 如何在其他任務中調用芹菜任務

這是我的任務。

@shared_task 
def post_notification(data,url): 
    url = "http://posttestserver.com/data/?dir=praful" # when in production, remove this line. 
    headers = {'content-type': 'application/json'} 
    requests.post(url, data=json.dumps(data), headers=headers) 


@shared_task 
def shipment_server(data,notification_type): 
    notification_obj = Notification.objects.get(name = notification_type) 
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj) 

    for server in server_list: 
     task = post_notification.delay(data,server.server_id.url) 
     print task.status # it prints 'Nonetype' has no attribute id 

如何在任務中調用任務? 我在某處讀取它可以使用group,但我無法形成正確的語法。我該怎麼做?

我想這

for server in server_list: 
    task = group(post_notification.s(data, server.server_id.url))().get() 
    print task.status 

拋出警告說

TxIsolationWarning: Polling results w│                   
ith transaction isolation level repeatable-read within the same transacti│                   
on may give outdated results. Be sure to commit the transaction for each │                   
poll iteration.               │                   
    'Polling results with transaction isolation level ' 

不知道它是什麼!

我該如何解決我的問題?

+0

'結果= task.delay' /'task.apply_async'給出了'AsyncResult'對象。這支持輪詢'.status'屬性,每次訪問它將檢查任務的狀態。在發送任務後立即調用.state是沒有意義的,因爲工作人員沒有開始執行它。在你以後的例子中,你調用'task = ..... get()。status',這將不起作用,因爲你在任務的返回值上調用狀態,而不是結果(result.status vs result.get() 。狀態)。 – asksol

+0

最後,您不應該等待子任務的結果,因爲這可能會導致死鎖,您應該使用回調任務:'(post_notification.s()| do_sometihing_after_posted.s())。delay()'。請參閱http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks和http://docs.celeryproject.org/en/latest/userguide/canvas.html – asksol

回答

5

這應該工作:

celery.current_app.send_task('mymodel.tasks.mytask', args=[arg1, arg2, arg3]) 
+0

什麼是my_model和current_app? – PythonEnthusiast

+0

'current_app'是芹菜模塊的屬性。 'mymodel.tasks'是你的'tasks.py'的路徑。如有必要,更改它。 –

+0

所以,我應該這樣做'task = celery.current_app.send_task('mymodel.tasks.mytask',args = [arg1,arg2,arg3])' – PythonEnthusiast

0

您可以使用延時功能

from app.tasks import celery_add_task 
    celery_add_task.apply_async(args=[task_name]) 

調用從任務的任務......它會工作

+0

I已經試過 – PythonEnthusiast

+0

@ user1162512嘗試使用這個工作4 me –

2

你是對的,因爲在每一個任務你for循環將被覆蓋task變量。

你可以嘗試celery.group

from celery import group 

@shared_task 
def shipment_server(data,notification_type): 
    notification_obj = Notification.objects.get(name = notification_type) 
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj) 


    tasks = [post_notification.s(data, server.server_id.url) for server in server_list] 
    results = group(tasks)() 
    print results.get() # results.status() what ever you want