Voglio generare un Observable
in tempo reale dai risultati di un elenco di Futures
.Osservabile da Futures - onNext da più thread
Nel caso più semplice, si supponga di avere un elenco di futuri che sto utilizzando con Future.sequence
e sto semplicemente monitorando i loro progressi con un Observable
che mi dice ogni volta che uno ha completato. Fondamentalmente lo faccio in questo modo:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}
Questo funziona perfettamente nel mio ambiente di test. Ma ho appena letto che onNext
non dovrebbe essere chiamato da thread diversi, almeno senza fare attenzione che non ci siano chiamate sovrapposte. Qual è il modo consigliato per risolvere questo problema? Sembra che molti usi reali di Observables
richiederebbero il onNext
per essere chiamati dal codice asincrono come questo, ma non riesco a trovare un esempio simile nei documenti.
non sono sicuro se c'è una risposta migliore, ma è possibile garantire che le chiamate 'onNext' sono gestiti dallo stesso thread se si utilizza ad esempio un unico contesto di esecuzione filettata (' ExecutionContext.fromExecutor (Esecutori. newSingleThreadExecutor()) ') per eseguire quei callback' onComplete's. – Kolmar
Puoi riferirti a quale articolo, riguardo 'onNext', ti stai riferendo? Questo caso d'uso va perfettamente bene, dal mio punto di vista. – mavarazy
@mavarazy: Gran parte della documentazione che ho trovato su questo è piuttosto poco chiaro ma [questo] (http://reactivex.io/documentation/operators/serialize.html) parla dell'uso di 'serialize()' per evitare due sovrapposizioni 'onNext()' chiama e [this] (https://github.com/ReactiveX/RxJava/wiki/Subject) ti avvisa di non chiamare 'onNext()' da più thread - almeno se stai usando un Soggetto. E tutti gli esempi ufficiali Rx che ho trovato sono a thread singolo. – thund