2014-11-06 95 views
0

使用txamqp客戶端如下具體情況:如何重啓消費和消費被拒絕的消息

  1. 聲明所謂的 '消息' (類型=話題)
yield amqp.chan.exchange_declare(exchange='messaging', type='topic') 
交換
  1. 設置消費者
yield amqp.named_queue_declare(queue="submit.sm_all") 
yield amqp.chan.queue_bind(queue="submit.sm_all", exchange="messaging", routing_key="submit.sm.*") 
yield amqp.chan.basic_consume(queue="submit.sm_all", no_ack=False, consumer_tag='qtag') 
  • 發佈50消息
  • for i in range(50): 
         yield amqp.publish(exchange='messaging', routing_key="submit.sm.connector01", 
          content=Content(str(i))) 
    
  • 啓動消費者用回調重新消耗所有消費的消息
  • queue = yield amqp.client.queue('qtag') 
    queue.get().addCallback(self._callback_reject_and_requeue_all).addErrback(self._errback) 
    
  • 停止5秒後消費者:
  • yield queue.close() 
    

    在這個階段,該隊列是仍然充滿了50條消息,因爲它們都被拒絕並重新排隊(回調被多次觸發)。

    再次
  • 啓動消費者:
  • queue = yield amqp.client.queue('qtag') 
    queue.get().addCallback(self._callback).addErrback(self._errback) 
    
  • 停止後消費5秒
  • yield queue.close() 
    

    問題是,在步驟6中啓動使用者後,從未觸發回調,並且隊列中仍保留着50條消息。

    注:

    • 消息被拒絕是這樣的:
    yield amqp.chan.basic_reject(delivery_tag=message.delivery_tag, requeue = 1) 
    

    回答

    0

    爲了乾淨地停止消費者(步驟5),basic_cancel必須使用:

    收率amqp.chan:

    5秒後
  • 停止消費者.basic_cancel(consumer_tag = 'qtag')

  • 開始再次消費者:

    收率amqp.chan.basic_consume(隊列= 「submit.sm_all」,NO_ACK =假,consumer_tag = 'qtag') 隊列=產量amqp.client.queue( 'qtag') queue.get()的addCallback(self._callback).addErrback(self._errback)

  • 0

    有沒有區別的消息是否已經被拒絕或沒有 - 它會駐留在頂部隊列,並且可以被任何消費者選擇(或者如果使用TTL或長度限制將會從隊列中移除,並且這樣的限制將會達到)。

    您不能只消耗先前拒絕的消息,而無法在服務器端定義它。事實上,你只能從隊列中消耗一個消息(它們是嚴格的FIFO隊列)。

    作爲一種解決方法,您可以設置Dead Letter Exchanges並拒絕帶有requeue=false的消息,然後根據DLX路由流程將它們移動到目標隊列。然後,您可以從那裏消費被拒絕的消息,但通常情況下,通常會拒絕消息與消耗的隊列相同,除非需要特殊的邏輯。

    你也可以重新發布你想要拒絕的地方,你想要的地方,它甚至聽起來有點生疏。

    P.S:

    注意,當你在函數中調用yield,它不運行函數體而是返回發生器對象

    +0

    你的意思是拒絕的郵件會去當前隊列?如果是這種情況,那麼當我再次用相同的隊列啓動我的消費者時,爲什麼它看不到被拒絕的消息? – 2014-11-07 17:27:33

    +0

    我已經添加了關於yield語句的註釋。使用'return'而不是'yield'呢?它應該有所幫助。 – pinepain 2014-11-07 20:24:54

    +0

    與yield沒有關係,它只是阻塞,直到我從調用中得到響應,我用它來簡化代碼並且不使用回調(amqp對象是一個使用扭曲框架的txamqp客戶端,基於事件的一個事件) – 2014-11-08 08:13:53