1
我有一個基於asyncio的類,我想單元測試。使用tornado.testing.AsyncTestCase
這個工作很好,很容易。但是,我的課程的一個特定方法使用asyncio.ensure_future
來安排另一種方法的執行。這永遠不會在AsyncTestCase
中完成,因爲默認測試運行器使用龍捲風KQueueIOLoop
事件循環,而不是asyncio事件循環。使用asyncio事件循環運行tornado.testing.AsyncTestCase
class TestSubject:
def foo(self):
asyncio.ensure_future(self.bar())
async def bar(self):
pass
class TestSubjectTest(AsyncTestCase):
def test_foo(self):
t = TestSubject()
# here be somewhat involved setup with MagicMock and self.stop
t.foo()
self.wait()
$ python -m tornado.testing baz.testsubject_test
...
[E 160627 17:48:22 testing:731] FAIL
[E 160627 17:48:22 base_events:1090] Task was destroyed but it is pending!
task: <Task pending coro=<TestSubject.bar() running at ...>>
.../asyncio/base_events.py:362: RuntimeWarning: coroutine 'TestSubject.bar' was never awaited
如何使用不同的事件循環運行,以確保我的任務測試將實際執行?或者,我如何使我的實現事件獨立於循環並交叉兼容?