2016-10-01 52 views
0

我正在以5個任務/秒的速率創建任務。我可以看到RabbitMQ消息傳入速率平均爲5.2/s,我有240個消費者分佈在4個虛擬機中(每個虛擬機60個),每個工作人員處理持續20秒的任務。 理論上我應該處理100K任務而不排隊。RabbitMQ未執行郵件

我看到大量的Unacked消息。如何擺脫Unacked消息或添加一個計時器來殺死他們,這是否意味着我的工作人員的問題?

How can I recover unacknowledged AMQP messages from other channels than my connection's own?

隊列標籤

準備UNACKED總的來電提供/獲取ACK

21884 960 22844 5.0/s的0.40/s的0.40 /秒

enter image description here 交易所 tab:stackoverflow direct D 5.0/s 5.0/s

這是我的celeryconfig文件。

CELERYD_CHDIR = settings.filepath 
CELERY_ENABLE_UTC = True 
CELERY_TIMEZONE = "US/Eastern" 
CELERY_ACCEPT_CONTENT = ['json', 'pickle', 'yaml'] 
CELERY_IGNORE_RESULT = True 
CELERY_RESULT_BACKEND = "amqp" 
CELERY_RESULT_PERSISTENT = True 
BROKER_URL = 'amqp://stackoverflow:[email protected]:5672' 
BROKER_CONNECTION_TIMEOUT = 15 
BROKER_CONNECTION_MAX_RETRIES = 5 
CELERY_DISABLE_RATE_LIMITS = True 
CELERY_TASK_RESULT_EXPIRES = 7200 
CELERY_IMPORTS = ("cc.modules.stackoverflow") 


CELERY_DEFAULT_QUEUE = "default" 
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'), 
    Queue('gold', Exchange('stackoverflow'), routing_key='stackoverflow.gold'), 
    Queue('silver', Exchange('stackoverflow'), routing_key='stackoverflow.silver'), 
    Queue('bronze', Exchange('stackoverflow'), routing_key='stackoverflow.bronze'), 
) 
CELERY_DEFAULT_EXCHANGE = "stackoverflow" 
CELERY_DEFAULT_EXCHANGE_TYPE = "topic" 
CELERY_DEFAULT_ROUTING_KEY = "default" 
CELERY_TRACK_STARTED = True 

CELERY_ROUTES = { 
    'process_call'  : {'queue': 'gold', 'routing_key': 'stackoverflow.gold', 'exchange': 'stackoverflow',}, 
    'process_recording': {'queue': 'silver', 'routing_key': 'stackoverflow.silver', 'exchange': 'stackoverflow',}, 
    'process_campaign' : {'queue': 'bronze', 'routing_key': 'stackoverflow.bronze', 'exchange': 'stackoverflow',} 
} 

回答