2014-06-20 97 views
0

我需要建立一個能夠以編程方式控制的Kombu消費者。我見過的所有例子都只是告訴你使用ctrl-c來停止程序。Kombu消費者作爲節儉服務

我的主要應用程序是作爲一個扭曲Thrift服務運行,我想我可以以某種方式使用Twisted reactor來處理我的消費者內的eventloop,但我無法弄清楚如何。

這是我的消費類。 start_consuming()部分是好的,除了它是阻塞的,我不能從外部調用stop_consuming()。

from kombu import BrokerConnection, Exchange, eventloop, Queue, Consumer 


class DMS(): 
    __routing_key = None 
    __is_consuming = None 
    __message_counter = 0 

    def __init__(self, routing_key): 
     print 'server: __init__()' 
     self.__routing_key = routing_key 

    def __handle_message(self, body, message): 
     self.__message_counter += 1 

     # Print count every 10,000 messsages. 
     if (self.__message_counter % 10000) == 0: 
      print self.__message_counter 

    def start_consuming(self): 
     print 'server: start_consuming()' 
     self.__is_consuming = True 
     exchange = Exchange('raven-exchange', type='topic', durable=False) 
     queue = Queue(self.__routing_key, exchange, routing_key=self.__routing_key) 

     with BrokerConnection('amqp://guest:[email protected]:5672//') as connection: 
      with Consumer(connection, queue, callbacks=[self.__handle_message]) as consumer: 
       for _ in eventloop(connection): 

        if self.__is_consuming: 
         pass 
        else: 
         break 

       consumer.cancel() 
      connection.close() 

    def stop_consuming(self): 
     print 'server: stop_consuming()' 
     self.__is_consuming = False 

回答

0

的推薦方法路線儲蓄服務電話通過MQ系統是通過電話oneway,因爲這是通過MQ和MessageBus系統通信的最自然的方式。

struct Foo { 
    1: string whoa 
    2: i32 counter 
} 

service Whatever { 
    oneway void FooBar(1: Foo someData, 2:i32 moreData) 
} 

一個oneway呼叫是節儉RPC調用的一種特殊形式:顧名思義,將呼叫轉至只在一個方向。 oneway不使用返回值也不使用例外(實際上是返回值)。該調用只發送輸入參數,並不等待任何返回的值。

爲了建立雙向通信,客戶端需要實現一個類似的服務,旨在接收傳入的應答消息。在Thrift /contrib folder有一些樣本,具有0MQ,Rebus和Stomp。雖然他們沒有專門處理Python,但主要想法應該變得清晰。