2016-03-09 34 views
1

我正在我的Ubuntu工作站上運行一些測試。這些基準開始填充隊列,其運行非常緩慢:我們可以加快通過RabbitMQ發佈消息

import pika 
import datetime 

if __name__ == '__main__': 
    try: 
     connection = pika.BlockingConnection(pika.ConnectionParameters(
       host='localhost')) 
     channel = connection.channel() 

     channel.queue_declare(queue='hello_durable', durable=True) 
     started_at = datetime.datetime.now() 
     properties = pika.BasicProperties(delivery_mode=2) 
     for i in range(0, 100000): 
      channel.basic_publish(exchange='', 
            routing_key='hello', 
            body='Hello World!', 
            properties=properties) 
      if i%10000 == 0: 
       duration = datetime.datetime.now() - started_at 
       print(i, duration.total_seconds()) 
     print(" [x] Sent 'Hello World!'") 
     connection.close() 
     now = datetime.datetime.now() 
     duration = now - started_at 
     print(duration.total_seconds()) 
    except Exception as e: 
     print(e) 

發送10K消息需要超過30秒的時間。根據最高命令,工作站有12個內核,這些內核並不繁忙。有超過8Gb的空閒內存。隊列是否耐用無關緊要。

我們如何加快發送消息?

+0

發佈在多線程? – Gabriele

回答

0

從BlockingConnection切換到SelectConnection產生了巨大的差異,加速了該過程幾乎五十次。我需要做的就是修改the following tutorial:的示例,發佈消息循環:

import pika 

# Step #3 
def on_open(connection): 

    connection.channel(on_channel_open) 

# Step #4 
def on_channel_open(channel): 

    channel.basic_publish('test_exchange', 
          'test_routing_key', 
          'message body value', 
          pika.BasicProperties(content_type='text/plain', 
               delivery_mode=1)) 

    connection.close() 

# Step #1: Connect to RabbitMQ 
parameters = pika.URLParameters('amqp://guest:[email protected]:5672/%2F') 

connection = pika.SelectConnection(parameters=parameters, 
            on_open_callback=on_open) 

try: 

    # Step #2 - Block on the IOLoop 
    connection.ioloop.start() 

# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly 
except KeyboardInterrupt: 

    # Gracefully close the connection 
    connection.close() 

    # Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed 
    connection.ioloop.start() 
1

假設您不運行任何消費者These benchmarks start with populating a queue。 由於您只發布消息,因此rabbitmq切換爲流狀態。更確切地說,你的交易所和/或隊列進入流動狀態。從rabbitmq blog

引用這(大約)意味着客戶端是被速率限制;它會 要發佈速度更快,但服務器無法跟上

我敢肯定,如果你看夠近,你會看到消息的第一部分(在初始設置,與空隊列)速度很快,但發送速率在某個點急劇下降。

相關問題