2016-03-26 9 views
7

Sto cercando di capire come gestire gli errori durante la mappatura degli elementi all'interno di un flusso.Come gestire l'errore durante l'esecuzione di Flux.map()

Per esempio, sto parsing di una stringa CSV in uno dei miei POJO di business:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock)); 

Alcune di queste linee potrebbe contenere degli errori, così che cosa ottengo nel registro è:

reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001)) 
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999)) 
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo) 
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo 

ho letto nella API alcuni metodi di gestione errori, ma la maggior parte denominati nel restituire un "valore di errore" o utilizzando un ripiego Flux, come questo:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff); 

Tuttavia, l'utilizzo di questo con il mio myflux significa che l'intero flusso viene elaborato di nuovo.

Quindi, esiste un modo per gestire gli errori durante l'elaborazione di particolari elementi (Ie li ignora/li registra) e continua a elaborare il resto del flusso?

UPDATE con @akarnokd soluzione

public Flux<StockQuotation> getQuotes(List<String> tickers) 
{ 
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers) 
    // Get each set of quotes in a separate thread 
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s))) 
    // Convert each list of raw quotes string in a new Flux<String> 
    .flatMap(list -> Flux.fromIterable(list)) 
    // Convert the string to POJOs 
    .flatMap(x -> { 
      try { 
       return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));  
      } 
      catch (IllegalArgumentException ex){ 
       System.out.println("Error decoding stock quotation: " + x); 
       return Flux.empty(); 
      } 
    }); 

    return processingFlux; 
} 

Questo funziona come un fascino, tuttavia, come si può vedere il codice è meno elegante di prima. L'API Flux non ha alcun metodo per fare ciò che fa questo codice?

retry(...) 
retryWhen(...) 
onErrorResumeWith(...) 
onErrorReturn(...) 

risposta

5

È necessario flatMap invece che ti permette di tornare una sequenza vuota se il trattamento non è riuscita:

myflux.flatMap(v -> { 
    try { 
     return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock)); 
    } catch (IllegalArgumentException ex) { 
     return Flux.empty(); 
    } 
}); 
+0

Funziona grande (andando accettare questa risposta), ma mi piacerebbe sapere se questo può essere fatto con la API. In caso contrario, aprirò una richiesta di funzionalità. Grazie! – Victor

+0

Questa è l'API standard di fatto per eseguire tale comportamento. Gli errori sono eventi terminali e devi trasformarli in qualcos'altro in lambda per evitare la fine. – akarnokd

+0

Ok. Ho proposto la creazione di un nuovo metodo per gestire i singoli fallimenti (magari pubblicando quei guasti come un flusso "lettera morta"?). Forse questo potrebbe essere utile ... – Victor

Problemi correlati