2014-11-14 16 views
6

Ho un osservabile che produce dati da un flusso veloce da un cursore di database. Sto cercando di limitare l'output su un tasso di x articoli al secondo. Finora ho usato Callstack blocco come descritto la documentazione:Osservabili limitanti la velocità

observable.map(f -> { 
ratelimiter.acquire(); // configured limiter to only allow 
}); 

Questo sta lavorando bene, ma solo per curiosità c'è un modo migliore per gestire questa situazione con contropressione?

Tks

+0

Vuoi 'delay' o' throttleFirst (throttleLast) '? Quest'ultimo lascerà cadere gli oggetti se riceve oggetti troppo velocemente. – zsxwing

risposta

2

Si potrebbe provare a utilizzare rx.Observable#onBackpressureBuffer() combinato con un abbonato personalizzato che sarà periodicamente richiedere n articoli per secondo. Ma, si sarebbe limitato a difficile un secondo campionamento,.

Nota.subscribeOn() e .toBlocking() è solo per rendere il metodo principale non uscire immediatamente.

public class BackpressureTest { 

    public static void main(final String[] args) { 
    Observable.range(1, 1000) 
     .compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it 
     .lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second 
     .subscribeOn(Schedulers.computation()) 
     .toBlocking() 
     .subscribe(System.out::println); 
    } 

    private static <T> Observable.Operator<T, T> allowPerSecond(final int n) { 
    return upstream -> periodicallyRequestingSubscriber(upstream, n); 
    } 

    private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) { 
    return new Subscriber<T>() { 

     @Override 
     public void onStart() { 
     request(0); // request 0 so that source stops emitting 
     Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items 
     } 

     @Override 
     public void onCompleted() { 
     upstream.onCompleted(); 
     } 

     @Override 
     public void onError(final Throwable e) { 
     upstream.onError(e); 
     } 

     @Override 
     public void onNext(final T integer) { 
     upstream.onNext(integer); 
     } 
    }; 
    } 
} 
0

La risposta da @michalsamek sembra corretta, sebbene la retropressione funzioni solo per Flowables. Ho corretto il suo abbonato, in modo che faccia ciò che è richiesto.

C'è stato anche un piccolo problema quando lo si utilizza a raffiche in momenti diversi.

private static <T> FlowableOperator<T, T> allowPerMillis(int millis) { 
    return observer -> new PeriodicallyRequestingSubscriber<>(observer, millis); 
} 


Observable.range(1, 100) 
    .observeOn(Schedulers.io()) 
    .toFlowable(BackpressureStrategy.BUFFER) 
    .compose(Flowable::onBackpressureBuffer) 
    .lift(allowPerMillis(200)) 
    .subscribe(value -> System.out.println(System.currentTimeMillis() % 10_000 + " : " + value)); 



public class PeriodicallyRequestingSubscriber<T> implements Subscriber<T> { 

    private final Subscriber<T> upstream; 

    private final int millis; 

    // If there hasn't been a request for a long time, do not flood 
    private final AtomicBoolean shouldRequest = new AtomicBoolean(true); 

    public PeriodicallyRequestingSubscriber(Subscriber<T> upstream, int millis) { 
     this.upstream = upstream; 
     this.millis = millis; 
    } 

    @Override 
    public void onSubscribe(Subscription subscription) { 
     Observable 
       .interval(millis, TimeUnit.MILLISECONDS) 
       .subscribe(x -> { 
        if (shouldRequest.getAndSet(false)) 
         subscription.request(1); 
       }); 
} 

@Override 
public void onNext(T t) { 
    shouldRequest.set(true); 
    upstream.onNext(t); 
} 

@Override 
public void onError(Throwable throwable) { 
    upstream.onError(throwable); 
} 

@Override 
public void onComplete() { 
    upstream.onComplete(); 
} 
} 
Problemi correlati