2015-03-16 18 views
9

Ho una situazione in cui sto creando un Osservabile contenente i risultati di un database. Sto quindi applicando una serie di filtri a loro. Quindi ho un abbonato che sta registrando i risultati. Può accadere che nessun elemento si faccia strada attraverso i filtri. La mia logica aziendale afferma che questo non è un errore. Tuttavia, quando ciò accade il mio onError viene chiamato e contiene la seguente eccezione: java.util.NoSuchElementException: Sequence contains no elementsUtilizzo corretto vuoto Osservabile in RxJava

È la pratica accettata solo rilevare quel tipo di eccezione e ignorarlo? O c'è un modo migliore per gestire questo?

La versione è 1.0.0.

Ecco un semplice caso di test che espone ciò che sto vedendo. Sembra essere correlato all'aver filtrato tutti gli eventi prima di raggiungere una mappa e ridurre.

@Test 
public void test() 
{ 

    Integer values[] = new Integer[]{1, 2, 3, 4, 5}; 

    Observable.from(values).filter(new Func1<Integer, Boolean>() 
    { 
     @Override 
     public Boolean call(Integer integer) 
     { 
      if (integer < 0) 
       return true; 
      else 
       return false; 
     } 
    }).map(new Func1<Integer, String>() 
    { 
     @Override 
     public String call(Integer integer) 
     { 
      return String.valueOf(integer); 
     } 
    }).reduce(new Func2<String, String, String>() 
    { 
     @Override 
     public String call(String s, String s2) 
     { 
      return s + "," + s2; 
     } 
    }) 

      .subscribe(new Action1<String>() 
      { 
       @Override 
       public void call(String s) 
       { 
        System.out.println(s); 
       } 
      }); 
} 

perché sto usando un abbonato di sicurezza, getta inizialmente un OnErrorNotImplementedException che avvolge la seguente eccezione:

java.util.NoSuchElementException: Sequence contains no elements 
    at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82) 
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140) 
    at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73) 
    at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45) 
    at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59) 
    at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121) 
    at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43) 
    at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42) 
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79) 
    at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147) 
    at rx.Subscriber.setProducer(Subscriber.java:139) 
    at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139) 
    at rx.Subscriber.setProducer(Subscriber.java:133) 
    at rx.Subscriber.setProducer(Subscriber.java:133) 
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47) 
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33) 
    at rx.Observable$1.call(Observable.java:144) 
    at rx.Observable$1.call(Observable.java:136) 
    at rx.Observable$1.call(Observable.java:144) 
    at rx.Observable$1.call(Observable.java:136) 
    at rx.Observable$1.call(Observable.java:144) 
    at rx.Observable$1.call(Observable.java:136) 
    at rx.Observable$1.call(Observable.java:144) 
    at rx.Observable$1.call(Observable.java:136) 
    at rx.Observable$1.call(Observable.java:144) 
    at rx.Observable$1.call(Observable.java:136) 
    at rx.Observable.subscribe(Observable.java:7284) 

Sulla base della risposta da @davem sotto, ho creato un nuovo banco di prova:

@Test 
public void testFromBlockingAndSingle() 
{ 

    Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

    List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>() 
    { 
     @Override 
     public Boolean call(Integer integer) 
     { 
      if (integer < 0) 
       return true; 
      else 
       return false; 
     } 
    }).map(new Func1<Integer, String>() 
    { 
     @Override 
     public String call(Integer integer) 
     { 
      return String.valueOf(integer); 
     } 
    }).reduce(new Func2<String, String, String>() 
    { 
     @Override 
     public String call(String s, String s2) 
     { 
      return s + "," + s2; 
     } 
    }).toList().toBlocking().single(); 

    System.out.println("Test: " + results + " Size: " + results.size()); 

} 

E questo i risultati dei test nel seguente comportamento:

Quando l'ingresso è:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

Poi i risultati (come previsto) sono:

Test: [-2,-1] Size: 1 

E quando l'ingresso è:

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5}; 

Poi il risultato è il seguente stack trace :

java.util.NoSuchElementException: Sequence contains no elements 
at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82) 
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140) 
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73) 
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45) 
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59) 
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121) 
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43) 
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42) 
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79) 
at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147) 
at rx.Subscriber.setProducer(Subscriber.java:139) 
at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139) 
at rx.Subscriber.setProducer(Subscriber.java:133) 
at rx.Subscriber.setProducer(Subscriber.java:133) 
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47) 
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable.subscribe(Observable.java:7284) 
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441) 
at rx.observables.BlockingObservable.single(BlockingObservable.java:340) 
at EmptyTest2.test(EmptyTest2.java:19) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) 
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) 
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) 
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) 
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) 
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) 
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) 
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:309) 
at org.junit.runner.JUnitCore.run(JUnitCore.java:160) 
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74) 
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211) 
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) 

Quindi sembra che il problema sia sicuramente con l'uso della funzione di riduzione. Vedere il seguente caso di test in grado di gestire entrambe le situazioni:

@Test 
public void testNoReduce() 
{ 

    Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

    List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>() 
    { 
     @Override 
     public Boolean call(Integer integer) 
     { 
      if (integer < 0) 
       return true; 
      else 
       return false; 
     } 
    }).map(new Func1<Integer, String>() 
    { 
     @Override 
     public String call(Integer integer) 
     { 
      return String.valueOf(integer); 
     } 
    }).toList().toBlocking().first(); 

    Iterator<String> itr = results.iterator(); 
    StringBuilder b = new StringBuilder(); 

    while (itr.hasNext()) 
    { 
     b.append(itr.next()); 

     if (itr.hasNext()) 
      b.append(","); 
    } 

    System.out.println("Test NoReduce: " + b); 

} 

con la seguente digitazione:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

ottengo i seguenti risultati che ci si aspetta:

Test NoReduce: -2,-1 

E con la seguente digitazione :

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5}; 

ottengo il seguente output che si prevede:

Test NoReduce: 

Quindi, a meno che io sono qualcosa di completamente equivoco, l'unico modo per gestire davvero un elemento pari a zero osservabili che i risultati di un filtro quando seguita da una mappa e ridurre è implementare la logica di riduzione al di fuori della catena osservabile. Sei d'accordo con questa affermazione?


Soluzione Finale

Ecco la mia soluzione finale dopo l'implementazione di quanto sia Tomáš Dvořák e David Motten suggerito. Penso che questa soluzione sia ragionevole.

@Test 
public void testWithToList() 
{ 

    Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

    Observable.from(values).filter(new Func1<Integer, Boolean>() 
    { 
     @Override 
     public Boolean call(Integer integer) 
     { 
      if (integer < 0) 
       return true; 
      else 
       return false; 
     } 
    }).toList().map(new Func1<List<Integer>, String>() 
    { 
     @Override 
     public String call(List<Integer> integers) 
     { 
      Iterator<Integer> intItr = integers.iterator(); 
      StringBuilder b = new StringBuilder(); 

      while (intItr.hasNext()) 
      { 
       b.append(intItr.next()); 

       if (intItr.hasNext()) 
       { 
        b.append(","); 
       } 
      } 

      return b.toString(); 
     } 
    }).subscribe(new Action1<String>() 
    { 
     @Override 
     public void call(String s) 
     { 
      System.out.println("With a toList: " + s); 
     } 
    }); 

} 

Ecco come si comporta questo test quando vengono forniti i seguenti input.

Quando in un flusso che avrà alcuni valori passano attraverso i filtri:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

Il risultato è:

With a toList: -2,-1 

Quando dato un flusso che non avrà alcun valore passano attraverso i filtri :

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5}; 

Il risultato è:

With a toList: <empty string> 
+0

si prega di inviare un certo codice, in modo da non dover dedurre dalla analisi dello stack. –

+0

Hai ragione, non sono riuscito ad aggiungere un caso di test. Ho aggiunto uno semplice. – babernathy

risposta

13

Ora dopo l'aggiornamento l'errore è abbastanza ovvio. Reduce in RxJava non riuscirà con uno IllegalArgumentException se l'osservabile che sta riducendo è vuoto, esattamente come specificato (http://reactivex.io/documentation/operators/reduce.html).

Nella programmazione funzionale, di solito ci sono due operatori generici che aggregano una raccolta in un singolo valore, fold e reduce. Nella terminologia accettata, fold accetta un valore di accumulatore iniziale e una funzione che accetta un accumulatore in esecuzione e un valore dalla raccolta e produce un altro valore di accumulatore. Un esempio in pseudocodice:

[1, 2, 3, 4].fold(0, (accumulator, value) => accumulator + value)

inizia con 0, ed eventualmente aggiungere 1, 2, 3, 4 all'accumulatore esecuzione, infine cedevole 10, la somma dei valori.

Ridurre è molto simile, solo che non prende il valore dell'accumulatore iniziale in modo esplicito, utilizza il primo valore come accumulatore iniziale e quindi accumula tutti i valori rimanenti. Questo ha senso se tu sei per es. cercando un valore minimo o massimo.

[1, 2, 3, 4].reduce((accumulator, value) => min(accumulator, value))

Guardando piega e ridurre modo diverso, probabilmente userai fold ogni volta che il valore aggregato avrà senso anche sulla raccolta vuota (come, in sum, 0 ha un senso), e reduce altrimenti (minimum marche non ha senso su una collezione vuota, e reduce non funzionerà su tale raccolta, nel tuo caso lanciando un'eccezione).

Si sta facendo un'aggregazione simile, intercalando una raccolta di stringhe con una virgola per produrre una singola stringa. Questa è una situazione un po 'più difficile.Probabilmente ha senso su una collezione vuota (probabilmente ti aspetti una stringa vuota), ma d'altra parte, se inizi con un accumulatore vuoto, nel risultato avrai una virgola in più di quanto ti aspetti. La soluzione corretta consiste nel controllare, sia che la raccolta sia vuota per prima, sia nel restituire una stringa di sicurezza per una raccolta vuota, oppure fare una copia reduce in una raccolta non vuota. Probabilmente osserverai che spesso non vuoi una stringa vuota nel caso di raccolta vuoto, ma qualcosa come "la raccolta è vuota" potrebbe essere più appropriato, assicurandoti così ulteriormente che questa soluzione sia quella pulita.

Btw, sto usando la parola collezione qui invece di osservabile liberamente, solo per scopi didattici. Inoltre, in RxJava, sia fold sia reduce sono chiamati uguali, reduce, solo che ci sono due versioni di quel metodo, una che prende solo un parametro, gli altri due parametri.

Come per la tua domanda finale: non devi lasciare la catena Osservabile. Basta usare toList(), come suggerisce David Motten.

.filter(...) 
.toList() 
.map(listOfValues => listOfValues.intersperse(", ")) 

dove intersperse potrebbero essere attuate in termini di reduce, se non è già una funzione di libreria (è abbastanza comune).

collection.intersperse(separator) = 
    if (collection.isEmpty()) 
     "" 
    else 
     collection.reduce(accumulator, element => accumulator + separator + element) 
+0

Grazie per la spiegazione dettagliata. Questo ha aiutato molto. Ho aggiornato la domanda con la mia soluzione finale. – babernathy

9

Il motivo è che stai utilizzando toBlocking().single() su un flusso vuoto. Se si prevede che 0 o 1 valori dello stream possano essere eseguiti su toList().toBlocking().single(), esaminare i valori nell'elenco (che potrebbe essere vuoto ma non provocherà l'eccezione che si sta ottenendo).

+0

Buon lavoro, deducendolo solo dalla traccia dello stack e assolutamente senza codice :) +1 per te, -1 per la domanda. Solo, non intendi controllare il risultato di 'toBlocking.toList' invece di' toList.toBlocking.single'? –

+0

Grazie. 'toBlocking.toList' non fa parte dell'API 1.0.8 ma potrebbe usare' toBlocking.toIterable'. 'toBlocking.singleOrDefault' è un'altra possibilità ma senza dettagli la versione' toList' la copre. –

+0

Vedo, non ho letto i documenti e ho pensato che l'originale 'tolist 'era un errore, non ero nemmeno a conoscenza dell'esistenza. Probabilmente userò '.take (1) .toBlocking.singleOrDefault' se cercassi solo il primo elemento per risparmiare risorse, ma come dici tu, chissà cosa è appropriato per questo caso particolare. –

4

È possibile utilizzare ridurre con valore iniziale versione .reduce(0, (x,y) -> x+y) dovrebbe funzionare come operatore piega spiegato da Tomáš Dvořák

Problemi correlati