我有一個簡單的Raspberry Pi + Django + Celery + Rabbitmq設置,用於在用戶與Web應用程序交互時從Xbee收音機發送和接收數據。Rabbitmq消息隊列堆積直到系統崩潰(所有隊列都「準備就緒」)
對於我的生活,我無法獲得Rabbitmq(或芹菜?)的控制權,只有一天(有時候會更長)整個系統因某種內存泄漏而崩潰。
我懷疑的是,隊列堆積,永遠不會被刪除。
似乎所有的隊列都在「就緒」狀態。
奇怪的是,似乎工人確實收到消息並執行任務。
該任務是非常小的,不應該超過1秒。 我已經驗證了執行到最後一行的任務,應該返回正常。
我不是專家,也不知道我在看什麼,所以我不確定這是否是正常行爲,我的問題在別處?
我已將所有設置爲以守護進程運行,但即使在開發模式下運行,我也會得到相同的結果。
我花了近四個小時的時間用Google搜索進行調試,發現它把我圈起來,我沒有找到清晰度。
[CONFIGS和代碼]
在/ ECT /默認/ celeryd我已經設置了下列內容:
CELERY_APP="MyApp"
CELERYD_NODES="w1"
# Python interpreter from environment.
ENV_PYTHON="/home/pi/.virtualenvs/myappenv/bin/python"
# Where to chdir at start.
CELERYD_CHDIR="/home/pi/django_projects/MyApp"
# Virtual Environment Setup
ENV_MY="/home/pi/.virtualenvs/myappenv"
CELERYD="$ENV_MY/bin/celeryd"
CELERYD_MULTI="$ENV_PYTHON $CELERYD_CHDIR/manage.py celeryd_multi"
CELERYCTL="$ENV_MY/bin/celeryctl"
CELERYD_OPTS="--app=MyApp --concurrency=1 --loglevel=FATAL"
CELERYD_LOG_FILE="/var/log/celery/%n.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"
CELERYD_USER="celery"
CELERYD_GROUP="celery"
tasks.py
@celery.task
def sendStatus(modelContext, ignore_result=True, *args, **kwargs):
node = modelContext#EndNodes.objects.get(node_addr_lg=node_addr_lg)
#check age of message and proceed to send status update if it is fresh, otherwise we'll skip it
if not current_task.request.eta == None:
now_date = datetime.now().replace(tzinfo=None) #the time now
eta_date = dateutil.parser.parse(current_task.request.eta).replace(tzinfo=None)#the time this was supposed to run, remove timezone from message eta datetime
delta_seconds = (now_date - eta_date).total_seconds()#seconds from when this task was supposed to run
if delta_seconds >= node.status_timeout:#if the message was queued more than delta_seconds ago this message is too old to process
return
#now that we know the message is fresh we can proceed to process the contents and send status to xbee
hostname = current_task.request.hostname #the name/key in the schedule that might have related xbee sessions
app = Celery('app')#create a new instance of app (because documented methods didnt work)
i = app.control.inspect()
scheduled_tasks = i.scheduled()#the schedule of tasks in the queue
for task in scheduled_tasks[hostname]:#iterate through each task
xbee_session = ast.literal_eval(task['request']['kwargs'])#the request data in the message (converts unicode to dict)
if xbee_session['xbee_addr'] == node.node_addr_lg:#get any session data for this device that we may have set from model's save override
if xbee_session['type'] == 'STAT':#because we are responding with status update we look for status sessions
app.control.revoke(task['request']['id'], terminate=True)#revoke this task because it is redundant and we are sending update now
page_mode = chr(node.page_mode)#the paging mode to set on the remote device
xbee_global.tx(dest_addr_long=bytearray.fromhex(node.node_addr_lg),
frame_id='A',
dest_addr='\xFF\xFE',
data=page_mode)
芹菜飛濺:
-------------- [email protected] v3.1.23 (Cipater)
---- **** -----
--- * *** * -- Linux-4.4.11-v7+-armv7l-with-debian-8.0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: MyApp:0x762efe10
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: amqp://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. MyApp.celery.debug_task
. clone_app.tasks.nodeInterval
. clone_app.tasks.nodePoll
. clone_app.tasks.nodeState
. clone_app.tasks.resetNetwork
. clone_app.tasks.sendData
. clone_app.tasks.sendStatus
[2016-10-11 03:41:12,863: WARNING/Worker-1] Got signal worker_process_init for task id None
[2016-10-11 03:41:12,913: WARNING/Worker-1] JUST OPENED
[2016-10-11 03:41:12,915: WARNING/Worker-1] /dev/ttyUSB0
[2016-10-11 03:41:12,948: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2016-10-11 03:41:13,101: INFO/MainProcess] mingle: searching for neighbors
[2016-10-11 03:41:14,206: INFO/MainProcess] mingle: all alone
[2016-10-11 03:41:14,341: WARNING/MainProcess] [email protected] ready.
[2016-10-11 03:41:16,223: WARNING/Worker-1] RAW DATA
[2016-10-11 03:41:16,225: WARNING/Worker-1] {'source_addr_long': '\x00\x13\xa2\[email protected]\x89\xe9\xd7', 'rf_data': '...^%:STAT:`', 'source_addr': '[*', 'id': 'rx', 'options': '\x01'}
[2016-10-11 03:41:16,458: INFO/MainProcess] Received task: clone_app.tasks.sendStatus[6e1a74ec-dca5-495f-a4fa-906a5c657b26] eta:[2016-10-11 03:41:17.307421+00:00]
我可以提供addi如果需要,請提供詳細信息 並感謝您解決此問題的任何幫助。