2015-08-24 14 views
5

Ho difficoltà a comprendere RX. Nel seguente caso, è necessario annullare l'iscrizione? C'è un modo per annullare automaticamente l'iscrizione dopo l'esecuzione della funzione "call"?È necessario annullare l'iscrizione

Observable.create(new Observable.OnSubscribe<NumberInfo>() { 
     @Override 
     public void call(Subscriber<? super NumberInfo> subscriber) { 
      try { 
       // Store data to db 
      } catch (Exception e) { 
       Log.e(TAG, "Downloaded numberInfo was not added to cache.", e); 
      } 
     } 
    }).subscribeOn(Schedulers.newThread()) 
    .subscribe(); 

Non voglio osservare per qualsiasi risultato è per questo che ho omesso la classica .observeOn(AndroidSchedulers.mainThread())

Thx per la spiegazione.

+0

'Non voglio osservare per qualsiasi result' - allora non bisogno di 'Observable' in primo luogo. Che problema stai cercando di risolvere esattamente? –

+0

@Dmitry Zaitsev Mi serve per sostituire asyncTask. La seconda opzione potrebbe essere quella di eseguire lo snippet in un thread ma anche l'osservabile può gestirlo. – zatziky

+1

Vorrei prendere in considerazione l'utilizzo di estensioni Async per RxJava - hanno già implementato tutte queste cose, quindi puoi semplicemente chiamare 'start (yourTask) .subscribe()' –

risposta

5

In base al contratto Rx, quando Observable attiva onCompleted, l'annullamento Observer si interrompe. Nel tuo caso, il contratto non è rispettato perché non c'è il numero subscriber.onCompleted() nel tuo codice.

Se avete solo bisogno di qualcosa come "Fuoco e dimenticare", si potrebbe provare solo:

Schedulers.io().createWorker().schedule(new Action0() { 
    @Override 
    public void call() { 
     try { 
      // Store data to db 
     } catch (Exception e) { 
      Log.e(TAG, "Downloaded numberInfo was not added to cache.", e); 
     } 
    } 
}); 

eseguirà sul I/O Scheduler e il vostro thread dell'interfaccia utente è sicuro.

IMO si dovrebbe sempre avere un valore di ritorno. Il routing Store data to db ha sicuramente un valore di ritorno, come un long che specifica il numero di riga o un boolean che indica il successo. Avendo questo approccio, è possibile creare un metodo corretto:

public Observable<Long> storeToDb(final SomethingToStore storeMe) { 
    return Observable 
      .create(new Observable.OnSubscribe<Long>() { 
       @Override 
       public void call(Subscriber<? super Long> subscriber) { 
        long row = syncStore(storeMe); 

        if (row == -1) { 
         subscriber.onError(new Throwable("Cannot store " + storeMe.toString + " to DB.")); 
        } 

        subscriber.onNext(row); 
        subscriber.onCompleted(); 
       } 
      }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); 
} 

e si potrebbe usare in questo modo:

storeToDb(storeThis) 
     .subscribe(new Observer<Long>() { 
      @Override 
      public void onCompleted() { 

      } 

      @Override 
      public void onError(Throwable e) { 
       Log.e("STORING", "Something went south: " + e.getMessage()); 
      } 

      @Override 
      public void onNext(Long row) { 
       Log.d("STORING", "Everything has been stored as record number: " + row); 
      } 
     }); 
4

Quando Observable è completo, RxJava si annulla automaticamente. È necessario chiamare subscriber.onComplete() per eseguire la cancellazione automatica.