2014-12-01 10 views
10

Se ho un flusso parallelo in java 8 e termino con anyMatch e la mia raccolta ha un elemento che corrisponde al predicato, sto cercando di capire cosa succede quando un thread elabora questo elemento.Streaming Java 8 parallelo + anyMatch: i thread vengono interrotti una volta trovata una corrispondenza?

So che anyMatch è in cortocircuito, quindi non mi aspetto che ulteriori elementi vengano elaborati una volta che l'elemento corrispondente è stato elaborato. La mia confusione riguarda ciò che accade agli altri thread, che sono presumibilmente nel mezzo di elementi di elaborazione. Posso pensare a 3 scenari plausibili: a) Vengono interrotti? b) Continuano a elaborare l'elemento su cui stanno lavorando e quindi, una volta che tutti i thread non stanno facendo nulla, ottengo il mio risultato? c) Ottengo il risultato, ma i thread che stavano elaborando altri elementi continuano a elaborare quegli elementi (ma non assumono altri elementi una volta terminati)?

Ho un predicato di lunga durata, in cui è molto utile terminare rapidamente non appena so che un elemento corrisponde. Mi preoccupo un po 'perché non riesco a trovare questa informazione nella documentazione che potrebbe essere una cosa dipendente dall'implementazione, che sarebbe anche utile sapere.

Grazie

risposta

21

Dopo un po 'scavare attraverso il codice sorgente di Java Credo di aver trovato la risposta.

Gli altri thread controllano periodicamente per vedere se un altro thread ha trovato la risposta e in caso affermativo, quindi smettono di funzionare e annulla tutti i nodi non ancora in esecuzione.

java.util.Stream.FindOps$FindTask ha questo metodo:

private void foundResult(O answer) { 
     if (isLeftmostNode()) 
      shortCircuit(answer); 
     else 
      cancelLaterNodes(); 
    } 

sua classe genitore, AbstractShortcircuitTask implementa shortCircuit come questo:

/** 
* Declares that a globally valid result has been found. If another task has 
* not already found the answer, the result is installed in 
* {@code sharedResult}. The {@code compute()} method will check 
* {@code sharedResult} before proceeding with computation, so this causes 
* the computation to terminate early. 
* 
* @param result the result found 
*/ 
protected void shortCircuit(R result) { 
    if (result != null) 
     sharedResult.compareAndSet(null, result); 
} 

E il metodo effettivo compute() che fa il lavoro ha questa importante linea:

AtomicReference<R> sr = sharedResult; 
    R result; 
    while ((result = sr.get()) == null) { 
     ...//does the actual fork stuff here 
    } 

dove sharedResult viene aggiornato dal metodo shortCircuit() in modo che il calcolo lo visualizzi la prossima volta che controlla la condizione del ciclo while.

EDIT Quindi, in sintesi:

  1. Le discussioni non sono interrotti
  2. Invece, essi saranno periodicamente controllare per vedere se qualcuno ha trovato la risposta e si fermerà l'ulteriore elaborazione se la risposta è stata trovata .
  3. Non verranno avviati nuovi thread una volta trovata la risposta.
+4

Ecco informazioni molto dettagliate sul codice di implementazione, ma vi consiglio di aggiungere un riepilogo che si rivolge, infine, la domanda del PO, vale a dire “Che cosa accadrà” → 'B' – Holger

+0

Qualcuno può chiarire - questo si applica anche per le operazioni di noneMatch e allmatch ? Quindi se un thread di allMatch trova un elemento che non corrisponde, gli altri thread controllano periodicamente e quindi si fermano come risultato di questo? – Tranquility

Problemi correlati