2015-06-04 22 views
50

Ho un file di grandi dimensioni che contiene un elenco di elementi.Java 8 Stream con elaborazione in batch

Vorrei creare un gruppo di elementi, effettuare una richiesta HTTP con questo batch (tutti gli elementi sono necessari come parametri nella richiesta HTTP). Posso farlo molto facilmente con un ciclo for, ma come amante di Java 8, voglio provare a scrivere questo con il framework Stream di Java 8 (e sfruttare i vantaggi dell'elaborazione lenta).

Esempio:

List<String> batch = new ArrayList<>(BATCH_SIZE); 
for (int i = 0; i < data.size(); i++) { 
    batch.add(data.get(i)); 
    if (batch.size() == BATCH_SIZE) process(batch); 
} 

if (batch.size() > 0) process(batch); 

voglio fare qualcosa di una lunga linea di lazyFileStream.group(500).map(processBatch).collect(toList())

Quale sarebbe il modo migliore per farlo?

+3

Potrebbe fornire un esempio di ciò che è necessario utilizzare il ciclo 'for'? – Edd

+0

Non riesco a capire come eseguire il raggruppamento, mi spiace, ma [File # righe] (https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html # lines-java.nio.file.Path-java.nio.charset.Charset-) leggerà pigramente il contenuto del file. – Toby

+0

quindi hai praticamente bisogno di un inverso di 'flatMap' (+ un'aggiunta flatMap per comprimere nuovamente i flussi)? Non penso che qualcosa del genere esista come metodo conveniente nella libreria standard.O dovrai trovare una lib di terze parti o scrivere la tua basata su spliterator e/o un collector che emette un flusso di stream – the8472

risposta

12

Si potrebbe fare con jOOλ, una biblioteca che si estende Java 8 flussi per, sequenziali casi d'uso a thread singolo flusso:

Seq.seq(lazyFileStream)    // Seq<String> 
    .zipWithIndex()     // Seq<Tuple2<String, Long>> 
    .groupBy(tuple -> tuple.v2/500) // Map<Long, List<String>> 
    .forEach((index, batch) -> { 
     process(batch); 
    }); 

Dietro le quinte, zipWithIndex() è solo:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) { 
    final Iterator<T> it = stream.iterator(); 

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> { 
     long index; 

     @Override 
     public boolean hasNext() { 
      return it.hasNext(); 
     } 

     @Override 
     public Tuple2<T, Long> next() { 
      return tuple(it.next(), index++); 
     } 
    } 

    return seq(new ZipWithIndex()); 
} 

... mentre groupBy() è convenienza API per:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) { 
    return collect(Collectors.groupingBy(classifier)); 
} 

(Disclaimer: io lavoro per la società dietro jOOλ)

+0

Wow. Questo è ESATTAMENTE quello che sto cercando. Il nostro sistema normalmente elabora i flussi di dati in sequenza, quindi sarebbe opportuno passare a Java 8. –

+8

Si noti che questa soluzione memorizza inutilmente l'intero flusso di input sulla 'Mappa' intermedia (a differenza, ad esempio, della soluzione Ben Manes) –

25

puro Java-8 applicazione è anche possibile:

int BATCH = 500; 
IntStream.range(0, (data.size()+BATCH-1)/BATCH) 
     .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH))) 
     .forEach(batch -> process(batch)); 

Si noti che diversamente Jool può funzionare bene in parallelo (a condizione che il vostro data è un lista di accesso casuale).

+0

cosa se i tuoi dati sono in realtà un flusso? (diciamo linee in un file, o anche dalla rete). –

+4

@OmryYadan, la domanda riguardava l'input da 'List' (si veda' data.size() ',' data.get() 'nella domanda). Sto rispondendo alla domanda posta. Se hai un'altra domanda, chiedi invece (anche se penso che la domanda stream sia stata già chiesta). –

+0

Come elaborare i batch in parallelo? –

6

Si potrebbe anche dare un'occhiata a cyclops-react, io sono l'autore di questa libreria. Implementa l'interfaccia jOOλ (e per estensione JDK 8 Stream), ma a differenza di JDK 8 Parallel Streams si concentra sulle operazioni asincrone (come il blocco potenziale delle chiamate I/O asincrone). JDK Parallel Streams, al contrario, si concentra sul parallelismo dei dati per le operazioni legate alla CPU. Funziona gestendo gli aggregati delle attività basate su Future sotto il cofano, ma presenta una API Stream standard estesa agli utenti finali.

Questo codice di esempio può aiutare a iniziare

LazyFutureStream.parallelCommonBuilder() 
       .react(data) 
       .grouped(BATCH_SIZE)     
       .map(this::process) 
       .run(); 

C'è un tutorial on batching here

E un more general Tutorial here

Per utilizzare il proprio pool di thread (che è probabilmente più appropriato per il blocco I/O), è possibile iniziare l'elaborazione con

 LazyReact reactor = new LazyReact(40); 

    reactor.react(data) 
      .grouped(BATCH_SIZE)     
      .map(this::process) 
      .run(); 
7

È inoltre possibile utilizzare RxJava:

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch)); 

o

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList(); 

o

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList(); 
60

Per completezza, ecco una soluzione Guava.

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process); 

Nella questione della collezione è disponibile in modo un flusso non è necessario e può essere scritto come,

Iterables.partition(data, batchSize).forEach(this::process); 
+0

Questo sembra più facile e più leggibile per me. Grazie per la condivisione! – grinch

+4

'Lists.partition' è un'altra variante che avrei dovuto menzionare. –

19

puro Java 8 soluzione:

possiamo creare un collezionista personalizzato per farlo elegantemente, che accetta un batch size e un Consumer per elaborare ogni lotto:

import java.util.ArrayList; 
import java.util.Collections; 
import java.util.List; 
import java.util.Set; 
import java.util.function.*; 
import java.util.stream.Collector; 

import static java.util.Objects.requireNonNull; 


/** 
* Collects elements in the stream and calls the supplied batch processor 
* after the configured batch size is reached. 
* 
* In case of a parallel stream, the batch processor may be called with 
* elements less than the batch size. 
* 
* The elements are not kept in memory, and the final result will be an 
* empty list. 
* 
* @param <T> Type of the elements being collected 
*/ 
class BatchCollector<T> implements Collector<T, List<T>, List<T>> { 

    private final int batchSize; 
    private final Consumer<List<T>> batchProcessor; 


    /** 
    * Constructs the batch collector 
    * 
    * @param batchSize the batch size after which the batchProcessor should be called 
    * @param batchProcessor the batch processor which accepts batches of records to process 
    */ 
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) { 
     batchProcessor = requireNonNull(batchProcessor); 

     this.batchSize = batchSize; 
     this.batchProcessor = batchProcessor; 
    } 

    public Supplier<List<T>> supplier() { 
     return ArrayList::new; 
    } 

    public BiConsumer<List<T>, T> accumulator() { 
     return (ts, t) -> { 
      ts.add(t); 
      if (ts.size() >= batchSize) { 
       batchProcessor.accept(ts); 
       ts.clear(); 
      } 
     }; 
    } 

    public BinaryOperator<List<T>> combiner() { 
     return (ts, ots) -> { 
      // process each parallel list without checking for batch size 
      // avoids adding all elements of one to another 
      // can be modified if a strict batching mode is required 
      batchProcessor.accept(ts); 
      batchProcessor.accept(ots); 
      return Collections.emptyList(); 
     }; 
    } 

    public Function<List<T>, List<T>> finisher() { 
     return ts -> { 
      batchProcessor.accept(ts); 
      return Collections.emptyList(); 
     }; 
    } 

    public Set<Characteristics> characteristics() { 
     return Collections.emptySet(); 
    } 
} 

Opzionalmente quindi creare una classe di utilità di supporto:

import java.util.List; 
import java.util.function.Consumer; 
import java.util.stream.Collector; 

public class StreamUtils { 

    /** 
    * Creates a new batch collector 
    * @param batchSize the batch size after which the batchProcessor should be called 
    * @param batchProcessor the batch processor which accepts batches of records to process 
    * @param <T> the type of elements being processed 
    * @return a batch collector instance 
    */ 
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) { 
     return new BatchCollector<T>(batchSize, batchProcessor); 
    } 
} 

Esempio utilizzo:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 
List<Integer> output = new ArrayList<>(); 

int batchSize = 3; 
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs); 

input.stream() 
    .collect(StreamUtils.batchCollector(batchSize, batchProcessor)); 

ho postato il mio codice su GitHub così, se qualcuno vuole dare un'occhiata:

Link to Github

4

Ho scritto uno Spliterator personalizzato per scenari come questo. Riempirà gli elenchi di una determinata dimensione dal flusso di input. Il vantaggio di questo approccio è che eseguirà l'elaborazione lenta e funzionerà con altre funzioni di streaming.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) { 
    return batchSize <= 0 
     ? Stream.of(stream.collect(Collectors.toList())) 
     : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel()); 
} 

private static class BatchSpliterator<E> implements Spliterator<List<E>> { 

    private final Spliterator<E> base; 
    private final int batchSize; 

    public BatchSpliterator(Spliterator<E> base, int batchSize) { 
     this.base = base; 
     this.batchSize = batchSize; 
    } 

    @Override 
    public boolean tryAdvance(Consumer<? super List<E>> action) { 
     final List<E> batch = new ArrayList<>(batchSize); 
     for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++) 
      ; 
     if (batch.isEmpty()) 
      return false; 
     action.accept(batch); 
     return true; 
    } 

    @Override 
    public Spliterator<List<E>> trySplit() { 
     if (base.estimateSize() <= batchSize) 
      return null; 
     final Spliterator<E> splitBase = this.base.trySplit(); 
     return splitBase == null ? null 
       : new BatchSpliterator<>(splitBase, batchSize); 
    } 

    @Override 
    public long estimateSize() { 
     final double baseSize = base.estimateSize(); 
     return baseSize == 0 ? 0 
       : (long) Math.ceil(baseSize/(double) batchSize); 
    } 

    @Override 
    public int characteristics() { 
     return base.characteristics(); 
    } 

} 
4

Abbiamo avuto un problema simile da risolvere. Volevamo creare un flusso che fosse più grande della memoria di sistema (iterando attraverso tutti gli oggetti in un database) e rendere casuale l'ordine nel miglior modo possibile - pensavamo che sarebbe stato ok per bufferizzare 10.000 elementi e renderli casuali.

L'obiettivo era una funzione che includeva un flusso.

delle soluzioni proposte qui, sembra che ci siano una serie di opzioni:

  • utilizzare vari non-Java 8 librerie aggiuntive
  • iniziare con qualcosa che non è un flusso - per esempio un elenco
  • accesso casuale avere un flusso che può essere diviso facilmente in spliterator

Il nostro istinto era in origine per utilizzare una collezione personalizzata, ma questo significava l'abbandono di streaming. La soluzione di raccolta personalizzata sopra è molto buona e l'abbiamo quasi utilizzata.

Ecco una soluzione che imbroglia utilizzando il fatto che Stream s può dare un Iterator che si può usare come una via di fuga per farti fare qualcosa in più che i flussi non supportano. Lo Iterator viene riconvertito in uno stream utilizzando un altro frammento di stregoneria Java 8 StreamSupport.

/** 
* An iterator which returns batches of items taken from another iterator 
*/ 
public class BatchingIterator<T> implements Iterator<List<T>> { 
    /** 
    * Given a stream, convert it to a stream of batches no greater than the 
    * batchSize. 
    * @param originalStream to convert 
    * @param batchSize maximum size of a batch 
    * @param <T> type of items in the stream 
    * @return a stream of batches taken sequentially from the original stream 
    */ 
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) { 
     return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize)); 
    } 

    private static <T> Stream<T> asStream(Iterator<T> iterator) { 
     return StreamSupport.stream(
      Spliterators.spliteratorUnknownSize(iterator,ORDERED), 
      false); 
    } 

    private int batchSize; 
    private List<T> currentBatch; 
    private Iterator<T> sourceIterator; 

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) { 
     this.batchSize = batchSize; 
     this.sourceIterator = sourceIterator; 
    } 

    @Override 
    public boolean hasNext() { 
     prepareNextBatch(); 
     return currentBatch!=null && !currentBatch.isEmpty(); 
    } 

    @Override 
    public List<T> next() { 
     return currentBatch; 
    } 

    private void prepareNextBatch() { 
     currentBatch = new ArrayList<>(batchSize); 
     while (sourceIterator.hasNext() && currentBatch.size() < batchSize) { 
      currentBatch.add(sourceIterator.next()); 
     } 
    } 
} 

Un semplice esempio di utilizzo di questo sarebbe simile a questa:

@Test 
public void getsBatches() { 
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) 
     .forEach(System.out::println); 
} 

Le stampe di cui sopra

[A, B, C] 
[D, E, F] 

Per il nostro caso d'uso, abbiamo voluto mischiare le partite e poi tenerli come un flusso - assomigliava a questo:

@Test 
public void howScramblingCouldBeDone() { 
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) 
     // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one 
     .map(list -> { 
      Collections.shuffle(list); return list; }) 
     .flatMap(List::stream) 
     .forEach(System.out::println); 
} 

Emette una cosa del genere (è randomizzato, in modo diverso ogni volta)

A 
C 
B 
E 
D 
F 

L'ingrediente segreto è che c'è sempre un flusso, in modo da poter sia operare su un flusso di lotti, o di fare qualcosa per ogni lotto e quindi flatMap torna a uno stream. Ancora meglio, tutto quanto sopra funziona solo come finale forEach o collect o altre espressioni di terminazione PULL i dati attraverso il flusso.

Si scopre che iterator è un tipo speciale di che termina l'operazione su uno stream e non causa l'esecuzione dell'intero stream e viene in memoria! Grazie ai ragazzi di Java 8 per un design brillante!

+0

Ed è molto positivo che si effettui un'iterazione completa su ogni batch quando viene raccolto e persistito in un 'Elenco': non è possibile rinviare l'iterazione degli elementi all'interno del batch perché il consumatore potrebbe voler saltare un intero batch e se non lo ha fatto Non consumare gli elementi, quindi non salteranno molto lontano. (Ho implementato uno di questi in C#, sebbene fosse sostanzialmente più semplice.) – ErikE

0

Semplice esempio utilizzando Spliterator

// read file into stream, try-with-resources 
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) { 
     //skip header 
     Spliterator<String> split = stream.skip(1).spliterator(); 
     Chunker<String> chunker = new Chunker<String>(); 
     while(true) {    
      boolean more = split.tryAdvance(chunker::doSomething); 
      if (!more) { 
       break; 
      } 
     }   
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

static class Chunker<T> { 
    int ct = 0; 
    public void doSomething(T line) { 
     System.out.println(ct++ + " " + line.toString()); 
     if (ct % 100 == 0) { 
      System.out.println("====================chunk=====================");    
     }   
    }  
} 

risposta di Bruce è più completo, ma ero alla ricerca di qualcosa di veloce e sporco di elaborare un gruppo di file.