2016-02-17 99 views
1

我已經安裝了該插件從這裏rabbitmq-delayed-message-exchange發送延遲的消息。我怎麼會使用RabbitMQ的延遲消息交換插件在RabbitMQ的發送延遲的消息?

我找不到在Python中使用它的任何幫助。我剛開始使用rabbitmq。

這是我一直想:

import pika 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
channel = connection.channel() 
channel.exchange_declare("test-x", type="x-delayed-message", arguments={"x-delayed-type":"direct"}) 
channel.queue_declare(queue='task_queue',durable=True) 
channel.queue_bind(queue="task_queue", exchange="test-x", routing_key="task_queue") 
channel.basic_publish(exchange='test-x',routing_key='task_queue',body='Hello World! Delayed',arguments={"x-delay":100}) 
print(" [x] Sent 'Hello World! Delayed'") 
connection.close() 

這裏是交易所上市:

sudo rabbitmqctl list_exchanges 
Listing exchanges ... 
amq.direct direct 
test-x x-delayed-message 
amq.fanout fanout 
amq.match headers 
amq.headers headers 
    direct 
amq.rabbitmq.trace topic 
amq.topic topic 
amq.rabbitmq.log topic 

我沒有一個好主意,我怎麼能延遲參數傳遞給basic_publish功能

任何幫助表示讚賞

回答

1

您需要添加x-delay標頭指向您的消息屬性並指定毫秒中的延遲值。試試這個:

channel.basic_publish(
    exchange='test-x', 
    routing_key='task_queue', 
    body='Hello World! Delayed', 
    properties=pika.BasicProperties(headers={"x-delay": 1000}) 
) 
+0

它確實有效。將現在檢查延遲。很棒 –

1

實際上你可以推遲的消息,而無需使用插件。使用QUEUE TTL - - 使用消息TTL 如果在隊列中的所有消息將被延遲固定的時間使用的隊列TTL兔隊列 消息可以以2種方式 被延遲。 如果每個消息必須通過不同的時間延遲使用消息TTL。 我已經用python3和pika模塊解釋了它。以毫秒爲單位 鼠BasicProperties參數「到期」具有要被設置在延遲隊列延遲的消息。 設定到期時間之後,發佈消息到delayed_queue(「不實際隊列,消費者正在等待消耗」),一旦在delayed_queue消息到期,消息將使用交換「amq.direct」

def delay_publish(self, messages, queue, headers=None, expiration=0): 
    """ 
    Connect to RabbitMQ and publish messages to the queue 
    Args: 
     queue (string): queue name 
     messages (list or single item): messages to publish to rabbit queue 
     expiration(int): TTL in milliseconds for message 
    """ 
    delay_queue = "".join([queue, "_delay"]) 
    logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue)) 
    logging.info('Connecting to RabbitMQ: {host}'.format(
     host=self.rabbit_host)) 
    credentials = pika.PlainCredentials(
     RABBIT_MQ_USER, RABBIT_MQ_PASS) 
    parameters = pika.ConnectionParameters(
     rabbit_host, RABBIT_MQ_PORT, 
     RABBIT_MQ_VHOST, credentials, heartbeat_interval=0) 
    connection = pika.BlockingConnection(parameters) 

    channel = connection.channel() 
    channel.queue_declare(queue=queue, durable=True) 

    channel.queue_bind(exchange='amq.direct', 
         queue=queue) 
    delay_channel = connection.channel() 
    delay_channel.queue_declare(queue=delay_queue, durable=True, 
           arguments={ 
            'x-dead-letter-exchange': 'amq.direct', 
            'x-dead-letter-routing-key': queue 
           }) 

    properties = pika.BasicProperties(
     delivery_mode=2, headers=headers, expiration=str(expiration)) 

    if type(messages) not in (list, tuple): 
     messages = [messages] 

    try: 
     for message in messages: 
      try: 
       json_data = json.dumps(message) 
      except Exception as err: 
       logging.error(
        'Error Jsonify Payload: {err}, {payload}'.format(
         err=err, payload=repr(message)), exc_info=True 
       ) 
       if (type(message) is dict) and ('data' in message): 
        message['data'] = {} 
        message['error'] = 'Payload Invalid For JSON' 
        json_data = json.dumps(message) 
       else: 
        raise 

      try: 
       delay_channel.basic_publish(
        exchange='', routing_key=delay_queue, 
        body=json_data, properties=properties) 
      except Exception as err: 
       logging.error(
        'Error Publishing Data: {err}, {payload}'.format(
         err=err, payload=json_data), exc_info=True 
       ) 
       raise 

    except Exception: 
     raise 

    finally: 
     logging.info(
      'Done Publishing. Closing Connection to {queue}'.format(
       queue=delay_queue 
      ) 
     ) 
     connection.close() 
被路由到一個實際隊列
+0

不知道。雖然它很長,下次還會利用它。謝謝您的幫助。 –