0
我是新來的芹菜。我正在嘗試將一個分佈式任務與芹菜一起工作。芹菜添加任務隊列準備時
可以說,我有我的任務的單任務文件task.py
:
@celery.task
def generate_sorts(params,meta_data_dict):
'''
This will generate some sorts .
'''
pass
而且我做了以下一些分佈式處理的部分:
taskset = TaskSet(
tasks.generate_sorts.subtask(args = (params, meta_data_dict))
for meta_data_dict in chunk_generator)
print "Dispatching tasks"
taskset_result = taskset.apply_async()
print "Waiting for results"
results = taskset_result.join_native()
print "Results:"
#Process the results.
現在chunk_generator基本上是一臺發電機模式轉到數據庫並獲取一些元數據。我現在的問題是這些任務在最終發送到任務隊列之前得到累積。我的生成器需要大約30分鐘的時間才能在任務實際添加到隊列之前獲取所有元數據。我知道那是TaskSet
打算執行。我正在尋找TaskSet
的替代方案,即我將能夠以分佈式方式執行下面的等效功能。
pool.imap_unordered(generate_sorts, chunk_generator)
以上將在發生器產生結果後立即執行generate_sots。換句話說,有什麼東西可以替代TaskSet
,我可以在生成器產生第一份工作後立即從生成器中添加它,而不是等到生成器獲取所有內容後才能最終開始做一些工作。
這一定要幫我。非常感激 – Senthil 2014-10-10 17:54:19