Sto utilizzando akka-http
per effettuare una richiesta a un servizio http che restituisce una risposta chunked. Questo è come il bit rilevante del codice simile:concatenazione risposta chunked akka-http
val httpRequest: HttpRequest = //build the request
val request = Http().singleRequest(httpRequest)
request.flatMap { response =>
response.entity.dataBytes.runForeach { chunk =>
println("-----")
println(chunk.utf8String)
}
}
e l'output prodotto nella linea di comando sembra qualcosa di simile:
-----
{"data":
-----
"some text"}
-----
{"data":
-----
"this is a longer
-----
text"}
-----
{"data": "txt"}
-----
...
Il pezzo logica dei dati - un JSON in questo caso le estremità con un simbolo di fine riga \r\n
, ma il problema è che il json non si adatta sempre in un singolo chunk di risposta http come chiaramente visibile nell'esempio sopra.
La mia domanda è: come concatenare i dati in chunk in entrata in full jsons in modo che il tipo di contenitore risultante rimarrebbe o Source[Out,M1]
o Flow[In,Out,M2]
? Mi piacerebbe seguire l'idealizzazione di akka-stream
.
UPDATE: Vale la pena ricordare anche, che la risposta è infinita e l'aggregazione deve essere fatto in tempo reale
Che cosa fa esattamente la scansione delle funzioni? Non c'è documentazione a riguardo. Puoi spiegare per favore? – MaatDeamon
@MaatDeamon In realtà c'è: "Simile al fold ma non è un'operazione terminale, emette il suo valore corrente che inizia da zero e poi applica il valore corrente e successivo alla funzione data f, emettendo il successivo valore corrente." (Http://doc.akka.io/api/akka-stream-and-http-experimental/1.0/index.html#akka.stream.scaladsl.Source). Il modo in cui lo capisco è come una piega, ma può essere applicato a un flusso continuo. Senza di essa questa soluzione non funzionerebbe mai. – Caballero
Inoltre, il chunking della risposta viene gestito automaticamente? Quello che voglio dire è che il tuo callback è valido per ogni chunked? "essere chiamato" – MaatDeamon