2016-04-11 15 views
12

Sto sviluppando un'applicazione per Android con sincronizzazione dei dati in background. Attualmente sto usando RxJava per pubblicare alcuni dati sul server a intervalli regolari. Oltre a questo, vorrei fornire all'utente un pulsante "force sync" che attiverà la sincronizzazione immediatamente. So come usare Observable.interval() per inviare dati in intervalli di tempo regolari e vorrei sapere come usare Observalbe.just() per forzare quello che è forzato, ma mi piacerebbe metterli in coda se succede così che uno viene attivato mentre il precedente ancora piste.Attività di accodamento con RxJava in Android

Quindi, prendiamo esempio quando 1min è intervallo di sincronizzazione automatica, e diciamo che la sincronizzazione dura 40 secondi (ho esagerato qui solo per rendere più facile il punto). Ora, se per caso, l'utente preme il pulsante "force" quando l'automatico è ancora in esecuzione (o viceversa - i trigger automatici quando quello forzato è ancora in esecuzione), mi piacerebbe accodare la seconda richiesta di sincronizzazione per andare proprio come il primo finisce.

Ho disegnare questa immagine che potrebbe mettere un po 'di più la prospettiva ad esso:

enter image description here

Come si può vedere, l'automatica viene attivata (da alcuni Observable.interval()), e nel bel mezzo della sincronizzazione, l'utente preme il pulsante "force". Ora vogliamo aspettare che la prima richiesta finisca e poi ricominciare per la richiesta forzata. A un certo punto, mentre la richiesta forzata era in esecuzione, la nuova richiesta automatica è stata attivata di nuovo che l'ha appena aggiunta alla coda. Dopo che l'ultimo fu terminato dalla coda, tutto si fermò, e poi l'automatico fu programmato di nuovo poco dopo.

Spero che qualcuno possa indicarmi come correggere l'operatore. Ho provato con Observable.combineLatest(), ma l'elenco delle code è stato inviato all'inizio e quando ho aggiunto la nuova sincronizzazione alla coda non è proseguita quando l'operazione precedente è stata completata.

Ogni aiuto è molto apprezzato, Darko

risposta

11

È possibile farlo attraverso la fusione il timer con il tasto clic Observable/Subject, utilizzare l'effetto messa in coda di onBackpressureBuffer e concatMap la trasformazione in esso che fa in modo che gestisce uno Al tempo.

PublishSubject<Long> subject = PublishSubject.create(); 

Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS); 

periodic.mergeWith(subject) 
.onBackpressureBuffer() 
.concatMap(new Func1<Long, Observable<Integer>>() { 
    @Override 
    public Observable<Integer> call(Long v) { 
     // simulates the task to run 
     return Observable.just(1) 
       .delay(300, TimeUnit.MILLISECONDS); 
    } 
} 
).subscribe(System.out::println, Throwable::printStackTrace); 

Thread.sleep(1100); 
// user clicks a button 
subject.onNext(-1L); 

Thread.sleep(800); 
+0

Ok, questa è una buona direzione, ma ho un paio di domande ... Che succede con la funzione anonima? Non sto utilizzando Java 8 perché devo sviluppare per il livello API 16. Inoltre, quale linea simula il pulsante premi? –

+0

Il lambda indica dove dovrebbe avvenire la sincronizzazione, in una forma Osservabile. Il subject.onNext() è dove l'utente fa clic su un pulsante. – akarnokd

+0

Aggiungendo un'altra domanda se puoi aiutarmi. Così l'ho proposto come [@akarnokd] (http://stackoverflow.com/users/61158/akarnokd) suggerito e tutto funziona perfettamente, proprio quello di cui ho bisogno. Ora sto cercando di ottenere la chiamata 'onComplete()' dopo che la catena è finita. Se guardi l'immagine qui sopra, la prima catena terminerebbe dopo il 3 ° processo di sincronizzazione. Questo perché forse mi piacerebbe aggiornare l'interfaccia utente dopo che la catena è finita piuttosto che dopo ogni sincronizzazione. –

3

Anche se v'è una risposta accettata con una buona soluzione vorrei condividere un'altra opzione per farlo usando Scheduler e SingleThreadExecutor

public static void main(String[] args) throws Exception { 
    System.out.println(" init "); 
    Observable<Long> numberObservable = 
      Observable.interval(700, TimeUnit.MILLISECONDS).take(10); 

    final Subject subject = PublishSubject.create(); 

    Executor executor = Executors.newSingleThreadExecutor(); 
    Scheduler scheduler = Schedulers.from(executor); 
    numberObservable.observeOn(scheduler).subscribe(subject); 

    subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"), 
        onCompleteFunc("subscriber 1")); 

    Thread.sleep(800); 
    //simulate action 
    executor.execute(new Runnable() { 
     @Override 
     public void run() { 
      subject.onNext(333l); 
     } 
    }); 

    Thread.sleep(5000); 
} 

static Action1<Long> onNextFunc(final String who) { 
    return new Action1<Long>() { 
     public void call(Long x) { 
      System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName() 
        + " -- " + System.currentTimeMillis()); 
      try { 
       //simulate some work 
       Thread.sleep(500); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    }; 
} 

static Action1<Throwable> onErrorFunc(final String who) { 
    return new Action1<Throwable>() { 
     public void call(Throwable t) { 
      t.printStackTrace(); 
     } 
    }; 
} 

static Action0 onCompleteFunc(final String who) { 
    return new Action0() { 
     public void call() { 
      System.out.println(who + " complete"); 
     } 
    }; 
}