2016-01-27 11 views
7

Mi sto abituando ad asyncio e trovo la gestione delle attività abbastanza piacevole, ma può essere difficile mescolare le librerie asincrone con le librerie io tradizionali. Il problema che sto affrontando attualmente è come decodificare correttamente uno StreamReader asincrono.Asyncio decodifica utf-8 con StreamReader

La soluzione più semplice è quella di read() blocchi di stringhe di byte e quindi decodificare ogni blocco, vedere il codice riportato di seguito. (Nel mio programma, non vorrei stampare ogni pezzo, ma decodificarlo in una stringa e mando in un altro metodo per l'elaborazione):

import asyncio 
import aiohttp 

async def get_data(port): 
    url = 'http://localhost:{}/'.format(port) 
    r = await aiohttp.get(url) 
    stream = r.content 
    while not stream.at_eof(): 
     data = await stream.read(4) 
     print(data.decode('utf-8')) 

Questo funziona bene, fino a quando v'è una caratteri UTF-8, che è diviso tra troppi pezzi. Ad esempio se la risposta è b'M\xc3\xa4dchen mit Bi\xc3\x9f\n', quindi lettura blocchi di 3 funzioneranno, ma blocchi di 4 non (come \xc3 e \x9f sono in diversi pezzi e decodificare il pezzo termina con \xc3 genererà il seguente errore:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data

Ho cercato soluzioni appropriate a questo problema e, almeno nel mondo dei blocchi, sembra sia io.TextIOWrapper o codecs.treamReaderWriter (le cui differenze sono discusse in PEP 0400). Tuttavia, entrambi si basano su tipici flussi bloccanti

Ho trascorso 30 minuti Sto cercando esempi con asyncio e continuando a trovare la mia soluzione decode(). Qualcuno sa di una soluzione migliore o questa è una caratteristica mancante nell'asyncio di Python?

Per riferimento, ecco i risultati dell'utilizzo dei due decodificatori "standard" con flussi asincroni.

Utilizzo del lettore di flusso codec:

r = yield from aiohttp.get(url) 
decoder = codecs.getreader('utf-8') 
stream = decoder(r.content) 

eccezione:

File "echo_client.py", line 13, in get_data 
    data = yield from stream.read(4) 
File "/usr/lib/python3.5/codecs.py", line 497, in read 
    data = self.bytebuffer + newdata 
TypeError: can't concat bytes to generator 

(chiama lettura() direttamente, anziché yield from o await it)

Ho anche provato avvolgendo flusso con io.TextIOWrapper:

stream = TextIOWrapper(r.content) 

Ma che conduce al seguente:

File "echo_client.py", line 10, in get_data 
    stream = TextIOWrapper(r.content) 
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable' 

P.S. Se si desidera un caso di test esemplificativo, consultare this gist. Puoi eseguirlo con python3.5 per riprodurre l'errore. Se si modifica la dimensione del blocco da 4 a 3 (o 30), funzionerà correttamente.

EDIT

La risposta accettata risolto questo come un fascino. Grazie!Se qualcun altro ha questo problema, qui è una semplice classe wrapper che ho fatto per gestire la decodifica su uno StreamReader:

import codecs 

class DecodingStreamReader: 
    def __init__(self, stream, encoding='utf-8', errors='strict'): 
     self.stream = stream 
     self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors) 

    async def read(self, n=-1): 
     data = await self.stream.read(n) 
     if isinstance(data, (bytes, bytearray)): 
      data = self.decoder.decode(data) 
     return data 

    def at_eof(self): 
     return self.stream.at_eof() 
+2

E prima che qualcuno chieda perché non carico solo l'intera risposta in memoria, considera socket web o feed keep alive (come il feed _changes di couchdb in modalità continua). Voglio analizzare tutti i dati in arrivo, mentre arrivano, senza attendere (possibilmente minuti) che la connessione HTTP si chiuda. –

risposta

3

È possibile utilizzare un IncrementalDecoder:

Utf8Decoder = codecs.getincrementaldecoder('utf-8') 

Con il vostro esempio:

decoder = Utf8Decoder(error='strict') 
while not stream.at_eof(): 
    data = await stream.read(4) 
    print(decoder.decode(data), end='') 

uscita:

Mädchen mit Biß 
+1

Ottimo! L'ho provato e funziona perfettamente. Grazie per la risposta rapida. –