每PEP-492我想實現一個異步迭代器,這樣我可以做例如實現一個異步迭代器
async for foo in bar:
...
下面是一個簡單的例子,一個類似的文檔,用實例和異步迭代的一個非常基本的測試:
import pytest
class TestImplementation:
def __aiter__(self):
return self
async def __anext__(self):
raise StopAsyncIteration
@pytest.mark.asyncio # note use of pytest-asyncio marker
async def test_async_for():
async for _ in TestImplementation():
pass
然而,當我執行我的測試套件,我見:
=================================== FAILURES ===================================
________________________________ test_async_for ________________________________
@pytest.mark.asyncio
async def test_async_for():
> async for _ in TestImplementation():
E TypeError: 'async for' received an invalid object from __aiter__: TestImplementation
...: TypeError
===================== 1 failed, ... passed in 2.89 seconds ======================
爲什麼我的TestImplementation
看起來無效?至於我可以告訴它符合協議:
- 的對象必須實現一個
__aiter__
方法...返回異步迭代器對象。- 異步迭代器對象必須實現一個
__anext__
方法...返回一個等待狀態。- 停止迭代
__anext__
必須引發StopAsyncIteration
異常。
這與Python的最新發布的版本(3.5.1),py.test
(2.9.2)和pytest-asyncio
(0.4.1)失敗。
經過近3年,你終於[問了一個問題](http://stackoverflow.com/users/3001761/jonrsharpe?tab=questions)。榮譽 –
@BhargavRao我感覺像2,076:1是正確的; o) – jonrsharpe
適用於pytest 2.9.2。你使用什麼版本? –