2015-03-29 8 views
5

Sto cercando di imparare come (in modo idiomatico) utilizzare Python 3.4 asyncio. Il mio più grande ostacolo è come "coroutines" a catena che consumano continuamente dati, aggiornano lo stato con esso e consentono a tale stato di essere utilizzato da un'altra coroutine.Come collego asyncio.coroutines che produce e consuma continuamente dati?

Il comportamento osservabile che mi aspetto da questo programma di esempio è semplicemente di segnalare periodicamente la somma dei numeri ricevuti da un sottoprocesso. La segnalazione dovrebbe avvenire all'incirca allo stesso tasso dell'oggetto Source che riceve i numeri dal sottoprocesso. Il blocco dell'IO nella funzione di segnalazione non deve bloccare la lettura dal sottoprocesso. Se la funzione di reporting si blocca più a lungo di un'iterazione di lettura dal sottoprocesso, non mi interessa se salta in avanti o segnala un gruppo in una sola volta; ma ci dovrebbero essere circa il numero di iterazioni di reporter() in quanto ci sono di expect_exact() per un periodo di tempo abbastanza lungo.

#!/usr/bin/python3 
import asyncio 
import pexpect 

class Source: 

    def __init__(self): 
     self.flag = asyncio.Event() 
     self.sum = 0 

    def start(self): 
     self.flag.set() 

    def stop(self): 
     self.flag.clear() 

    @asyncio.coroutine 
    def run(self): 
     yield from self.flag.wait() 

     p = pexpect.spawn(
      "python -c " 
      "'import random, time\n" 
      "while True: print(random.choice((-1, 1))); time.sleep(0.5)'") 

     while self.flag.is_set(): 
      yield from p.expect_exact('\n', async=True) 
      self.sum += int(p.before) 

     p.terminate() 

@asyncio.coroutine 
def reporter(source): 
    while True: 
     # Something like: 
     new_sum = yield from source # ??? 
     print("New sum is: {:d}".format(new_sum)) 
     # Potentially some other blocking operation 
     yield from limited_throughput.write(new_sum) 

def main(): 
    loop = asyncio.get_event_loop() 

    source = Source() 
    loop.call_later(1, source.start) 
    loop.call_later(11, source.stop) 

    # Again, not sure what goes here... 
    asyncio.async(reporter(source)) 

    loop.run_until_complete(source.run()) 
    loop.close() 

if __name__ == '__main__': 
    main() 

Questo esempio richiede pexpect essere installato da Git; si potrebbe altrettanto facilmente sostituire con run():

@asyncio.coroutine 
def run(self): 
    yield from self.flag.wait() 

    while self.flag.is_set(): 
     value = yield from asyncio.sleep(0.5, random.choice((-1, 1))) 
     self.sum += value 

Ma il vero sottoprocesso mi interessa deve essere eseguito in un pty, che a mio avviso significa che il sottoprocesso quadro trasporti/protocollo fornito in asyncio non sarà sufficiente per questo. Il punto è che la sorgente dell'attività asincrona è una coroutine che può essere utilizzata con yield from.

Si noti che la funzione reporter() in questo esempio non è un codice valido; il mio problema è che non so cosa dovrebbe andare lì dentro. Idealmente, mi piacerebbe mantenere il codice reporter() separato da run(); il punto di questo esercizio è vedere come calcolare i programmi più complessi in unità più piccole di codice utilizzando i componenti in asyncio.

C'è un modo per strutturare questo tipo di comportamento con il modulo asyncio?

risposta

5

Le primitive di blocco e le code in asyncio forniscono alcuni meccanismi per farlo.

Condizioni

Il asyncio.Condition() fornisce un modo per ricevere una notifica di una condizione. Usalo quando non importa se lasci cadere alcuni eventi.

class Source: 

    def __init__(self): 
     self.flag = asyncio.Event() 
     self.sum = 0 

     # For consumers 
     self.ready = asyncio.Condition() 

    def start(self): 
     self.flag.set() 

    def stop(self): 
     self.flag.clear() 

    @asyncio.coroutine 
    def run(self): 
     yield from self.flag.wait() 

     p = pexpect.spawn(
      "python -c " 
      "'import random, time\n" 
      "while True: print(random.choice((-1, 1))); time.sleep(0.5)'") 

     while self.flag.is_set(): 
      yield from p.expect_exact('\n', async=True) 
      self.sum += int(p.before) 
      with (yield from self.ready): 
       self.ready.notify_all() # Or just notify() depending on situation 

     p.terminate() 

    @asyncio.coroutine 
    def read(self): 
     with (yield from self.ready): 
      yield from self.ready.wait() 
      return self.sum 


@asyncio.coroutine 
def reporter(source): 
    while True: 
     # Something like: 
     new_sum = yield from source.read() 
     print("New sum is: {:d}".format(new_sum)) 
     # Other potentially blocking stuff in here 

code

Il asyncio.Queue() consente di mettere i dati in una coda (sia LIFO o FIFO) e avere qualcos'altro leggere da esso. Usalo se vuoi assolutamente rispondere a tutti gli eventi, anche se il tuo cliente rimane indietro (in tempo). Nota che se limiti la dimensione della coda, il tuo produttore alla fine bloccherà se il tuo consumatore è abbastanza lento.

Nota che questo ci consente di convertire sum anche in una variabile locale.

#!/usr/bin/python3 
import asyncio 
import pexpect 

class Source: 

    def __init__(self): 
     self.flag = asyncio.Event() 
     # NOTE: self.sum removed! 

     # For consumers 
     self.output = asyncio.Queue() 

    def start(self): 
     self.flag.set() 

    def stop(self): 
     self.flag.clear() 

    @asyncio.coroutine 
    def run(self): 
     yield from self.flag.wait() 

     sum = 0 

     p = pexpect.spawn(
      "python -c " 
      "'import random, time\n" 
      "while True: print(random.choice((-1, 1))); time.sleep(0.5)'") 

     while self.flag.is_set(): 
      yield from p.expect_exact('\n', async=True) 
      sum += int(p.before) 
      yield from self.output.put(sum) 

     p.terminate() 

    @asyncio.coroutine 
    def read(self): 
     return (yield from self.output.get()) 

@asyncio.coroutine 
def reporter(source): 
    while True: 
     # Something like: 
     new_sum = yield from source.read() 
     print("New sum is: {:d}".format(new_sum)) 
     # Other potentially blocking stuff here 

Nota che Python 3.4.4 aggiunge task_done() e join() metodi per la Queue, per consentire di terminare con garbo l'elaborazione di tutto quando si conosce il consumatore è finito (se del caso).

Problemi correlati