使用txamqp客戶端如下具體情況:如何重啓消費和消費被拒絕的消息
- 聲明所謂的 '消息' (類型=話題)
交換yield amqp.chan.exchange_declare(exchange='messaging', type='topic')
- 設置消費者
yield amqp.named_queue_declare(queue="submit.sm_all") yield amqp.chan.queue_bind(queue="submit.sm_all", exchange="messaging", routing_key="submit.sm.*") yield amqp.chan.basic_consume(queue="submit.sm_all", no_ack=False, consumer_tag='qtag')
- 發佈50消息
- 啓動消費者用回調重新消耗所有消費的消息
- 停止5秒後消費者:
- 啓動消費者:
- 停止後消費5秒:
- 消息被拒絕是這樣的:
for i in range(50): yield amqp.publish(exchange='messaging', routing_key="submit.sm.connector01", content=Content(str(i)))
queue = yield amqp.client.queue('qtag') queue.get().addCallback(self._callback_reject_and_requeue_all).addErrback(self._errback)
yield queue.close()
在這個階段,該隊列是仍然充滿了50條消息,因爲它們都被拒絕並重新排隊(回調被多次觸發)。
再次queue = yield amqp.client.queue('qtag') queue.get().addCallback(self._callback).addErrback(self._errback)
yield queue.close()
問題是,在步驟6中啓動使用者後,從未觸發回調,並且隊列中仍保留着50條消息。
注:
yield amqp.chan.basic_reject(delivery_tag=message.delivery_tag, requeue = 1)
你的意思是拒絕的郵件會去當前隊列?如果是這種情況,那麼當我再次用相同的隊列啓動我的消費者時,爲什麼它看不到被拒絕的消息? – 2014-11-07 17:27:33
我已經添加了關於yield語句的註釋。使用'return'而不是'yield'呢?它應該有所幫助。 – pinepain 2014-11-07 20:24:54
與yield沒有關係,它只是阻塞,直到我從調用中得到響應,我用它來簡化代碼並且不使用回調(amqp對象是一個使用扭曲框架的txamqp客戶端,基於事件的一個事件) – 2014-11-08 08:13:53