我創建了一個應用程序芹菜具有以下目錄結構(如芹菜網站給出)延遲和apply_async:調用芹菜任務掛起
proj
|-- celery.py
|-- celery.pyc
|-- __init__.py
|-- __init__.pyc
|-- tasks.py
`-- tasks.pyc
以下是celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
broker='amqp://rabbitmquser:<my_passowrd>@localhost:5672/localvhost',
#backend='amqp://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
內容以下是任務的內容.py
from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
現在我開始了芹菜工作者與以下指揮官d:
celery -A proj worker -l debug
我認爲工人運行良好,因爲它輸出以下對:
[2014-06-12 21:25:02,326: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: Building graph...
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoscaler, Beat, Autoreloader, StateDB, Consumer}
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Building graph...
[2014-06-12 21:25:02,334: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Heart, Gossip, event loop}
[2014-06-12 21:25:02,335: WARNING/MainProcess] /home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/apps/worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.
The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
-------------- [email protected] v3.1.12 (Cipater)
---- **** -----
--- * *** * -- Linux-3.5.0-25-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x1f46690
- ** ---------- .> transport: amqp://rabbitmquser:**@localhost:5672/localvhost
- ** ---------- .> results: disabled
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. proj.tasks.add
. proj.tasks.mul
. proj.tasks.xsum
[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Hub
[2014-06-12 21:25:02,336: DEBUG/MainProcess] ^-- substep ok
[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Pool
[2014-06-12 21:25:02,344: DEBUG/MainProcess] ^-- substep ok
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Consumer: Starting Connection
運行工人我打開終端和從python解釋並執行以下之後:
>>> from proj.tasks import add
>>> add(2,2)
4
>>> add.delay(2,3)
延遲掛起(apply_async的相同故事)。當我按Ctrl + C停止它我得到如下:
^CTraceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 453, in delay
return self.apply_async(args, kwargs)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 555, in apply_async
**dict(self._get_exec_options(), **options)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/base.py", line 352, in send_task
reply_to=reply_to or self.oid, **options
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/amqp.py", line 305, in publish_task
**kwargs
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 168, in publish
routing_key, mandatory, immediate, exchange, declare)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 436, in _ensured
return fun(*args, **kwargs)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 173, in _publish
channel = self.channel
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 190, in _get_channel
channel = self._channel = channel()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 422, in __call__
value = self.__value__ = self.__contract__()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 205, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 756, in default_channel
self.connection
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 741, in connection
self._connection = self._establish_connection()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 696, in _establish_connection
conn = self.transport.establish_connection()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 112, in establish_connection
conn = self.Connection(**opts)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 171, in __init__
(10, 10), # start
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 237, in _wait_method
self.method_reader.read_method()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
self._next_method()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 107, in _next_method
frame_type, channel, payload = read_frame()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 153, in read_frame
frame_type, channel, size = unpack('>BHI', read(7, True))
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 272, in _read
s = recv(n - len(rbuf))
KeyboardInterrupt
任何建議或意見將非常感激。 我已經通過其他鏈接談論/ var目錄大小,但我認爲我有足夠的空間。
DF -h
Filesystem Size Used Avail Use% Mounted on
/dev/sda3 283G 99G 170G 37%/
udev 1.9G 4.0K 1.9G 1% /dev
tmpfs 388M 1.1M 387M 1% /run
none 5.0M 0 5.0M 0% /run/lock
none 1.9G 28M 1.9G 2% /run/shm
以下的結果是,我檢查了RabbitMQ的日誌,並沒有得到任何有rabbitmqctl狀態
Status of node '[email protected]' ...
[{pid,12014},
{running_applications,[{rabbit,"RabbitMQ","3.3.2"},
{os_mon,"CPO CXC 138 46","2.2.7"},
{xmerl,"XML parser","1.2.10"},
{mnesia,"MNESIA CXC 138 12","4.5"},
{sasl,"SASL CXC 138 11","2.1.10"},
{stdlib,"ERTS CXC 138 10","1.17.5"},
{kernel,"ERTS CXC 138 10","2.14.5"}]},
{os,{unix,linux}},
{erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:4:4] [rq:4] [async-threads:30] [kernel-poll:true]\n"},
{memory,[{total,27919080},
{connection_procs,2704},
{queue_procs,5408},
{plugins,0},
{other_proc,9099992},
{mnesia,63776},
{mgmt_db,0},
{msg_index,34080},
{other_ets,784160},
{binary,12144},
{code,14685283},
{atom,1367393},
{other_system,1864140}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,1625165004},
{disk_free_limit,50000000},
{disk_free,181684699136},
{file_descriptors,[{total_limit,2},
{total_used,0},
{sockets_limit,0},
{sockets_used,0}]},
{processes,[{limit,1048576},{used,127}]},
{run_queue,0},
{uptime,20072}]
...done.
結果。芹菜版本是3.1.12。
我創建的RabbitMQ虛擬主機和用戶用下面的命令
$ sudo rabbitmqctl add_user rabbitmquser <mypassword>
$ sudo rabbitmqctl add_vhost localvhost
$ sudo rabbitmqctl set_permissions -p localvhost rabbitmquser ".*" ".*" ".*"
感謝
它掛起很可能是因爲它無法到達你的兔子服務器。 'ansumanb-u12'是你運行芹菜工和兔子的機器,對嗎?如果是這樣,請確保你的用戶名和密碼在芹菜conf中是正確的,'localhost:5672/localvhost'是正確的(我想知道你是否需要'/ localvhost')。您需要將'CELERY_TASK_RESULT_EXPIRES = ['json']'添加到celery conf中,以便擺脫芹菜啓動時的警告以及鹹菜的安全問題。 –
我會檢查經紀人鏈接。我怎樣才能確保經紀人鏈接是正確的?有沒有辦法直接從終端檢查連接?是的,我的機器ansumanb-u12包含芹菜工和兔子。我將添加CELERY_TASK_RESULT_EXPIRES。謝謝。 –
哦拍攝,對不起,應該是'CELERY_ACCEPT_CONTENT',我的錯誤。您可以放入ipython會話並嘗試創建Celery對象並啓動它。 'app = Celery('proj',broker ='amqp:// user:password @ hostname //').start()' –