2015-02-21 17 views
10

Sto lavorando per migrare da Rx Java a Java 8 lambda. Un esempio che non riesco a trovare è un modo per bufferizzare le richieste. Ad esempio, in Rx Java, posso dire quanto segue.Java 8 lambda api

Observable.create(getIterator()).buffer(20, 1000, TimeUnit. MILLISECONDS).doOnNext(list -> doWrite(list)); 

Dove siamo tampone 20 elementi in un elenco o timeout a 1000 millisecondi, che accade mai prima.

Gli osservabili in RX sono osservabili in stile "push", dove gli stream utilizzano un tiro java. Sarebbe possibile implementare la mia stessa operazione di mappatura nei flussi o l'impossibilità di emettere causa problemi in quanto il doOnNext deve eseguire il polling dell'elemento precedente?

+2

http://stackoverflow.com/questions/ 27944784/java-8-stream-utilities-per-input-dati/27.950.637 27.950.637 # – Misha

risposta

2

Un modo per farlo sarebbe utilizzare BlockingQueue e Guava. Utilizzando Queues.drain, è possibile creare un Collection che è possibile chiamare stream() e apportare le proprie trasformazioni. Ecco un link: Guava Queues.drain

Ed ecco un esempio veloce:

public void transform(BlockingQueue<Something> input) 
{ 
    List<Something> buffer = new ArrayList<>(20); 
    Queues.drain(input, buffer, 20, 1000, TimeUnit.MILLISECONDS); 
    doWrite(buffer); 
} 
1

simple-react ha operatori simili, ma non questo esatto uno. È abbastanza estensibile, quindi dovrebbe essere possibile scriverne uno tuo. Con l'avvertenza che non ho scritto questo in un IDE o provato, più o meno un buffer per dimensione con operatore timeout per semplice reagire sarebbe simile a questa

import com.aol.simple.react.async.Queue; 
    import com.aol.simple.react.stream.traits.LazyFutureStream; 
    import com.aol.simple.react.async.Queue.ClosedQueueException; 
    import com.aol.simple.react.util.SimpleTimer; 
    import java.util.concurrent.TimeUnit; 

    static LazyFutureStream batchBySizeAndTime(LazyFutureStream stream,int size,long time, TimeUnit unit) { 
    Queue queue = stream.toQueue(); 
    Function<Supplier<U>, Supplier<Collection<U>>> fn = s -> { 
     return() -> { 
      SimpleTimer timer = new SimpleTimer(); 
      List<U> list = new ArrayList<>(); 
      try { 
       do { 
        if(list.size()==size()) 
         return list; 
        list.add(s.get()); 
       } while (timer.getElapsedNanoseconds()<unit.toNanos(time)); 
      } catch (ClosedQueueException e) { 

       throw new ClosedQueueException(list); 
      } 
      return list; 
     }; 
    }; 
    return stream.fromStream(queue.streamBatch(stream.getSubscription(), fn)); 
}