2011-10-02 35 views
0

我正在實現一個Ruby中的小型節儉(0.6.0)服務器,以便向具有多個連接(多個客戶端)的另一個協議的代理角色發送到單個服務器。我希望能夠在服務器端保持每個客戶端的數據,並跟蹤處理函數的多個調用中的「會話」參數。在節儉處理程序函數中獲取對等地址

我目前使用Thrift::NonblockingServer作爲SimpleServer似乎不允許併發連接。

我知道如何使用TCPSocket::peeraddrThrift::NonblockingServer::IOManager::Worker::run它讀取幀創建一個臨時MemoryBufferTransport並傳遞作爲輸入/輸出協議下到處理器,使得它似乎是信息不從那裏傳下來的。

有沒有一個乾淨的方法來做到這一點? 我正在考慮重新定義上面提到的Thrift::NonblockingServer::IOManager::Worker::run以在其他參數中包含fd或其他ID以處理或增強原始實例,但是因爲我還必須擔心生成的一個紅寶石代碼層(process_*方法class Processor)似乎有點沉重。

我不知道以前是否有人做過這樣的事情。

謝謝!

p.s.這是類似的問題this C++ thrift question

回答

0

這是我如何去改變Thrift::NonblockingServer::IOManager::Worker::run來支持這一點。

幾個注意事項:

  • 正如我提到的問題,我不認爲這是乾淨的(如果不出意外,我將不得不監測未來節儉版本在這個功能的改變,這是基於0.6。 0)。
  • 這是寫入與紅寶石1.8工作(或我會得到非翻譯addr /端口see my other question
  • 我是一個紅寶石新手..我敢肯定我已經做了一些這種「錯誤的」 (例如,$connections@@connectionsConnEntry或差異類?)。
  • 我知道,Thread.current哈希線程本地存儲具有a namespace pollution issue

首先,在一些核心模塊:

module MyThriftExt 

    $connections={} 

    class ConnEntry 
    attr_reader :addr_info, :attr 

    def initialize(thrift_fd) 
     @addr_info=thrift_fd.handle.peeraddr 
     @attr={} 
    end 

    def self.PreHandle(fd) 
     $connections[fd]=ConnEntry.new(fd) unless $connections[fd].is_a? ConnEntry 
     # make the connection entry as short-term thread-local variable 
     # (cleared in postHandle) 
     Thread.current[:connEntry]=$connections[fd] 
    end 

    def self.PostHandle() 
     Thread.current[:connEntry]=nil 
    end 

    def to_s() 
     "#{addr_info}" 
    end end end 


module Thrift class NonblockingServer 
    class IOManager 
     alias :old_remove_connection :remove_connection 
     def remove_connection(fd) 
     $connections.delete fd 
     old_remove_connection(fd) 
     end 

     class Worker 
     # The following is verbatim from thrift 0.6.0 except for the two lines 
     # marked with "Added" 
     def run 
      loop do 
      cmd, *args = @queue.pop 
      case cmd 
      when :shutdown 
       @logger.debug "#{self} is shutting down, goodbye" 
       break 
      when :frame 
       fd, frame = args 
       begin 
       otrans = @transport_factory.get_transport(fd) 
       oprot = @protocol_factory.get_protocol(otrans) 
       membuf = MemoryBufferTransport.new(frame) 
       itrans = @transport_factory.get_transport(membuf) 
       iprot = @protocol_factory.get_protocol(itrans) 
       MyThriftExt::ConnEntry.PreHandle(fd) # <<== Added 
       @processor.process(iprot, oprot) 
       MyThriftExt::ConnEntry.PostHandle  # <<== Added 
       rescue => e 
       @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}" 
       end 
      end 
      end 
     end 
     end 
    end 
    end 
end 

然後在處理任何時候,你可以連接訪問Thread.current[:connEntry].addr_info具體數據或存儲任何關於Thread.current[:connEntry].attr哈希中的連接。

相關問題