2016-04-23 13 views
7

Ho il seguente problema:RxJava condividere le emissioni di un 'osservabile tra più abbonati

Ho un osservabile che sta facendo un certo lavoro, ma altri osservabili bisogno l'uscita di quella osservabile al lavoro. Ho provato a iscriversi più volte sullo stesso osservabile, ma all'interno del registro vedo che l'osservabile originale viene avviato più volte.

thats il mio questo è osservabili creare l'oggetto:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> { 
      if (mApi == null) { 
       //do some work 
      } 
      subscriber.onNext(mApi); 
      subscriber.unsubscribe(); 
     }) 

thats il mio osservabile che ha bisogno l'oggetto

loadApi().flatMap(api -> api....())); 

sto usando

.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread()) 
       .unsubscribeOn(Schedulers.io() 

su tutti osservabili.

risposta

12

Non sono sicuro di aver capito correttamente la tua domanda, ma immagino che tu stia cercando un modo per condividere le emissioni di un osservabile tra più abbonati. Ci sono diversi modi per farlo. Per uno, è possibile utilizzare un Connectable Observable in questo modo:

ConnectableObservable<Integer> obs = Observable.range(1,3).publish(); 
obs.subscribe(item -> System.out.println("Sub A: " + item)); 
obs.subscribe(item -> System.out.println("Sub B: " + item)); 
obs.connect(); //Now the source observable starts emitting items 

uscita:

Sub A: 1 
Sub B: 1 
Sub A: 2 
Sub B: 2 
Sub A: 3 
Sub B: 3 

In alternativa, è possibile utilizzare un PublishSubject:

PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject 
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject 
subject.subscribe(item -> System.out.println("Sub B: " + item));  
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable 

uscita:

Sub A: 1 
Sub B: 1 
Sub A: 2 
Sub B: 2 
Sub A: 3 
Sub B: 3 

Entrambi o Se questi esempi sono a thread singolo, è possibile aggiungere facilmente le chiamate observOn o subscirbeOn per renderli asincroni.

+0

sì che voglio il primo risultato della osservabile su ogni iscriversi – H3x0n

+0

cosa dovrei farlo quando non so con quale frequenza il mio osservabile sarà iscritto e quando l'abbonato dovrebbe ottenere il risultato prima che tutti siano abbonati – H3x0n

+0

è praticamente risposto in [questa domanda] (http://stackoverflow.com/questions/35951942/single-observable-with-multiple-subscribers) – JohnWowUs

1

Prima di tutto utilizzare Observable.create è complicato e facile da sbagliare. Hai bisogno di qualcosa di simile a

Observable.create(subscriber -> { 
     if (mApi == null) { 
      //do some work 
     } 
     if (!subscriber.isUnsubscribed()) { 
      subscriber.onNext(mApi); 
      subscriber.onCompleted(); 
      // Not subscriber.unsubscribe(); 
     }    
}) 

È possibile utilizzare

ConnectableObservable<Integer> obs = Observable.just(1).replay(1).autoConnect(); 

Tutti gli abbonati successive dovrebbe ottenere l'elemento singolo emessa

obs.subscribe(item -> System.out.println("Sub 1 " + item)); 
obs.subscribe(item -> System.out.println("Sub 2 " + item)); 
obs.subscribe(item -> System.out.println("Sub 3 " + item)); 
obs.subscribe(item -> System.out.println("Sub 4 " + item)); 
+0

ho provato il tuo, ma l'on subscribe non viene chiamato. Il mio osservabile è pubblico statico finale. – H3x0n

+0

Hai provato il tuo osservabile originale con .publish(). Replay(). Autoconnect()? – JohnWowUs

+0

quando uso solo .share() funziona. – H3x0n

Problemi correlati