2016-10-20 44 views
1

試圖調出Celery(與RabbitMQ)官方碼頭集裝箱。socket.error:超時(Celery&RabbitMQ在碼頭集裝箱中運行)

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq 
docker run --link some-rabbit:rabbit --name some-celery -d celery 

我檢查日誌,以確保一切都很好:現在

# docker logs some-celery 

[2016-10-20 11:05:50,357: WARNING/MainProcess] /usr/local/lib/python3.5/site-packages/celery/apps/worker.py:161: CDeprecationWarning: 
Starting from version 3.2 Celery will refuse to accept pickle by default. 

The pickle serializer is a security concern as it may give attackers 
the ability to execute any command. It's important to secure 
your broker from unauthorized access when using pickle, so we think 
that enabling pickle should require a deliberate action and not be 
the default choice. 

If you depend on pickle then you should set a setting to disable this 
warning and to be sure that everything will continue working 
when you upgrade to Celery 3.2:: 

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] 

You must only enable the serializers that you will actually use. 


    warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) 
[2016-10-20 11:05:50,419: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbit:5672//: [Errno 111] Connection refused. 
Trying again in 2.00 seconds... 

[2016-10-20 11:05:52,430: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbit:5672//: [Errno 111] Connection refused. 
Trying again in 4.00 seconds... 

[2016-10-20 11:05:57,611: WARNING/MainProcess] celery[email protected] ready. 

,在主機我使用tasks.py

from celery import Celery 

app = Celery('tasks', backend='amqp', broker='amqp://guest:[email protected]/') 

@app.task(name='tasks.add') 
def add(x, y): 
    return x + y 

,並從主機命令行我運行Python並得到:

>>> from tasks import add 
>>> res=add.delay(4,4) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/Library/Python/2.7/site-packages/celery/app/task.py", line 461, in delay 
    return self.apply_async(args, kwargs) 
    File "/Library/Python/2.7/site-packages/celery/app/task.py", line 573, in apply_async 
    **dict(self._get_exec_options(), **options) 
    File "/Library/Python/2.7/site-packages/celery/app/base.py", line 354, in send_task 
    reply_to=reply_to or self.oid, **options 
    File "/Library/Python/2.7/site-packages/celery/app/amqp.py", line 310, in publish_task 
    **kwargs 
    File "/Library/Python/2.7/site-packages/kombu/messaging.py", line 172, in publish 
    routing_key, mandatory, immediate, exchange, declare) 
    File "/Library/Python/2.7/site-packages/kombu/connection.py", line 470, in _ensured 
    interval_max) 
    File "/Library/Python/2.7/site-packages/kombu/connection.py", line 382, in ensure_connection 
    interval_start, interval_step, interval_max, callback) 
    File "/Library/Python/2.7/site-packages/kombu/utils/__init__.py", line 246, in retry_over_time 
    return fun(*args, **kwargs) 
    File "/Library/Python/2.7/site-packages/kombu/connection.py", line 250, in connect 
    return self.connection 
    File "/Library/Python/2.7/site-packages/kombu/connection.py", line 756, in connection 
    self._connection = self._establish_connection() 
    File "/Library/Python/2.7/site-packages/kombu/connection.py", line 711, in _establish_connection 
    conn = self.transport.establish_connection() 
    File "/Library/Python/2.7/site-packages/kombu/transport/pyamqp.py", line 116, in establish_connection 
    conn = self.Connection(**opts) 
    File "/Library/Python/2.7/site-packages/amqp/connection.py", line 165, in __init__ 
    self.transport = self.Transport(host, connect_timeout, ssl) 
    File "/Library/Python/2.7/site-packages/amqp/connection.py", line 186, in Transport 
    return create_transport(host, connect_timeout, ssl) 
    File "/Library/Python/2.7/site-packages/amqp/transport.py", line 299, in create_transport 
    return TCPTransport(host, connect_timeout) 
    File "/Library/Python/2.7/site-packages/amqp/transport.py", line 95, in __init__ 
    raise socket.error(last_err) 
socket.error: timed out 

我看到這個Question但似乎錯誤是不同的有..

編輯: 我已經添加了RabbitMQ的容器日誌(相關部分):

docker logs some-rabbit 


       RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc. 
    ## ##  Licensed under the MPL. See http://www.rabbitmq.com/ 
    ## ## 
    ########## Logs: tty 
    ###### ##  tty 
    ########## 
       Starting broker... 

=INFO REPORT==== 20-Oct-2016::10:22:41 === 
Starting RabbitMQ 3.6.5 on Erlang 19.0.7 
Copyright (C) 2007-2016 Pivotal Software, Inc. 
Licensed under the MPL. See http://www.rabbitmq.com/ 

=INFO REPORT==== 20-Oct-2016::10:22:41 === 
node   : [email protected] 
home dir  : /var/lib/rabbitmq 
config file(s) : /etc/rabbitmq/rabbitmq.config 
cookie hash : AlUJAQFic5TGBPlUjyyIOw== 
log   : tty 
sasl log  : tty 
database dir : /var/lib/rabbitmq/mnesia/[email protected] 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
Memory limit set to 799MB of 1999MB total. 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
Disk free limit set to 50MB 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
Limiting to approx 1048476 file handles (943626 sockets) 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
FHC read buffering: OFF 
FHC write buffering: ON 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
Database directory at /var/lib/rabbitmq/mnesia/[email protected] is empty. Initialising from scratch... 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
    application: mnesia 
    exited: stopped 
    type: temporary 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Priority queues enabled, real BQ is rabbit_variable_queue 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Adding vhost '/' 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Creating user 'guest' 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Setting user tags for user 'guest' to [administrator] 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Setting permissions for 'guest' in '/' to '.*', '.*', '.*' 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
msg_store_transient: using rabbit_msg_store_ets_index to provide index 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
msg_store_persistent: using rabbit_msg_store_ets_index to provide index 

=WARNING REPORT==== 20-Oct-2016::10:22:43 === 
msg_store_persistent: rebuilding indices from scratch 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
started TCP Listener on [::]:5672 
completed with 0 plugins. 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Server startup complete; 0 plugins started. 

=INFO REPORT==== 20-Oct-2016::10:22:47 === 
accepting AMQP connection <0.353.0> (172.17.0.3:41166 -> 172.17.0.2:5672) 

=INFO REPORT==== 20-Oct-2016::10:22:47 === 
accepting AMQP connection <0.360.0> (172.17.0.3:41168 -> 172.17.0.2:5672) 
+0

你能直接訪問rabbitmq服務器嗎? – Jason

+0

@Jason我已經將rabbitmq日誌添加到問題中了。 – ItayB

+0

我注意到您正在使用rabbit-mq作爲後端。如果您從芹菜配置中刪除'backend ='amqp',這仍然會發生嗎?我想知道這個任務是否執行,但是它保存了對拋出異常的後端的響應。將日誌記錄到任務中會發生什麼? – Jason

回答

1

固定。 我按照這個question。 我改變了tasks.py代理IP地址本地主機(因爲RabbitMQ的端口暴露主辦和行爲像本地主機的過程):

from celery import Celery 

app = Celery('tasks', backend='amqp', broker='amqp://[email protected]') 

@app.task(name='tasks.add') 
def add(x, y): 
    return x + y 

然後,我運行的容器:

docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management 

docker run -v /Users/user/hostpath:/home/user --link some-rabbit:rabbit --name some-celery -d celery 

和將celeryconfig.py文件添加到/Users/user/hostpath

CELERY_IMPORTS = ('tasks') 
CELERY_IGNORE_RESULT = False 
CELERY_RESULT_BACKEND = 'amqp' 
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] 

,這對我有用:)

+1

很高興你找到了解決方案! – Jason

+0

@Jason感謝您的支持;-) – ItayB