2013-07-25 17 views
0

我正在尋找任何幫助。我想修復insert_order_queue()函數,以便能夠重新發送郵件到RabbitMQ如果郵件實際上沒有傳送到服務器。代碼設計:重新發送消息給兔子從嘗試塊除外

這是我當前的代碼:

def insert_order_queue(self, msg): 
    ''' Insert message into the queue ''' 
    if msg: 
     msg_props = pika.BasicProperties(delivery_mode=conf.rabbit_msg_props_delivery_mode, 
             content_type=conf.rabbit_msg_props_content_type) 
     logger.info('Message : %s' % msg) 
     try: 
      self.channel.basic_publish(body=json.dumps(msg), 
             exchange=conf.rabbit_exchange_name, 
             properties=msg_props, 
             routing_key=conf.rabbit_exchange_routing_key) 
     except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error: 
      logger.error('AMQP Connection failed. Trying again... %s' % error) 
      self._connect() 
      return 
    else: 
     logger.error('Something wrong') 

這是我_connect()方法:

def _connect(self): 
    ''' Connecting to the RabbitMQ, and declare queue ''' 
    logger.info('Trying to connect to RabbitMQ') 
    while True: 
     try: 
      conn_broker = pika.BlockingConnection(
       pika.ConnectionParameters(
        host=conf.rabbit_server, 
        port=conf.rabbit_port, 
        virtual_host=conf.rabbit_vhost, 
        ssl=conf.rabbit_ssl, # do not set it to True if there is no ssl! 
        heartbeat_interval=conf.rabbit_heartbeat_interval, 
        credentials=pika.PlainCredentials(
         conf.rabbit_user, 
         conf.rabbit_pass))) 
      logger.info('Successfully connected to Rabbit at %s:%s' % (conf.rabbit_server, conf.rabbit_port)) 
      channel = conn_broker.channel() 
      # Don't dispatch a new message to a worker until it has processed and acknowledged the previous one 
      channel.basic_qos(prefetch_count=conf.rabbit_prefetch_count) 
      status = channel.queue_declare(queue=conf.rabbit_queue_name, 
              durable=conf.rabbit_queue_durable, 
              exclusive=conf.rabbit_queue_exclusive, 
              passive=conf.rabbit_queue_passive) 
      if status.method.message_count == 0: 
       logger.info("Queue empty") 
      else: 
       logger.info('Queue status: %s' % status)     
      channel.queue_bind(
       queue=conf.rabbit_queue_name, 
       exchange=conf.rabbit_exchange_name, 
       routing_key=conf.rabbit_exchange_routing_key) 
      return channel 
     except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error: 
      time.sleep(3) 
      logger.error('Exception while connecting to Rabbit %s' % error) 
     else: 
      break 

回答

1

有幾個方面的消息不能被 「交付」

最顯而易見的是,「與兔子的連接已關閉」,在這種情況下,您只需重新連接並重新發送(就重新連接而言,您已擁有大部分邏輯,只需重新發送即時消息智者)。

然後有幾個變化「沒有人正在聽這條消息」。這些可以通過basic_publish上的立即和強制標誌來處理。請參閱此處瞭解更多信息:http://bunnyamqp.wordpress.com/2009/08/21/amqp-basic-publish-immediate-versus-mandatory/

最後,您可以添加確認回調。鼠兔,您可以設置這個回調:

https://github.com/pika/pika/blob/master/pika/channel.py#L387

從這個回調中,你可以決定再次或無法發送郵件。

相關問題