0
from celery import Celery
from celery.worker.control import inspect_command
app = Celery('tasks', broker='pyamqp://[email protected]//')
@app.task
def add(x, y):
return x + y
@inspect_command
def current_prefetch_count(state):
return {'prefetch_count': state.consumer.qos.value}
我試圖運行與芹菜的代碼,但它給了我一個錯誤:芹菜從蟒蛇檢查任務
File"c:\python27\lib\site-packages\kombu\utils\imports.py",line56,insymbo
_by_name
typeError: inspect_command() takes exactly 0 arguments (1 given)
另外,我想定製inspect_command運行此sysfile.py
#!/usr/bin/env python
import platform
import celery
import os
import psutil
import json
def Speed_Test():
Speed_list = (os.popen("speedtest-cli --share --simple").read()).split("\n")
result = [Speed_list[-2].split(":")[-1]]
result.append(Speed_list[-3].split(":")[-1])
return result
def Sys_Info():
inner_Dict = {}
inner_Dict["CPU Model"]=platform.processor()
inner_Dict["No of CPU"]=psutil.cpu_count()
inner_Dict["Disk info"]=psutil.disk_usage('/')
inner_Dict["celery"] = (celery.__version__)
inner_Dict["Upload"] = Speed_Test()[0]
inner_Dict["Download"] = Speed_Test()[1]
return json.dumps(inner_Dict)
x = Sys_Info()
print x
怎麼樣讓接下來的事情在尋求感謝那些幫助了很多 –
另外我想custome inspect_command運行此sysfile.py –
調用SYS_INFO():這是另一個PY文件,得到的結果 整合SYS_INFO()功能納入他們自己的檢查命令 –