2014-06-18 17 views
7

Sto provando a creare un campione usando rxjava. L'esempio dovrebbe orchestrare un ReactiveWareService e un ReactiveReviewService retruning di un composito WareAndReview.Come attendere async Osservabile per il completamento

ReactiveWareService 
     public Observable<Ware> findWares() { 
     return Observable.from(wareService.findWares()); 
    } 

ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency! 

public Observable<Review> findReviewsByItem(final String item) { 
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> { 
    try { 
     List<Review> reviews = reviewService.findReviewsByItem(item); 
     reviews.forEach(observer::onNext); 
     observer.onCompleted(); 
    } catch (Exception e) { 
     observer.onError(e); 
    } 
})); 
} 

public List<WareAndReview> findWaresWithReviews() throws RuntimeException { 
final List<WareAndReview> wareAndReviews = new ArrayList<>(); 

wareService.findWares() 
    .map(WareAndReview::new) 
.subscribe(wr -> { 
     wareAndReviews.add(wr); 
     //Async!!!! 
     reviewService.findReviewsByItem(wr.getWare().getItem()) 
      .subscribe(wr::addReview, 
       throwable -> System.out.println("Error while trying to find reviews for " + wr) 
      ); 
    } 
); 

//TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion! 
try { 
    Thread.sleep(3000); 
} catch (InterruptedException e) {} 

return wareAndReviews; 
} 

Tenuto conto del fatto che non voglio tornare un'osservabile, come posso aspettare per asincrona osservabili (findReviewsByItem) per completare?

risposta

-3

Un altro modo sarebbe quello di dichiarare un CountdownLatch prima di iniziare. Quindi chiama countDown() su quel latch nel tuo onCompleted(). È quindi possibile sostituire Thread.sleep() con await() su tale latch.

12

La maggior parte del vostro esempio può essere riscritta con gli operatori RxJava standard che funzionano bene insieme:

public class Example { 

    Scheduler scheduler = Schedulers.from(executor); 

    public Observable<Review> findReviewsByItem(final String item) { 
     return Observable.just(item) 
       .subscribeOn(scheduler) 
       .flatMapIterable(reviewService::findReviewsByItem); 
    } 
    public List<WareAndReview> findWaresWithReviews() { 
     return wareService 
       .findWares() 
       .map(WareAndReview::new) 
       .flatMap(wr -> { 
        return reviewService 
          .findReviewsByItem(wr.getWare().getItem()) 
          .doOnNext(wr::addReview) 
          .lastOrDefault(null) 
          .map(v -> wr); 
       }) 
       .toList() 
       .toBlocking() 
       .first(); 
    } 
} 

Ogni volta che si desidera comporre servizi come questo, pensare a flatMap prima. Non è necessario bloccare per ogni sotto-Osservabile ma solo alla fine con toBlocking() se davvero necessario.

Problemi correlati