我正在以5個任務/秒的速率創建任務。我可以看到RabbitMQ消息傳入速率平均爲5.2/s,我有240個消費者分佈在4個虛擬機中(每個虛擬機60個),每個工作人員處理持續20秒的任務。 理論上我應該處理100K任務而不排隊。RabbitMQ未執行郵件
我看到大量的Unacked消息。如何擺脫Unacked消息或添加一個計時器來殺死他們,這是否意味着我的工作人員的問題?
How can I recover unacknowledged AMQP messages from other channels than my connection's own?
隊列標籤
準備UNACKED總的來電提供/獲取ACK
21884 960 22844 5.0/s的0.40/s的0.40 /秒
交易所 tab:stackoverflow direct D 5.0/s 5.0/s
這是我的celeryconfig文件。
CELERYD_CHDIR = settings.filepath
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "US/Eastern"
CELERY_ACCEPT_CONTENT = ['json', 'pickle', 'yaml']
CELERY_IGNORE_RESULT = True
CELERY_RESULT_BACKEND = "amqp"
CELERY_RESULT_PERSISTENT = True
BROKER_URL = 'amqp://stackoverflow:[email protected]:5672'
BROKER_CONNECTION_TIMEOUT = 15
BROKER_CONNECTION_MAX_RETRIES = 5
CELERY_DISABLE_RATE_LIMITS = True
CELERY_TASK_RESULT_EXPIRES = 7200
CELERY_IMPORTS = ("cc.modules.stackoverflow")
CELERY_DEFAULT_QUEUE = "default"
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('gold', Exchange('stackoverflow'), routing_key='stackoverflow.gold'),
Queue('silver', Exchange('stackoverflow'), routing_key='stackoverflow.silver'),
Queue('bronze', Exchange('stackoverflow'), routing_key='stackoverflow.bronze'),
)
CELERY_DEFAULT_EXCHANGE = "stackoverflow"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "default"
CELERY_TRACK_STARTED = True
CELERY_ROUTES = {
'process_call' : {'queue': 'gold', 'routing_key': 'stackoverflow.gold', 'exchange': 'stackoverflow',},
'process_recording': {'queue': 'silver', 'routing_key': 'stackoverflow.silver', 'exchange': 'stackoverflow',},
'process_campaign' : {'queue': 'bronze', 'routing_key': 'stackoverflow.bronze', 'exchange': 'stackoverflow',}
}