2015-10-22 12 views
5

Sto utilizzando akka-http per effettuare una richiesta a un servizio http che restituisce una risposta chunked. Questo è come il bit rilevante del codice simile:concatenazione risposta chunked akka-http

val httpRequest: HttpRequest = //build the request 
val request = Http().singleRequest(httpRequest) 
request.flatMap { response => 
    response.entity.dataBytes.runForeach { chunk => 
     println("-----") 
     println(chunk.utf8String) 
    } 
} 

e l'output prodotto nella linea di comando sembra qualcosa di simile:

----- 
{"data": 
----- 
"some text"} 

----- 
{"data": 
----- 
"this is a longer 
----- 
text"} 

----- 
{"data": "txt"} 

----- 
... 

Il pezzo logica dei dati - un JSON in questo caso le estremità con un simbolo di fine riga \r\n, ma il problema è che il json non si adatta sempre in un singolo chunk di risposta http come chiaramente visibile nell'esempio sopra.

La mia domanda è: come concatenare i dati in chunk in entrata in full jsons in modo che il tipo di contenitore risultante rimarrebbe o Source[Out,M1] o Flow[In,Out,M2]? Mi piacerebbe seguire l'idealizzazione di akka-stream.

UPDATE: Vale la pena ricordare anche, che la risposta è infinita e l'aggregazione deve essere fatto in tempo reale

risposta

3

trovato una soluzione:

val request: HttpRequest = //build the request 
request.flatMap { response => 
    response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String) 
     .filter(_.contains("\r\n")) 
     .runForeach { json => 
      println("-----") 
      println(json) 
     } 
} 
+0

Che cosa fa esattamente la scansione delle funzioni? Non c'è documentazione a riguardo. Puoi spiegare per favore? – MaatDeamon

+0

@MaatDeamon In realtà c'è: "Simile al fold ma non è un'operazione terminale, emette il suo valore corrente che inizia da zero e poi applica il valore corrente e successivo alla funzione data f, emettendo il successivo valore corrente." (Http://doc.akka.io/api/akka-stream-and-http-experimental/1.0/index.html#akka.stream.scaladsl.Source). Il modo in cui lo capisco è come una piega, ma può essere applicato a un flusso continuo. Senza di essa questa soluzione non funzionerebbe mai. – Caballero

+0

Inoltre, il chunking della risposta viene gestito automaticamente? Quello che voglio dire è che il tuo callback è valido per ogni chunked? "essere chiamato" – MaatDeamon

0

Il akka stream documentation ha una voce nel libro di cucina per questo molto problema: "Analizzare le righe da un flusso di ByteString". La loro soluzione è abbastanza dettagliata ma può anche gestire la situazione in cui un singolo blocco può contenere più righe. Questo sembra più robusto in quanto la dimensione del blocco potrebbe essere abbastanza grande da gestire più messaggi JSON.

+0

Link aggiornato ad Akka 2.4: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-cookbook.html#Parsing_lines_from_a_stream_of_ByteStrings – akauppi

+0

akka-http conterrà presto anche il supporto specifico per il frame per JSON. Un'anteprima anticipata di questo supporto può essere vista nei progetti di esempio di @ktoso. Ecco un collegamento diretto al codice di frammentazione JSON pertinente: https://github.com/ktoso/scaladays-berlin-akka-streams/blob/master/src/main/scala/akka/http/scaladsl/server/JsonEntityStreaming. Scala –

0
response.entity.dataBytes 
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8096)) 
.mapAsyncUnordered(Runtime.getRuntime.availableProcessors()) { data => 
    if (response.status == OK) { 
    val event: Future[Event] = Unmarshal(data).to[Event] 
    event.foreach(x => log.debug("Received event: {}.", x)) 
    event.map(Right(_)) 
    } else { 
    Future.successful(data.utf8String) 
     .map(Left(_)) 
    } 
} 

L'unico requisito è conoscere la dimensione massima di un record. Se si inizia con qualcosa di piccolo, il comportamento predefinito è fallire se il record è maggiore del limite. Puoi impostarlo per troncare invece di fallire, ma la parte di un JSON non ha senso.