Si potrebbe provare a utilizzare rx.Observable#onBackpressureBuffer()
combinato con un abbonato personalizzato che sarà periodicamente richiedere n
articoli per secondo. Ma, si sarebbe limitato a difficile un secondo campionamento,.
Nota.subscribeOn()
e .toBlocking()
è solo per rendere il metodo principale non uscire immediatamente.
public class BackpressureTest {
public static void main(final String[] args) {
Observable.range(1, 1000)
.compose(Observable::onBackpressureBuffer) // consume source immediately, but buffer it
.lift(allowPerSecond(3)) // via operator using custom subscriber request n items per second
.subscribeOn(Schedulers.computation())
.toBlocking()
.subscribe(System.out::println);
}
private static <T> Observable.Operator<T, T> allowPerSecond(final int n) {
return upstream -> periodicallyRequestingSubscriber(upstream, n);
}
private static <T> Subscriber<T> periodicallyRequestingSubscriber(final Subscriber<T> upstream, final int n) {
return new Subscriber<T>() {
@Override
public void onStart() {
request(0); // request 0 so that source stops emitting
Observable.interval(1, SECONDS).subscribe(x -> request(n)); // every second request n items
}
@Override
public void onCompleted() {
upstream.onCompleted();
}
@Override
public void onError(final Throwable e) {
upstream.onError(e);
}
@Override
public void onNext(final T integer) {
upstream.onNext(integer);
}
};
}
}
Vuoi 'delay' o' throttleFirst (throttleLast) '? Quest'ultimo lascerà cadere gli oggetti se riceve oggetti troppo velocemente. – zsxwing