2017-04-25 152 views
0

發送任務,芹菜我使用:無法從蟒蛇與apply_async

​​

問題是 - 有時(真常)我不能開始從Django的殼或從我的使用apply_async其他工作任務,程序只是掛在這個命令上。

我嘗試運行代碼,如:

my_task.apply_async(
    args=[object_id], 
    queue='my_queue') 

而且它只是掛,沒有任何結果。我無法弄清楚爲什麼會發生這種情況。

如果我需要提供更詳細的數據,請告訴我。

socket(PF_INET6, SOCK_DGRAM, IPPROTO_IP) = 13 
connect(13, {sa_family=AF_INET6, sin6_port=htons(27026), inet_pton(AF_INET6, "4a67:6b8:0:f0e:7543:7rh8:a28:gtbc", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, 28) = 0 

getsockname(13, {sa_family=AF_INET6, sin6_port=htons(60033), inet_pton(AF_INET6, "2a45:6b8:0:f0e:8034:7gtd:a28:hy6b", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 0 
connect(13, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0 
connect(13, {sa_family=AF_INET, sin_port=htons(27026), sin_addr=inet_addr("86.223.186.12")}, 16) = 0 
getsockname(13, {sa_family=AF_INET6, sin6_port=htons(50388), inet_pton(AF_INET6, "::ffff:37.9.91.95", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 0 
close(13)        = 0 
socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP) = 13 
setsockopt(13, SOL_TCP, TCP_NODELAY, [1], 4) = 0 
fcntl(13, F_GETFL)      = 0x2 (flags O_RDWR) 
fcntl(13, F_SETFL, O_RDWR|O_NONBLOCK) = 0 
setsockopt(13, SOL_SOCKET, SO_KEEPALIVE, [0], 4) = 0 
connect(13, {sa_family=AF_INET6, sin6_port=htons(27026), inet_pton(AF_INET6, "2a45:6b8:0:f0e:8034:7gtd:a28:hy6b", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, 28) = -1 EINPROGRESS (Operation now in progress) 
poll([{fd=13, events=POLLOUT}], 1, 4000) = 1 ([{fd=13, revents=POLLOUT}]) 
getsockopt(13, SOL_SOCKET, SO_ERROR, [0], [4]) = 0 
fcntl(13, F_GETFL)      = 0x802 (flags O_RDWR|O_NONBLOCK) 
fcntl(13, F_SETFL, O_RDWR)    = 0 
sendto(13, ":\0\0\0\254\376\245\33\0\0\0\0\324\7\0\0\0\0\0\0admin.$cmd\0\0"..., 58, 0, NULL, 0) = 58 
recvfrom(13, "\344\1\0\0\255\351Ct\254\376\245\33\1\0\0\0", 16, 0, NULL, NULL) = 16 
recvfrom(13, "\10\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1\0\0\0\300\1\0\0\2setName"..., 468, 0, NULL, NULL) = 468 
open("/dev/urandom", O_RDONLY)   = 20 
read(20, "\3a\240\371\34\342\273", 7) = 7 
close(20)        = 0 
sendto(13, "\230\0\0\0\202\256\213C\0\0\0\0\324\7\0\0\0\0\0\0dogma.$cmd\0\0"..., 152, 0, NULL, 0) = 152 
recvfrom(13, "\267\0\0\0\274\351Ct\202\256\213C\1\0\0\0", 16, 0, NULL, NULL) = 16 
recvfrom(13, "\10\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1\0\0\0\223\0\0\0\20convers"..., 167, 0, NULL, NULL) = 167 
futex(0x11bb8c0, FUTEX_WAKE_PRIVATE, 1) = 1 
sendto(13, "\274\0\0\0Ih<\21\0\0\0\0\324\7\0\0\0\0\0\0dogma.$cmd\0\0"..., 188, 0, NULL, 0) = 188 
recvfrom(13, "|\0\0\0X\352CtIh<\21\1\0\0\0", 16, 0, NULL, NULL) = 16 
recvfrom(13, "\10\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1\0\0\0X\0\0\0\20convers"..., 108, 0, NULL, NULL) = 108 
sendto(13, "`\0\0\0\245\347\226\35\0\0\0\0\324\7\0\0\0\0\0\0dogma.$cmd\0\0"..., 96, 0, NULL, 0) = 96 
recvfrom(13, "^\0\0\0]\352Ct\245\347\226\35\1\0\0\0", 16, 0, NULL, NULL) = 16 
recvfrom(13, "\10\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1\0\0\0:\0\0\0\20convers"..., 78, 0, NULL, NULL) = 78 
sendto(13, "x\0\0\0\366&\373T\0\0\0\0\324\7\0\0\0\0\0\0dogma.$cmd\0\0"...,  120, 0, NULL, 0) = 120 
recvfrom(13, 

當我做CTRL + C在Django殼殺任務送我收到以下:

我嘗試調用apply_async在使用strace的工藝Django的殼得到這個日誌(安裝它的最後幾行)輸出:

/lib/python2.7/site-packages/test/my_tasks.py in 
send_task_to_celery(my_args): 
    850  my_task.apply_async(
    851   args=[my_args], 
--> 852   queue='my_queue' 
    853 ) 

/lib/python2.7/site-packages/celery/app/task.pyc in apply_async(self, args, kwargs, task_id, producer, link, link_error, **options) 
    563    self.name, args, kwargs, task_id=task_id, producer=producer, 
    564    link=link, link_error=link_error, result_cls=self.AsyncResult, 
--> 565    **dict(self._get_exec_options(), **options) 
    566  ) 
    567 

/lib/python2.7/site-packages/celery/app/base.pyc in send_task(self, name, args, kwargs, countdown, eta, task_id, producer, connection, router, result_cls, expires, publisher, link, link_error, add_to_parent, reply_to, **options) 
    352     task_id=task_id, expires=expires, 
    353     callbacks=maybe_list(link), errbacks=maybe_list(link_error), 
--> 354     reply_to=reply_to or self.oid, **options 
    355   ) 
    356   result = (result_cls or self.AsyncResult)(task_id) 

/lib/python2.7/site-packages/celery/app/amqp.pyc in publish_task(self, task_name, task_args, task_kwargs, countdown, eta, task_id, group_id, taskset_id, expires, exchange, exchange_type, event_dispatcher, retry, retry_policy, queue, now, retries, chord, callbacks, errbacks, routing_key, serializer, delivery_mode, compression, reply_to, time_limit, soft_time_limit, declare, headers, send_before_publish, before_receivers, send_after_publish, after_receivers, send_task_sent, sent_receivers, **kwargs) 
    308    correlation_id=task_id, 
    309    delivery_mode=delivery_mode, declare=declare, 
--> 310    **kwargs 
    311  ) 
    312 

/lib/python2.7/site-packages/kombu/messaging.pyc in publish(self, body, routing_key, delivery_mode, mandatory, immediate, priority, content_type, content_encoding, serializer, headers, compression, exchange, retry, retry_policy, declare, expiration, **properties) 
    170   return publish(body, priority, content_type, 
    171      content_encoding, headers, properties, 
--> 172      routing_key, mandatory, immediate, exchange, declare) 
    173 
    174  def _publish(self, body, priority, content_type, content_encoding, 

/lib/python2.7/site-packages/kombu/connection.pyc in _ensured(*args, **kwargs) 
    447    for retries in count(0): # for infinity 
    448     try: 
--> 449      return fun(*args, **kwargs) 
    450     except conn_errors as exc: 
    451      if got_connection and not has_modern_errors: 

/lib/python2.7/site-packages/kombu/messaging.pyc in _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare) 
    182   if declare: 
    183    maybe_declare = self.maybe_declare 
--> 184    [maybe_declare(entity) for entity in declare] 
    185   return channel.basic_publish(
    186    message, 

/lib/python2.7/site-packages/kombu/messaging.pyc in maybe_declare(self, entity, retry, **retry_policy) 
    109   during this session.""" 
    110   if entity: 
--> 111    return maybe_declare(entity, self.channel, retry, **retry_policy) 
    112 
    113  def publish(self, body, routing_key=None, delivery_mode=None, 

/lib/python2.7/site-packages/kombu/common.pyc in maybe_declare(entity, channel, retry, **retry_policy) 
    118   return _imaybe_declare(entity, declared, ident, 
    119        channel, **retry_policy) 
--> 120  return _maybe_declare(entity, declared, ident, channel) 
    121 
    122 

/lib/python2.7/site-packages/kombu/common.pyc in _maybe_declare(entity, declared, ident, channel) 
    125  if not channel.connection: 
    126   raise RecoverableConnectionError('channel disconnected') 
--> 127  entity.declare() 
    128  if declared is not None and ident: 
    129   declared.add(ident) 

/lib/python2.7/site-packages/kombu/entity.pyc in declare(self, nowait) 
    520   if self.exchange: 
    521    self.exchange.declare(nowait) 
--> 522   self.queue_declare(nowait, passive=False) 
    523 
    524   if self.exchange and self.exchange.name: 

/lib/python2.7/site-packages/kombu/entity.pyc in queue_declare(self, nowait, passive) 
    546           auto_delete=self.auto_delete, 
    547           arguments=self.queue_arguments, 
--> 548           nowait=nowait) 
    549   if not self.name: 
    550    self.name = ret[0] 
/lib/python2.7/site-packages/kombu/transport/virtual/__init__.pyc in queue_declare(self, queue, passive, **kwargs) 
    445   else: 
    446    self._new_queue(queue, **kwargs) 
--> 447   return queue_declare_ok_t(queue, self._size(queue), 0) 
    448 
    449  def queue_delete(self, queue, if_unused=False, if_empty=False, **kwargs): 

/lib/python2.7/site-packages/kombu/transport/mongodb.pyc in _size(self, queue) 
    122    return self.get_broadcast_cursor(queue).get_size() 
    123 
--> 124   return self.get_messages().find({'queue': queue}).count() 
    125 
    126  def _put(self, queue, message, **kwargs): 

/lib/python2.7/site-packages/pymongo/cursor.pyc in count(self, with_limit_and_skip) 
    706     cmd["skip"] = self.__skip 
    707 
--> 708   return self.__collection._count(cmd) 
    709 
    710  def distinct(self, key): 

/lib/python2.7/site-packages/pymongo/collection.pyc in _count(self, cmd) 
    1197         codec_options=self.codec_options._replace(
    1198          document_class=dict), 
-> 1199         read_concern=self.read_concern) 
    1200   if res.get("errmsg", "") == "ns missing": 
    1201    return 0 

/lib/python2.7/site-packages/pymongo/collection.pyc in _command(self, sock_info, command, slave_ok, read_preference, codec_options, check, allowable_errors, read_concern) 
    203         check, 
    204         allowable_errors, 
--> 205         read_concern=read_concern) 
    206 
    207  def __create(self, options): 

/lib/python2.7/site-packages/pymongo/pool.pyc in command(self, dbname, spec, slave_ok, read_preference, codec_options, check, allowable_errors, check_keys, read_concern) 
    216   # Catch socket.error, KeyboardInterrupt, etc. and close ourselves. 
    217   except BaseException as error: 
--> 218    self._raise_connection_failure(error) 
    219 
    220  def send_message(self, message, max_doc_size): 

/lib/python2.7/site-packages/pymongo/pool.pyc in _raise_connection_failure(self, error) 
    344    _raise_connection_failure(self.address, error) 
    345   else: 
--> 346    raise error 
    347 
    348  def __eq__(self, other): 
KeyboardInterrupt: 

這接縫,我認爲這個問題與MongoDB的連接,不能與芹菜/ Python的,是嗎?我如何診斷這樣的問題?那我該看看? MongoDB服務器版本:3.0.11

+0

你認爲你傳錯了嗎?不應該使用my_task.apply_async(* [object_id],queue ='my_queue')而不是args = [object_id] – daemon24

回答

0

問題出在我的經紀人 - 我使用mongodb,它花了很多時間寫一個任務。除非必須,否則不要將數據庫用作經紀人。