2015-02-19 16 views
5

Utilizzando asyncio un coroutine può essere eseguita con un timeout in modo che viene cancellato dopo il timeout:pitone asyncio forza timeout

@asyncio.coroutine 
def coro(): 
    yield from asyncio.sleep(10) 

loop = asyncio.get_event_loop() 
loop.run_until_complete(asyncio.wait_for(coro(), 5)) 

L'esempio precedente funziona come previsto (timeout dopo 5 secondi).

Tuttavia, quando la coroutine non utilizza asyncio.sleep() (o altre coroutine asyncio), non sembra che vada in timeout. Esempio:

@asyncio.coroutine 
def coro(): 
    import time 
    time.sleep(10) 

loop = asyncio.get_event_loop() 
loop.run_until_complete(asyncio.wait_for(coro(), 1)) 

Questo richiede più di 10 secondi per eseguire perché il time.sleep(10) non viene annullata. È possibile far valere l'annullamento della coroutine in tal caso?

Se asyncio dovrebbe essere usato per risolvere questo, come potrei farlo?

risposta

12

No, non è possibile interrompere una coroutine a meno che non restituisca il controllo al ciclo degli eventi, il che significa che deve essere all'interno di una chiamata yield from. asyncio è a thread singolo, quindi quando si blocca la chiamata time.sleep(10) nel secondo esempio, non è possibile eseguire il ciclo degli eventi. Ciò significa che quando scade il timeout impostato usando wait_for, il ciclo degli eventi non sarà in grado di intervenire su di esso. Il ciclo degli eventi non ha la possibilità di eseguire nuovamente fino alle uscite coro, a quel punto è troppo tardi.

Ecco perché, in generale, è necessario evitare sempre le chiamate di blocco che non sono asincroni; ogni volta che una chiamata si blocca senza cedere al ciclo degli eventi, non può eseguire nient'altro nel tuo programma, che probabilmente non è quello che vuoi. Se avete veramente bisogno di fare una lunga, operazione il blocco, si dovrebbe cercare di utilizzare BaseEventLoop.run_in_executor per eseguirlo in un pool di thread o un processo, che eviterà il blocco del ciclo di eventi:

import asyncio 
import time 
from concurrent.futures import ProcessPoolExecutor 

@asyncio.coroutine 
def coro(loop): 
    ex = ProcessPoolExecutor(2) 
    yield from loop.run_in_executor(ex, time.sleep, 10) # This can be interrupted. 

loop = asyncio.get_event_loop() 
loop.run_until_complete(asyncio.wait_for(coro(loop), 1)) 
+0

Un altro utile esempio qui: https://github.com/calebmadrigal/asyncio-examples/blob/master/run_in_executor.py – shrx

1

Thx @dano per la risposta. Se l'esecuzione di un coroutine non è un requisito difficile, ecco un rielaborato, più compatta versione

import asyncio, time, concurrent 

timeout = 0.5 
loop = asyncio.get_event_loop() 
future = asyncio.wait_for(loop.run_in_executor(None, time.sleep, 2), timeout) 
try: 
    loop.run_until_complete(future) 
    print('Thx for letting me sleep') 
except concurrent.futures.TimeoutError: 
    print('I need more sleep !') 

Per i curiosi, un po 'di debug nel mio Python 3.5.2 ha mostrato che il passaggio None come un risultato esecutore nella creazione di un _default_executor, come segue:

# _MAX_WORKERS = 5 
self._default_executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS) 
-1

Gli esempi che ho visto per il timeout sono molto banali. Data la realtà, la mia app è un po 'più complessa. La sequenza è:

  1. Quando un client si connette al server, che il server di creare un'altra connessione al server interno
  2. Quando la connessione server interno è ok, attendere che il client per inviare i dati. Sulla base di questi dati possiamo fare una query al server interno.
  3. Quando ci sono dati da inviare al server interno, inviarlo. Poiché il server interno a volte non risponde abbastanza velocemente, avvolgere questa richiesta in un timeout.
  4. Se i tempi di funzionamento fuori, crollano tutte le connessioni per segnalare il cliente circa l'errore

Per realizzare tutto quanto sopra, pur mantenendo l'evento del ciclo in esecuzione, il codice risultante contiene seguente codice:

def connection_made(self, transport): 
    self.client_lock_coro = self.client_lock.acquire() 
    asyncio.ensure_future(self.client_lock_coro).add_done_callback(self._got_client_lock) 

def _got_client_lock(self, task): 
    task.result() # True at this point, but call there will trigger any exceptions 
    coro = self.loop.create_connection(lambda: ClientProtocol(self), 
              self.connect_info[0], self.connect_info[1]) 
    asyncio.ensure_future(asyncio.wait_for(coro, 
              self.client_connect_timeout 
              )).add_done_callback(self.connected_server) 

def connected_server(self, task): 
    transport, client_object = task.result() 
    self.client_transport = transport 
    self.client_lock.release() 

def data_received(self, data_in): 
    asyncio.ensure_future(self.send_to_real_server(message, self.client_send_timeout)) 

def send_to_real_server(self, message, timeout=5.0): 
    yield from self.client_lock.acquire() 
    asyncio.ensure_future(asyncio.wait_for(self._send_to_real_server(message), 
                timeout, loop=self.loop) 
           ).add_done_callback(self.sent_to_real_server) 

@asyncio.coroutine 
def _send_to_real_server(self, message): 
    self.client_transport.write(message) 

def sent_to_real_server(self, task): 
    task.result() 
    self.client_lock.release() 
+0

Questa risposta non sembra rispondere alla domanda reale, anche io non penso che sia d'aiuto . (Da qui il downvote.) Imo troppe cose non correlate sono fatte nel codice e la gestione effettiva del timeout non è dimostrata chiaramente. Spero che questo feedback aiuti. – siebz0r

+0

Grazie per il tuo feedback. La vera domanda è che Coroutine può essere eseguita con un timeout, come fa il mio codice. Come ho affermato nella mia risposta, non vi è alcun codice da trovare nell'intero Internet in cui coroutine viene eseguita con timeout * senza * utilizzando 'loop.run_until_complete()', ecco perché l'ho postato. Dato anche il vincolo, il numero di metodi/funzioni sembra essere obbligatorio. Sentiti libero di fornire un codice più ottimizzato. –