2012-03-01 22 views
1

根據http://ask.github.com/celery/userguide/routing.html#manual-routing中發現的文檔,我可以將queue參數傳遞給apply_async,以便將任務路由到特定隊列。然而,使用taskset的時候,我得到我可以使用apply_async將自定義路由應用於TaskSet嗎?

TypeError at /some/path/ 
apply_async() got an unexpected keyword argument 'queue' 

給出的taskset的類下面的代碼https://github.com/ask/celery/blob/master/celery/task/sets.py#L122

def apply_async(self, connection=None, connect_timeout=None, 
     publisher=None, taskset_id=None): 
    """Apply taskset.""" 
    app = self.app 

    if app.conf.CELERY_ALWAYS_EAGER: 
     return self.apply(taskset_id=taskset_id) 

    with app.default_connection(connection, connect_timeout) as conn: 
     setid = taskset_id or uuid() 
     pub = publisher or self.Publisher(connection=conn) 
     try: 
      results = self._async_results(setid, pub) 
     finally: 
      if not publisher: # created by us. 
       pub.close() 

     return app.TaskSetResult(setid, results) 

我有任務的不確定數量,我需要在某些情況下,應用特殊的路由其中,被undestandable ,我該如何處理?不使用TaskSet?

回答

2

您可以使用subtasks選項參數

>>> from celery.task.sets import TaskSet 
>>> from tasks import add  
>>> 
>>> job = TaskSet(tasks=[add.subtask(args=(i, i),options={'queue':'celery'}) for i in range(10)]) 
>>> result = job.apply_async() 
>>> result.ready() 
True 
>>> job 
[tasks.add(0, 0), tasks.add(1, 1), tasks.add(2, 2), tasks.add(3, 3), tasks.add(4, 4), tasks.add(5, 5), tasks.add(6, 6), tasks.add(7, 7), tasks.add(8, 8), tasks.add(9, 9)] 
>>> 
相關問題