2014-09-03 30 views
7

Ho due osservabili (denominati A e B per semplicità) e un abbonato. Quindi, il Sottoscrittore sottoscrive A e se c'è un errore su A allora B (che è il fallback) entra in gioco. Ora, ogni volta che A colpisce un errore B viene chiamato fine, tuttavia A chiama onComplete() sull'abbonato, quindi B risposta non raggiunge mai l'abbonato anche se l'esecuzione B ha esito positivo.RxJava onErrorResumeNext()

È questo il comportamento normale? Ho pensato aErrorResumeNext() dovrebbe continuare il flusso e informare l'abbonato una volta completato come indicato nella documentazione (https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext).

Questa è la struttura complessiva di quello che sto facendo (omessa diversi codice di "noioso"):

public Observable<ModelA> observeGetAPI(){ 
    return retrofitAPI.getObservableAPI1() 
      .flatMap(observableApi1Response -> { 
       ModelA model = new ModelA(); 

       model.setApi1Response(observableApi1Response); 

       return retrofitAPI.getObservableAPI2() 
         .map(observableApi2Response -> { 
          // Blah blah blah... 
          return model; 
         }) 
         .onErrorResumeNext(observeGetAPIFallback(model)) 
         .subscribeOn(Schedulers.newThread()) 
      }) 
      .onErrorReturn(throwable -> { 
       // Blah blah blah... 
       return model; 
      }) 
      .subscribeOn(Schedulers.newThread()); 
} 

private Observable<ModelA> observeGetAPIFallback(ModelA model){ 
    return retrofitAPI.getObservableAPI3().map(observableApi3Response -> { 
     // Blah blah blah... 
     return model; 
    }).onErrorReturn(throwable -> { 
     // Blah blah blah... 
     return model; 
    }) 
    .subscribeOn(Schedulers.immediate()); 
} 

Subscription subscription; 
subscription = observeGetAPI.subscribe(ModelA -> { 
    // IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE... 
}, throwable ->{ 
    // WE NEVER GET HERE... onErrorResumeNext() 
}, 
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED } 
); 

Tutte le idee che sto facendo male?

Grazie!

EDIT: Ecco una cronologia approssimativa di ciò che sta succedendo:

---> HTTP GET REQUEST B 
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS) 

---> HTTP GET REQUEST A 
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!) 

---> HTTP GET FALLBACK A 
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time. 
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS) 

Ed ecco un link ad un semplice schema che ho fatto, che rappresentano quello che voglio che accada: Diagram

+0

La sequenza temporale mostra HTTP 200 per la risposta di errore. C'è qualche altro modo per segnalare un errore da getObservableAPI2()? Inoltre, puoi specificare quali richieste API corrispondono all'output della timeline? Sembra getObservableAPI1-> REQUEST B, getObservableAPI2-> RICHIEDI A, getObservableAPI3-> FALLBACK A ma voglio solo assicurarmi. – kjones

+0

Sì, in realtà anche se la risposta è 200, alcuni dati potrebbero risultare nulli, quindi lancio e errore in questi scenari. E sì, questa è la relazione delle richieste della timeline, modificheremo la domanda al più presto per far corrispondere la richiesta della timeline come tua. – mradzinski

+0

La tua logica sembra sana. Dovresti ricevere la risposta di fallback prima di onComplete. Puoi rimuovere tutte le chiamate subscribeOn() e vedere cosa succede. Non dovrebbero essere necessari poiché Retrofit esegue comunque le richieste sul proprio pool di thread. – kjones

risposta

5

La Rx le chiamate utilizzate in seguito dovrebbero simulare ciò che si sta facendo con Retrofit.

fallbackObservable = 
     Observable 
       .create(new Observable.OnSubscribe<String>() { 
        @Override 
        public void call(Subscriber<? super String> subscriber) { 
         logger.v("emitting A Fallback"); 
         subscriber.onNext("A Fallback"); 
         subscriber.onCompleted(); 
        } 
       }) 
       .delay(1, TimeUnit.SECONDS) 
       .onErrorReturn(new Func1<Throwable, String>() { 
        @Override 
        public String call(Throwable throwable) { 
         logger.v("emitting Fallback Error"); 
         return "Fallback Error"; 
        } 
       }) 
       .subscribeOn(Schedulers.immediate()); 

stringObservable = 
     Observable 
       .create(new Observable.OnSubscribe<String>() { 
        @Override 
        public void call(Subscriber<? super String> subscriber) { 
         logger.v("emitting B"); 
         subscriber.onNext("B"); 
         subscriber.onCompleted(); 
        } 
       }) 
       .delay(1, TimeUnit.SECONDS) 
       .flatMap(new Func1<String, Observable<String>>() { 
        @Override 
        public Observable<String> call(String s) { 
         logger.v("flatMapping B"); 
         return Observable 
           .create(new Observable.OnSubscribe<String>() { 
            @Override 
            public void call(Subscriber<? super String> subscriber) { 
             logger.v("emitting A"); 
             subscriber.onNext("A"); 
             subscriber.onCompleted(); 
            } 
           }) 
           .delay(1, TimeUnit.SECONDS) 
           .map(new Func1<String, String>() { 
            @Override 
            public String call(String s) { 
             logger.v("A completes but contains invalid data - throwing error"); 
             throw new NotImplementedException("YUCK!"); 
            } 
           }) 
           .onErrorResumeNext(fallbackObservable) 
           .subscribeOn(Schedulers.newThread()); 
        } 
       }) 
       .onErrorReturn(new Func1<Throwable, String>() { 
        @Override 
        public String call(Throwable throwable) { 
         logger.v("emitting Return Error"); 
         return "Return Error"; 
        } 
       }) 
       .subscribeOn(Schedulers.newThread()); 

subscription = stringObservable.subscribe(
     new Action1<String>() { 
      @Override 
      public void call(String s) { 
       logger.v("onNext " + s); 
      } 
     }, 
     new Action1<Throwable>() { 
      @Override 
      public void call(Throwable throwable) { 
       logger.v("onError"); 
      } 
     }, 
     new Action0() { 
      @Override 
      public void call() { 
       logger.v("onCompleted"); 
      } 
     }); 

L'uscita dalle dichiarazioni di log è:

 
RxNewThreadScheduler-1 emitting B 
RxComputationThreadPool-1 flatMapping B 
RxNewThreadScheduler-2 emitting A 
RxComputationThreadPool-2 A completes but contains invalid data - throwing error 
RxComputationThreadPool-2 emitting A Fallback 
RxComputationThreadPool-1 onNext A Fallback 
RxComputationThreadPool-1 onCompleted 

Questo mi sembra quello che stai cercando, ma forse mi manca qualcosa.