2014-10-08 75 views
0

我是新來的芹菜。我正在嘗試將一個分佈式任務與芹菜一起工作。芹菜添加任務隊列準備時

可以說,我有我的任務的單任務文件task.py

@celery.task 
def generate_sorts(params,meta_data_dict): 
    ''' 
    This will generate some sorts . 
    ''' 
    pass 

而且我做了以下一些分佈式處理的部分:

taskset = TaskSet(
    tasks.generate_sorts.subtask(args = (params, meta_data_dict)) 
    for meta_data_dict in chunk_generator) 

print "Dispatching tasks" 
taskset_result = taskset.apply_async() 

print "Waiting for results" 
results = taskset_result.join_native() 
print "Results:" 
#Process the results. 

現在chunk_generator基本上是一臺發電機模式轉到數據庫並獲取一些元數據。我現在的問題是這些任務在最終發送到任務隊列之前得到累積。我的生成器需要大約30分鐘的時間才能在任務實際添加到隊列之前獲取所有元數據。我知道那是TaskSet打算執行。我正在尋找TaskSet的替代方案,即我將能夠以分佈式方式執行下面的等效功能。

pool.imap_unordered(generate_sorts, chunk_generator) 

以上將在發生器產生結果後立即執行generate_sots。換句話說,有什麼東西可以替代TaskSet,我可以在生成器產生第一份工作後立即從生成器中添加它,而不是等到生成器獲取所有內容後才能最終開始做一些工作。

回答

1

你應該嘗試立即啓動它們,將所產生的的AsyncResult實例爲ResultSet

from celery.result import ResultSet 

result_set = ResultSet() 
for meta_data_dict in chunk_generator: 
    # Add the task to the queue immediately 
    result = task.generate_sorts.delay(params, meta_data_dict) 
    result_set.add(result) 

print "Waiting for results" 
results = result_set.join_native() 
print "Results:" 
# Process the results 
+0

這一定要幫我。非常感激 – Senthil 2014-10-10 17:54:19