2014-07-15 13 views
7

Ho problemi con il concatenamento di osservabili utilizzando il supporto RxJava di retrofit. Probabilmente fraintendo come usarlo, altrimenti potrebbe esserci un bug in retrofit. Spero che qualcuno qui possa aiutarmi a capire cosa sta succedendo. Modifica: Sto utilizzando il MockRestAdapter per queste risposte - questo potrebbe essere rilevante in quanto vedo che le implementazioni di RxSupport differiscono leggermente.Concatenazione di servizi di retrofit con supporto RxJava

Questa è un'app bancaria falsa. Sta provando a fare un trasferimento e, una volta completato il trasferimento, dovrebbe fare una richiesta di account per aggiornare i valori dell'account. Questo è fondamentalmente solo una scusa per me per provare flatMap. Il codice seguente, purtroppo, non funziona, non abbonati mai ricevere la notifica:

Caso 1: concatenamento di due osservabili retrofit-prodotta

Il servizio di trasferimento (nota: restituisce un retrofit-prodotta osservabile):

@FormUrlEncoded @POST("/user/transactions/") 
public Observable<TransferResponse> transfer(@Field("session_id") String sessionId, 
              @Field("from_account_number") String fromAccountNumber, 
              @Field("to_account_number") String toAccountNumber, 
              @Field("amount") String amount); 

Il servizio di conto (nota: restituisce un retrofit-prodotta osservabile):

@FormUrlEncoded @POST("/user/accounts") 
public Observable<List<Account>> getAccounts(@Field("session_id") String sessionId); 

Catene due osservabili retrofit-prodotta insieme:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount) 
      .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() { 
       @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) { 
        return accountsService.getAccounts(session.getSessionId()); 
       } 
      }) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()); 

Caso 2: creare il mio osservabili e concatenamento con un retrofit-prodotto uno

se ignoro il supporto integrato Rx nel retrofit per la " chiamata piatta mappata, funziona perfettamente! Tutti gli abbonati ricevono una notifica. Vedi sotto:

Il nuovo servizio di conti (nota: non produce un osservabile):

@FormUrlEncoded @POST("/user/accounts") 
public List<Account> getAccountsBlocking(@Field("session_id") String sessionId); 

creare la mia osservabile ed emettono le voci me:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount) 
      .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() { 
       @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) { 
        return Observable.create(new Observable.OnSubscribe<List<Account>>() { 
         @Override public void call(Subscriber<? super List<Account>> subscriber) { 
          List<Account> accounts = accountsService.getAccountsBlocking(session.getSessionId()); 
          subscriber.onNext(accounts); 
          subscriber.onCompleted(); 
         } 
        }); 
       } 
      }) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()); 

Qualsiasi aiuto sarebbe molto apprezzato!

+0

Quale versione di retrofit e rxjava stai usando? –

risposta

5

La risposta è sì, si dovrebbe essere in grado di collegare osservabili da Retrofit. Sembra esserci un bug nel MockRestAdapter $ MockRxSupport: createMockObservable classe privata. Il modo in cui viene eseguita la pianificazione rispetto alla sottoscrizione dell'abbonato all'osservabile sembra errato. La sottoscrizione all'osservabile viene dopo nel thread HttpExecutor stesso viene avviato. Credo che il flusso originale che proviene dal tuo thread Schedulers.io() sia stato completato e cancellato prima che il mockHandler.invokeSync restituito a Observable possa essere sottoscritto. Speriamo che questa spiegazione abbia un qualche senso se date un'occhiata al codice nel modulo retrofit-mock.

Per ovviare al problema con il codice corrente quando si utilizza solo il retrofit-mock, è possibile sostituire l'Executor interno predefinito con un'implementazione ImmediateExecutor. Ciò consentirebbe almeno quando testare i mock per avere un flusso di thread singolo che verrebbe fornito dal tuo Schedulers.io.

// ImmediateExecutor.java 
public class ImmediateExecutor implements Executor { 
    @Override 
    public void execute(Runnable command) { 
     command.run(); 
    } 
} 

// Create your RestAdapter with your ImmdiateExecutor 
RestAdapter adapter = new RestAdapter.Builder() 
      .setEndpoint(endpoint) 
      .setExecutors(new ImmediateExecutor(), null) 
      .build(); 

Per quanto riguarda fissare il problema alla fonte è possibile includere anche il progetto retrofit-finto come sorgente nel progetto e modificare il MockRestAdapter $ MockRxSupport: Metodo createMockObservable usando il codice seguente. Ho testato il tuo caso d'uso e risolve il problema.

--- MockRestAdapter.java $ MockRxSupport ----

Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo, 
     final RequestInterceptor interceptor, final Object[] args) { 
     return Observable.create(new Observable.OnSubscribe<Object>() { 
     @Override public void call(final Subscriber<? super Object> subscriber) { 
      try { 
      if (subscriber.isUnsubscribed()) return; 
      Observable observable = 
       (Observable) mockHandler.invokeSync(methodInfo, interceptor, args); 

      observable.subscribeOn(Schedulers.from(httpExecutor)); 

      //noinspection unchecked 
      observable.subscribe(subscriber); 

      } catch (RetrofitError e) { 
      subscriber.onError(errorHandler.handleError(e)); 
      } catch (Throwable e) { 
      subscriber.onError(e); 
      } 
     } 
     }); 
    } 

Creato un problema con il progetto Retrofit here, vedremo se lo accettano.

+0

Questo ha senso, me lo stavo chiedendo anch'io, ma dal momento che sto imparando sia Retrofit che Rx allo stesso tempo ho pensato che stavo sbagliando. Grazie per aver archiviato il bug, lo seguirò per vedere cosa succede lì. –

+0

Nessun problema, felice di poterti aiutare. –

Problemi correlati