2011-10-06 77 views
3

我必須實現一個任務子類,如果代理未運行,那麼該子類會優雅地失敗 - 目前我正在使用RabbitMQ。 我很可能只是用try語句來捕獲異常:芹菜:連接錯誤中止任務

try: 
    Mytask.delay(arg1, arg2) 
except socket.error: 
    # Send an notice to an admin 
    pass 

,但我想創建任務的一個子類,就可以搞定。 我試過類似的東西:

class MyTask(Task): 
    ignore_result = True 

    def __call__(self, *args, **kwargs): 
     try: 
      return self.run(*args, **kwargs) 
     except socket.error: 
      # Send an notice to an admin 
      return None 

但是工作流程顯然是錯誤的。我想我需要注入可能的後端子類或故障策略。 你有什麼建議嗎?

回答

2

一個可能的解決方案,我想出了:

import socket 
from celery.decorators import task 
from celery.task import Task 
from celery.backends.base import BaseBackend 

UNDELIVERED = 'UNDELIVERED' 


class DummyBackend(BaseBackend): 
    """ 
    Dummy queue backend for undelivered messages (due to the broker being down). 
    """ 
    def store_result(self, *args, **kwargs): 
     pass 

    def get_status(self, *args, **kwargs): 
     return UNDELIVERED 

    def _dummy(self, *args, **kwargs): 
     return None 

    wait_for = get_result = get_traceback = _dummy 


class SafeTask(Task): 
    """ 
    A task not raising socket errors if the broker is down. 
    """ 
    abstract = True 
    on_broker_error = None 
    errbackend = DummyBackend 

    @classmethod 
    def apply_async(cls, *args, **kwargs): 
     try: 
      return super(SafeTask, cls).apply_async(*args, **kwargs) 
     except socket.error, err: 
      if cls.on_broker_error is not None: 
       cls.on_broker_error(err, cls, *args, **kwargs) 
      return cls.app.AsyncResult(None, backend=cls.errbackend(), 
       task_name=cls.name) 


def safetask(*args, **kwargs): 
    """ 
    Task factory returning safe tasks handling socket errors. 
    When a socket error occurs, the given callable *on_broker_error* 
    is called passing the exception object, the class of the task 
    and the original args and kwargs. 
    """ 
    if 'base' not in kwargs: 

     on_broker_error = kwargs.pop('on_broker_error', SafeTask.on_broker_error) 
     errbackend = kwargs.pop('errbackend', SafeTask.errbackend) 
     kwargs['base'] = type('SafeTask', (SafeTask,), { 
      'on_broker_error': staticmethod(on_broker_error), 
      'errbackend': errbackend, 
      'abstract': True, 
     }) 

    return task(*args, **kwargs) 

你既可以子類SafeTask或使用裝飾@safetask。 如果您能想到改進,請不要猶豫提供意見。