2015-07-10 6 views
10

Sto cercando di capire come lavorare con lo streaming fluido. Io uso slick 3.0.0 con driver postgresQual è il modo giusto per lavorare con i risultati di streaming 3.03 di Slick e Postgresql?

La situazione è la seguente: il server deve dare sequenze di dati client suddivisi in blocchi limitati per dimensione (in byte). Così, ho scritto seguente query chiazza di petrolio:

val sequences = TableQuery[Sequences] 
def find(userId: Long, timestamp: Long) = sequences.filter(s ⇒ s.userId === userId && s.timestamp > timestamp).sortBy(_.timestamp.asc).result 
val seq = db.stream(find(0L, 0L)) 

ho combinato ss con Akka-stream Source, ha scritto su misura PushPullStage, che i limiti di dimensione dei dati (in byte) e termina a monte quando raggiunge limite di dimensione. Funziona bene. Il problema è - quando guardo in Postgres tronchi, vedo domanda del genere select * from sequences where user_id = 0 and timestamp > 0 order by timestamp;

Così, a prima vista sembra essere molto (e inutile) interrogazione del database in corso, utilizzare solo pochi byte in ogni query . Qual è il modo giusto per eseguire lo streaming con Slick in modo da ridurre al minimo l'interrogazione del database e utilizzare al meglio i dati trasferiti in ogni query?

risposta

11

Il "modo giusto" per fare streaming con Slick e Postgres include tre cose:

  1. necessario utilizzare db.stream()

  2. necessario disattivare autoCommit in JDBC-driver. Un modo è quello di eseguire la query in una transazione mediante il suffisso .transactionally.

  3. È necessario impostare fetchSize in modo che sia diverso da 0 altrimenti postgres sposterà l'intero risultato sul client in una volta sola.

Es:

DB.stream(
    find(0L, 0L) 
    .transactionally 
    .withStatementParameters(fetchSize = 1000) 
).foreach(println) 

Link utili:

https://github.com/slick/slick/issues/1038

https://github.com/slick/slick/issues/809

+0

Grazie per la risposta, molto disponibile. Ero confuso riguardo l '"addendum": non è il contesto di esecuzione che gestisce db mantenuto separatamente in AsyncExecutor? – JimN

+0

Buono a sapersi! Informazioni sull'addendum: Sì, hai ragione. Dovrebbe essere il default. Sto rimuovendo l'addendum dato che confonde più di quanto possa aiutare e con il senno di poi credo che abbia davvero a che fare con la meccanica della contropressione. Il mio consumatore è stato molto più veloce della rete, quindi ha iniziato a elaborare i futures con la stessa rapidità con cui sono arrivati ​​i risultati era una soluzione più adatta. – Rikard

+0

Esiste un equivalente per il caso di MySQL? – matanster

Problemi correlati