Per PEP-492 Sto tentando di implementare un iteratore asincrono, tale che posso fare ad es.Implementazione di un iteratore asincrono
async for foo in bar:
...
Ecco un esempio banale, simile a quello nella documentazione, con un test di base di esemplificazione e l'iterazione async:
import pytest
class TestImplementation:
def __aiter__(self):
return self
async def __anext__(self):
raise StopAsyncIteration
@pytest.mark.asyncio # note use of pytest-asyncio marker
async def test_async_for():
async for _ in TestImplementation():
pass
Tuttavia, quando eseguo la mia suite di test, vedo :
=================================== FAILURES ===================================
________________________________ test_async_for ________________________________
@pytest.mark.asyncio
async def test_async_for():
> async for _ in TestImplementation():
E TypeError: 'async for' received an invalid object from __aiter__: TestImplementation
...: TypeError
===================== 1 failed, ... passed in 2.89 seconds ======================
Perché il mio TestImplementation
sembra essere valida? Per quanto posso dire che incontra il protocollo:
- Un oggetto deve implementare un metodo
__aiter__
... restituisce un oggetto iteratore asincrono.- Un oggetto iteratore asincrono deve implementare un metodo
__anext__
... restituendo un aspetto attendibile.- Per interrompere l'iterazione
__anext__
è necessario generare un'eccezioneStopAsyncIteration
.
Questo sta fallendo con le ultime versioni rilasciate di Python (3.5.1), py.test
(2.9.2) e pytest-asyncio
(0.4.1).
Dopo quasi 3 anni, hai finalmente [fatto una domanda] (http://stackoverflow.com/users/3001761/jonrsharpe?tab=questions). Complimenti –
@BhargavRao Mi sono sentito come 2.076: 1 riguardava la parte destra; o) – jonrsharpe
Funziona per me con pytest 2.9.2. Che versione stai usando? –