2013-02-13 52 views
0

我試圖讓一個工人一次只運行一個任務,然後關機。我已經得到了切斷部正常工作(一些背景這裏:celery trying shutdown worker by raising SystemExit in task_postrun signal but always hangs and the main process never exits),但是當它關​​閉時,我發現了一個錯誤:芹菜關閉從task_success處理程序不工作的工人

[2013-02-13 12:19:05,689: CRITICAL/MainProcess] Couldn't ack 1, reason:AttributeError("'NoneType' object has no attribute 'method_writer'",) 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/site-packages/kombu/transport/base.py", line 104, in ack_log_error 
    self.ack() 
    File "/usr/local/lib/python2.7/site-packages/kombu/transport/base.py", line 99, in ack 
    self.channel.basic_ack(self.delivery_tag) 
    File "/usr/local/lib/python2.7/site-packages/amqplib/client_0_8/channel.py", line 1742, in basic_ack 
    self._send_method((60, 80), args) 
    File "/usr/local/lib/python2.7/site-packages/amqplib/client_0_8/abstract_channel.py", line 75, in _send_method 
    self.connection.method_writer.write_method(self.channel_id, 
AttributeError: 'NoneType' object has no attribute 'method_writer' 

這究竟是爲什麼?它不僅沒有缺陷,而且還清除了隊列中剩下的所有其他任務(大問題)。

我該如何解決這個問題?





UPDATE

下面是一切更新堆棧跟蹤(PIP安裝-U海帶AMQP amqplib芹菜):

[2013-02-13 11:58:05,357: CRITICAL/MainProcess] Internal error: AttributeError("'NoneType' object has no attribute 'method_writer'",) 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/celery/worker/__init__.py", line 372, in process_task 
    req.execute_using_pool(self.pool) 
    File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 219, in execute_using_pool 
    timeout=task.time_limit) 
    File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/base.py", line 137, in apply_async 
    **options) 
    File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/base.py", line 27, in apply_target 
    callback(target(*args, **kwargs)) 
    File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 333, in on_success 
    self.acknowledge() 
    File "/usr/local/lib/python2.7/dist-packages/celery/worker/job.py", line 439, in acknowledge 
    self.on_ack(logger, self.connection_errors) 
    File "/usr/local/lib/python2.7/dist-packages/kombu/transport/base.py", line 98, in ack_log_error 
    self.ack() 
    File "/usr/local/lib/python2.7/dist-packages/kombu/transport/base.py", line 93, in ack 
    self.channel.basic_ack(self.delivery_tag) 
    File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1562, in basic_ack 
    self._send_method((60, 80), args) 
    File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 57, in _send_method 
    self.connection.method_writer.write_method(
AttributeError: 'NoneType' object has no attribute 'method_writer' 
+0

我升級了我能想到的所有相關的python lib,但仍然出現了一個錯誤(儘管略有不同的堆棧跟蹤)。查看更新的問題 – 2013-02-13 18:00:10

回答

0

在task_postrun退出不推薦,因爲task_postrun是在「任務體」錯誤處理之外執行的。

當一個任務調用sys.exit沒有明確定義時,會發生什麼, 實際上它取決於正在使用的池。

隨着多處理,子進程將被一個新進程所取代。 在其他池中,工作人員將關閉,但這可能會改變 ,以便它與多處理行爲一致。

調用任務體外的退出被視爲內部錯誤(崩潰)。

「任務體」無論是在task.__call__()

執行我想,也許這更好的解決辦法是使用自定義的執行 策略:

from celery.worker import strategy 
from functools import wraps 

@staticmethod 
def shutdown_after_strategy(task, app, consumer): 

    default_handler = strategy.default(task, app, consumer) 

    def _shutdown_to_exit_after(fun): 
     @wraps(fun) 
     def _inner(*args, **kwargs): 
      try: 
       return fun(*args, **kwargs) 
      finally: 
       raise SystemExit() 
     return _inner 
    return _decorate_to_exit_after(default_handler) 

@celery.task(Strategy=shutdown_after_strategy) 
def shutdown_after(): 
    print('will shutdown after this') 

這是不完全的美麗,但執行策略是優化 任務執行並且不容易擴展(工作人員通過緩存Task.Strategy「預編譯」每個任務類型的執行 路徑)

在Celery 3.1中,您可以使用「bootsteps」擴展工作人員和消費者,因此很可能會有一個漂亮的解決方案。