2011-02-09 6 views
6

Ho un bel po 'di esperienza nell'usare i metodi base di comunicazione e di gruppo MPI2, e faccio un po' di lavoro di simulazione parallelo imbarazzante usando MPI. Fino ad ora, ho strutturato il mio codice per avere un nodo di invio e un mucchio di nodi di lavoro. Il nodo di invio ha un elenco di file di parametri che verranno eseguiti con il simulatore. Seziona ciascun nodo di lavoro con un file di parametri. I nodi worker eseguono la simulazione, quindi richiedono un altro file di parametri, fornito dal nodo di invio. Una volta che tutti i file dei parametri sono stati eseguiti, il nodo di invio arresta ogni nodo di lavoro, prima di spegnersi.Creazione di un contatore che rimane sincronizzato tra i processi MPI

I file dei parametri sono in genere denominati "Par_N.txt" dove N è il numero intero identificativo (ad es. N = 1-1000). Quindi stavo pensando, se potessi creare un contatore, e potessi avere questo contatore sincronizzato su tutti i miei nodi, potrei eliminare la necessità di avere un nodo di invio e rendere il sistema un po 'più semplice. Per quanto semplice possa sembrare, in pratica sospetto che sia un po 'più difficile, dato che dovrei assicurarmi che il contatore sia bloccato mentre viene cambiato, ecc. E pensavo che ci potesse essere un modo integrato per MPI di gestire questo. qualche idea? Sto pensando troppo a questo?

+0

Puoi spiegare quali benefici speri di ottenere dall'eliminazione del dispatcher? – NPE

+0

@ aix- sicuro. In alcuni dei nostri run più grandi, ho notato che il nodo di invio si satura di comunicazioni (ad esempio, una corsa con np = 10 nodi nodi). Per superare questo, ho iniziato a consentire più nodi di invio, in cui ogni nodo di invio prende un sottogruppo. Tuttavia, questo porta a un codice più complesso (più difficile da mantenere). Quindi è soprattutto un problema di cercare di semplificare le cose (se è qualcosa che potrebbe essere fatto semplicemente). – MarkD

+0

Inoltre, su run più piccoli (ad esempio 5-10 nodi) che vengono eseguiti più spesso, sarebbe opportuno non consegnare un nodo intero a un nodo di invio. Il nostro sys-admin è molto contro il sovraccarico dei core, e ha impostato il job scheduler per non consentire i lavori in cui il numero di processi> il numero di core richiesti. – MarkD

risposta

10

L'implementazione di un contatore condiviso non è banale, ma una volta eseguita e conservata in una libreria da qualche parte, è possibile eseguire un lotto con esso.

Nel libro Using MPI-2, che si dovrebbe avere a disposizione se si intende implementare questa roba, uno degli esempi (il codice è available online) è un contatore condiviso. Lo "non scalabile" dovrebbe funzionare bene con diverse dozzine di processi: il contatore è un array di 0..size-1 di interi, uno per grado, e quindi l'operazione di `get next work item # 'consiste di bloccando la finestra, leggendo il contributo di tutti gli altri al contatore (in questo caso, quanti elementi hanno preso), aggiornando il proprio (++), chiudendo la finestra e calcolando il totale. Questo è tutto fatto con operazioni passive unilaterali. (La migliore scala utilizza solo un albero piuttosto che un array 1-d).

Quindi l'uso sarebbe dire che il grado 0 ospita il contatore e tutti continuano a fare unità di lavoro e ad aggiornare il contatore per ottenere quello successivo fino a quando non c'è più lavoro; quindi aspetti ad una barriera o qualcosa e finalizzi.

Una volta che hai qualcosa di simile - utilizzando un valore condiviso per mettere a disposizione la prossima unità di lavoro - funzionante, puoi generalizzare a un approccio più sofisticato. Quindi, come suggerito da suzterpatt, tutti quelli che prendono "la loro parte" di unità di lavoro all'inizio funzionano alla grande, ma cosa fare se alcuni finiscono più velocemente di altri? La solita risposta ora è il furto del lavoro; ognuno mantiene la propria lista di unità di lavoro in una coda, e poi quando si esaurisce il lavoro, ruba unità di lavoro dall'altra parte di qualcuno, finché non rimane più lavoro.Questa è davvero la versione completamente distribuita di master-worker, dove non c'è più un singolo lavoro di partizionamento master. Una volta che hai funzionato un singolo contatore condiviso, puoi creare mutex da quelli e da questo puoi implementare la rimozione. Ma se il semplice contatore condiviso funziona abbastanza bene, potresti non aver bisogno di andare lì.

Aggiornamento: Ok, quindi ecco una hacky-tentativo di fare il contatore condiviso - la mia versione della semplice nel libro MPI-2: sembra funzionare, ma non avrebbe detto niente molto più forte di quello (Non ho giocato con questa roba per molto tempo). C'è una semplice implementazione del contatore (corrispondente alla versione non scalabile nel libro MPI-2) con due semplici test, uno corrispondente all'incirca al tuo caso di lavoro; ogni articolo aggiorna il contatore per ottenere un oggetto di lavoro, quindi esegue il "lavoro" (dorme per una quantità casuale di tempo). Alla fine di ogni test, viene stampata la struttura dei dati del contatore, che è il numero di incrementi che ogni grado ha eseguito.

#include <mpi.h> 
#include <stdlib.h> 
#include <stdio.h> 
#include <unistd.h> 

struct mpi_counter_t { 
    MPI_Win win; 
    int hostrank ; 
    int myval; 
    int *data; 
    int rank, size; 
}; 

struct mpi_counter_t *create_counter(int hostrank) { 
    struct mpi_counter_t *count; 

    count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t)); 
    count->hostrank = hostrank; 
    MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank)); 
    MPI_Comm_size(MPI_COMM_WORLD, &(count->size)); 

    if (count->rank == hostrank) { 
     MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data)); 
     for (int i=0; i<count->size; i++) count->data[i] = 0; 
     MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int), 
         MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win)); 
    } else { 
     count->data = NULL; 
     MPI_Win_create(count->data, 0, 1, 
         MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win)); 
    } 
    count -> myval = 0; 

    return count; 
} 

int increment_counter(struct mpi_counter_t *count, int increment) { 
    int *vals = (int *)malloc(count->size * sizeof(int)); 
    int val; 

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win); 

    for (int i=0; i<count->size; i++) { 

     if (i == count->rank) { 
      MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM, 
          count->win); 
     } else { 
      MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win); 
     } 
    } 

    MPI_Win_unlock(0, count->win); 
    count->myval += increment; 

    vals[count->rank] = count->myval; 
    val = 0; 
    for (int i=0; i<count->size; i++) 
     val += vals[i]; 

    free(vals); 
    return val; 
} 

void delete_counter(struct mpi_counter_t **count) { 
    if ((*count)->rank == (*count)->hostrank) { 
     MPI_Free_mem((*count)->data); 
    } 
    MPI_Win_free(&((*count)->win)); 
    free((*count)); 
    *count = NULL; 

    return; 
} 

void print_counter(struct mpi_counter_t *count) { 
    if (count->rank == count->hostrank) { 
     for (int i=0; i<count->size; i++) { 
      printf("%2d ", count->data[i]); 
     } 
     puts(""); 
    } 
} 

int test1() { 
    struct mpi_counter_t *c; 
    int rank; 
    int result; 

    c = create_counter(0); 

    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
    result = increment_counter(c, 1); 
    printf("%d got counter %d\n", rank, result); 

    MPI_Barrier(MPI_COMM_WORLD); 
    print_counter(c); 
    delete_counter(&c); 
} 


int test2() { 
    const int WORKITEMS=50; 

    struct mpi_counter_t *c; 
    int rank; 
    int result = 0; 

    c = create_counter(0); 

    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
    srandom(rank); 

    while (result < WORKITEMS) { 
     result = increment_counter(c, 1); 
     if (result <= WORKITEMS) { 
      printf("%d working on item %d...\n", rank, result); 
      sleep(random() % 10); 
     } else { 
      printf("%d done\n", rank); 
     } 
    } 

    MPI_Barrier(MPI_COMM_WORLD); 
    print_counter(c); 
    delete_counter(&c); 
} 

int main(int argc, char **argv) { 

    MPI_Init(&argc, &argv); 

    test1(); 
    test2(); 

    MPI_Finalize(); 
} 
+1

MPI_Fetch_and_op in MPI-3 semplifica enormemente questo codice. – Jeff

3

Non riesco a pensare a nessun meccanismo integrato per risolvere questo problema, dovresti implementarlo manualmente. A giudicare dai commenti si desidera decentrare il programma, nel qual caso ogni processo (o almeno gruppi di processi) dovrebbe mantenere i propri valori del contatore e mantenerlo sincronizzato. Ciò potrebbe probabilmente essere fatto con un uso intelligente di send/receive non bloccanti, ma la semantica di quelli non è banale.

Invece, risolverei il problema di saturazione semplicemente immettendo più file contemporaneamente in processi di lavoro. Ciò ridurrebbe il traffico di rete e consentirà di mantenere la configurazione semplice di un singolo dispatcher.

+0

@ suszterpatt- Ho pensato un po 'a quello che hai menzionato, dato un ID di processo e il numero totale di processi, posso facilmente afferrare un "pezzo" del lavoro che deve essere fatto da ciascun processo. La mia preoccupazione qui, tuttavia, è che le simulazioni hanno tempi di calcolo molto diversi (2+ ordini di grandezza a seconda dei tassi di convergenza), e posso vedere sorgere una situazione, in cui a un singolo nodo viene dato un numero elevato di processi a lungo termine, e il bilanciamento del carico diventerebbe un problema. – MarkD

+0

@ MarkD: In teoria, questo è certamente possibile. Tuttavia, sembra che tu stia elaborando quantità davvero massicce di dati, quindi le probabilità che questo non sia effettivamente così grandioso. Tuttavia, una possibile soluzione potrebbe essere quella di lasciare che il dispatcher "sganci" i file che il loro nodo di lavoro non ha ancora iniziato a elaborare, e invece li invia ad un lavoratore attualmente inattivo. Continuo a considerare questo approccio più semplice dell'implementazione di una variabile condivisa. – suszterpatt

0

Sembra che si stia utilizzando il nodo di invio per eseguire il bilanciamento dinamico del carico (assegnazione del lavoro ai processori quando diventano disponibili). Un contatore condiviso che non richiede l'arresto di tutti i processori non lo farà. Vorrei raccomandare di stare con quello che hai ora o fare ciò che suszterpatt suggerisce, inviare batch di file alla volta.

0

Non è chiaro se sia necessario passare attraverso i file in modo rigoroso o meno. In caso contrario, perché non fare in modo che ogni nodo i gestisca tutti i file in cui è N % total_workers == i - cioè la distribuzione ciclica del lavoro?

Problemi correlati