2017-04-12 148 views
0

使用RabbitMQ和pika(python),我運行一個工作排隊系統,爲節點(異步消費者)提供任務。定義任務的每條消息只有在任務完成後纔會被確認。RabbitMQ pika消費者取消後的異步消費者心跳問題

有時我需要在這些節點上執行更新,並且我創建了一個退出模式,節點等待任務完成,然後正常退出。然後我可以執行我的維護工作。

爲了使節點在退出模式下不會從RabbitMQ獲得更多消息,我讓它在等待作業完成之前調用basic_cancel方法。

這種方法的這種效果的鼠兔文檔中描述:

This method cancels a consumer. This does not affect already 
delivered messages, but it does mean the server will not send any more 
messages for that consumer. The client may receive an arbitrary number 
of messages in between sending the cancel method and receiving the 
cancel-ok reply. It may also be sent from the server to the client in 
the event of the consumer being unexpectedly cancelled (i.e. cancelled 
for any reason other than the server receiving the corresponding 
basic.cancel from the client). This allows clients to be notified of 
the loss of consumers due to events such as queue deletion. 

所以,如果你看過「已交付的郵件」爲已接收的消息,但不一定承認,出口模式允許等待的任務因爲即使運行它的消費者節點將自己從排隊系統中取消,也不應重新排隊。

我爲我的異步消費類(從鼠例如採取)的停止功能的代碼是類似這樣的:

def stop(self): 
    """Cleanly shutdown the connection to RabbitMQ by stopping the consumer 
    with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok 
    will be invoked by pika, which will then closing the channel and 
    connection. The IOLoop is started again because this method is invoked 
    when CTRL-C is pressed raising a KeyboardInterrupt exception. This 
    exception stops the IOLoop which needs to be running for pika to 
    communicate with RabbitMQ. All of the commands issued prior to starting 
    the IOLoop will be buffered but not processed. 

    """ 
    LOGGER.info('Stopping') 
    self._closing = True 
    self.stop_consuming() 
    LOGGER.info('Waiting for all running jobs to complete') 
    for index, thread in enumerate(self.threads): 
     if thread.is_alive(): 
      thread.join() 
      # also tried with a while loop that waits 10s as long as the 
      # thread is still alive 
      LOGGER.info('Thread {} has finished'.format(index)) 

    # also tried moving the call to stop consuming up to this point 
    if self._connection!=None: 
     self._connection.ioloop.start() 
     LOGGER.info('Closing connection') 
     self.close_connection() 

我的問題是,消費者取消後,異步消費者似乎不即使我在等待我的任務(線程)完成的循環後執行取消操作,也不會再發送心跳。

我已閱讀有關BlockingConnections的process_data_events函數,但找不到這樣的函數。 SelectConnection類的ioloop是否等同於異步使用者?

由於處於退出模式的節點不再發送心跳,所以當前執行的任務將在達到最大心跳後由RabbitMQ重新進行處理。我想保持這種心跳不變,因爲無論如何,當我不在退出模式時,這不是問題(我的心跳約爲100秒,而我的任務可能需要2個小時才能完成)。

望着RabbitMQ的日誌,心跳確實是原因:

=ERROR REPORT==== 12-Apr-2017::19:24:23 === 
closing AMQP connection (.....) : 
missed heartbeats from client, timeout: 100s 

我能想到的唯一解決方法是承認對應任務退出模式時仍在運行的消息,並希望這些任務不會失敗...

有沒有來自通道或連接的任何方法,我可以用來在等待時手動發送一些心跳信號?

問題是time.sleep()或thread.join()方法(來自python線程包)是否完全阻塞並且不允許其他線程執行他們所需的操作?我用於其他應用程序,他們似乎並沒有這樣做。

由於這個問題只出現在退出模式,我想有停止功能中的東西,導致消費者停止發送心跳,但正如我也試過(沒有任何成功)只在調用stop_consuming方法後等待運行任務循環,我沒有看到這個問題的根源。

非常感謝您的幫助!

回答

0

事實證明stop_consuming函數以與channel.close()函數的回調以異步方式調用basic_cancel,導致我的應用程序停止它的RabbitMQ交互,並且RabbitMQ重新處理unackesdmessages。實際上已經意識到,隨着線程試圖稍後確認剩餘的任務出現錯誤,因爲通道現在設置爲無,因此不再有ack方法。

希望它可以幫助別人!