2012-09-13 73 views
3

我有一個消費者,它監聽消息,如果消息流量超過消費者可處理的數量,我想啓動此消費者的另一個實例。對多個消費者的RPC調用

但我也希望能查詢從消費者(或多個)的信息,我的想法是,我可以使用RPC使用扇出交換,這樣所有的生產商獲取基於RPC請求從生產者信息呼叫。

我的問題首先是可能的,其次是合理的?

回答

1

經過一番研究,似乎這是不可能的。如果你看看RabbitMQ.com上的教程,你會發現這個調用有一個id,據我瞭解,這個調用會被佔用。

我選擇了另一種方式,即讀取日誌文件和聚合數據。

3

如果問題是「是否可以發送一個RPC消息到多個服務器?」答案是肯定的。

當您構建一個RPC調用時,您會爲消息附加一個臨時隊列(通常在header.reply_to中,但您也可以使用內部消息字段)。這是RPC目標將發佈其答案的隊列。

當發送一個RPC到您可以接收臨時隊列多於一個消息中的單個服務器:這意味着一個RPC應答可以通過以下方式形成:

  • 單個消息從單個源
  • 來自單個源的多個消息
  • 多於從幾個來源

在這種情況下所產生的問題是

一個消息
  • 你何時停止收聽?如果您知道RPC服務器的數量,您可以等到每個RPC服務器都向您發送答案,否則您必須實施某種形式的超時
  • 您需要跟蹤答案的來源嗎?您可以在消息中添加一些特殊字段以保存此信息。消息順序相同。

只是一些代碼來顯示你如何做到這一點(Python與皮卡庫)。注意,這遠非完美:最大的問題是,當你得到新的答案時,你應該重置超時時間。

def consume_rpc(self, queue, result_len=1, callback=None, timeout=None, raise_timeout=False): 
     if timeout is None: 
      timeout = self.rpc_timeout 

     result_list = [] 

     def _callback(channel, method, header, body): 
      print "### Got 1/%s RPC result" %(result_len) 
      msg = self.encoder.decode(body) 
      result_dict = {} 
      result_dict.update(msg['content']['data']) 
      result_list.append(result_dict) 

      if callback is not None: 
       callback(msg) 

      if len(result_list) == result_len: 
       print "### All results are here: stopping RPC" 
       channel.stop_consuming() 

     def _outoftime(): 
      self.channel.stop_consuming() 
      raise TimeoutError 

     if timeout != -1: 
      print "### Setting timeout %s seconds" %(timeout) 
      self.conn_broker.add_timeout(timeout, _outoftime) 

     self.channel.basic_consume(_callback, queue=queue, consumer_tag=queue) 

     if raise_timeout is True: 
      print "### Start consuming RPC with raise_timeout" 
      self.channel.start_consuming() 
     else: 
      try: 
       print "### Start consuming RPC without raise_timeout" 
       self.channel.start_consuming() 
      except TimeoutError: 
       pass 

     return result_list 
相關問題