2016-03-15 42 views
0

我正在實施stomp消費者作爲圖書館。通過在其他應用程序中調用該庫,我應該能夠在ActiveMQ中獲取數據。我正在執行如下,但我在返回frame.body時遇到問題。我無法從課堂外檢索數據。Stomp消費者使用deferred.inlinecallback

from twisted.internet import defer 
from stompest.async import Stomp 
from stompest.async.listener import SubscriptionListener 
from stompest.config import StompConfig 
from socket import gethostname 
from uuid import uuid1 

import json 

class Consumer(object): 
     def __init__(self, amq_uri): 
      self.amq_uri = amq_uri 
      self.hostname = gethostname() 
      self.config = StompConfig(uri=self.amq_uri) 


    @defer.inlineCallbacks 
    def run(self, in_queue): 
     client = yield Stomp(self.config) 
     headers = { 
      StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL, 
      StompSpec.ID_HEADER: self.hostname, 
      'activemq.prefetchSize': '1000', 
      } 

     yield client.connect(headers=self._return_client_id()) 
     client.subscribe(
       in_queue, 
       headers, 
       listener=SubscriptionListener(self.consume) 
       ) 

     try: 
      client = yield client.disconnected 
     except StompConnectionError: 
      yield client.connect(headers=self._return_client_id()) 
      client.subscribe(
       in_queue, 
       headers, 
       listener=SubscriptionListener(self.consume) 
       ) 

      while True: 
       try: 
        yield client.disconnected 
       except StompProtocolError: 
        pass 
       except StompConnectionError: 
        yield client.connect(headers=self._return_client_id()) 
       client.subscribe(
         in_queue, 
         headers, 
         listener=SubscriptionListener(self.consume) 
         ) 

     def _return_client_id(self): 
      client_id = {} 
      client_id['client-id'] = gethostname() + '-' + str(uuid1()) 
      return client_id 

     def consume(self, client, frame): 
      data = json.loads(frame.body) 
      print 'Received Message Type {}'.format(type(data)) 
      print 'Received Message {}'.format(data) 
      ## I want to return data here. I am able to print the frame.body here. 


     # Call from another application 
      import Queue 
      from twisted.internet import reactor 

      amq_uri = 'tcp://localhost:61613' 
      in_queue = '/queue/test_queue' 

      c = Consumer(amq_uri) 
      c.run(in_queue) 
      print "data is from outside function", data # Should be able to get the data which is returned by consume here 
      reactor.run() 

有人可以讓我知道我怎麼能做到這一點。 謝謝

回答

0

我找到了解決我的問題。我使用了同步stomp庫,而不是使用異步stomp庫。實施如下,

class Consumer(object): 
    def __init__(self, amq_uri): 
     self.amq_uri = amq_uri 
     self.hostname = gethostname() 
     self.config = StompConfig(uri=self.amq_uri) 

    def run(self, in_queue, return_dict): 
     client = Stomp(self.config) 
     headers = { 
     StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL, 
     StompSpec.ID_HEADER: self.hostname 
      } 
     client.connect() 
     client.subscribe(in_queue, headers) 
     try: 
      frame = client.receiveFrame() 
      data = json.dumps(frame.body) 
     except Exception as exc: 
      print exc 
     client.ack(frame) 
     client.disconnect() 
     return_dict['data'] = data 
     return data