Sto cercando di imparare come (in modo idiomatico) utilizzare Python 3.4 asyncio
. Il mio più grande ostacolo è come "coroutines" a catena che consumano continuamente dati, aggiornano lo stato con esso e consentono a tale stato di essere utilizzato da un'altra coroutine.Come collego asyncio.coroutines che produce e consuma continuamente dati?
Il comportamento osservabile che mi aspetto da questo programma di esempio è semplicemente di segnalare periodicamente la somma dei numeri ricevuti da un sottoprocesso. La segnalazione dovrebbe avvenire all'incirca allo stesso tasso dell'oggetto Source
che riceve i numeri dal sottoprocesso. Il blocco dell'IO nella funzione di segnalazione non deve bloccare la lettura dal sottoprocesso. Se la funzione di reporting si blocca più a lungo di un'iterazione di lettura dal sottoprocesso, non mi interessa se salta in avanti o segnala un gruppo in una sola volta; ma ci dovrebbero essere circa il numero di iterazioni di reporter()
in quanto ci sono di expect_exact()
per un periodo di tempo abbastanza lungo.
#!/usr/bin/python3
import asyncio
import pexpect
class Source:
def __init__(self):
self.flag = asyncio.Event()
self.sum = 0
def start(self):
self.flag.set()
def stop(self):
self.flag.clear()
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
p = pexpect.spawn(
"python -c "
"'import random, time\n"
"while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
while self.flag.is_set():
yield from p.expect_exact('\n', async=True)
self.sum += int(p.before)
p.terminate()
@asyncio.coroutine
def reporter(source):
while True:
# Something like:
new_sum = yield from source # ???
print("New sum is: {:d}".format(new_sum))
# Potentially some other blocking operation
yield from limited_throughput.write(new_sum)
def main():
loop = asyncio.get_event_loop()
source = Source()
loop.call_later(1, source.start)
loop.call_later(11, source.stop)
# Again, not sure what goes here...
asyncio.async(reporter(source))
loop.run_until_complete(source.run())
loop.close()
if __name__ == '__main__':
main()
Questo esempio richiede pexpect
essere installato da Git; si potrebbe altrettanto facilmente sostituire con run()
:
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
while self.flag.is_set():
value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
self.sum += value
Ma il vero sottoprocesso mi interessa deve essere eseguito in un pty
, che a mio avviso significa che il sottoprocesso quadro trasporti/protocollo fornito in asyncio
non sarà sufficiente per questo. Il punto è che la sorgente dell'attività asincrona è una coroutine che può essere utilizzata con yield from
.
Si noti che la funzione reporter()
in questo esempio non è un codice valido; il mio problema è che non so cosa dovrebbe andare lì dentro. Idealmente, mi piacerebbe mantenere il codice reporter()
separato da run()
; il punto di questo esercizio è vedere come calcolare i programmi più complessi in unità più piccole di codice utilizzando i componenti in asyncio
.
C'è un modo per strutturare questo tipo di comportamento con il modulo asyncio
?