2015-04-26 88 views
1

我是芹菜新手。我見過的所有例子都是從命令行啓動一名芹菜工作者。例如:從多處理開始芹菜工

$ celery -A proj worker -l info 

我開始一個關於彈性beanstalk的項目,並認爲將worker設爲我的web應用程序的子進程會很好。我嘗試使用多處理,它似乎工作。我想知道這是一個好主意,還是可能有一些缺點。

import celery 
import multiprocessing 


class WorkerProcess(multiprocessing.Process): 
    def __init__(self): 
     super().__init__(name='celery_worker_process') 

    def run(self): 
     argv = [ 
      'worker', 
      '--loglevel=WARNING', 
      '--hostname=local', 
     ] 
     app.worker_main(argv) 


def start_celery(): 
    global worker_process 
    worker_process = WorkerProcess() 
    worker_process.start() 


def stop_celery(): 
    global worker_process 
    if worker_process: 
     worker_process.terminate() 
     worker_process = None 


worker_name = '[email protected]' 
worker_process = None 

app = celery.Celery() 
app.config_from_object('celery_app.celeryconfig') 
+0

有趣的是,這段代碼與應用程序和工作者的_same_ Celery實例一起工作。否則,例如創建工人命令行,似乎總是創建一個新的Celery實例。我不知道這是否是一個問題... – Jens

回答

2

似乎是個不錯的選擇,絕對不是唯一的選擇,但一個好的:)

你可能想看看(你可能已經這樣做)

一兩件事,是連接到自動縮放你的芹菜隊列的大小。所以你只能在隊列增長時擴大規模。

有效地,芹菜在內部做類似的事情,所以沒有太大的區別。我能想到的唯一障礙是對外部資源(例如數據庫連接)的處理,這可能是一個問題,但完全取決於您對Celery所做的工作。

1

如果有人感興趣,我確實已經使用運行Python 3.4的預配置AMI服務器來處理Elastic Beanstalk。運行Debian Jessie的基於Docker的服務器遇到了很多問題。也許,與端口重新映射有關。 Docker是一個黑盒子,我發現它很難使用和調試。幸運的是,AWS中的好人剛剛在2015年4月8日添加了非泊塢窗Python 3.4選項。

我做了大量搜索以獲得部署和工作。我看到很多沒有答案的問題。所以這裏是我非常簡單部署的python 3.4/flask/celery過程。

芹菜,你可以只是點子安裝。您需要使用config命令或container_command從配置文件安裝rabbitmq。我在我上傳的項目zip中使用了一個腳本,因此使用腳本需要container_command(在項目安裝之前發生常規eb config命令)。

[yourapproot]/ebextensions/05_install_rabbitmq.config:

container_commands: 
    01RunScript: 
    command: bash ./init_scripts/app_setup.sh 

[yourapproot] /init_scripts/app_setup.sh:

#!/usr/bin/env bash 

# Download and install Erlang 
yum install erlang 

# Download the latest RabbitMQ package using wget: 
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmq-server-3.5.1-1.noarch.rpm 

# Install rabbit 
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc 
yum -y install rabbitmq-server-3.5.1-1.noarch.rpm 

# Start server 
/sbin/service rabbitmq-server start 

我正在做的燒瓶中的應用程序,所以我啓動所述第一請求之前工人:

@app.before_first_request 
def before_first_request(): 
    task_mgr.start_celery() 

的task_mgr創建芹菜應用對象(其我打電話芹菜,因爲燒瓶應用程序對象是應用程序)。對於一個簡單的任務管理器,這裏的公平很關鍵。任務預取有各種奇怪的行爲。這應該可能是默認的?

task_mgr/task_mgr.py:

import celery as celery_module 
import multiprocessing 


class WorkerProcess(multiprocessing.Process): 
    def __init__(self): 
     super().__init__(name='celery_worker_process') 

    def run(self): 
     argv = [ 
      'worker', 
      '--loglevel=WARNING', 
      '--hostname=local', 
      '-Ofair', 
     ] 
     celery.worker_main(argv) 


def start_celery(): 
    global worker_process 
    multiprocessing.set_start_method('fork') # 'spawn' seems to work also 
    worker_process = WorkerProcess() 
    worker_process.start() 


def stop_celery(): 
    global worker_process 
    if worker_process: 
     worker_process.terminate() 
     worker_process = None 


worker_name = '[email protected]' 
worker_process = None 

celery = celery_module.Celery() 
celery.config_from_object('task_mgr.celery_config') 

我的配置是非常簡單的,到目前爲止:

task_mgr/celery_config。潘岳:

BROKER_URL = 'amqp://' 
CELERY_RESULT_BACKEND = 'amqp://' 

CELERY_ACCEPT_CONTENT = ['json'] 
CELERY_TASK_SERIALIZER = 'json' # 'pickle' warning: can't use datetime in json 
CELERY_RESULT_SERIALIZER = 'json' # 'pickle' warning: can't use datetime in json 
CELERY_TASK_RESULT_EXPIRES = 18000 # Results hang around for 5 hours 

CELERYD_CONCURRENCY = 4 

然後你就可以把任務,無論你需要他們:

from task_mgr.task_mgr import celery 
import time 


@celery.task(bind=True) 
def error_task(self): 
    self.update_state(state='RUNNING') 
    time.sleep(10) 
    raise KeyError('im an error') 


@celery.task(bind=True) 
def long_task(self): 
    self.update_state(state='RUNNING') 
    time.sleep(20) 
    return 'long task finished' 


@celery.task(bind=True) 
def task_with_status(self, wait): 
    self.update_state(state='RUNNING') 
    for i in range(5): 
     time.sleep(wait) 
     self.update_state(
      state='PROGRESS', 
      meta={ 
       'current': i + 1, 
       'total': 5, 
       'status': 'progress', 
       'host': self.request.hostname, 
      } 
     ) 
    time.sleep(wait) 
    return 'finished with wait = ' + str(wait) 

我還留着一個任務隊列,以保持異步結果,所以我可以監視任務:

task_queue = [] 


def queue_task(task, *args): 
    async_result = task.apply_async(args) 
    task_queue.append(
     { 
      'task_name':task.__name__, 
      'task_args':args, 
      'async_result':async_result 
     } 
    ) 
    return async_result 


def get_tasks_info(): 
    tasks = [] 

    for task in task_queue: 
     task_name = task['task_name'] 
     task_args = task['task_args'] 
     async_result = task['async_result'] 
     task_id = async_result.id 
     task_state = async_result.state 
     task_result_info = async_result.info 
     task_result = async_result.result 
     tasks.append(
      { 
       'task_name': task_name, 
       'task_args': task_args, 
       'task_id': task_id, 
       'task_state': task_state, 
       'task_result.info': task_result_info, 
       'task_result': task_result, 
      } 
     ) 

    return tasks 

當然,開始你需要的任務:

from webapp.app import app 
from flask import url_for, render_template, redirect 
from webapp import tasks 
from task_mgr import task_mgr 


@app.route('/start_all_tasks') 
def start_all_tasks(): 
    task_mgr.queue_task(tasks.long_task) 
    task_mgr.queue_task(tasks.error_task) 
    for i in range(1, 9): 
     task_mgr.queue_task(tasks.task_with_status, i * 2) 

    return redirect(url_for('task_status')) 


@app.route('/task_status') 
def task_status(): 
    current_tasks = task_mgr.get_tasks_info() 
    return render_template(
     'parse/task_status.html', 
     tasks=current_tasks 
    ) 

就是這樣。讓我知道你是否需要任何幫助,儘管我的芹菜知識還相當有限。