2015-10-27 15 views
14

Sto usando RxAndroid per le operazioni di streaming. Nel mio caso d'uso reale, sto recuperando un elenco dal server (usando Retrofit). Utilizzo gli scheduler per eseguire il lavoro su un thread in background e ottenere l'emissione finale sul thread (principale) dell'interfaccia utente Android.Process Observable on background thread

Questo funziona bene per la chiamata di rete, tuttavia mi sono reso conto che i miei operatori dopo la chiamata di rete non utilizzano il thread in background, ma ricevono il richiamo sul thread principale.

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .subscribe(integer1 -> {}); 

Come posso essere sicuro che tutte le operazioni siano eseguite su un thread in background?

risposta

39

TL; DR: spostamento observeOn(AndroidSchedulers.mainThread()) inferiore a filter(...).


subscribeOn(...) è usato per designare sul quale il filo Observable sarà iniziare operante su. Le chiamate successive a subscribeOn verranno ignorate.

Quindi, se si dovesse scrivere la seguente, tutto verrebbe eseguito su Schedulers.newThread():

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .subscribe(integer1 -> { doSomething(integer1); }); 

Ora, naturalmente, questo non è ciò che si vuole: si vuole doSomething sul thread principale .
Qui è dove observeOn entra in vigore. Tutte le azioni dopoobserveOn vengono eseguite su tale schedulatore. Pertanto, nel tuo esempio, filter viene eseguito sul thread principale.

Invece, spostare observeOn fino a poco prima subscribe:

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(integer1 -> { doSomething(integer1) }); 

Ora, filter accadrà sul 'nuovo thread', e doSomething sul thread principale.


spingersi oltre, è possibile utilizzare observeOn più volte:

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(Schedulers.computation()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(integer1 -> { doSomething(integer1) }); 

In questo caso, il recupero avverrà in un nuovo filo, il filtraggio a un filo di calcolo, e doSomething sulla filo principale.

Checkout ReactiveX - SubscribeOn operator per la documentazione ufficiale.

+0

Grazie per la risposta dettagliata! C'è un modo per definire gli scheduler all'inizio della catena di chiamata del metodo (come nell'OP)? – WonderCsabo

+0

Gli osservabili seguono [Decorator Pattern] (https://en.wikipedia.org/wiki/Decorator_pattern), quindi in base alla progettazione, ciò non è possibile. 'observOn' (e gli altri metodi) non restituiscono la stessa istanza' Observable' come fa il builder pattern, ma restituiscono invece un _new_ 'Observable'' wrapping 'il' vecchio ''Observable'. – nhaarman

+0

Lo capisco. Questo è un peccato per il caso, perché ciò significa che devo aggiungere la chiamata 'observOn()' a ogni stream che scrivo? :( – WonderCsabo