2015-10-13 40 views
2

凸出/芹菜:輪詢正在運行的任務爲結果

celery.py

from __future__ import absolute_import 

from kombu import Exchange, Queue 
from celery import Celery 

app = Celery('proj', 
      broker='redis://myredis.com', 
      backend='redis://myredis.com', 
      include=['proj.tasks']) 

a_exchange = Exchange('a_ex', type='topic') 

# Optional configuration, see the application user guide. 
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600, 
    CELERY_DISABLE_RATE_LIMITS=True, 
    CELERY_ROUTES = {"app.tasks.timeme": "a"} 
) 

if __name__ == '__main__': 
    app.start() 

tasks.py

from __future__ import absolute_import 

from proj.celery import app 
import time 

@app.task 
def timeme(ts): 
    print 'hi' 
    lat = time.time() - float(ts) 
    return (lat, time.time()) 

do_tasks.py

import proj.tasks 
import time 
import sys 

stime = time.time() 
running = [] 
while time.time() < stime + 15: 
    res = proj.tasks.timeme.apply_async(args=[time.time()], link=proj.tasks.timeme.s()) 
    running.append(res) 

    for res in running:  #<------------ this gets extremely slow if running gets big! 
      if res.ready() 
       print res.get() 

在上面的代碼,循環running並查看它是否準備就緒因爲running變得越來越大所以結果與否都需要很長時間。

在運行芹菜任務時是否有類似select.selectpoll/epoll

所以我可以做類似如下:

While running: 
    read, w, e = select.select([running], [], []) 
    print read.get() 
    running.remove(read) 
    break 

回答

1

一句話,沒有。

但是你可以得到你想要使用的是什麼celery.app.control.inspect

i = app.control.inspect() 
i.active() 
[{'worker1.example.com': 
    [{'name': 'tasks.sleeptask', 
     'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf', 
     'args': '(8,)', 
     'kwargs': '{}'}]}]