2015-10-04 131 views
2

我有這段代碼,基本上它運行的是 channel.start_consuming()。 我希望它在一段時間後停止。Pika channel.stop_consuming不停止start_consuming循環

我認爲 channel.stop_consuming()是正確的方法:

def stop_consuming(self, consumer_tag=None): 
    """ Cancels all consumers, signalling the `start_consuming` loop to 
    exit. 

但它不工作:start_consuming()永遠不會結束(從執行不退出這個調用,「結束」從不打印)。

進口單元測試 進口鼠 進口線程 進口時間

_url = "amqp://user:[email protected]/aaa" 

class Consumer_test(unittest.TestCase): 

    def test_startConsuming(self): 

     def callback(channel, method, properties, body): 
      print("callback") 
      print(body) 

     def connectionTimeoutCallback(): 
      print("connecionClosedCallback") 

     def _closeChannel(channel_): 
      print("_closeChannel") 
      time.sleep(1) 
      print("close") 
      if channel_.is_open: 
       channel_.stop_consuming() 
       print("stop_cosuming") 
      else: 
       print("channel is closed") 
      #channel_.close() 

     params = pika.URLParameters(_url) 
     params.socket_timeout = 5 
     connection = pika.BlockingConnection(params) 
     #connection.add_timeout(2, connectionTimeoutCallback) 
     channel = connection.channel() 
     channel.basic_consume(callback, 
           queue='test', 
           no_ack=True) 

     t = threading.Thread(target=_closeChannel, args=[channel]) 
     t.start() 

     print("start_consuming") 
     channel.start_consuming() # start consuming (loop never ends) 
     connection.close() 
     print("end") 

connection.add_timeout解決我的問題,也許叫basic_cancel過,但我想用正確的方法。

感謝

注: 我無法迴應或本(pika, stop_consuming does not work)由於我的低信譽點添加註釋。

注2: 我認爲我沒有跨線程共享頻道或連接(派卡不支持這個),因爲我使用「channel_」作爲參數而不是「channel」實例(Am I錯誤?)。

+0

這裏回答了這裏: http://stackoverflow.com/questions/27624166/pika-stop-consuming-does-not-work –

回答

0

我有同樣的問題;因爲pika不是線程安全的。即連接和通道不能在線程間安全共享。

所以我用一個單獨的連接發送關機消息;然後停止使用callback函數中的原始頻道。