2015-05-07 11 views
5

Sto creando più flussi a cui devo accedere in parallelo (o eventualmente parallelo). So come provare le risorse quando la quantità di risorse è fissata in fase di compilazione, ma cosa succede se la quantità di risorse è determinata da un parametro?Come chiudere correttamente una quantità variabile di flussi?

ho qualcosa di simile a questo:

private static void foo(String path, String... files) throws IOException { 
    @SuppressWarnings("unchecked") 
    Stream<String>[] streams = new Stream[files.length]; 

    try { 
     for (int i = 0; i < files.length; i++) { 
      final String file = files[i]; 
      streams[i] = Files.lines(Paths.get(path, file)) 
       .onClose(() -> System.out.println("Closed " + file)); 
     } 

     // do something with streams 
     Stream.of(streams) 
      .parallel() 
      .flatMap(x -> x) 
      .distinct() 
      .sorted() 
      .limit(10) 
      .forEach(System.out::println); 
    } 
    finally { 
     for (Stream<String> s : streams) { 
      if (s != null) { 
       s.close(); 
      } 
     } 
    } 
} 
+2

stai chiedendo se c'è una prova, con le risorse che avrebbe gestito la situazione? La risposta è no, ma quello che hai va bene. – Kayaman

+0

Un'alternativa sarebbe quella di spostare l'apertura dei flussi nelle operazioni parallele, con ciascuno che deve gestire solo un flusso. – biziclop

+2

Sì, anche se c'è un problema: il 'close()' potrebbe in teoria (anche se improbabile in pratica) lanciare un 'UncheckedIOException', quindi dovresti probabilmente racchiudere' s.close() 'in un' try {s.close(); } catch (Exception ex) {// quash o log} '. –

risposta

5

si potrebbe scrivere un composito AutoCloseable per la gestione di una quantità dinamica di AutoCloseable:

import java.util.ArrayList; 
import java.util.List; 

public class CompositeAutoclosable<T extends AutoCloseable> implements AutoCloseable { 
    private final List<T> components= new ArrayList<>(); 

    public void addComponent(T component) { components.add(component); } 

    public List<T> getComponents() { return components; } 

    @Override 
    public void close() throws Exception { 
     Exception e = null; 
     for (T component : components) { 
      try { component.close(); } 
      catch (Exception closeException) { 
       if (e == null) { e = closeException; } 
       else { e.addSuppressed(closeException); } 
      } 
     } 
     if (e != null) { throw e; } 
    } 
} 

e si potrebbe usare nel metodo:

private static void foo(String path, String... files) throws Exception { 
    try (CompositeAutoclosable<Stream<String>> streams 
      = new CompositeAutoclosable<Stream<String>>()) { 
     for (int i = 0; i < files.length; i++) { 
      final String file = files[i]; 
      streams.addComponent(Files.lines(Paths.get(path, file)) 
       .onClose(() -> System.out.println("Closed " + file))); 
     } 
     streams.getComponents().stream() 
      .parallel() 
      .flatMap(x -> x) 
      .distinct() 
      .sorted() 
      .limit(10) 
      .forEach(System.out::println); 
    } 
} 
+0

Anche se non mi piace creare classi di utility personalizzate, almeno funziona. La ragione per cui non mi piace? Quando altri sviluppatori che lavorano su (diverse parti del) hanno un bisogno simile, potrebbero non conoscere la classe di utilità e progettare la propria soluzione. –

+0

Questo potrebbe essere risolto dalla comunicazione tra gli sviluppatori. "Ieri ho avuto questo problema quindi ho scritto un'utilità ..." – gontard

+2

Se si cattura un 'closeException' e c'è già un'eccezione precedente dovresti usare [' addSuppressed'] (http://docs.oracle.com/javase/ 8/docs/api/java/lang/Throwable.html # addSuppressed-java.lang.Throwable-) invece di sovrascrivere l'eccezione precedente ... – Holger

1

Il documentation of Stream.flatMap dice:

Ogni flusso mappato viene chiuso dopo che il suo contenuto è stato inserito in questo flusso.

In altre parole, per la chiusura ordinaria dei flussi, non è necessaria alcuna azione aggiuntiva. Tuttavia, dal momento che i flussi solo processati sono chiusi, non si deve creare i flussi con entusiasmo senza sapere se sono successivamente elaborati dal torrente:

private static void foo(String path, String... files) throws IOException { 
    Arrays.stream(files).flatMap(file-> { 
       try { return Files.lines(Paths.get(path, file)) 
        .onClose(() -> System.out.println("Closed " + file)); } 
       catch(IOException ex) { throw new UncheckedIOException(ex); } }) 
      .parallel() 
      .distinct() 
      .sorted() 
      .limit(10) 
      .forEachOrdered(System.out::println); 
} 

Con la creazione di sotto-correnti all'interno flatMap, è garantito che ciascuno è solo creato se lo stream sta per elaborarlo. Pertanto, questa soluzione chiuderà tutti i flussi secondari anche senza avere l'esterno Stream all'interno di un'istruzione try-with-resource. Lo svantaggio di questa soluzione è che non eseguirà la pulizia come l'istruzione try-with-resource nel caso eccezionale e persino il flusso esterno in un'istruzione try-with-resource non lo risolverà.

Considero questa eccezione mancante-sicurezza un difetto nell'implementazione Stream che dovrebbe essere corretta.

Provare a risolvere questo problema nel codice per ottenere un comportamento di chiusura sicuro, che si comporta come un'istruzione try-with-resource, ovvero non garantisce solo la chiusura di tutte le risorse, ma anche l'assenza di eccezioni generate da invocazioni close , è complicato. Essa implica ripercorrere ciò Files.lines(…) fa aggiungendo internamente e poi i pezzi mancanti:

static Closeable join(Closeable a, Closeable b) { 
    return a==null? b:()->{ try(Closeable c=a) { b.close(); }}; 
} 
private static void foo(String path, String... files) throws IOException { 
    Closeable[] cl={ null }; 
    try(Closeable c=()->{ if(cl[0]!=null) cl[0].close(); }) { 
     Stream.Builder<Stream<String>> b=Stream.builder(); 
     for(String file: files) { 
      BufferedReader br=Files.newBufferedReader(Paths.get(path, file)); 
      cl[0]=join(cl[0], br); 
      b.add(br.lines()); 
     } 
     b.build() 
     .parallel() 
     .flatMap(Function.identity()) 
     .distinct() 
     .sorted() 
     .limit(10) 
     .forEachOrdered(System.out::println); 
    } 
} 

questo modo si chiude tutte le risorse, indipendentemente dal fatto che e dove eccezioni accaduto, durante la lavorazione flusso, la chiusura o l'inizializzazione (si chiuderà tutto risorsa che sono stati assegnati finora).

Se si desidera verificare il comportamento, è possibile utilizzare il seguente codice:

private static void foo2(String path, String... files) throws IOException { 
    Closeable[] cl={ null }; 
    try(Closeable c=()->{ if(cl[0]!=null) cl[0].close(); }) { 
     Stream.Builder<Stream<String>> b=Stream.builder(); 
     for(String file: files) { 
      if(Math.random()>0.7) throw new RuntimeException("simulated opening failure"); 
      BufferedReader br=Files.newBufferedReader(Paths.get(path, file)); 
      System.out.println("opened "+file); 
      br=new BufferedReader(br) { 
       @Override 
       public void close() throws IOException { 
        System.out.println("closing "+file); 
        super.close(); 
        throw new RuntimeException("simulated closing "+file+" failure"); 
       } 
      }; 
      cl[0]=join(cl[0], br); 
      b.add(br.lines()); 
     } 
     b.build() 
     .parallel() 
     .flatMap(Function.identity()) 
     .distinct() 
     .sorted() 
     .limit(10) 
     .forEachOrdered(System.out::println); 
     throw new RuntimeException("simulated processing failure"); 
    } 
} 
+0

Mentre sono sicuro che la tua soluzione funziona, è molto difficile leggere e vedere cosa succede in ogni istruzione. Ogni sviluppatore dovrebbe preferire codice di facile lettura su codice ottimizzato e/o intelligente, o causerà problemi di manutenzione o bug. –

+0

@Mark Jeronimus: beh, la mia soluzione preferita è il mio primo snippet di codice (potresti formattarlo più bello) e sperare che gli sviluppatori di JRE risolvano la sicurezza delle eccezioni mancanti. Penso che * sia * una soluzione leggibile e ho annunciato che la risoluzione del problema nel codice dell'applicazione sarà complicata. Ecco perché penso che dovrebbe essere gestito all'interno dell'implementazione di 'Stream'. Ovviamente, il codice potrebbe essere rielaborato in più metodi per essere più facile da leggere e avere un'unica soluzione di chiusura multi-flusso riutilizzabile. – Holger

+0

Sei sicuro che la tua prima soluzione legge i file in parallelo come il codice originale nella domanda? Penso che 'parallelo' dovrebbe essere applicato prima di' flatMap'. –

Problemi correlati