0
我正在實施stomp消費者作爲圖書館。通過在其他應用程序中調用該庫,我應該能夠在ActiveMQ中獲取數據。我正在執行如下,但我在返回frame.body時遇到問題。我無法從課堂外檢索數據。Stomp消費者使用deferred.inlinecallback
from twisted.internet import defer
from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener
from stompest.config import StompConfig
from socket import gethostname
from uuid import uuid1
import json
class Consumer(object):
def __init__(self, amq_uri):
self.amq_uri = amq_uri
self.hostname = gethostname()
self.config = StompConfig(uri=self.amq_uri)
@defer.inlineCallbacks
def run(self, in_queue):
client = yield Stomp(self.config)
headers = {
StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
StompSpec.ID_HEADER: self.hostname,
'activemq.prefetchSize': '1000',
}
yield client.connect(headers=self._return_client_id())
client.subscribe(
in_queue,
headers,
listener=SubscriptionListener(self.consume)
)
try:
client = yield client.disconnected
except StompConnectionError:
yield client.connect(headers=self._return_client_id())
client.subscribe(
in_queue,
headers,
listener=SubscriptionListener(self.consume)
)
while True:
try:
yield client.disconnected
except StompProtocolError:
pass
except StompConnectionError:
yield client.connect(headers=self._return_client_id())
client.subscribe(
in_queue,
headers,
listener=SubscriptionListener(self.consume)
)
def _return_client_id(self):
client_id = {}
client_id['client-id'] = gethostname() + '-' + str(uuid1())
return client_id
def consume(self, client, frame):
data = json.loads(frame.body)
print 'Received Message Type {}'.format(type(data))
print 'Received Message {}'.format(data)
## I want to return data here. I am able to print the frame.body here.
# Call from another application
import Queue
from twisted.internet import reactor
amq_uri = 'tcp://localhost:61613'
in_queue = '/queue/test_queue'
c = Consumer(amq_uri)
c.run(in_queue)
print "data is from outside function", data # Should be able to get the data which is returned by consume here
reactor.run()
有人可以讓我知道我怎麼能做到這一點。 謝謝