2016-04-08 18 views
5

Ho la classe seguente (semplificata):Come testare che osservabile utilizza gli scheduler corretti in RxJava?

public class Usecase<T> { 
    private final Observable<T> get; 
    private final Scheduler observeScheduler; 

    public Usecase(Observable<T> get, Scheduler observeScheduler) { 
     this.get = get; 
     this.observeScheduler = observeScheduler; 
    } 

    public Observable<T> execute() { 
     return get.subscribeOn(Schedulers.io()).observeOn(observeScheduler); 
    } 
} 

e sto scrivendo unit test per esso. Come posso verificare che subscribeOn e observeOn siano stati chiamati con valori corretti?

cerco il seguente:

Observable<String> observable = mock(Observable.class); 
    Usecase<String> usecase = new Usecase(observable, Schedulers.computation()); 
    usecase.execute(); 

    verify(observable).subscribeOn(Schedulers.computation()); // should fail here, but passes 
    verify(observable).observeOn(Schedulers.computation()); // should pass, but fails: Missing method call for verify(mock) here 

È possibile che questo fallisce (credo) perché subscribeOn e observeOn sono final metodi. Quindi potrebbe esserci un altro modo per garantire che l'osservabile usi gli scheduler corretti?

risposta

3

There is a way per accedere indirettamente a entrambi i thread su cui un osservabile sta operando e viene osservato, il che significa che è possibile verificare che lo Observable utilizza gli scheduler corretti.

Siamo limitati a verificare i thread per nome. Fortunatamente, i thread utilizzati da Schedulers.io() sono denominati con un prefisso coerente a cui è possibile confrontare. Ecco il (pieno?) Elenco delle utilità di pianificazione che hanno prefissi unici per riferimento:

  • Schedulers.io() - RxCachedThreadScheduler
  • Schedulers.newThread()-RxNewThreadScheduler
  • Schedulers.computation()-RxComputationThreadPool

Per verificare un Osservabile era sottoscritto al sul filo IO:

// Calling Thread.currentThread() inside Observer.OnSubscribe will return the 
// thread the Observable is running on (Schedulers.io() in our case) 
Observable<String> obs = Observable.create((Subscriber<? super String> s) -> { 
    s.onNext(Thread.currentThread().getName()); 
    s.onCompleted(); 
}) 

// Schedule the Observable 
Usecase usecase = new Usecase(obs, Schedulers.immediate()); 
Observable usecaseObservable = usecase.execute(); 

// Verify the Observable emitted the name of the IO thread 
String subscribingThread = usecaseObservable.toBlocking().first(); 
assertThat(subscribingThread).startsWith("RxCachedThreadScheduler"); 

Per verificare un'osservabile era osservato sul filo calcolo, è possibile utilizzare per accedere TestSubscriber#getLastSeenThread l'ultimo filo utilizzato per l'osservazione.

TestSubscriber<Object> subscriber = TestSubscriber.create(); 
UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation()) 
usecase.execute().subscribe(subscriber); 

// The observable runs asynchronously, so wait for it to complete 
subscriber.awaitTerminalEvent(); 
subscriber.assertNoErrors(); 

// Verify the observable was observed on the computation thread 
String observingThread = subscriber.getLastSeenThread().getName(); 
assertThat(observingThread).startsWith("RxComputationThreadPool"); 

Nessun librerie di terze parti o di scherno sono necessari, anche se sto usando AssertJ per la fluente startsWith asserzione.

2

L'applicazione di operatori su Observable restituisce un nuovo Observable in modo che qualsiasi applicazione operatore successiva si verifichi su un oggetto diverso. Dovresti seguire il grafico della composizione in un Osservabile per scoprire quali operatori sono stati applicati e con quali parametri.

Questo non è supportato in RxJava e si deve fare affidamento su dettagli interni e riflessione.

Generalmente, non è possibile assumere nulla sulla posizione da cui gli eventi arriveranno, ma è possibile applicare observeOn per assicurarsi che raggiungano il thread della scelta.

Problemi correlati