2016-04-13 10 views
10

consente di dire che abbiamo qualcosa di simile:Do Java 8 flussi paralleli utilizzano lo stesso filo per una sequenza

LongStream.range(0, 10).parallel() 
.filter(l -> { 
    System.out.format("filter: %s [%s]\n", l, Thread.currentThread().getName()); 
    return l % 2 == 0; 
}) 
.map(l -> { 
    System.out.format("map: %s [%s]\n", l, Thread.currentThread().getName()); 
    return l; 
}); 

Se si esegue questa uscita programma sarebbe qualcosa di simile:

filter: 6 [main] 
map: 6 [main] 
filter: 5 [main] 
filter: 4 [ForkJoinPool.commonPool-worker-2] 
map: 4 [ForkJoinPool.commonPool-worker-2] 
filter: 1 [ForkJoinPool.commonPool-worker-3] 
filter: 2 [ForkJoinPool.commonPool-worker-1] 
filter: 0 [ForkJoinPool.commonPool-worker-3] 
filter: 3 [ForkJoinPool.commonPool-worker-2] 
filter: 8 [main] 
filter: 7 [ForkJoinPool.commonPool-worker-2] 
filter: 9 [ForkJoinPool.commonPool-worker-2] 
map: 0 [ForkJoinPool.commonPool-worker-3] 
map: 2 [ForkJoinPool.commonPool-worker-1] 
map: 8 [main]` 

Come abbiamo può vedere ogni sequenza di attività per ogni long viene eseguita esattamente da uno stesso thread. È qualcosa su cui possiamo contare, o è solo una coincidenza? I thread possono 'condividere' compiti durante l'esecuzione?

risposta

9

Da stream package summary sezione Effetti collaterali:

Se i parametri comportamentali hanno effetti collaterali, se non esplicitamente dichiarato, non ci sono garanzie per quanto riguarda la visibilità di questi effetti collaterali ad altri thread, né esiste alcuna garanzia che diverse operazioni sullo "stesso" elemento all'interno della stessa pipeline di flusso vengano eseguite nella stessa thread.

4

non è il caso, è come flusso API attualmente implementati in OracleJDK/OpenJDK: operazioni stateless (come filter, map, peek e flatMap) sono fusi insieme in un'unica operazione che esegue le operazioni in sequenza in single thread. Tuttavia, l'introduzione di alcune operazioni stateful potrebbe cambiare le cose. Per esempio, aggiungiamo un limit:

LongStream.range(0, 10).parallel() 
.filter(l -> { 
    System.out.format("filter: %s [%s]\n", l, Thread.currentThread().getName()); 
    return l % 2 == 0; 
}) 
.limit(10) 
.map(l -> { 
    System.out.format("map: %s [%s]\n", l, Thread.currentThread().getName()); 
    return l; 
}) 
.forEach(x -> {}); 

limite introduce ora una barriera che divide il gasdotto in due parti. Il risultato è simile:

filter: 8 [ForkJoinPool.commonPool-worker-2] 
filter: 9 [ForkJoinPool.commonPool-worker-7] 
filter: 0 [ForkJoinPool.commonPool-worker-6] 
filter: 1 [ForkJoinPool.commonPool-worker-3] 
filter: 4 [ForkJoinPool.commonPool-worker-5] 
filter: 2 [ForkJoinPool.commonPool-worker-1] 
filter: 6 [main] 
filter: 7 [ForkJoinPool.commonPool-worker-4] 
filter: 3 [ForkJoinPool.commonPool-worker-6] 
filter: 5 [ForkJoinPool.commonPool-worker-2] 
map: 0 [ForkJoinPool.commonPool-worker-6] 
map: 2 [ForkJoinPool.commonPool-worker-2] 
map: 8 [ForkJoinPool.commonPool-worker-4] 
map: 6 [main] 
map: 4 [ForkJoinPool.commonPool-worker-6] 

Vedi quell'elemento # 2 è stato filtrato in PLG-1 thread, ma mappato in PLG-2 thread.

Si noti che, come citato correttamente da @Misha, anche per le operazioni senza stato non si garantisce che venga utilizzato lo stesso thread. È possibile che implementazioni future o alternative di Stream API modificheranno questo comportamento (ad esempio, utilizzando l'approccio produttore-consumatore).

Problemi correlati