Il documentation of Stream.flatMap
dice:
Ogni flusso mappato viene chiuso dopo che il suo contenuto è stato inserito in questo flusso.
In altre parole, per la chiusura ordinaria dei flussi, non è necessaria alcuna azione aggiuntiva. Tuttavia, dal momento che i flussi solo processati sono chiusi, non si deve creare i flussi con entusiasmo senza sapere se sono successivamente elaborati dal torrente:
private static void foo(String path, String... files) throws IOException {
Arrays.stream(files).flatMap(file-> {
try { return Files.lines(Paths.get(path, file))
.onClose(() -> System.out.println("Closed " + file)); }
catch(IOException ex) { throw new UncheckedIOException(ex); } })
.parallel()
.distinct()
.sorted()
.limit(10)
.forEachOrdered(System.out::println);
}
Con la creazione di sotto-correnti all'interno flatMap
, è garantito che ciascuno è solo creato se lo stream sta per elaborarlo. Pertanto, questa soluzione chiuderà tutti i flussi secondari anche senza avere l'esterno Stream
all'interno di un'istruzione try-with-resource. Lo svantaggio di questa soluzione è che non eseguirà la pulizia come l'istruzione try-with-resource nel caso eccezionale e persino il flusso esterno in un'istruzione try-with-resource non lo risolverà.
Considero questa eccezione mancante-sicurezza un difetto nell'implementazione Stream
che dovrebbe essere corretta.
Provare a risolvere questo problema nel codice per ottenere un comportamento di chiusura sicuro, che si comporta come un'istruzione try-with-resource, ovvero non garantisce solo la chiusura di tutte le risorse, ma anche l'assenza di eccezioni generate da invocazioni close
, è complicato. Essa implica ripercorrere ciò Files.lines(…)
fa aggiungendo internamente e poi i pezzi mancanti:
static Closeable join(Closeable a, Closeable b) {
return a==null? b:()->{ try(Closeable c=a) { b.close(); }};
}
private static void foo(String path, String... files) throws IOException {
Closeable[] cl={ null };
try(Closeable c=()->{ if(cl[0]!=null) cl[0].close(); }) {
Stream.Builder<Stream<String>> b=Stream.builder();
for(String file: files) {
BufferedReader br=Files.newBufferedReader(Paths.get(path, file));
cl[0]=join(cl[0], br);
b.add(br.lines());
}
b.build()
.parallel()
.flatMap(Function.identity())
.distinct()
.sorted()
.limit(10)
.forEachOrdered(System.out::println);
}
}
questo modo si chiude tutte le risorse, indipendentemente dal fatto che e dove eccezioni accaduto, durante la lavorazione flusso, la chiusura o l'inizializzazione (si chiuderà tutto risorsa che sono stati assegnati finora).
Se si desidera verificare il comportamento, è possibile utilizzare il seguente codice:
private static void foo2(String path, String... files) throws IOException {
Closeable[] cl={ null };
try(Closeable c=()->{ if(cl[0]!=null) cl[0].close(); }) {
Stream.Builder<Stream<String>> b=Stream.builder();
for(String file: files) {
if(Math.random()>0.7) throw new RuntimeException("simulated opening failure");
BufferedReader br=Files.newBufferedReader(Paths.get(path, file));
System.out.println("opened "+file);
br=new BufferedReader(br) {
@Override
public void close() throws IOException {
System.out.println("closing "+file);
super.close();
throw new RuntimeException("simulated closing "+file+" failure");
}
};
cl[0]=join(cl[0], br);
b.add(br.lines());
}
b.build()
.parallel()
.flatMap(Function.identity())
.distinct()
.sorted()
.limit(10)
.forEachOrdered(System.out::println);
throw new RuntimeException("simulated processing failure");
}
}
stai chiedendo se c'è una prova, con le risorse che avrebbe gestito la situazione? La risposta è no, ma quello che hai va bene. – Kayaman
Un'alternativa sarebbe quella di spostare l'apertura dei flussi nelle operazioni parallele, con ciascuno che deve gestire solo un flusso. – biziclop
Sì, anche se c'è un problema: il 'close()' potrebbe in teoria (anche se improbabile in pratica) lanciare un 'UncheckedIOException', quindi dovresti probabilmente racchiudere' s.close() 'in un' try {s.close(); } catch (Exception ex) {// quash o log} '. –