Sono quasi venduto a RxJava, che è un compagno perfetto di Retrofit, ma sto lottando in un modello comune durante la migrazione del mio codice: per risparmiare larghezza di banda, mi piacerebbe pigramente recupera oggetti (impaginati) dal mio webservice quando necessario, mentre la mia listview (o recyclerview) sta scorrendo usando la programmazione reattiva.Recupero pigro di oggetti impaginati usando RxJava
Il mio codice precedente stava facendo perfettamente il lavoro, ma la programmazione reattiva sembra valere la pena.
Ascolto di ListView/recyclerview scorrimento (e altri animali noioso) non è la preoccupazione e ottenere un osservabile è facile con Retrofit:
@GET("/api/messages")
Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit);
non riesco proprio a capire il modello da utilizzare nella programmazione reattiva.
L'operatore Concat
sembra un buon punto di partenza, insieme a ConnectableObservable
a un certo punto per rinviare l'emissione e forse flatMap
, ma come?
EDIT:
Ecco il mio attuale soluzione (naive):
public interface Paged<T> {
boolean isLoading();
void cancel();
void next(int count);
void next(int count, Scheduler scheduler);
Observable<List<T>> asObservable();
boolean hasCompleted();
int position();
}
E la mia implementazione utilizzando un soggetto:
public abstract class SimplePaged<T> implements Paged<T> {
final PublishSubject<List<T>> subject = PublishSubject.create();
private volatile boolean loading;
private volatile int offset;
private Subscription subscription;
@Override
public boolean isLoading() {
return loading;
}
@Override
public synchronized void cancel() {
if(subscription != null && !subscription.isUnsubscribed())
subscription.unsubscribe();
if(!hasCompleted())
subject.onCompleted();
subscription = null;
loading = false;
}
@Override
public void next(int count) {
next(count, null);
}
@Override
public synchronized void next(int count, Scheduler scheduler) {
if (isLoading())
throw new IllegalStateException("you can't call next() before onNext()");
if(hasCompleted())
throw new IllegalStateException("you can't call next() after onCompleted()");
loading = true;
Observable<List<T>> obs = onNextPage(offset, count).single();
if(scheduler != null)
obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler!
subscription = obs.subscribe(this::onNext, this::onError, this::onComplete);
}
@Override
public Observable<List<T>> asObservable() {
return subject.asObservable();
}
@Override
public boolean hasCompleted() {
return subject.hasCompleted();
}
@Override
public int position() {
return offset;
}
/* Warning: functions below may be called from another thread */
protected synchronized void onNext(List<T> items) {
if (items != null)
offset += items.size();
loading = false;
if (items == null || items.size() == 0)
subject.onCompleted();
else
subject.onNext(items);
}
protected synchronized void onError(Throwable t) {
loading = false;
subject.onError(t);
}
protected synchronized void onComplete() {
loading = false;
}
abstract protected Observable<List<T>> onNextPage(int offset, int count);
}
Mi aspetto un osservabile che emetta eventi quando i nuovi messaggi devono essere recuperati, cioè il fondo della pagina viene raggiunto. Puoi quindi iscrivere una funzione a questo osservabile per recuperare i messaggi su ogni evento e aggiungerli alla pagina. – Calavoow
@ nono240: prima di leggere la risposta di lopar mi sono ritrovato con qualcosa di "concettualmente" simile a quello che hai fatto (avendo fondamentalmente uno stato di caricamento). Btw: Penso che puoi sostituire i diversi "loading = false" con un "finallyDo" centralizzato (http://reactivex.io/documentation/operators/do.html) –
@ nono240 inoltre: l'operatore "withLatestFrom" (https: // github.com/ReactiveX/RxJava/releases/tag/v1.0.7) è stato recentemente rilasciato come sperimentale e forse può essere utile in questo caso, indagherò più avanti (sono nuovo alla programmazione Rx quindi vediamo, forse non ha senso !) –