2015-05-12 18 views
7

Desidero eseguire 2 chiamate di rete in modo asincrono: sto utilizzando Retrofit + RxJava per ottenere ciò. Questa logica proviene da una semplice classe Runner per testare la soluzione. NOTA: Questo riguarda principalmente RxJava sul lato server.Uso corretto di Retrofit + RxJava's combineLatest

Il mio codice presenta come di seguito:

public static void main(String[] args) throws Exception { 
    Api api = ...; 

    Observable.combineLatest(
     api.getStates(), 
     api.getCmsContent(), 
     new Func2<List<States>, CmsContent, String>() { 
     @Override public String call(List<State> states, CmsContent content) { 
      ... 
      return "PLACEHOLDER"; 
     } 
     }) 
     .observeOn(Schedulers.immediate()) 
     .subscribeOn(Schedulers.immediate()) 
     .subscribe(new Observer<String>() { 
     @Override public void onCompleted() { 
      System.out.println("COMPLETED"); 
     } 

     @Override public void onError(Throwable e) { 
      System.out.println("ERROR: " + e.getMessage()); 
     } 

     @Override public void onNext(String s) { 
      // I don't care what's returned here 
     } 
     }); 
} 

tre domande:

  1. È Observable.combineLatest l'operatore migliore da utilizzare quando si desidera eseguire più REST chiama in modo asincrono e procedere quando tutte le chiamate hanno finito ?
  2. L'implementazione Func2 attualmente restituisce String. Dopo l'esecuzione delle 2 chiamate API, elaborerò i risultati nel metodo Func2#call(). Non mi interessa cosa viene restituito - ci deve essere un modo migliore per gestire questo, però - ho ragione?
  3. Le chiamate API sono eseguite correttamente con il codice sopra. Ma il metodo main non completa con il corretto Process finished with exit code 0 quando eseguo il programma. Cosa potrebbe causare il blocco del codice?

AGGIORNAMENTO - 2015-05-14

Sulla base della raccomandazione, ho cambiato la logica per la seguente:

public static void main(String[] args) throws Exception { 
    Api api = ...; 

    Observable.zip(
     api.getStates(), 
     api.getCmsContent(), 
     new Func2<List<States>, CmsContent, Boolean>() { 
     @Override public Boolean call(List<State> states, CmsContent content) { 
      // process data 
      return true; 
     } 
     }) 
     .subscribeOn(Schedulers.io()) 
     .toBlocking() 
     .first(); 
} 

Questo appare come la soluzione ero cercando. Lo userò per un po 'di tempo per vedere se mi imbatto in problemi.

risposta

1

1) No, il migliore è utilizzare zip(). Combina l'ultimo è buono se uno dei due (o più) apis restituisce risultati "più lenti" diversi/ha l'essenza del caching.

2) Fun2 facilita la fusione dei risultati. È meglio (per quanto riguarda l'architettura) elaborare il risultato in onNext() o onError(). È possibile utilizzare una semplice classe Pair<T,Y> per passare i risultati da Func2 a onNext().

3) Niente di sbagliato. Il risultato come detto dovrebbe essere elaborato in onNext() e non nel onComplete. Secondo Retrofit's source code i risultati vengono passati solo (ovviamente coretto) nel onNext().

Spero che questi aiuti.

5

1) Se si sa che si avrà un singolo valore su entrambi i percorsi, è pari a zip.

2) Cosa ti piacerebbe fare? Otterrai la coppia di valori nel tuo Func2 e se non ti interessa davvero cosa viaggia con onNext, restituisci un valore del tuo abbinamento.

3) Schedulers.immediate() non è un vero programmatore in un certo senso ed è abbastanza incline a scenari di deadlock dello stesso pool. Non hai davvero bisogno di usarlo. Se si desidera bloccare il thread principale fino al termine del lavoro asincrono, utilizzare toBlocking().first() ad esempio.

+0

Grazie, ho aggiornato il mio esempio di codice con i tuoi consigli. – Kasa

1

mi rendo conto che sto un anno di ritardo su questo, ma la modifica pubblicato dal PO su 2015/05/14 non soddisfa la sua esigenza originale:

voglio eseguire 2 chiamate di rete asincrono

  1. osservabili getStates e getCmsContent non verrà eseguito in concomitanza a meno che non si abbonano singolarmente su thread separati. Questo è un punto cruciale omesso nel suo post e nessuno della risposta precedente lo ha definito.

    Observable.fromCallable(() -> doStuff()) 
        .subscribeOn(Schedulers.computation()); 
    

Come detto @akarnokd, nel caso in cui entrambi i flussi hanno valori singoli, zip e combineLatest comportarsi in modo simile. La funzione di unione si bloccherà fino al ritorno di entrambi getStates e getCmsContent ma, come mostrato sopra, ognuno di essi viene eseguito contemporaneamente su thread separati.

  1. Un'altra soluzione dipende dalla capacità di fondere la List<States> e CmsContent man mano che arrivano. Dato il suo codice, c'è chiaramente un "titolare di dati" di qualche tipo (non mostrato) perché quello che sta restituendo è un Boolean, non i dati uniti. Di seguito, forEach viene eseguito contemporaneamente.

    Observable.just(api.getStates(), api.getCmsContent()) 
    // subscribe on separate thread as shown previously 
    .flatMap(this::buildObservable) 
    .toBlocking() 
    // executes concurrently 
    .forEach(item -> { 
        // merge into "data holder"   
    }); 
    

Naturalmente, questo codice presenta il problema di non essere fortemente tipizzato in modo che sia una scelta da fare.