2010-12-08 117 views
1

您好,我正在開發一個基於twisted的rpc服務器來服務幾個微控制器,這些微控制器使rpc調用到扭曲的jsonrpc服務器。但是應用程序還要求服務器隨時向每個微型計算機發送信息,所以問題在於如何防止來自微型計算機的遠程jsonrpc調用的響應與服務器jsonrpc請求混淆一個用戶。如何實現一個雙向jsonrpc + twisted服務器/客戶端

現在我得到的結果是微型計算機正在接收不良信息,因爲他們不知道來自套接字的netstring/json字符串是來自先前需求的響應還是來自服務器的新請求。

這裏是我的代碼:

from twisted.internet import reactor 
from txjsonrpc.netstring import jsonrpc 
import weakref 

creds = {'user1':'pass1','user2':'pass2','user3':'pass3'} 

class arduinoRPC(jsonrpc.JSONRPC): 
    def connectionMade(self): 
     pass 

    def jsonrpc_identify(self,username,password,mac): 
     """ Each client must be authenticated just after to be connected calling this rpc """ 
     if creds.has_key(username): 
      if creds[username] == password: 
       authenticated = True 
      else: 
       authenticated = False 
     else: 
      authenticated = False 

     if authenticated: 
      self.factory.clients.append(self) 
      self.factory.references[mac] = weakref.ref(self) 
      return {'results':'Authenticated as %s'%username,'error':None} 
     else: 
      self.transport.loseConnection() 

    def jsonrpc_sync_acq(self,data,f): 
     """Save into django table data acquired from sensors and send ack to gateway""" 
     if not (self in self.factory.clients): 
      self.transport.loseConnection() 
     print f 
     return {'results':'synced %s records'%len(data),'error':'null'} 

    def connectionLost(self, reason): 
     """ mac address is searched and all reference to self.factory.clientes are erased """ 
     for mac in self.factory.references.keys(): 
      if self.factory.references[mac]() == self: 
       print 'Connection closed - Mac address: %s'%mac 
       del self.factory.references[mac] 
       self.factory.clients.remove(self) 


class rpcfactory(jsonrpc.RPCFactory): 
    protocol = arduinoRPC 
    def __init__(self, maxLength=1024): 
     self.maxLength = maxLength 
     self.subHandlers = {} 
     self.clients = [] 
     self.references = {} 

""" Asynchronous remote calling to micros, simulating random calling from server """ 
import threading,time,random,netstring,json 
class asyncGatewayCalls(threading.Thread): 
    def __init__(self,rpcfactory): 
     threading.Thread.__init__(self) 
     self.rpcfactory = rpcfactory 
     """identifiers of each micro/client connected""" 
     self.remoteMacList = ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90'] 
    def run(self): 
     while True: 
      time.sleep(10) 
      while True: 
       """ call to any of three potential micros connected """ 
       mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))] 
       if self.rpcfactory.references.has_key(mac): 
        print 'Calling %s'%mac 
        proto = self.rpcfactory.references[mac]() 
        """ requesting echo from selected micro""" 
        dataToSend = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']})) 
        proto.transport.write(dataToSend) 
        break 

factory = rpcfactory(arduinoRPC) 

"""start thread caller""" 
r=asyncGatewayCalls(factory) 
r.start() 

reactor.listenTCP(7080, factory) 
print "Micros remote RPC server started" 
reactor.run() 

回答

2

你需要足夠的信息添加到每個郵件,以便收件人可以決定如何解釋它。您的要求聽起來與AMP非常相似,因此您可以使用AMP代替或使用與AMP相同的結構來識別您的消息。具體來說:

  • 在請求中,放置一個特定的密鑰 - 例如,AMP使用「_ask」來標識請求。它還爲這些提供了一個獨特的價值,它進一步確定了連接生命週期的請求。
  • 在迴應中,輸入了一個不同的鍵 - 例如,AMP使用「_answer」表示這一點。該值與請求響應中的「_ask」鍵值相匹配。

使用這種方法,您只需查看是否有「_ask」鍵或「_answer」鍵來確定您是否收到了新請求或對先前請求的響應。

在另一個主題上,您的asyncGatewayCalls類不應該是基於線程的。它沒有明顯的理由使用線程,並且這樣做也會濫用Twisted API,從而導致未定義的行爲。大多數Twisted API只能用於您調用reactor.run的線程。唯一的例外是reactor.callFromThread,您可以使用它從任何其他線程向反應器線程發送消息。 asyncGatewayCalls嘗試寫入傳輸,但這會導致緩衝區損壞或發送數據的任意延遲,或者更糟糕的情況。取而代之的是,你可以寫asyncGatewayCalls這樣的:

from twisted.internet.task import LoopingCall 

class asyncGatewayCalls(object): 
    def __init__(self, rpcfactory): 
     self.rpcfactory = rpcfactory 
     self.remoteMacList = [...] 

    def run(): 
     self._call = LoopingCall(self._pokeMicro) 
     return self._call.start(10) 

    def _pokeMicro(self): 
     while True: 
      mac = self.remoteMacList[...] 
      if mac in self.rpcfactory.references: 
       proto = ... 
       dataToSend = ... 
       proto.transport.write(dataToSend) 
       break 

factory = ... 
r = asyncGatewayCalls(factory) 
r.run() 

reactor.listenTCP(7080, factory) 
reactor.run() 

這給了你應該有相同的行爲,你打算爲原asyncGatewayCalls類單線程解決方案。爲了安排調用,它不是在線程中的循環中休眠,而是使用反應堆的調度API(通過更高級別的LoopingCall類,該類調度要重複調用的東西)以確保每10秒調用一次_pokeMicro

+0

是的你的權利,幾小時前,我正在閱讀線程API文檔(task.LoopingCall)後得出同樣的結論。我測試過,效果很好。感謝您的幫助 – Jaime 2010-12-10 11:27:48

相關問題