2014-02-09 20 views
13

CELERY_IMPORTSsettings.py中的模塊發生更改時,我可以自動重新加載芹菜自己。芹菜自動重新加載任何更改

我試圖給母模塊檢測即使在子模塊上的變化,但它沒有檢測到子模塊的變化。這讓我明白,檢測不是由芹菜遞歸完成的。我在文檔中搜索了它,但我沒有遇到任何對我的問題的迴應。

這真的很麻煩我把我項目的一切相關芹菜部分添加到CELERY_IMPORTS來檢測變化。

有沒有辦法告訴芹菜「當項目的任何地方有任何變化時自動重新加載自己」。

謝謝!

+0

'--autoreload'選項已被棄用,並且在新芹菜中無效。最好的方法是發送廣播消息來關閉代理,並在頂級域名(如supervisord)上自動重啓代理。我在啓動時使用遠程代理在生產環境中從Web應用程序下載程序包。 – Raffi

回答

16

您可以使用-I|--include手動添加其他模塊。將其與GNU工具(如findawk)結合使用,您將能夠找到所有.py文件幷包含它們。

$ celery -A app worker --autoreload --include=$(find . -name "*.py" -type f | awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=',' | sed 's/.$//') 

讓我們解釋一下:

find . -name "*.py" -type f 

find遞歸搜索包含.py所有文件。輸出看起來是這樣的:

./app.py 
./some_package/foopy 
./some_package/bar.py 

然後:

awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=',' 

這一行需要的find輸出作爲輸入,消除了./所有出現。然後它將所有/替換爲.。最後一個sub()刪除用空字符串替換.pyORS,取代所有換行符。這個輸出:

app,some_package.foo,some_package.bar, 

最後一個命令,sed刪除最後,

所以正在執行的命令看起來像:

$ celery -A app worker --autoreload --include=app,some_package.foo,some_package.bar 

如果你有一個virtualenv源裏,你可以通過添加-path .path_to_your_env -prune -o排除:

$ celery -A app worker --autoreload --include=$(find . -path .path_to_your_env -prune -o -name "*.py" -type f | awk '{sub("\./",""); gsub("/", "."); sub(".py",""); print}' ORS=',' | sed 's/.$//') 
+1

您可能想在* awk *部分中使用'gsub(「/」,「。」);'而不是'sub(「/」,「。」);'以替換所有的斜槓文件名? –

+0

如果你的某個模塊包含'* py'(即任何字符後跟'py'),這三個字符將從模塊名稱中剝離(在我的例子中,將「copy_thing」改爲「c_thing」)。修復它更新'sub(「。py」,「」)'到'sub(「\\。py $」)'。 –

+3

似乎從'4.0'版本,'--autoreload'功能[已被刪除](http://docs.celeryproject.org/en/latest/whatsnew-4.0.html?highlight=autoreload#features-removed缺乏資金):( –

2

OrangeTux的解決方案沒有奏效對我來說,所以我寫了一個小小的Python腳本來達到或多或少的一致。它使用inotify監視文件更改,並在檢測到IN_MODIFYIN_ATTRIBIN_DELETE時觸發芹菜重啓。

#!/usr/bin/env python 
"""Runs a celery worker, and reloads on a file change. Run as ./run_celery [directory]. If 
directory is not given, default to cwd.""" 
import os 
import sys 
import signal 
import time 

import multiprocessing 
import subprocess 
import threading 

import inotify.adapters 


CELERY_CMD = tuple("celery -A amcat.amcatcelery worker -l info -Q amcat".split()) 
CHANGE_EVENTS = ("IN_MODIFY", "IN_ATTRIB", "IN_DELETE") 
WATCH_EXTENSIONS = (".py",) 

def watch_tree(stop, path, event): 
    """ 
    @type stop: multiprocessing.Event 
    @type event: multiprocessing.Event 
    """ 
    path = os.path.abspath(path) 

    for e in inotify.adapters.InotifyTree(path).event_gen(): 
     if stop.is_set(): 
      break 

     if e is not None: 
      _, attrs, path, filename = e 

      if filename is None: 
       continue 

      if any(filename.endswith(ename) for ename in WATCH_EXTENSIONS): 
       continue 

      if any(ename in attrs for ename in CHANGE_EVENTS): 
       event.set() 


class Watcher(threading.Thread): 
    def __init__(self, path): 
     super(Watcher, self).__init__() 
     self.celery = subprocess.Popen(CELERY_CMD) 
     self.stop_event_wtree = multiprocessing.Event() 
     self.event_triggered_wtree = multiprocessing.Event() 
     self.wtree = multiprocessing.Process(target=watch_tree, args=(self.stop_event_wtree, path, self.event_triggered_wtree)) 
     self.wtree.start() 
     self.running = True 

    def run(self): 
     while self.running: 
      if self.event_triggered_wtree.is_set(): 
       self.event_triggered_wtree.clear() 
       self.restart_celery() 
      time.sleep(1) 

    def join(self, timeout=None): 
     self.running = False 
     self.stop_event_wtree.set() 
     self.celery.terminate() 
     self.wtree.join() 
     self.celery.wait() 
     super(Watcher, self).join(timeout=timeout) 

    def restart_celery(self): 
     self.celery.terminate() 
     self.celery.wait() 
     self.celery = subprocess.Popen(CELERY_CMD) 


if __name__ == '__main__': 
    watcher = Watcher(sys.argv[1] if len(sys.argv) > 1 else ".") 
    watcher.start() 

    signal.signal(signal.SIGINT, lambda signal, frame: watcher.join()) 
    signal.pause() 

您應該更改CELERY_CMD或任何其他全局變量。

0

芹菜--autoreload不起作用,它是deprecated

由於您使用的是django,因此您可以爲其編寫管理命令。 Django具有autoreload實用程序,runserver使用此實用程序在代碼更改時重新啓動WSGI服務器。

同樣的功能可以用來重新加載芹菜工人。創建一個名爲芹菜的單獨管理命令。編寫一個函數來殺死現有的工人並啓動一個新的工人。現在掛鉤這個函數來自動重載,如下所示。現在

import shlex 
import subprocess 

from django.core.management.base import BaseCommand 
from django.utils import autoreload 


def restart_celery(): 
    cmd = 'pkill celery' 
    subprocess.call(shlex.split(cmd)) 
    cmd = 'celery worker -l info -A foo' 
    subprocess.call(shlex.split(cmd)) 


class Command(BaseCommand): 

    def handle(self, *args, **options): 
     print('Starting celery worker with autoreload...') 
     autoreload.main(restart_celery) 

你可以運行python manage.py celery芹菜工人將自動重載代碼庫的變化的時候。

這僅用於開發目的,不用於生產。代碼取自我的other answer here