2016-02-19 65 views
0

我必須將數據從測試用例傳遞到模擬服務器。 這樣做的最好方法是什麼?將數據異步傳遞給Python服務器類

這是我迄今爲止

mock_server.py

class ThreadedUDPServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer): 
    pass 

class ThreadedUDPRequestHandler(SocketServer.BaseRequestHandler): 
    def __init__(self, request, client_address, server): 
     SocketServer.BaseRequestHandler.__init__(self,request,client_address,server) 

    def handle(self): 
     print server.data #this is where i need the data 

class server_wrap: 
    def __init__(self): 
     self.server = ThreadedUDPServer(("127.0.0.1",49555) , ThreadedUDPRequestHandler) 

    def set_data(self,data) 
     self.server.data = data 

    def start(self) 
     server_thread = threading.Thread(target=self.server.serve_forever()) 

    def stop(self) 
     self.server.shutdown() 

test_mock的.py

server_inst = server_wrap() 
server_inst.start() 
#code which sets the data and expects the handle method to print the data set 
server_inst.stop() 

,我有這個代碼的問題是,在執行在server_inst.start()處停止,服務器進入無限偵聽模式

,我曾嘗試其他解決方案,但沒有成功

  • 使用全局變量
  • 使用隊列
  • 開始mock_server.py 有自己主要

讓我瞭解其他可能的解決方案。在此先感謝

更新1:

使用單獨的線程將數據發送到套接字:

變化

test_mock.py

def test_set_data(data) 
    server_inst = server_wrap() 
    server_inst.set_data(data) 
    server_inst.start() 

if __name__ == "__main__": 
    thread = Thread(target=test_set_data, args=("foo_data)) 
    thread.setDaemon(True) 
    thread.start() 
    #test code which verifies if data set is same 
    #works so far, able to pass data 

    #problem starts now 
    thread = Thread(target=test_set_data, args=("bar_data)) 
    thread.setDaemon(True) 
    thread.start() 
    #says address already in use error 
    #Tried calling server.shuddown() in handle , but error persists. Also there is no thread.shop in threading.Thread object 

感謝

回答

1

服務器應該轉到聆聽模式。

你不需要server_inst.stop,直到所有的數據被髮送,並測試完成。也許在你測試拆除,或測試套件完成時。

要發送數據到服務器,並讓手柄選取它,您應該在anohter線程上打開一個套接字。然後通過此套接字將數據發送到服務器。

此代碼應該是這個樣子:

import socket 
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
sock.connect(("127.0.0.1",49555)) 
sock.send(... the data ...) 
received = sock.recv(1024) # the handle can send a response 
sock.close() 

添加功能在你的Django代碼,並在另一個線程運行。這個函數將打開套接字,連接,發送數據並獲得響應。您可以從一個視圖,中間件等

+0

感謝調用它,就會給它一個嘗試,並更新你 –

+0

我對着address_reuse_error由於不能夠關閉線程。我已經更新了這個問題來反映這一點。謝謝 –

+0

你什麼時候得到地址重用? –