2015-02-10 7 views
8

Sono quasi venduto a RxJava, che è un compagno perfetto di Retrofit, ma sto lottando in un modello comune durante la migrazione del mio codice: per risparmiare larghezza di banda, mi piacerebbe pigramente recupera oggetti (impaginati) dal mio webservice quando necessario, mentre la mia listview (o recyclerview) sta scorrendo usando la programmazione reattiva.Recupero pigro di oggetti impaginati usando RxJava

Il mio codice precedente stava facendo perfettamente il lavoro, ma la programmazione reattiva sembra valere la pena.

Ascolto di ListView/recyclerview scorrimento (e altri animali noioso) non è la preoccupazione e ottenere un osservabile è facile con Retrofit:

@GET("/api/messages") 
Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit); 

non riesco proprio a capire il modello da utilizzare nella programmazione reattiva.

L'operatore Concat sembra un buon punto di partenza, insieme a ConnectableObservable a un certo punto per rinviare l'emissione e forse flatMap, ma come?

EDIT:

Ecco il mio attuale soluzione (naive):

public interface Paged<T> { 
    boolean isLoading(); 
    void cancel(); 
    void next(int count); 
    void next(int count, Scheduler scheduler); 
    Observable<List<T>> asObservable(); 
    boolean hasCompleted(); 
    int position(); 
} 

E la mia implementazione utilizzando un soggetto:

public abstract class SimplePaged<T> implements Paged<T> { 

    final PublishSubject<List<T>> subject = PublishSubject.create(); 
    private volatile boolean loading; 
    private volatile int offset; 
    private Subscription subscription; 

    @Override 
    public boolean isLoading() { 
     return loading; 
    } 

    @Override 
    public synchronized void cancel() { 
     if(subscription != null && !subscription.isUnsubscribed()) 
      subscription.unsubscribe(); 

     if(!hasCompleted()) 
      subject.onCompleted(); 

     subscription = null; 
     loading = false; 
    } 

    @Override 
    public void next(int count) { 
     next(count, null); 
    } 

    @Override 
    public synchronized void next(int count, Scheduler scheduler) { 
     if (isLoading()) 
      throw new IllegalStateException("you can't call next() before onNext()"); 

     if(hasCompleted()) 
      throw new IllegalStateException("you can't call next() after onCompleted()"); 

     loading = true; 

     Observable<List<T>> obs = onNextPage(offset, count).single(); 

     if(scheduler != null) 
      obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler! 

     subscription = obs.subscribe(this::onNext, this::onError, this::onComplete); 
    } 

    @Override 
    public Observable<List<T>> asObservable() { 
     return subject.asObservable(); 
    } 

    @Override 
    public boolean hasCompleted() { 
     return subject.hasCompleted(); 
    } 

    @Override 
    public int position() { 
     return offset; 
    } 

    /* Warning: functions below may be called from another thread */ 
    protected synchronized void onNext(List<T> items) { 
     if (items != null) 
      offset += items.size(); 

     loading = false; 

     if (items == null || items.size() == 0) 
      subject.onCompleted(); 
     else 
      subject.onNext(items); 
    } 

    protected synchronized void onError(Throwable t) { 
     loading = false; 
     subject.onError(t); 
    } 

    protected synchronized void onComplete() { 
     loading = false; 
    } 

    abstract protected Observable<List<T>> onNextPage(int offset, int count); 

} 
+0

Mi aspetto un osservabile che emetta eventi quando i nuovi messaggi devono essere recuperati, cioè il fondo della pagina viene raggiunto. Puoi quindi iscrivere una funzione a questo osservabile per recuperare i messaggi su ogni evento e aggiungerli alla pagina. – Calavoow

+0

@ nono240: prima di leggere la risposta di lopar mi sono ritrovato con qualcosa di "concettualmente" simile a quello che hai fatto (avendo fondamentalmente uno stato di caricamento). Btw: Penso che puoi sostituire i diversi "loading = false" con un "finallyDo" centralizzato (http://reactivex.io/documentation/operators/do.html) –

+0

@ nono240 inoltre: l'operatore "withLatestFrom" (https: // github.com/ReactiveX/RxJava/releases/tag/v1.0.7) è stato recentemente rilasciato come sperimentale e forse può essere utile in questo caso, indagherò più avanti (sono nuovo alla programmazione Rx quindi vediamo, forse non ha senso !) –

risposta

5

Ecco uno su alcuni possibili modi per gestire reattiva paging. Supponiamo di avere un metodo getNextPageTrigger che restituisce uno Observable emette qualche oggetto evento quando il listener di scorrimento (o qualsiasi altro input) desidera che venga caricata una nuova pagina. Nella vita reale probabilmente avrebbe l'operatore debounce, ma in aggiunta ci assicureremo di attivarlo solo dopo aver caricato l'ultima pagina.

Definiamo anche un metodo per scartare i messaggi dalla loro lista:

Observable<Message> getPage(final int page) { 
    return service.getMessages(page * PAGE_SIZE, PAGE_SIZE) 
     .flatMap(messageList -> Observable.from(messageList)); 
} 

allora si può fare la logica recupero effettivo:

// Start with the first page. 
getPage(0) 
    // Add on each incremental future page. 
    .concatWith(Observable.range(1, Integer.MAX_VALUE) 
     // Uses a little trick to get the next page to wait for a signal to load. 
     // By ignoring all actual elements emitted and casting, the trigger must 
     // complete before the actual page request will be made. 
     .concatMap(page -> getNextPageTrigger().limit(1) 
      .ignoreElements() 
      .cast(Message.class) 
      .concatWith(getPage(page))) // Then subscribe, etc.. 

Questo manca ancora un paio di cose potenzialmente importanti:

1 - Questo ovviamente non sa quando interrompere il recupero di pagine aggiuntive, il che significa che una volta che raggiunge la fine, a seconda di cosa restituisce il server, potrebbe o k eep colpendo errori o risultati vuoti ogni volta che viene attivato lo scorrimento. L'approccio alla risoluzione di questo dipende da come segnali al cliente che non ci sono più pagine da caricare.

2 - Se sono necessari tentativi di errore, suggerirei di esaminare l'operatore retryWhen. In caso contrario, gli errori di rete comuni potrebbero causare la propagazione di un errore nel caricamento di una pagina.

+0

Ho finito per fare qualcosa del genere (vedi la mia modifica), ma non sono soddisfatto al 100%. –

+0

Mi sono appena perso il fatto che la soluzione non consente di recuperare le pagine "su richiesta" quando necessario durante lo scorrimento del contenuto. –

+0

Questo è ciò che 'getNextPageTrigger()' è per, in realtà. è qualsiasi cosa osservabile si pompi lì, che potrebbe essere attivata dall'utente o potrebbe essere un 'Subject' alimentato da un ascoltatore di scroll. Fondamentalmente qualunque cosa tu voglia che sia. :) – lopar

Problemi correlati