L'implementazione di un contatore condiviso non è banale, ma una volta eseguita e conservata in una libreria da qualche parte, è possibile eseguire un lotto con esso.
Nel libro Using MPI-2, che si dovrebbe avere a disposizione se si intende implementare questa roba, uno degli esempi (il codice è available online) è un contatore condiviso. Lo "non scalabile" dovrebbe funzionare bene con diverse dozzine di processi: il contatore è un array di 0..size-1 di interi, uno per grado, e quindi l'operazione di `get next work item # 'consiste di bloccando la finestra, leggendo il contributo di tutti gli altri al contatore (in questo caso, quanti elementi hanno preso), aggiornando il proprio (++), chiudendo la finestra e calcolando il totale. Questo è tutto fatto con operazioni passive unilaterali. (La migliore scala utilizza solo un albero piuttosto che un array 1-d).
Quindi l'uso sarebbe dire che il grado 0 ospita il contatore e tutti continuano a fare unità di lavoro e ad aggiornare il contatore per ottenere quello successivo fino a quando non c'è più lavoro; quindi aspetti ad una barriera o qualcosa e finalizzi.
Una volta che hai qualcosa di simile - utilizzando un valore condiviso per mettere a disposizione la prossima unità di lavoro - funzionante, puoi generalizzare a un approccio più sofisticato. Quindi, come suggerito da suzterpatt, tutti quelli che prendono "la loro parte" di unità di lavoro all'inizio funzionano alla grande, ma cosa fare se alcuni finiscono più velocemente di altri? La solita risposta ora è il furto del lavoro; ognuno mantiene la propria lista di unità di lavoro in una coda, e poi quando si esaurisce il lavoro, ruba unità di lavoro dall'altra parte di qualcuno, finché non rimane più lavoro.Questa è davvero la versione completamente distribuita di master-worker, dove non c'è più un singolo lavoro di partizionamento master. Una volta che hai funzionato un singolo contatore condiviso, puoi creare mutex da quelli e da questo puoi implementare la rimozione. Ma se il semplice contatore condiviso funziona abbastanza bene, potresti non aver bisogno di andare lì.
Aggiornamento: Ok, quindi ecco una hacky-tentativo di fare il contatore condiviso - la mia versione della semplice nel libro MPI-2: sembra funzionare, ma non avrebbe detto niente molto più forte di quello (Non ho giocato con questa roba per molto tempo). C'è una semplice implementazione del contatore (corrispondente alla versione non scalabile nel libro MPI-2) con due semplici test, uno corrispondente all'incirca al tuo caso di lavoro; ogni articolo aggiorna il contatore per ottenere un oggetto di lavoro, quindi esegue il "lavoro" (dorme per una quantità casuale di tempo). Alla fine di ogni test, viene stampata la struttura dei dati del contatore, che è il numero di incrementi che ogni grado ha eseguito.
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
struct mpi_counter_t {
MPI_Win win;
int hostrank ;
int myval;
int *data;
int rank, size;
};
struct mpi_counter_t *create_counter(int hostrank) {
struct mpi_counter_t *count;
count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
count->hostrank = hostrank;
MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
MPI_Comm_size(MPI_COMM_WORLD, &(count->size));
if (count->rank == hostrank) {
MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data));
for (int i=0; i<count->size; i++) count->data[i] = 0;
MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int),
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
} else {
count->data = NULL;
MPI_Win_create(count->data, 0, 1,
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
}
count -> myval = 0;
return count;
}
int increment_counter(struct mpi_counter_t *count, int increment) {
int *vals = (int *)malloc(count->size * sizeof(int));
int val;
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);
for (int i=0; i<count->size; i++) {
if (i == count->rank) {
MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,
count->win);
} else {
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
}
}
MPI_Win_unlock(0, count->win);
count->myval += increment;
vals[count->rank] = count->myval;
val = 0;
for (int i=0; i<count->size; i++)
val += vals[i];
free(vals);
return val;
}
void delete_counter(struct mpi_counter_t **count) {
if ((*count)->rank == (*count)->hostrank) {
MPI_Free_mem((*count)->data);
}
MPI_Win_free(&((*count)->win));
free((*count));
*count = NULL;
return;
}
void print_counter(struct mpi_counter_t *count) {
if (count->rank == count->hostrank) {
for (int i=0; i<count->size; i++) {
printf("%2d ", count->data[i]);
}
puts("");
}
}
int test1() {
struct mpi_counter_t *c;
int rank;
int result;
c = create_counter(0);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
result = increment_counter(c, 1);
printf("%d got counter %d\n", rank, result);
MPI_Barrier(MPI_COMM_WORLD);
print_counter(c);
delete_counter(&c);
}
int test2() {
const int WORKITEMS=50;
struct mpi_counter_t *c;
int rank;
int result = 0;
c = create_counter(0);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
srandom(rank);
while (result < WORKITEMS) {
result = increment_counter(c, 1);
if (result <= WORKITEMS) {
printf("%d working on item %d...\n", rank, result);
sleep(random() % 10);
} else {
printf("%d done\n", rank);
}
}
MPI_Barrier(MPI_COMM_WORLD);
print_counter(c);
delete_counter(&c);
}
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
test1();
test2();
MPI_Finalize();
}
Puoi spiegare quali benefici speri di ottenere dall'eliminazione del dispatcher? – NPE
@ aix- sicuro. In alcuni dei nostri run più grandi, ho notato che il nodo di invio si satura di comunicazioni (ad esempio, una corsa con np = 10 nodi nodi). Per superare questo, ho iniziato a consentire più nodi di invio, in cui ogni nodo di invio prende un sottogruppo. Tuttavia, questo porta a un codice più complesso (più difficile da mantenere). Quindi è soprattutto un problema di cercare di semplificare le cose (se è qualcosa che potrebbe essere fatto semplicemente). – MarkD
Inoltre, su run più piccoli (ad esempio 5-10 nodi) che vengono eseguiti più spesso, sarebbe opportuno non consegnare un nodo intero a un nodo di invio. Il nostro sys-admin è molto contro il sovraccarico dei core, e ha impostato il job scheduler per non consentire i lavori in cui il numero di processi> il numero di core richiesti. – MarkD