我試圖測試一些在斷開連接後重新連接到服務器的代碼。這在測試之外很好地工作,但是在運行測試時它無法確認套接字已經斷開連接。如何在單元測試中終止套接字以重新連接測試
我使用的是GEVENT流服務器來模擬一個真實的聆聽服務器:
import gevent.server
from gevent import queue
class TestServer(gevent.server.StreamServer):
def __init__(self, *args, **kwargs):
super(TestServer, self).__init__(*args, **kwargs)
self.sockets = {}
def handle(self, socket, address):
self.sockets[address] = (socket, queue.Queue())
socket.sendall('testing the connection\r\n')
gevent.spawn(self.recv, address)
def recv(self, address):
socket = self.sockets[address][0]
queue = self.sockets[address][1]
print 'Connection accepted %s:%d' % address
try:
for data in socket.recv(1024):
queue.put(data)
except:
pass
def murder(self):
self.stop()
for sock in self.sockets.iteritems():
print sock
sock[1][0].shutdown(socket.SHUT_RDWR)
sock[1][0].close()
self.sockets = {}
def run_server():
test_server = TestServer(('127.0.0.1', 10666))
test_server.start()
return test_server
而且我的測試是這樣的:
def test_can_reconnect(self):
test_server = run_server()
client_config = {'host': '127.0.0.1', 'port': 10666}
client = Connection('test client', client_config, get_config())
client.connect()
assert client.socket_connected
test_server.murder()
#time.sleep(4) #tried sleeping. no dice.
assert not client.socket_connected
assert client.server_disconnect
test_server = run_server()
client.reconnect()
assert client.socket_connected
它未能在assert not client.socket_connected
。
我在recv期間檢測到「不是數據」。如果它是None,那麼我設置一些變量,以便其他代碼可以決定是否重新連接(如果是user_disconnect等,則不要重新連接)。這種行爲起作用,過去一直對我有用,直到現在我從來沒有試過對它進行測試。套接字連接和本地函數作用域有什麼奇怪的地方?就像停止服務器後連接仍然存在。
我想要測試的代碼是開放的:https://github.com/kyleterry/tenyks.git
如果你運行測試,你會看到一個我試圖修復失敗。
包裝是一個kludge。在Python中,有更多優雅的解決方案來替代外部對象,例如['unittest.mock.patch'](http://docs.python.org/dev/library/unittest.mock#id4)。 –