2011-11-18 65 views
4

我需要能夠優雅地阻止在Pika ioloop中工作的消費者(工作人員)。工人應該在60秒後停下來。當前處理的消息應該完成。Pika ioloop異步套裝超時時間(RabbitMQ)

我試圖把一個connection.close()在回調函數內,但只停止當前線程而不是完整的ioloop。它給了一個可怕的錯誤輸出。

請參閱第16行,並在我的代碼如下:我用的(約鼠兔基本的例子ioloop http://pika.github.com/connecting.html#cps-example

from pika.adapters import SelectConnection 
    channel = None 
    def on_connected(connection): 
     connection.channel(on_channel_open) 

    def on_channel_open(new_channel): 
     global channel 
     channel = new_channel 
     channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared) 

    def on_queue_declared(frame): 
     channel.basic_consume(handle_delivery, queue='test') 

    def handle_delivery(channel, method, header, body): 
     print body 

     # timer stuff which did NOT work 
     global start_time, timeout, connection 
     time_diff = time.time()-start_time 
     if time_diff > timeout: 
      #raise KeyboardInterrupt 
      connection.close() 

    timeout = 60 
    start_time = time.time() 

    connection = SelectConnection(parameters, on_connected) 

    try: 
     connection.ioloop.start() 
    except KeyboardInterrupt: 
     connection.close() 
     connection.ioloop.start() 

回答

9

您可以打開的連接上附加一個超時回調函數 這裏是。 。您例如額外的代碼

timeout = 60 

def on_timeout(): 
    global connection 
    connection.close() 

connection.add_timeout(timeout, on_timeout) 
-4

你可以嘗試使用:

connection.ioloop.stop()