2015-06-23 38 views
2

我試圖運行一個芹菜(3.1.17)任務,在一個組中執行進一步的任務,但我總是遇到錯誤。這是我如何設置代碼:Python芹菜組() - TypeError:**之後的參數必須是一個映射,不會很長

from celery import task, group 

@task 
def daily_emails(): 

    [...] 

    all_tasks = [] 

    for chunk in range(0, users.count(), 1000): 
     some_users = users[chunk:chunk+1000] 
     all_tasks.append(write_email_bunch.subtask(some_users, execnum)) 

    job = group(all_tasks) 
    # result = job.apply_async() 
    # job.get() 
    result = job.delay() 
    print result 
    results = result.join() 
    print results 

    print "done writing email tasks" 
    count = sum(results) 
    print count 


@task 
def write_email_bunch(some_users, execnum): 

    [...] 

    return len(some_users) - skipped_email_count 

這是輸出:

<GroupResult: 3d766c85-21af-4ed0-90cb-a1ca2d281db1 [69527252-8468-4358-9328-144f727f372b, 6d03d86e-1b69-4f43-832e-bd27c4dfc092, 1d868d1b-b502-4672-9895-430089e9532e]> 
Traceback (most recent call last): 
    File "send_daily_emails.py", line 8, in <module> 
    daily_emails() 
    File "/var/www/virtualenvs/nt_dev/local/lib/python2.7/site-packages/celery/app/task.py", line 420, in __call__ 
    return self.run(*args, **kwargs) 
    File "/var/www/nt_dev/nt/apps/emails/tasks.py", line 124, in daily_emails 
    results = result.join() 
    File "/var/www/virtualenvs/nt_dev/local/lib/python2.7/site-packages/celery/result.py", line 642, in join 
    interval=interval, no_ack=no_ack, 
    File "/var/www/virtualenvs/nt_dev/local/lib/python2.7/site-packages/celery/result.py", line 870, in get 
    raise self.result 
TypeError: write_email_bunch() argument after ** must be a mapping, not long 

所以我得到一個GroupResult但不知何故林無法加入,或進一步處理它。 當我使用write_email_bunch.s(some_users,execnum)我得到這個異常:

File "/var/www/virtualenvs/nt_dev/local/lib/python2.7/site-packages/celery/result.py", line 870, in get 
    raise self.result 
TypeError: 'tuple' object is not callable 

我怎麼會等待完成所有的任務組事後繼續? job.get()給了我這樣的例外:

TypeError: get expected at least 1 arguments, got 0 

回答

3

subtask需要args來一個元組,kwargs和任務選項的字典所以它應該被稱爲是這樣的:

all_tasks.append(write_email_bunch.subtask((some_users, execnum))) 

記我們通過它包含參數的元組

另外,您不應該等待任務內的任務 - 這可能導致死鎖。在這種情況下,我認爲daily_emails不需要是芹菜任務 - 它可以是創建畫布對象並運行應用異步的常規函數​​。

def daily_emails(): 

    all_tasks = [] 

    for chunk in range(0, users.count(), 1000): 
     some_users = users[chunk:chunk+1000] 
     all_tasks.append(write_email_bunch.subtask(some_users, execnum)) 

    job = group(all_tasks) 
    result = job.apply_async() 
    return result.id 
+1

嗨scytale,感謝您的回答!這實際上是主要問題。非常感謝你解決! –

2

除了對方的回答,你可以在這裏使用chunkshttp://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks

@app.task 
def daily_emails(): 
    return write_email.chunks(users, 1000).delay() 

@task 
def write_email(user): 
    [...] 

這可能是有益的做,如果手動在從數據庫一旦 獲得多個對象是很重要的。您還應該考慮模型對象將在此處序列化,以避免您只能發送pk並重新獲取任務中的模型,或發送您關心的字段(如電子郵件地址或發送該電子郵件所需的任何內容給用戶)。

+1

這就是整潔!非常感謝芹菜的偉大工作! –

相關問題