2016-03-12 13 views
5

Voglio generare un Observable in tempo reale dai risultati di un elenco di Futures.Osservabile da Futures - onNext da più thread

Nel caso più semplice, si supponga di avere un elenco di futuri che sto utilizzando con Future.sequence e sto semplicemente monitorando i loro progressi con un Observable che mi dice ogni volta che uno ha completato. Fondamentalmente lo faccio in questo modo:

def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = { 
    Observable[String](observer => { 
     val loudFutures: List[Future[Int]] = futs.map(f => { 
      f onComplete { 
       case Success(a) => observer.onNext(s"just did $a more") 
       case Failure(e) => observer.onError(e) 
      } 
      f 
     }) 
     Future.sequence(loudFutures) onComplete { 
      case Success(_) => observer.onCompleted() 
      case Failure(e) => observer.onError(e) 
     } 
    }) 
    } 

Questo funziona perfettamente nel mio ambiente di test. Ma ho appena letto che onNext non dovrebbe essere chiamato da thread diversi, almeno senza fare attenzione che non ci siano chiamate sovrapposte. Qual è il modo consigliato per risolvere questo problema? Sembra che molti usi reali di Observables richiederebbero il onNext per essere chiamati dal codice asincrono come questo, ma non riesco a trovare un esempio simile nei documenti.

+0

non sono sicuro se c'è una risposta migliore, ma è possibile garantire che le chiamate 'onNext' sono gestiti dallo stesso thread se si utilizza ad esempio un unico contesto di esecuzione filettata (' ExecutionContext.fromExecutor (Esecutori. newSingleThreadExecutor()) ') per eseguire quei callback' onComplete's. – Kolmar

+0

Puoi riferirti a quale articolo, riguardo 'onNext', ti stai riferendo? Questo caso d'uso va perfettamente bene, dal mio punto di vista. – mavarazy

+0

@mavarazy: Gran parte della documentazione che ho trovato su questo è piuttosto poco chiaro ma [questo] (http://reactivex.io/documentation/operators/serialize.html) parla dell'uso di 'serialize()' per evitare due sovrapposizioni 'onNext()' chiama e [this] (https://github.com/ReactiveX/RxJava/wiki/Subject) ti avvisa di non chiamare 'onNext()' da più thread - almeno se stai usando un Soggetto. E tutti gli esempi ufficiali Rx che ho trovato sono a thread singolo. – thund

risposta

1

The Observable Contract

osservabili rilascia le notifiche agli osservatori in serie (non in parallelo). Possono emettere queste notifiche da diversi thread, ma deve esserci una relazione formale prima-avvenuta tra le notifiche .

Date un'occhiata a Serialize

E 'possibile per un' osservabile per richiamare i metodi propri osservatori in modo asincrono, forse da diversi thread. Ciò potrebbe rendere tale un Observable violento il contratto Observable, in quanto potrebbe provare a inviare una notifica OnCompleted o OnError prima di una delle sue notifiche OnNext oppure potrebbe eseguire una notifica OnNext da due thread contemporaneamente. È possibile forzare tale Osservabile a essere ben educato e sincrono applicando l'operatore Serialize su di esso.

+0

Grazie. Quando ho guardato velocemente la pagina Serialize prima di essere scoraggiato dal fatto che la sezione Rx-scala dice semplicemente TBD e non fornisce la sintassi effettiva. Non ero sicuro se fosse mai stato implementato. Tuttavia, la seguente sintassi compila: 'Observable [String] (observer => {...}). Serialize', e sembra funzionare bene. Non posso dire onestamente se '.serialize' fa qualcosa - per questo proverò a costruire alcuni test di stress più severi. Sono anche sorpreso che non sia l'implementazione di default di "Observable". – thund