我有一個消費者,它監聽消息,如果消息流量超過消費者可處理的數量,我想啓動此消費者的另一個實例。對多個消費者的RPC調用
但我也希望能查詢從消費者(或多個)的信息,我的想法是,我可以使用RPC使用扇出交換,這樣所有的生產商獲取基於RPC請求從生產者信息呼叫。
我的問題首先是可能的,其次是合理的?
我有一個消費者,它監聽消息,如果消息流量超過消費者可處理的數量,我想啓動此消費者的另一個實例。對多個消費者的RPC調用
但我也希望能查詢從消費者(或多個)的信息,我的想法是,我可以使用RPC使用扇出交換,這樣所有的生產商獲取基於RPC請求從生產者信息呼叫。
我的問題首先是可能的,其次是合理的?
經過一番研究,似乎這是不可能的。如果你看看RabbitMQ.com上的教程,你會發現這個調用有一個id,據我瞭解,這個調用會被佔用。
我選擇了另一種方式,即讀取日誌文件和聚合數據。
如果問題是「是否可以發送一個RPC消息到多個服務器?」答案是肯定的。
當您構建一個RPC調用時,您會爲消息附加一個臨時隊列(通常在header.reply_to中,但您也可以使用內部消息字段)。這是RPC目標將發佈其答案的隊列。
當發送一個RPC到您可以接收臨時隊列多於一個消息中的單個服務器:這意味着一個RPC應答可以通過以下方式形成:
在這種情況下所產生的問題是
一個消息只是一些代碼來顯示你如何做到這一點(Python與皮卡庫)。注意,這遠非完美:最大的問題是,當你得到新的答案時,你應該重置超時時間。
def consume_rpc(self, queue, result_len=1, callback=None, timeout=None, raise_timeout=False):
if timeout is None:
timeout = self.rpc_timeout
result_list = []
def _callback(channel, method, header, body):
print "### Got 1/%s RPC result" %(result_len)
msg = self.encoder.decode(body)
result_dict = {}
result_dict.update(msg['content']['data'])
result_list.append(result_dict)
if callback is not None:
callback(msg)
if len(result_list) == result_len:
print "### All results are here: stopping RPC"
channel.stop_consuming()
def _outoftime():
self.channel.stop_consuming()
raise TimeoutError
if timeout != -1:
print "### Setting timeout %s seconds" %(timeout)
self.conn_broker.add_timeout(timeout, _outoftime)
self.channel.basic_consume(_callback, queue=queue, consumer_tag=queue)
if raise_timeout is True:
print "### Start consuming RPC with raise_timeout"
self.channel.start_consuming()
else:
try:
print "### Start consuming RPC without raise_timeout"
self.channel.start_consuming()
except TimeoutError:
pass
return result_list