2015-08-24 10 views
6

Come esempio semplice, considerare l'equivalente di rete di/dev/zero, di seguito. (. O più realisticamente, solo un web server l'invio di un file di grandi dimensioni)Come rilevare errori di scrittura in asyncio?

Se un cliente si disconnette presto, si ottiene una raffica di messaggi di log:

WARNING:asyncio:socket.send() raised exception. 

Ma io non sto trovando alcun modo per la cattura ha detto eccezione. L'ipotetico server continua a leggere i gigabyte dal disco e li invia a un socket morto, senza alcuno sforzo da parte del client, e ti sei procurato un attacco DoS.

L'unica cosa che ho trovato dai documenti è di cedere da una lettura, con una stringa vuota che indica la chiusura. Ma non va bene qui perché un normale client non invierà nulla, bloccando il ciclo di scrittura.

Qual è il modo corretto per rilevare le scritture non riuscite o essere avvisati che la connessione TCP è stata chiusa, con l'API dello streaming o altro?

Codice: API basata

from asyncio import * 
import logging 

@coroutine 
def client_handler(reader, writer): 
    while True: 
     writer.write(bytes(1)) 
     yield from writer.drain() 

logging.basicConfig(level=logging.INFO) 
loop = get_event_loop() 
coro = start_server(client_handler, '', 12345) 
server = loop.run_until_complete(coro) 
loop.run_forever() 
+1

** Aggiornamento: ** Questo è ora [corretto] (https://github.com/python/asyncio/pull/280) a monte nella libreria 'asyncio'. –

risposta

2

Questo è un po 'strano, ma si può effettivamente consentire un'eccezione per raggiungere il client_handler coroutine, costringendola a cedere il controllo al ciclo di eventi per un'iterazione:

import asyncio 
import logging 

@asyncio.coroutine 
def client_handler(reader, writer): 
    while True: 
     writer.write(bytes(1)) 
     yield # Yield to the event loop 
     yield from writer.drain() 

logging.basicConfig(level=logging.INFO) 
loop = asyncio.get_event_loop() 
coro = asyncio.start_server(client_handler, '', 12345) 
server = loop.run_until_complete(coro) 
loop.run_forever() 

Se lo faccio, ho questa uscita quando uccido la connessione client:

ERROR:asyncio:Task exception was never retrieved 
future: <Task finished coro=<client_handler() done, defined at aio.py:4> exception=ConnectionResetError(104, 'Connection reset by peer')> 
Traceback (most recent call last): 
    File "/usr/lib/python3.4/asyncio/tasks.py", line 238, in _step 
    result = next(coro) 
    File "aio.py", line 9, in client_handler 
    yield from writer.drain() 
    File "/usr/lib/python3.4/asyncio/streams.py", line 301, in drain 
    raise exc 
    File "/usr/lib/python3.4/asyncio/selector_events.py", line 700, in write 
    n = self._sock.send(data) 
ConnectionResetError: [Errno 104] Connection reset by peer 

sono davvero non del tutto sicuro perché è necessario far in modo esplicito il ciclo degli eventi ottenere il controllo per l'eccezione per passare - non farlo avere tempo al momento di scavare in esso. Presumo che un po 'abbia bisogno di essere capovolto per indicare che la connessione è caduta e chiamare yield from writer.drain() (che può causare un cortocircuito attraverso il ciclo degli eventi) in un ciclo impedisce che ciò accada, ma non sono sicuro. Se avrò la possibilità di indagare, aggiornerò la risposta con tali informazioni.

1

Il flusso non ha una richiamata è possibile specificare per quando la connessione è chiusa. Ma l'API protocollo fa, in modo da utilizzare al posto: https://docs.python.org/3/library/asyncio-protocol.html#connection-callbacks

+0

Non è la risposta che speravo, ma ho detto "o altrimenti", quindi immagino sia giusto. Per me, un avvertimento significa che sto facendo qualcosa di sbagliato. Ma stai dicendo che non esiste un modo "giusto" per farlo con i flussi? – jwelsh

4

Ho scavato nella sorgente asyncio per espandere la risposta dano's sul motivo per cui le eccezioni non vengono generate senza passare esplicitamente il controllo al ciclo degli eventi. Ecco cosa ho trovato.

La chiamata yield from wirter.drain() dà il controllo alla coroutine StreamWriter.drain. Questa coroutine checks per e solleva qualsiasi eccezione che il StreamReaderProtocol impostato su StreamReader. Ma dal momento che abbiamo passato il controllo su drain, il protocollo non ha ancora avuto la possibilità di impostare l'eccezione. drain quindi assegna il controllo alla coroutine FlowControlMixin._drain_helper. Questa coroutine ritorna immediatamente perché alcuni flag ancora non sono stati ancora impostati e il controllo termina con la coroutine che ha chiamato yield from wirter.drain().

E così abbiamo completato il cerchio senza dare il controllo al ciclo degli eventi per consentirgli di gestire altre coroutine e presentare le eccezioni a writer.drain().

yield prima di un drain() offre al trasporto/protocollo la possibilità di impostare i flag e le eccezioni appropriati.

Ecco un mock up di quello che sta succedendo, con tutte le chiamate nidificate crollato:

import asyncio as aio 

def set_exception(ctx, exc): 
    ctx["exc"] = exc 

@aio.coroutine 
def drain(ctx): 
    if ctx["exc"] is not None: 
    raise ctx["exc"] 

    return 

@aio.coroutine 
def client_handler(ctx): 
    i = 0 
    while True: 
    i += 1 
    print("write", i) 
    # yield # Uncommenting this allows the loop.call_later call to be scheduled. 
    yield from drain(ctx) 

CTX = {"exc": None} 

loop = aio.get_event_loop() 
# Set the exception in 5 seconds 
loop.call_later(5, set_exception, CTX, Exception("connection lost")) 
loop.run_until_complete(client_handler(CTX)) 
loop.close() 

Questo dovrebbe probabilmente fissata a monte nelle API Streams dai asyncio sviluppatori.

+0

Grazie per questa spiegazione. Ho avuto difficoltà con una presa remota che chiude forzatamente la connessione e non ero sicuro di cosa fare. Prendo l'eccezione e aggiungo la logica per affrontarlo. Sembra che dovrei usare una callback di connessione. Ma volevo usare l'API Stream per evitare i callback! – LeslieK

Problemi correlati