2014-01-07 32 views
6

我在測試kombu的工作方式。我打算在幾個項目中取代pika。我看到kombu有很多文檔,但是使用我在文檔中發現的一些消息丟失了。這是代碼:使用kombu重試發佈消息的最佳方式是什麼?

from kombu import Connection, Producer 
conn = Connection('amqp://localhost:5672') 
def errback(exc, interval): 
    logger.error('Error: %r', exc, exc_info=1) 
    logger.info('Retry in %s seconds.', interval) 
producer = Producer(conn) 
publish = conn.ensure(producer, producer.publish, errback=errback, max_retries=3) 
for i in range(1, 200000): 
    publish({'hello': 'world'}, routing_key='test_queue') 
    time.sleep(0.001) 

當它的發佈我關閉連接好幾次,它使出版,但在隊列中大約有60000點的消息,所以有很多丟失的消息的。

我已經嘗試了不同的替代方案e.g:

publish({'hello': 'world'}, retry=True, mandatory=True, routing_key='hipri') 

謝謝!

回答

10

的問題是,在默認情況下海帶不使用「確認」,你必須使用:

 conn = Connection('amqp://localhost:5672', transport_options={'confirm_publish': True}) 

感謝

+0

尼斯分享此。 – flycee

+1

這不會做什麼強制性標誌在皮卡做。如果在RabbitMQ服務器上沒有聲明隊列,則消息將被髮送,不會被路由到任何隊列,因此將被「忽略」。製作人將不會收到任何信息不被路由。 – Sergey11g

相關問題