2016-10-11 39 views
0

我有一個簡單的Raspberry Pi + Django + Celery + Rabbitmq設置,用於在用戶與Web應用程序交互時從Xbee收音機發送和接收數據。Rabbitmq消息隊列堆積直到系統崩潰(所有隊列都「準備就緒」)

對於我的生活,我無法獲得Rabbitmq(或芹菜?)的控制權,只有一天(有時候會更長)整個系統因某種內存泄漏而崩潰。

我懷疑的是,隊列堆積,永遠不會被刪除。

下面有什麼,我只有幾分鐘的運行時間後看到一個畫面: enter image description here

似乎所有的隊列都在「就緒」狀態。

奇怪的是,似乎工人確實收到消息並執行任務。

該任務是非常小的,不應該超過1秒。 我已經驗證了執行到最後一行的任務,應該返回正常。

我不是專家,也不知道我在看什麼,所以我不確定這是否是正常行爲,我的問題在別處?

我已將所有設置爲以守護進程運行,但即使在開發模式下運行,我也會得到相同的結果。

我花了近四個小時的時間用Google搜索進行調試,發現它把我圈起來,我沒有找到清晰度。

[CONFIGS和代碼]

在/ ECT /默認/ celeryd我已經設置了下列內容:

CELERY_APP="MyApp" 
CELERYD_NODES="w1" 

# Python interpreter from environment. 
ENV_PYTHON="/home/pi/.virtualenvs/myappenv/bin/python" 

# Where to chdir at start. 
CELERYD_CHDIR="/home/pi/django_projects/MyApp" 

# Virtual Environment Setup 
ENV_MY="/home/pi/.virtualenvs/myappenv" 
CELERYD="$ENV_MY/bin/celeryd" 
CELERYD_MULTI="$ENV_PYTHON $CELERYD_CHDIR/manage.py celeryd_multi" 
CELERYCTL="$ENV_MY/bin/celeryctl" 

CELERYD_OPTS="--app=MyApp --concurrency=1 --loglevel=FATAL" 

CELERYD_LOG_FILE="/var/log/celery/%n.log" 
CELERYD_PID_FILE="/var/run/celery/%n.pid" 

CELERYD_USER="celery" 
CELERYD_GROUP="celery" 

tasks.py

@celery.task 
def sendStatus(modelContext, ignore_result=True, *args, **kwargs): 
    node = modelContext#EndNodes.objects.get(node_addr_lg=node_addr_lg) 
    #check age of message and proceed to send status update if it is fresh, otherwise we'll skip it 
    if not current_task.request.eta == None: 
     now_date = datetime.now().replace(tzinfo=None) #the time now 
     eta_date = dateutil.parser.parse(current_task.request.eta).replace(tzinfo=None)#the time this was supposed to run, remove timezone from message eta datetime 
     delta_seconds = (now_date - eta_date).total_seconds()#seconds from when this task was supposed to run 
     if delta_seconds >= node.status_timeout:#if the message was queued more than delta_seconds ago this message is too old to process 
      return 
    #now that we know the message is fresh we can proceed to process the contents and send status to xbee 
    hostname = current_task.request.hostname #the name/key in the schedule that might have related xbee sessions 

    app = Celery('app')#create a new instance of app (because documented methods didnt work) 

    i = app.control.inspect() 
    scheduled_tasks = i.scheduled()#the schedule of tasks in the queue 

    for task in scheduled_tasks[hostname]:#iterate through each task 
     xbee_session = ast.literal_eval(task['request']['kwargs'])#the request data in the message (converts unicode to dict) 
     if xbee_session['xbee_addr'] == node.node_addr_lg:#get any session data for this device that we may have set from model's save override 
      if xbee_session['type'] == 'STAT':#because we are responding with status update we look for status sessions 
       app.control.revoke(task['request']['id'], terminate=True)#revoke this task because it is redundant and we are sending update now 

    page_mode = chr(node.page_mode)#the paging mode to set on the remote device 
    xbee_global.tx(dest_addr_long=bytearray.fromhex(node.node_addr_lg), 
        frame_id='A', 
        dest_addr='\xFF\xFE', 
        data=page_mode) 

芹菜飛濺:

-------------- [email protected] v3.1.23 (Cipater) 
---- **** ----- 
--- * *** * -- Linux-4.4.11-v7+-armv7l-with-debian-8.0 
-- * - **** --- 
- ** ---------- [config] 
- ** ---------- .> app:   MyApp:0x762efe10 
- ** ---------- .> transport: amqp://guest:**@localhost:5672// 
- ** ---------- .> results:  amqp:// 
- *** --- * --- .> concurrency: 1 (prefork) 
-- ******* ---- 
--- ***** ----- [queues] 
-------------- .> celery   exchange=celery(direct) key=celery 


[tasks] 
    . MyApp.celery.debug_task 
    . clone_app.tasks.nodeInterval 
    . clone_app.tasks.nodePoll 
    . clone_app.tasks.nodeState 
    . clone_app.tasks.resetNetwork 
    . clone_app.tasks.sendData 
    . clone_app.tasks.sendStatus 

[2016-10-11 03:41:12,863: WARNING/Worker-1] Got signal worker_process_init for task id None 
[2016-10-11 03:41:12,913: WARNING/Worker-1] JUST OPENED 
[2016-10-11 03:41:12,915: WARNING/Worker-1] /dev/ttyUSB0 
[2016-10-11 03:41:12,948: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// 
[2016-10-11 03:41:13,101: INFO/MainProcess] mingle: searching for neighbors 
[2016-10-11 03:41:14,206: INFO/MainProcess] mingle: all alone 
[2016-10-11 03:41:14,341: WARNING/MainProcess] [email protected] ready. 
[2016-10-11 03:41:16,223: WARNING/Worker-1] RAW DATA 
[2016-10-11 03:41:16,225: WARNING/Worker-1] {'source_addr_long': '\x00\x13\xa2\[email protected]\x89\xe9\xd7', 'rf_data': '...^%:STAT:`', 'source_addr': '[*', 'id': 'rx', 'options': '\x01'} 
[2016-10-11 03:41:16,458: INFO/MainProcess] Received task: clone_app.tasks.sendStatus[6e1a74ec-dca5-495f-a4fa-906a5c657b26] eta:[2016-10-11 03:41:17.307421+00:00] 

我可以提供addi如果需要,請提供詳細信息 並感謝您解決此問題的任何幫助。

回答

0

哇,幾乎immedietly後發佈我的問題,我發現this post它已完全解決了我的問題。

正如我的預期ignore_result = True是必需的,我只是不確定它屬於哪裏。

現在我看不到任何隊列,除非可能是工人正在執行任務。 :)


下面是tasks.py變化:

#From 
@celery.task 
def sendStatus(modelContext, ignore_result=True, *args, **kwargs): 
    #Some code here 

#To 
@celery.task(ignore_result=True) 
def sendStatus(modelContext, *args, **kwargs): 
    #Some code here