2014-06-12 100 views
2

我創建了一個應用程序芹菜具有以下目錄結構(如芹菜網站給出)延遲和apply_async:調用芹菜任務掛起

proj 
|-- celery.py 
|-- celery.pyc 
|-- __init__.py 
|-- __init__.pyc 
|-- tasks.py 
`-- tasks.pyc 

以下是celery.py

from __future__ import absolute_import 

from celery import Celery 

app = Celery('proj', 
      broker='amqp://rabbitmquser:<my_passowrd>@localhost:5672/localvhost', 
      #backend='amqp://', 
      include=['proj.tasks']) 

# Optional configuration, see the application user guide. 
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600, 
) 

if __name__ == '__main__': 
    app.start() 

內容以下是任務的內容.py

from __future__ import absolute_import 

from proj.celery import app 


@app.task 
def add(x, y): 
    return x + y 


@app.task 
def mul(x, y): 
    return x * y 


@app.task 
def xsum(numbers): 
    return sum(numbers) 

現在我開始了芹菜工作者與以下指揮官d:

celery -A proj worker -l debug 

我認爲工人運行良好,因爲它輸出以下對:

[2014-06-12 21:25:02,326: DEBUG/MainProcess] | Worker: Preparing bootsteps. 
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: Building graph... 
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoscaler, Beat, Autoreloader, StateDB, Consumer} 
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Preparing bootsteps. 
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Building graph... 
[2014-06-12 21:25:02,334: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Heart, Gossip, event loop} 
[2014-06-12 21:25:02,335: WARNING/MainProcess] /home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/apps/worker.py:161: CDeprecationWarning: 
Starting from version 3.2 Celery will refuse to accept pickle by default. 

The pickle serializer is a security concern as it may give attackers 
the ability to execute any command. It's important to secure 
your broker from unauthorized access when using pickle, so we think 
that enabling pickle should require a deliberate action and not be 
the default choice. 

If you depend on pickle then you should set a setting to disable this 
warning and to be sure that everything will continue working 
when you upgrade to Celery 3.2:: 

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] 

You must only enable the serializers that you will actually use. 


    warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) 

-------------- [email protected] v3.1.12 (Cipater) 
---- **** ----- 
--- * *** * -- Linux-3.5.0-25-generic-x86_64-with-Ubuntu-12.04-precise 
-- * - **** --- 
- ** ---------- [config] 
- ** ---------- .> app:   proj:0x1f46690 
- ** ---------- .> transport: amqp://rabbitmquser:**@localhost:5672/localvhost 
- ** ---------- .> results:  disabled 
- *** --- * --- .> concurrency: 4 (prefork) 
-- ******* ---- 
--- ***** ----- [queues] 
-------------- .> celery   exchange=celery(direct) key=celery 


[tasks] 
    . celery.backend_cleanup 
    . celery.chain 
    . celery.chord 
    . celery.chord_unlock 
    . celery.chunks 
    . celery.group 
    . celery.map 
    . celery.starmap 
    . proj.tasks.add 
    . proj.tasks.mul 
    . proj.tasks.xsum 

[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Hub 
[2014-06-12 21:25:02,336: DEBUG/MainProcess] ^-- substep ok 
[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Pool 
[2014-06-12 21:25:02,344: DEBUG/MainProcess] ^-- substep ok 
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Worker: Starting Consumer 
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Consumer: Starting Connection 

運行工人我打開終端和從python解釋並執行以下之後:

>>> from proj.tasks import add 
>>> add(2,2) 
4 
>>> add.delay(2,3) 

延遲掛起(apply_async的相同故事)。當我按Ctrl + C停止它我得到如下:

^CTraceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 453, in delay 
    return self.apply_async(args, kwargs) 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 555, in apply_async 
    **dict(self._get_exec_options(), **options) 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/base.py", line 352, in send_task 
    reply_to=reply_to or self.oid, **options 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/amqp.py", line 305, in publish_task 
    **kwargs 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 168, in publish 
    routing_key, mandatory, immediate, exchange, declare) 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 436, in _ensured 
    return fun(*args, **kwargs) 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 173, in _publish 
    channel = self.channel 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 190, in _get_channel 
    channel = self._channel = channel() 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 422, in __call__ 
    value = self.__value__ = self.__contract__() 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 205, in <lambda> 
    channel = ChannelPromise(lambda: connection.default_channel) 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 756, in default_channel 
    self.connection 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 741, in connection 
    self._connection = self._establish_connection() 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 696, in _establish_connection 
    conn = self.transport.establish_connection() 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 112, in establish_connection 
    conn = self.Connection(**opts) 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 171, in __init__ 
    (10, 10), # start 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait 
    self.channel_id, allowed_methods) 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 237, in _wait_method 
    self.method_reader.read_method() 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method 
    self._next_method() 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 107, in _next_method 
    frame_type, channel, payload = read_frame() 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 153, in read_frame 
    frame_type, channel, size = unpack('>BHI', read(7, True)) 
    File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 272, in _read 
    s = recv(n - len(rbuf)) 
KeyboardInterrupt 

任何建議或意見將非常感激。 我已經通過其他鏈接談論/ var目錄大小,但我認爲我有足夠的空間。

DF -h

Filesystem  Size Used Avail Use% Mounted on 
/dev/sda3  283G 99G 170G 37%/
udev   1.9G 4.0K 1.9G 1% /dev 
tmpfs   388M 1.1M 387M 1% /run 
none   5.0M  0 5.0M 0% /run/lock 
none   1.9G 28M 1.9G 2% /run/shm 

以下的結果是,我檢查了RabbitMQ的日誌,並沒有得到任何有rabbitmqctl狀態

Status of node '[email protected]' ... 
[{pid,12014}, 
{running_applications,[{rabbit,"RabbitMQ","3.3.2"}, 
         {os_mon,"CPO CXC 138 46","2.2.7"}, 
         {xmerl,"XML parser","1.2.10"}, 
         {mnesia,"MNESIA CXC 138 12","4.5"}, 
         {sasl,"SASL CXC 138 11","2.1.10"}, 
         {stdlib,"ERTS CXC 138 10","1.17.5"}, 
         {kernel,"ERTS CXC 138 10","2.14.5"}]}, 
{os,{unix,linux}}, 
{erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:4:4] [rq:4] [async-threads:30] [kernel-poll:true]\n"}, 
{memory,[{total,27919080}, 
      {connection_procs,2704}, 
      {queue_procs,5408}, 
      {plugins,0}, 
      {other_proc,9099992}, 
      {mnesia,63776}, 
      {mgmt_db,0}, 
      {msg_index,34080}, 
      {other_ets,784160}, 
      {binary,12144}, 
      {code,14685283}, 
      {atom,1367393}, 
      {other_system,1864140}]}, 
{alarms,[]}, 
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]}, 
{vm_memory_high_watermark,0.4}, 
{vm_memory_limit,1625165004}, 
{disk_free_limit,50000000}, 
{disk_free,181684699136}, 
{file_descriptors,[{total_limit,2}, 
        {total_used,0}, 
        {sockets_limit,0}, 
        {sockets_used,0}]}, 
{processes,[{limit,1048576},{used,127}]}, 
{run_queue,0}, 
{uptime,20072}] 
...done. 

結果。芹菜版本是3.1.12。

我創建的RabbitMQ虛擬主機和用戶用下面的命令

$ sudo rabbitmqctl add_user rabbitmquser <mypassword> 
$ sudo rabbitmqctl add_vhost localvhost 
$ sudo rabbitmqctl set_permissions -p localvhost rabbitmquser ".*" ".*" ".*" 

感謝

+0

它掛起很可能是因爲它無法到達你的兔子服務器。 'ansumanb-u12'是你運行芹菜工和兔子的機器,對嗎?如果是這樣,請確保你的用戶名和密碼在芹菜conf中是正確的,'localhost:5672/localvhost'是正確的(我想知道你是否需要'/ localvhost')。您需要將'CELERY_TASK_RESULT_EXPIRES = ['json']'添加到celery conf中,以便擺脫芹菜啓動時的警告以及鹹菜的安全問題。 –

+0

我會檢查經紀人鏈接。我怎樣才能確保經紀人鏈接是正確的?有沒有辦法直接從終端檢查連接?是的,我的機器ansumanb-u12包含芹菜工和兔子。我將添加CELERY_TASK_RESULT_EXPIRES。謝謝。 –

+0

哦拍攝,對不起,應該是'CELERY_ACCEPT_CONTENT',我的錯誤。您可以放入ipython會話並嘗試創建Celery對象並啓動它。 'app = Celery('proj',broker ='amqp:// user:password @ hostname //').start()' –

回答

5

我做了一個極大的錯誤。我的問題是我試圖改變一些我不知道的東西。我將其添加爲供其他人蔘考。

在安裝rabbitmq時,我將ulimit的默認值從1024更改爲100,其值爲/etc/default/rabbitmq-server

我將值更改回1024,現在問題已修復。

+1

在某處說安裝說明你應該把它設置爲無限制,我認爲它的在擠壓之後,無論如何都是Debian系統中的默認設置 – Zarathustra