2016-04-17 13 views
5

Avere uno stream con flussi personalizzati e ad un certo punto voglio dividere il flusso e disporre di due alternative di gestione dei dati che si uniranno di nuovo in seguito.Flussi alternativi basati su condizioni per stream akka

E.g.

    -> F3 -> F6 
Src -> F1 -> F2    > Merge -> Sink 
        -> F4 -> F5 

F2 dovrebbe avere una condizione di dire se i dati contiene formato A allora dovrebbe andare a scorrere F3, altrimenti andare al F4.

Per quanto posso vedere, ogni flusso può avere solo una porta in ciascuna direzione (o due se bidi) - quindi, come posso supportare tale flusso?

risposta

11

È possibile utilizzare Broadcast per suddividere lo stream, quindi sarà possibile utilizzare filter o collect su ciascuno degli stream per filtrare i dati richiesti.

val split = builder.add(Broadcast[Int](2)) 

Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink 
        -> filterCondB -> F4 -> F5 -> Merge 

Inoltre, v'è Partition fase che gestisce il numero di porte di uscita e la funzione di mappa dal valore di numero di porta f: T => Int.

val portMapper(value: T): Int = value match { 
    case CondA => 0 
    case CondB => 1 
} 

val split = builder.add(Partition[T](2, portMapper)) 

Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink 
      split -> F4 -> F5 -> Merge 

Forse c'è un modo più semplice.

+0

Grazie, mi hai salvato la giornata. Possiamo creare val filterCondA = Flow [Int] .filter() –

Problemi correlati