2012-10-23 21 views
9

我想檢查一下消費者/工作者是否存在消費消息我即將發送。在Pika或RabbitMQ中,如何檢查是否有消費者目前正在消費?

如果沒有任何工人,我將開始一些工人(消費者和出版商都在一臺機器上),然後去發佈消息

如果有像connection.check_if_has_consumers一個功能,我會有點實現它這樣的 -

import pika 
import workers 

# code for publishing to worker queue 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
channel = connection.channel() 

# if there are no consumers running (would be nice to have such a function) 
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""): 
    # start the workers in other processes, using python's `multiprocessing` 
    workers.start_workers() 

# now, publish with no fear of your queues getting filled up 
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True) 
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin", 
          properties=pika.BasicProperties(delivery_mode=2)) 
connection.close() 

但我無法找到與check_if_has_consumers功能的任何功能鼠兔

有沒有辦法完成這個,使用pika?或者可以通過說話The Rabbit直接?

我不能完全肯定,但我真的覺得的RabbitMQ會知道使用者訂閱不同的隊列數量,因爲它派遣消息給他們,並接受的ACK

我只是開始了與RabbitMQ的 2小時前...任何幫助,歡迎...

這裏是workers.py代碼我寫的,如果它的任何幫助....

import multiprocessing 
import pika 


def start_workers(num=3): 
    """start workers as non-daemon processes""" 
    for i in xrange(num):  
     process = WorkerProcess() 
     process.start() 


class WorkerProcess(multiprocessing.Process): 
    """ 
    worker process that waits infinitly for task msgs and calls 
    the `callback` whenever it gets a msg 
    """ 
    def __init__(self): 
     multiprocessing.Process.__init__(self) 
     self.stop_working = multiprocessing.Event() 

    def run(self): 
     """ 
     worker method, open a channel through a pika connection and 
     start consuming 
     """ 
     connection = pika.BlockingConnection(
           pika.ConnectionParameters(host='localhost') 
        ) 
     channel = connection.channel() 
     channel.queue_declare(queue='worker_queue', auto_delete=False, 
                durable=True) 

     # don't give work to one worker guy until he's finished 
     channel.basic_qos(prefetch_count=1) 
     channel.basic_consume(callback, queue='worker_queue') 

     # do what `channel.start_consuming()` does but with stopping signal 
     while len(channel._consumers) and not self.stop_working.is_set(): 
      channel.transport.connection.process_data_events() 

     channel.stop_consuming() 
     connection.close() 
     return 0 

    def signal_exit(self): 
     """exit when finished with current loop""" 
     self.stop_working.set() 

    def exit(self): 
     """exit worker, blocks until worker is finished and dead""" 
     self.signal_exit() 
     while self.is_alive(): # checking `is_alive()` on zombies kills them 
      time.sleep(1) 

    def kill(self): 
     """kill now! should not use this, might create problems""" 
     self.terminate() 
     self.join() 


def callback(channel, method, properties, body): 
    """pika basic consume callback""" 
    print 'GOT:', body 
    # do some heavy lifting here 
    result = save_to_database(body) 
    print 'DONE:', result 
    channel.basic_ack(delivery_tag=method.delivery_tag) 

編輯:

我必須繼續前進所以這裏是一個解決辦法,我要帶,除非有更好的方法出現時,

所以,的RabbitMQ有這些HTTP management apis,他們的工作你已經打開了management plugin後和HTTP API頁面的中間有

/api/connections - 所有打開連接的列表。

/api/connections/name - 一個單獨的連接。刪除它將關閉連接。

所以,如果我連我的工人和我不同連接名/用戶主要生產兩個,我就可以檢查工人連接是開放的......(有可能是工人死亡時的問題...)

將等待更好的解決方案...

編輯:

剛剛發現這個在RabbitMQ的文檔,但是這將是哈克在Python做:

[email protected]:~$ sudo rabbitmqctl -p vhostname list_queues name consumers 
Listing queues ... 
worker_queue 0 
...done. 

,所以我可以做這樣的事情,

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'") 

hacky ...仍然希望pika有一些python功能可以做到這一點...

謝謝,

回答

7

我只是在考慮這一點。通過源和文檔閱讀後,我在channel.py跨越下面就來:

@property 
def consumer_tags(self): 
    """Property method that returns a list of currently active consumers 

    :rtype: list 

    """ 
    return self._consumers.keys() 

我自己的測試是成功的。我使用了以下通道對象self._channel:

if len(self._channel.consumer_tags) == 0: 
     LOGGER.info("Nobody is listening. I'll come back in a couple of minutes.") 
     ... 
0

我實際上發現這是在意外尋找一個不同的問題,但有一件事可能會幫助你在Basic_Publish函數,有一個參數「立即」,默認爲False。

你可以做的一個想法是將立即標誌設置爲真,這將要求消費者立即消費,而不是坐在隊列中。如果工作人員不能使用該消息,則會反彈一個錯誤,告訴您啓動另一個工作人員。

根據系統的吞吐量,這可能會產生大量額外的工人,或產卵工人來替換死亡的工人。對於前一個問題,您可以編寫一個類似管理員的系統,通過一個控制隊列簡單地跟蹤工作人員,在這裏您可以告訴像運行程序那樣的「Runner」,以殺死現在不再需要的工作進程。