2016-07-12 23 views
10

Ho utilizzato con successo FileIO per trasmettere il contenuto di un file, calcolare alcune trasformazioni per ogni riga e aggregare/ridurre i risultati.Un modo corretto per arrestare i flussi di Akka a condizione

Ora ho un caso d'uso piuttosto specifico, in cui vorrei interrompere il flusso quando viene raggiunta una condizione, in modo che non sia necessario leggere l'intero file ma il processo termina il prima possibile. Qual è il modo consigliato per raggiungere questo obiettivo?

+2

Se la condizione è basata sul contenuto del flusso, 'Source.takewhile' (http://doc.akka.io/api/akka/2.4.8/index.html#[email protected] (p: Out => Boolean): FlowOps.this.Repr [Out]) dovrebbe funzionare. – devkat

risposta

16

Se la condizione di arresto è "sulla parte esterna del torrente"

C'è un avanzato building-block chiamato KillSwitch che si potrebbe usare per fare questo: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html Il flusso sarebbe venga chiuso una volta che il kill switch è notificato.

Ha metodi come abort(reason)/shutdown ecc, vedi qui per la sua API: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

Documentazione di riferimento è qui: http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala

esempio d'uso potrebbe essere:

val countingSrc = Source(Stream.from(1)).delay(1.second, 
    DelayOverflowStrategy.backpressure) 
val lastSnk = Sink.last[Int] 

val (killSwitch, last) = countingSrc 
    .viaMat(KillSwitches.single)(Keep.right) 
    .toMat(lastSnk)(Keep.both) 
    .run() 

doSomethingElse() 

killSwitch.shutdown() 

Await.result(last, 1.second) shouldBe 2 

Se la condizione di arresto è all'interno dello stream

È possibile utilizzare lo stato takeWhile per esprimere qualsiasi condizione, anche se a volte lo take o lo limit può essere anche sufficiente "prendere 10 lenze".

Se la logica è molto avanzata, si potrebbe costruire una prova speciale che gestisce la logica speciale utilizzando statefulMapConcat che permette di esprimere letteralmente qualsiasi cosa - così si potrebbe completare il flusso ogni volta che si vuole "dal di dentro".

+0

Cosa devo fare se devo attivare e disattivare lo streaming continuo in modo che la mia chiamata a funzione di riduzione emetta i dati aggregati (raggruppati) in Sink? Posso usare takeWhile (p) e swithc p true e false? – zt1983811

Problemi correlati