0
我必須將數據從測試用例傳遞到模擬服務器。 這樣做的最好方法是什麼?將數據異步傳遞給Python服務器類
這是我迄今爲止
mock_server.py
class ThreadedUDPServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
pass
class ThreadedUDPRequestHandler(SocketServer.BaseRequestHandler):
def __init__(self, request, client_address, server):
SocketServer.BaseRequestHandler.__init__(self,request,client_address,server)
def handle(self):
print server.data #this is where i need the data
class server_wrap:
def __init__(self):
self.server = ThreadedUDPServer(("127.0.0.1",49555) , ThreadedUDPRequestHandler)
def set_data(self,data)
self.server.data = data
def start(self)
server_thread = threading.Thread(target=self.server.serve_forever())
def stop(self)
self.server.shutdown()
test_mock的.py
server_inst = server_wrap()
server_inst.start()
#code which sets the data and expects the handle method to print the data set
server_inst.stop()
,我有這個代碼的問題是,在執行在server_inst.start()處停止,服務器進入無限偵聽模式
,我曾嘗試其他解決方案,但沒有成功
- 使用全局變量
- 使用隊列
- 開始mock_server.py 有自己主要
讓我瞭解其他可能的解決方案。在此先感謝
更新1:
使用單獨的線程將數據發送到套接字:
變化
test_mock.py
def test_set_data(data)
server_inst = server_wrap()
server_inst.set_data(data)
server_inst.start()
if __name__ == "__main__":
thread = Thread(target=test_set_data, args=("foo_data))
thread.setDaemon(True)
thread.start()
#test code which verifies if data set is same
#works so far, able to pass data
#problem starts now
thread = Thread(target=test_set_data, args=("bar_data))
thread.setDaemon(True)
thread.start()
#says address already in use error
#Tried calling server.shuddown() in handle , but error persists. Also there is no thread.shop in threading.Thread object
感謝
感謝調用它,就會給它一個嘗試,並更新你 –
我對着address_reuse_error由於不能夠關閉線程。我已經更新了這個問題來反映這一點。謝謝 –
你什麼時候得到地址重用? –