2015-04-28 11 views
8

Voglio avere qualcosa di simile a Collectors.maxBy(), un raccoglitore che ottiene gli elementi in cima a una collezione (maxBy ne riceve uno solo).Come implementare un servizio di raccolta thread-safe?

Ho uno stream di Possibility oggetti che possono essere valutati con un metodo Integer score(Possibility).

Per prima cosa ho provato:

List<Possibity> possibilities = getPossibilityStream() 
    .parallel() 
    .collect(Collectors.toList()); 

if(!possibilities.isEmpty()) { 
    int bestScore = possibilities.stream() 
     .mapToInt(p -> score(p)) 
     .max() 
     .getAsInt(); 
    possibilities = possibilities.stream() 
     .filter(p -> score(p)==bestScore) 
     .collect(Collectors.toList()); 
} 

Ma farlo, scruto la collezione tre volte. Una volta per costruirlo, una seconda volta per ottenere il punteggio più alto, e una terza volta per filtrarlo e questo non è ottimale. Inoltre il numero di possibilità potrebbe essere enorme (> 10).

Il modo migliore dovrebbe essere quello di ottenere direttamente le migliori possibilità nel primo raccoglitore, ma non sembra esserci alcun compilatore incorporato per fare una cosa del genere.

Così ho implementato il mio Collector:

public class BestCollector<E> implements Collector<E, List<E>, List<E>> { 

    private final Comparator<E> comparator; 

    private final Class<? extends List> listImpl ; 

    public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) { 
     this.comparator = comparator; 
     this.listImpl = listImpl; 
    } 

    public BestCollector(Comparator<E> comparator) { 
     this.comparator= comparator; 
     listImpl = ArrayList.class; 
    } 

    @Override 
    public Supplier<List<E>> supplier() { 
     return() -> { 
      try { 
       return listImpl.newInstance(); 
      } catch (InstantiationException | IllegalAccessException ex) { 
       throw new RuntimeException(ex); 
      } 
     }; 
    } 

    @Override 
    public BiConsumer<List<E>, E> accumulator() { 
     return (list, e) -> { 
      if (list.isEmpty()) { 
       list.add(e); 
      } else { 
       final int comparison = comparator.compare(list.get(0), e); 
       if (comparison == 0) { 
        list.add(e); 
       } else if (comparison < 0) { 
        list.clear(); 
        list.add(e); 
       } 
      } 
     }; 
    } 

    @Override 
    public BinaryOperator<List<E>> combiner() { 
     return (l1, l2) -> { 
      final int comparison = comparator.compare(l1.get(0), l2.get(0)); 
      if (comparison == 0) { 
       l1.addAll(l2); 
       return l1; 
      } else if (comparison < 0) { 
       return l2; 
      } else { 
       return l1; 
      } 
     }; 
    } 

    @Override 
    public Function<List<E>, List<E>> finisher() { 
     return Function.identity(); 
    } 

    @Override 
    public Set<Characteristics> characteristics() { 
     return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED); 
    } 
} 

E poi:

List<Possibity> possibilities = getPossibilityStream() 
    .parallel() 
    .collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2))); 

E che fa il lavoro in una modalità sequenziale (senza il .parallel()), ma in modo parallelo Ci sono alcune eccezioni occasionalmente in due punti:

  • A java.lang.IndexOutOfBoundsException Index: 0, Size: 0 nella linea:

    final int comparison = comparator.compare(list.get(0), e); 
    

del accumulator() metodo

comprendo succede quando un list.clear() è chiamato tra list.isEmpty() e list.get(0).

  • A java.lang.NullPointerException nel metodo punteggio (possibilità) perché la possibilità è null. Anche in questo caso la stessa linea è coinvolto:

    final int comparison = comparator.compare(list.get(0), e); 
    

Non capisco come list.get(0) potrebbe tornare null ...

In modo parallelo, a volte list.get(0) solleva un IndexOutOfBoundsException e, a volte ritornano null.

Capisco che il mio codice non è thread-safe così ho provato diverse soluzioni:

  • Aggiungi synchronized in tutti i metodi di BestCollector: public synchronized …
  • Utilizzare una raccolta thread-safe, invece di ArrayList: java.util.concurrent.CopyOnWriteArrayList
  • Aggiungi synchronized e utilizza CopyOnWriteArrayList allo stesso tempo
  • Rimuovi Characteristics.CONCURRENT dal Set<Characteristics> del characteristics() metodo di

    @Override 
    public Set<Characteristics> characteristics() { 
        return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED); 
    } 
    

Ma io non so se il Characteristics.CONCURRENT è qui per indicare che il mio codice è thread-safe o che il mio codice sarà utilizzato in un processo di concorrenza.

Ma nessuna di queste soluzioni risolve il problema.


Infatti quando viene rimosso CONCURRENT su caratteristiche v'è, a volte, un java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 ma in linea:

final int comparison = comparator.compare(l1.get(0), l2.get(0)); 

del metodo combiner().

Tuttavia, le eccezioni sollevate dal metodo accumulator() sembrano non verificarsi più.


@ La risposta di Holger è giusta.

La soluzione completa è di cambiare entrambi i combiner() e characteristics() metodi:

@Override 
public BinaryOperator<List<E>> combiner() { 
    return (l1, l2) -> { 
     if (l1.isEmpty()) { 
      return l2; 
     } else if (l2.isEmpty()) { 
      return l1; 
     } else { 
      final int comparison = comparator.compare(l1.get(0), l2.get(0)); 
      if (comparison == 0) { 
       l1.addAll(l2); 
       return l1; 
      } else if (comparison < 0) { 
       return l2; 
      } else { 
       return l1; 
      } 
     } 
    }; 
} 

@Override 
public Set<Characteristics> characteristics() { 
    return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED); 
} 
+1

Non vedo nulla di sbagliato nell'implementazione di Collector (che è fondamentalmente la stessa implementazione della risposta accettata di questa domanda: http://stackoverflow.com/questions/29334404/how-to-force-max-to -return-all-maximum-values-in-a-java-stream/29334774) –

+1

È interessante notare che la rimozione della caratteristica 'CONCURRENT' lo rende funzionante per me, quindi suppongo che dovresti guardare in questa direzione. –

risposta

7

il codice ha un solo errore significativo: se il collettore non è thread-safe, non dovrebbe riferire Characteristics.CONCURRENT come questo è esattamente rivendicando che era sicuro.

Il punto importante che dovete capire è che per i non CONCURRENT collezionisti, il quadro eseguirà i passi necessari per utilizzarlo in un thread-safe, ma ancora modo efficiente:

  • per ciascun lavoratore, filo un nuovo contenitore verrà acquisito via supplier()
  • ciascun lavoratore utilizzerà la funzione accumulator() insieme al suo contenitore locale
  • il combiner() sarà utilizzata una volta due thread di lavoro hanno terminato il loro lavoro
  • il finisher() verrà utilizzato quando tutti thread di lavoro hanno finito il loro lavoro e tutti i contenitori vengono combinati

Quindi tutto quello che dovete fare è quello di garantire che il vostro fornitore ritorna veramente una nuova istanza per ogni invocazione e che tutte le funzioni sono non interferenti e prive di effetti collaterali (per quanto riguarda altro che il contenitore che ricevono come argomenti) e, ovviamente, non riportano Characteristics.CONCURRENT quando il raccoglitore non è un collector concorrente.

Non è necessaria la parola chiave synchronized né raccolte simultanee qui.


proposito, un Comparator della forma (p1, p2) -> score(p1).compareTo(score(p2)) può essere implementato usando Comparator.comparing(p -> score(p)) o se il valore è un punteggio int: Comparator.comparingInt(p -> score(p)).


Infine, la funzione combinatore non controlla se uno degli elenchi è vuoto. Questo spiega perfettamente un IndexOutOfBoundsException all'interno del combiner mentre il IndexOutOfBoundsException all'interno del accumulator è il risultato del vostro raccoglitore segnalazione Characteristics.CONCURRENT ...


E 'anche importante capire che l'aggiunta di una parola chiave synchronized a un metodo di accumulator() o combiner() non custodisce la funzione costruita tramite espressione lambda. Proteggerà il metodo che costruisce l'istanza della funzione, ma non il codice stesso della funzione. A differenza di una classe interna, non è possibile aggiungere una parola chiave synchronized al metodo di implementazione della funzione effettiva.

+0

Sì, ho visto che il 'synchronized' non si comporta come si usa ... Anche io provo ad aggiungere un blocco sincronizzato:' synchronized (list) {... 'nel metodo' accumulator() ', tieni 'CONCURRENT' nel set di caratteristiche e non controllare' isEmpty() 'nel metodo' combiner() 'e funziona anche ma la tua soluzione è più veloce in termini di tempo di esecuzione – kwisatz

Problemi correlati