Voglio avere qualcosa di simile a Collectors.maxBy()
, un raccoglitore che ottiene gli elementi in cima a una collezione (maxBy
ne riceve uno solo).Come implementare un servizio di raccolta thread-safe?
Ho uno stream di Possibility
oggetti che possono essere valutati con un metodo Integer score(Possibility)
.
Per prima cosa ho provato:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(Collectors.toList());
if(!possibilities.isEmpty()) {
int bestScore = possibilities.stream()
.mapToInt(p -> score(p))
.max()
.getAsInt();
possibilities = possibilities.stream()
.filter(p -> score(p)==bestScore)
.collect(Collectors.toList());
}
Ma farlo, scruto la collezione tre volte. Una volta per costruirlo, una seconda volta per ottenere il punteggio più alto, e una terza volta per filtrarlo e questo non è ottimale. Inoltre il numero di possibilità potrebbe essere enorme (> 10).
Il modo migliore dovrebbe essere quello di ottenere direttamente le migliori possibilità nel primo raccoglitore, ma non sembra esserci alcun compilatore incorporato per fare una cosa del genere.
Così ho implementato il mio Collector
:
public class BestCollector<E> implements Collector<E, List<E>, List<E>> {
private final Comparator<E> comparator;
private final Class<? extends List> listImpl ;
public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) {
this.comparator = comparator;
this.listImpl = listImpl;
}
public BestCollector(Comparator<E> comparator) {
this.comparator= comparator;
listImpl = ArrayList.class;
}
@Override
public Supplier<List<E>> supplier() {
return() -> {
try {
return listImpl.newInstance();
} catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
};
}
@Override
public BiConsumer<List<E>, E> accumulator() {
return (list, e) -> {
if (list.isEmpty()) {
list.add(e);
} else {
final int comparison = comparator.compare(list.get(0), e);
if (comparison == 0) {
list.add(e);
} else if (comparison < 0) {
list.clear();
list.add(e);
}
}
};
}
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
};
}
@Override
public Function<List<E>, List<E>> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED);
}
}
E poi:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2)));
E che fa il lavoro in una modalità sequenziale (senza il .parallel()
), ma in modo parallelo Ci sono alcune eccezioni occasionalmente in due punti:
A
java.lang.IndexOutOfBoundsException Index: 0, Size: 0
nella linea:final int comparison = comparator.compare(list.get(0), e);
del accumulator()
metodo
comprendo succede quando un list.clear()
è chiamato tra list.isEmpty()
e list.get(0)
.
A
java.lang.NullPointerException
nel metodo punteggio (possibilità) perché la possibilità ènull
. Anche in questo caso la stessa linea è coinvolto:final int comparison = comparator.compare(list.get(0), e);
Non capisco come list.get(0)
potrebbe tornare null
...
In modo parallelo, a volte list.get(0)
solleva un IndexOutOfBoundsException
e, a volte ritornano null
.
Capisco che il mio codice non è thread-safe così ho provato diverse soluzioni:
- Aggiungi
synchronized
in tutti i metodi di BestCollector:public synchronized …
- Utilizzare una raccolta thread-safe, invece di
ArrayList
:java.util.concurrent.CopyOnWriteArrayList
- Aggiungi
synchronized
e utilizzaCopyOnWriteArrayList
allo stesso tempo Rimuovi
Characteristics.CONCURRENT
dalSet<Characteristics>
delcharacteristics()
metodo di@Override public Set<Characteristics> characteristics() { return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED); }
Ma io non so se il Characteristics.CONCURRENT
è qui per indicare che il mio codice è thread-safe o che il mio codice sarà utilizzato in un processo di concorrenza.
Ma nessuna di queste soluzioni risolve il problema.
Infatti quando viene rimosso CONCURRENT su caratteristiche v'è, a volte, un java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
ma in linea:
final int comparison = comparator.compare(l1.get(0), l2.get(0));
del metodo combiner()
.
Tuttavia, le eccezioni sollevate dal metodo accumulator()
sembrano non verificarsi più.
@ La risposta di Holger è giusta.
La soluzione completa è di cambiare entrambi i combiner()
e characteristics()
metodi:
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
if (l1.isEmpty()) {
return l2;
} else if (l2.isEmpty()) {
return l1;
} else {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
}
};
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
Non vedo nulla di sbagliato nell'implementazione di Collector (che è fondamentalmente la stessa implementazione della risposta accettata di questa domanda: http://stackoverflow.com/questions/29334404/how-to-force-max-to -return-all-maximum-values-in-a-java-stream/29334774) –
È interessante notare che la rimozione della caratteristica 'CONCURRENT' lo rende funzionante per me, quindi suppongo che dovresti guardare in questa direzione. –