2015-02-05 10 views
12

Supponendo che disponga di un Java IntStream, è possibile convertirlo in un IntStream con somme cumulative? Ad esempio, uno stream che inizia con [4, 2, 6, ...] deve essere convertito in [4, 6, 12, ...].Calcolo stato stream: somme cumulative

Più in generale, come si dovrebbe procedere all'implementazione di operazioni di flusso stateful? Ci si sente come questo dovrebbe essere possibile:

Con l'ovvia limitazione che questo funziona solo sui flussi sequenziali. Tuttavia, Stream.map richiede esplicitamente una funzione mappa senza stato. Ho ragione nel mancare un'operazione Stream.statefulMap o Stream.cumulative o manca il punto degli stream Java?

Confronta ad esempio per Haskell, dove la funzione scanl1 risolve esattamente questo esempio:

scanl1 (+) [1 2 3 4] = [1 3 6 10] 
+2

Gli stream sono progettati appositamente per supportare solo operazioni parallelizzabili, il che non è il caso di scanl. –

+0

Gli stream hanno più senso se si pretende di essere in 'java.util.concurrent.stream' invece di' java.util.stream'. –

+6

Nota che se la tua sorgente è una matrice, puoi semplicemente usare ['Arrays.parallelPrefix (array, Integer :: sum);'] (http://docs.oracle.com/javase/8/docs/api/java /util/Arrays.html#parallelPrefix-int:A-java.util.function.IntBinaryOperator-) ... – Holger

risposta

3

È possibile eseguire questa operazione con un numero atomico. Per esempio:

import java.util.concurrent.atomic.AtomicLong; 
import java.util.stream.IntStream; 
import java.util.stream.LongStream; 

public class Accumulator { 
    public static LongStream toCumulativeSumStream(IntStream ints){ 
     AtomicLong sum = new AtomicLong(0); 
     return ints.sequential().mapToLong(sum::addAndGet); 
    } 

    public static void main(String[] args){ 
     LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5)); 
     sums.forEachOrdered(System.out::println); 
    } 
} 

Questo uscite:

1 
3 
6 
10 

ho usato un lungo per memorizzare le somme, perché è del tutto possibile che due interi aggiungere fino a ben oltre Integer.MAX_VALUE, e una lunga ha meno di una possibilità di overflow.

+2

Ho trovato questa risposta interessante e inaspettata. Potresti rispondere ad alcune domande a riguardo? Perché hai usato AtomicReference piuttosto che AtomicInteger - con addAndGet? Ma ancora più importante, perché questo cambia il fatto che se il flusso è stato reso parallelo non c'è alcuna garanzia dell'ordine in cui si verifica l'accumulo? AtomicReference modifica in qualche modo il comportamento dei flussi? In tal caso, puoi indicare un tutorial o una documentazione su questo? Grazie. – sprinter

+2

Solo un supplemento alla domanda precedente, ho pensato di provare da solo con un IntStream parallelo. Non funziona. Quindi questa non è una buona risposta. – sprinter

+2

@sprinter Sì, questo non funziona su flussi paralleli, ma poiché l'operazione non può essere parallelizzata in ogni caso, si può semplicemente chiamare .sequential() prima di eseguirlo. –

5

E 'possibile fare con un collettore che quindi crea un nuovo flusso:

class Accumulator { 
    public static void accept(List<Integer> list, Integer value) { 
     list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1))); 
    } 

    public static List<Integer> combine(List<Integer> list1, List<Integer> list2) { 
     int total = list1.get(list1.size() - 1); 
     list2.stream().map(n -> n + total).forEach(list1::add); 
     return list1; 
    } 
} 

Questo è usato come:

myIntStream.parallel() 
    .collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine) 
    .stream(); 

Si spera che tu possa vedere che l'importante attributo di questo collector è che anche se il flusso è parallelo come le istanze Accumulator sono combinate regola i totali.

Questo ovviamente non è efficiente come un'operazione di mappa perché raccoglie l'intero flusso e quindi produce un nuovo flusso. Ma questo non è solo un dettaglio di implementazione: è una funzione necessaria del fatto che i flussi sono destinati ad essere potenzialmente elaborati contemporaneamente.

L'ho provato con IntStream.range(0, 10000).parallel() e funziona correttamente.

+0

Devo dire che questa è un'ottima risposta. Potrebbe non essere la risposta perfetta per questa domanda, ma per l'accumulazione parallela. – Jatin

+1

@Jatin grazie. In seguito mi sono reso conto che potrebbe essere un po 'più semplice di questo: il totale parziale non è realmente necessario perché è solo l'ultimo elemento della lista. Aggiornerò per mostrarti cosa intendo: fammi sapere cosa ne pensi. – sprinter

+0

Questo è più leggibile. Grazie. Non pensavo alle linee parallele, che tu potessi avere dei blocchi e poi uno li può unire. grande. – Jatin