2014-06-23 11 views
6

Stavo eseguendo alcuni test con l'elaborazione parallela e ho creato un programma che, data una matrice di numeri interi, ricalcola il valore di ogni posizione in base ai vicini.Idee per evitare di fare un trucco su CyclicBarrier

ho bisogno di una copia della matrice in modo che i valori non sarebbero sovrascritte e usato un CyclicBarrier per unire i risultati parziali volta che i problemi sono stati risolti:

CyclicBarrier cyclic_barrier = new CyclicBarrier(n_tasks + 1, new Runnable() { 
    public void run() { 
     ParallelProcess.mergeResult(); 
    } 
}); 
ParallelProcess p = new ParallelProcess(cyclic_barrier, n_rows, r_cols); // init 

Ogni task viene assegnata una porzione della matrice : Lo sto dividendo in parti per file. Ma potrebbe accadere che le divisioni non siano esatte, quindi ci sarebbe un piccolo pezzo corrispondente alla riga dura che non sarebbe stata inviata al pool di thread.

Esempio: se ho 16 righe e n_tasks = 4 nessun problema, tutti i 4 verranno inviati al pool. Ma se avessi il 18, saranno presentati i primi 16, ma non gli ultimi due.

Quindi sto forzando un invio se questo caso accade. Bene, non sto presentando effettivamente, perché sto usando un pool di thread fisso che ho creato in questo modo ExecutorService e = Executors.newFixedThreadPool(n_tasks). Poiché tutti gli slot della piscina sono occupati e i fili sono bloccati dalla barriera (mybarrier.await() è chiamato nel metodo run) non potevo inviarlo al pool, quindi ho usato solo Thread.start().

Andiamo al punto. Dal momento che ho bisogno di prendere in considerazione per il CyclicBarrier la possibilità di quel chunk rimanente, il numero di parti deve essere incrementato di 1.

Ma se questo caso non accadesse, sarei una parte breve per innescare la barriera .

Qual è la mia soluzione ?:

if (lower_limit != n_rows) { // the remaining chunk to be processed 
    Thread t = new Thread(new ParallelProcess(lower_limit, n_rows)); 
    t.start(); 
    t.join(); 
} 
else { 
    cyclic_barrier.await(); 
} 

Mi sento come sto imbrogli quando si utilizza il cyclic_barrier.await() trucco per aumentare la barriera con la forza.

C'è un altro modo in cui posso affrontare questo problema, quindi non ho dovuto fare quello che sto facendo?

+0

Che dire di questo: Nella operaio ultimo ad arrivare alla barriera (controllare tramite valore restituito da ' barrier.await() '), puoi controllare se ci sono oggetti da continuare. In caso affermativo, il lavoratore può presentare una nuova attività per gestire gli oggetti rimanenti.Forse, dovrai ancora usare un po 'di sincronizzazione aggiuntiva all'interno di 'mergeResult()' oltre 'barrier.await()' - proprio come mostri con 't.join()'. –

+0

@VictorSorokin Non capisco come controllare il valore restituito di 'await()' mi aiuterebbe a controllare se c'è una parte sinistra. – dabadaba

+0

Non sono noti i valori di 'rows' e' n_threads 'in precedenza (prima di iniziare l'invio delle attività)? –

risposta

1

Sebbene questo non risponda alla tua domanda su CyclicBarriers, posso consigliare l'uso di Phaser? Questo ha la possibilità di includere il numero di parti e consente inoltre di eseguire il mergeResult quando una fase è scattata.

Quindi, prima di eseguire un calcolo asincrono, semplicemente register. Quindi all'interno di quel calcolo arriva il thread su phaser. Quando tutti i thread sono arrivati, avanza la fase e può richiamare un metodo sovrascritto onAdvance.

La presentazione:

ParallelProcess process = new ParallelProcess(lower_limit, n_rows)); 
phaser.register(); 
executor.submit(process); 

Il processore

public void run(){ 
    //do stuff 
    phaser.arrive(); 
} 

Il phaser

Phaser phaser = new Phaser(){ 
    protected boolean onAdvance(int phase, int registeredParties) { 
     ParallelProcess.mergeResult(); 
     return true; 
    } 
} 
Problemi correlati