2016-03-11 25 views
20

Ho un Observable<<List<Foo>> getFoo() creato da un servizio di riconversione e dopo aver chiamato il metodo .getFoo(), ho bisogno di condividerlo con più abbonati. Chiamando il metodo .share(), la chiamata di rete può essere rieseguita. Replay Operator non funziona neanche. So che una potenziale soluzione potrebbe essere .cache(), ma non so perché questo comportamento è causato.Singolo osservabile con più abbonati

Retrofit retrofit = new Retrofit.Builder() 
      .baseUrl(API_URL) 
      .addConverterFactory(GsonConverterFactory.create()) 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build(); 

    // Create an instance of our GitHub API interface. 

    // Create a call instance for looking up Retrofit contributors. 

    Observable<List<Contributor>> testObservable = retrofit 
      .create(GitHub.class) 
      .contributors("square", "retrofit") 
      .share(); 


    Subscription subscription1 = testObservable 
    .subscribe(new Subscriber<List<Contributor>>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable throwable) { 

     } 

     @Override 
     public void onNext(List<Contributor> contributors) { 
      System.out.println(contributors); 
     } 
    }); 

    Subscription subscription2 = testObservable 
      .subscribe(new Subscriber<List<Contributor>>() { 
       @Override 
       public void onCompleted() { 

       } 

       @Override 
       public void onError(Throwable throwable) { 

       } 

       @Override 
       public void onNext(List<Contributor> contributors) { 
        System.out.println(contributors + " -> 2"); 
       } 
      }); 

    subscription1.unsubscribe(); 
    subscription2.unsubscribe(); 

Il codice sopra riportato può riprodurre il comportamento sopra descritto. È possibile eseguire il debug e verificare che gli elenchi ricevuti appartengano a un diverso indirizzo di memoria.

Ho anche considerato ConnectableObservables come una potenziale soluzione, ma questo mi richiede di portare l'osservabile originale in giro e chiamare .connect() ogni volta che voglio aggiungere un nuovo Sottoscrittore.

Questo tipo di comportamento con .share() funzionava correttamente fino a Retrofit 1.9. Ha smesso di funzionare su Retrofit 2 - beta. Non l'ho ancora testato con la Retrofit 2 Release Version, rilasciata alcune ore fa.

EDIT: 01/02/2017

Per i lettori futuri, ho scritto un articolo che spiega here di più sul caso!

risposta

23

Sembra che tu stia (implicitamente) trasmettendo il tuo ConnectedObservable restituito da .share() a un normale Observable. Potresti voler leggere la differenza tra osservabili caldi e freddi.

Prova

ConnectedObservable<List<Contributor>> testObservable = retrofit 
     .create(GitHub.class) 
     .contributors("square", "retrofit") 
     .share(); 

Subscription subscription1 = testObservable 
    .subscribe(new Subscriber<List<Contributor>>() { 
    @Override 
    public void onCompleted() { 

    } 

    @Override 
    public void onError(Throwable throwable) { 

    } 

    @Override 
    public void onNext(List<Contributor> contributors) { 
     System.out.println(contributors); 
    } 
}); 

Subscription subscription2 = testObservable 
     .subscribe(new Subscriber<List<Contributor>>() { 
      @Override 
      public void onCompleted() { 

      } 

      @Override 
      public void onError(Throwable throwable) { 

      } 

      @Override 
      public void onNext(List<Contributor> contributors) { 
       System.out.println(contributors + " -> 2"); 
      } 
     }); 

testObservable.connect(); 
subscription1.unsubscribe(); 
subscription2.unsubscribe(); 

Edit: Non è necessario chiamare connect() ogni volta che si desidera un nuovo abbonamento è sufficiente per avviare il osservabile. Suppongo che si potrebbe usare replay() per assicurarsi che tutti gli abbonati successivi ottengono tutti gli articoli prodotti

ConnectedObservable<List<Contributor>> testObservable = retrofit 
     .create(GitHub.class) 
     .contributors("square", "retrofit") 
     .share() 
     .replay() 
+0

Grazie per la risposta. Il fatto è che voglio davvero evitare di chiamare ogni volta. Sei sicuro che l'operatore di riproduzione funzionerà correttamente con questo caso d'uso? – Pavlos

+0

In realtà l'ho provato e ha funzionato. Grazie per il vostro tempo e la vostra risposta. Solo per il problema, ho letto la differenza tra gli osservabili caldi e freddi, ma non è riuscito a riprodurli con le chiamate di rete con Retrofit. Se ho usato un Observable.just() l'operatore di condivisione stava lavorando abbastanza bene. – Pavlos

30

Dopo aver controllato indietro con RxJava sviluppatore Dávid Karnok vorrei proporre una spiegazione completa di ciò che stava succedendo qui.

share() è definito come publish().refCount(), i. e. la fonte Observable viene prima trasformata in publish() per ma invece di dover chiamare "connect()" manualmente "quella parte viene gestita da refCount(). In particolare, refCount chiamerà connect() sullo ConnectableObservable quando riceve la prima sottoscrizione; poi, finché c'è almeno un abbonato rimarrà iscritto; e, infine, quando il numero di abbonati scende a 0 annullerà la sottoscrizione verso l'alto. Con coldObservables, come quelli restituiti da Retrofit, questo interromperà qualsiasi calcolo in esecuzione.

Se dopo uno di questi cicli arriva un altro abbonato, refCount chiamerà nuovamente connect e quindi innescherà un nuovo abbonamento alla sorgente osservabile. In questo caso, attiverà un'altra richiesta di rete.

Ora, questo di solito non è diventato evidente con Retrofit 1 (e in effetti qualsiasi versione precedente a this commit), poiché queste versioni precedenti di Retrofit per impostazione predefinita spostavano tutte le richieste di rete su un altro thread.Questo di solito significava che tutte le tue chiamate subscribe() si sarebbero verificate mentre la prima richiesta/Observable era ancora in esecuzione e pertanto i nuovi Subscriber s verrebbero semplicemente aggiunti allo refCount e pertanto non attiverebbero richieste aggiuntive/Observables.

Le versioni più recenti di Retrofit, tuttavia, non spostano di default il lavoro su un altro thread: è necessario farlo esplicitamente chiamando, ad esempio, subscribeOn(Schedulers.io()). Se non lo fai, tutto rimarrà sul thread corrente, il che significa che il secondo subscribe() si verificherà solo dopo che il primo Observable ha chiamato onCompleted e quindi dopo tutto lo Subscribers hanno annullato l'iscrizione e tutto viene spento. Ora, come abbiamo visto nel primo paragrafo, quando viene chiamato il secondo subscribe(), share() non ha altra scelta che causare un altro Subscription all'origine osservabile e attivare un'altra richiesta di rete.

Quindi, per tornare al comportamento a cui sei abituato da Retrofit 1, è sufficiente aggiungere subscribeOn(Schedulers.io()).

Ciò dovrebbe comportare l'esecuzione solo della richiesta di rete, la maggior parte delle volte. In linea di principio, tuttavia, è possibile ottenere più richieste (e sempre con Retrofit 1), ma solo se le richieste di rete sono estremamente veloci e/o le chiamate subscribe() si verificano con notevole ritardo, in modo che, di nuovo, la prima richiesta sia terminato quando si verifica il secondo subscribe().

Pertanto, Dávid suggerisce di utilizzare cache() (ma ha gli svantaggi che hai citato) o replay().autoConnect(). Secondo questi release notes, autoConnect funziona come solo la prima metà di refCount, o più precisamente, è

simile nel comportamento a refcount(), tranne che non scollega quando gli abbonati sono persi.

Ciò significa la richiesta sarà attivato solo quando il primo subscribe() accade ma poi tutti i successivi Subscriber s riceveranno tutti gli elementi emessi, indipendentemente dal fatto che ci fosse, in qualsiasi momento tra, 0 abbonati.

+1

Grazie per la tua spiegazione estesa. Ci penserò sicuramente, anche se ora sono abbastanza sicuro di avere alcune soluzioni comode per questo problema :) – Pavlos

+2

Great! Volevo solo aggiungere questa spiegazione perché 'replay',' share', 'publish', ecc. Sono abbastanza complicati e non fa male avere spiegazioni dettagliate dei casi limite. –

+2

Ottima risposta davvero :) –

Problemi correlati