2011-02-09 8 views
5

Sono un neofita del multithreading in Windows, quindi questa potrebbe essere una domanda banale: qual è il modo più semplice per assicurarsi che i thread eseguano un ciclo in sequenza?Come fare loop fili Win32/MFC in lockstep?

Ho provato a passare un array condiviso di Event s a tutti i thread e utilizzando WaitForMultipleObjects alla fine del ciclo per sincronizzarli, ma questo mi dà un deadlock dopo uno, a volte due cicli. Ecco una versione semplificata del mio codice corrente (con solo due fili, ma vorrei renderlo scalabile):

typedef struct 
{ 
    int rank; 
    HANDLE* step_events; 
} IterationParams; 

int main(int argc, char **argv) 
{ 
    // ... 

    IterationParams p[2]; 
    HANDLE step_events[2]; 
    for (int j=0; j<2; ++j) 
    { 
     step_events[j] = CreateEvent(NULL, FALSE, FALSE, NULL); 
    } 

    for (int j=0; j<2; ++j) 
    { 
     p[j].rank = j; 
     p[j].step_events = step_events; 
     AfxBeginThread(Iteration, p+j); 
    } 

    // ... 
} 

UINT Iteration(LPVOID pParam) 
{ 
    IterationParams* p = (IterationParams*)pParam; 
    int rank = p->rank; 

    for (int i=0; i<100; i++) 
    { 
     if (rank == 0) 
     { 
      printf("%dth iteration\n",i); 
      // do something 
      SetEvent(p->step_events[0]); 
      WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE); 
     } 
     else if (rank == 1) 
     { 
      // do something else 
      SetEvent(p->step_events[1]); 
      WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE); 
     } 
    } 
    return 0; 
} 

(so che sto mescolando C e C++, in realtà è l'eredità di codice C che ho Sto cercando di parallelizzare.)

Leggere i documenti su MSDN, penso che questo dovrebbe funzionare. Tuttavia, il thread 0 stampa solo una volta, occasionalmente due volte, quindi il programma si blocca. È un modo corretto per sincronizzare i thread? In caso contrario, cosa consiglieresti (non esiste davvero un supporto integrato per una barriera in MFC?).


EDIT: questa soluzione è SBAGLIATO, tra cui anche Alessandro's fix. Ad esempio, si consideri questo scenario:

  1. Discussione 0 imposta il suo evento e chiede Attendere, blocchi
  2. filettatura 1 definisce i suoi eventi e le chiamate Attendere, blocchi
  3. discussione 0 rendimenti Attendere, ripristina la sua manifestazione, e completa un ciclo senza Thread 1 ottenendo il controllo
  4. Il thread 0 imposta il proprio evento e chiama Wait. Poiché Thread 1 non ha ancora avuto la possibilità di reimpostare il suo evento, Thread Wait di Wait ritorna immediatamente ei thread non sono sincronizzati.

Quindi la domanda rimane: come si fa a tranquillamente assicurarsi che i fili rimanere in sincronia?

risposta

4

Introduzione

ho implementato un semplice programma C++ per la vostra considerazione (testato in Visual Studio 2010). Sta usando solo le API Win32 (e la libreria standard per l'output della console e un po 'di randomizzazione). Dovresti essere in grado di rilasciarlo in un nuovo progetto di console Win32 (senza intestazioni precompilate), compilare ed eseguire.


Soluzione

#include <tchar.h> 
#include <windows.h> 


//--------------------------------------------------------- 
// Defines synchronization info structure. All threads will 
// use the same instance of this struct to implement randezvous/ 
// barrier synchronization pattern. 
struct SyncInfo 
{ 
    SyncInfo(int threadsCount) : Awaiting(threadsCount), ThreadsCount(threadsCount), Semaphore(::CreateSemaphore(0, 0, 1024, 0)) {}; 
    ~SyncInfo() { ::CloseHandle(this->Semaphore); } 
    volatile unsigned int Awaiting; // how many threads still have to complete their iteration 
    const int ThreadsCount; 
    const HANDLE Semaphore; 
}; 


//--------------------------------------------------------- 
// Thread-specific parameters. Note that Sync is a reference 
// (i.e. all threads share the same SyncInfo instance). 
struct ThreadParams 
{ 
    ThreadParams(SyncInfo &sync, int ordinal, int delay) : Sync(sync), Ordinal(ordinal), Delay(delay) {}; 
    SyncInfo &Sync; 
    const int Ordinal; 
    const int Delay; 
}; 


//--------------------------------------------------------- 
// Called at the end of each itaration, it will "randezvous" 
// (meet) all the threads before returning (so that next 
// iteration can begin). In practical terms this function 
// will block until all the other threads finish their iteration. 
static void RandezvousOthers(SyncInfo &sync, int ordinal) 
{ 
    if (0 == ::InterlockedDecrement(&(sync.Awaiting))) { // are we the last ones to arrive? 
     // at this point, all the other threads are blocking on the semaphore 
     // so we can manipulate shared structures without having to worry 
     // about conflicts 
     sync.Awaiting = sync.ThreadsCount; 
     wprintf(L"Thread %d is the last to arrive, releasing synchronization barrier\n", ordinal); 
     wprintf(L"---~~~---\n"); 

     // let's release the other threads from their slumber 
     // by using the semaphore 
     ::ReleaseSemaphore(sync.Semaphore, sync.ThreadsCount - 1, 0); // "ThreadsCount - 1" because this last thread will not block on semaphore 
    } 
    else { // nope, there are other threads still working on the iteration so let's wait 
     wprintf(L"Thread %d is waiting on synchronization barrier\n", ordinal); 
     ::WaitForSingleObject(sync.Semaphore, INFINITE); // note that return value should be validated at this point ;) 
    } 
} 


//--------------------------------------------------------- 
// Define worker thread lifetime. It starts with retrieving 
// thread-specific parameters, then loops through 5 iterations 
// (randezvous-ing with other threads at the end of each), 
// and then finishes (the thread can then be joined). 
static DWORD WINAPI ThreadProc(void *p) 
{ 
    ThreadParams *params = static_cast<ThreadParams *>(p); 
    wprintf(L"Starting thread %d\n", params->Ordinal); 

    for (int i = 1; i <= 5; ++i) { 
     wprintf(L"Thread %d is executing iteration #%d (%d delay)\n", params->Ordinal, i, params->Delay); 
     ::Sleep(params->Delay); 
     wprintf(L"Thread %d is synchronizing end of iteration #%d\n", params->Ordinal, i); 
     RandezvousOthers(params->Sync, params->Ordinal); 
    } 

    wprintf(L"Finishing thread %d\n", params->Ordinal); 
    return 0; 
} 


//--------------------------------------------------------- 
// Program to illustrate iteration-lockstep C++ solution. 
int _tmain(int argc, _TCHAR* argv[]) 
{ 
    // prepare to run 
    ::srand(::GetTickCount()); // pseudo-randomize random values :-) 
    SyncInfo sync(4); 
    ThreadParams p[] = { 
     ThreadParams(sync, 1, ::rand() * 900/RAND_MAX + 100), // a delay between 200 and 1000 milliseconds will simulate work that an iteration would do 
     ThreadParams(sync, 2, ::rand() * 900/RAND_MAX + 100), 
     ThreadParams(sync, 3, ::rand() * 900/RAND_MAX + 100), 
     ThreadParams(sync, 4, ::rand() * 900/RAND_MAX + 100), 
    }; 

    // let the threads rip 
    HANDLE t[] = { 
     ::CreateThread(0, 0, ThreadProc, p + 0, 0, 0), 
     ::CreateThread(0, 0, ThreadProc, p + 1, 0, 0), 
     ::CreateThread(0, 0, ThreadProc, p + 2, 0, 0), 
     ::CreateThread(0, 0, ThreadProc, p + 3, 0, 0), 
    }; 

    // wait for the threads to finish (join) 
    ::WaitForMultipleObjects(4, t, true, INFINITE); 

    return 0; 
} 

Esempio di output

L'esecuzione di questo programma sulla mia macchina (dual-core) produce il seguente output:

Starting thread 1 
Starting thread 2 
Starting thread 4 
Thread 1 is executing iteration #1 (712 delay) 
Starting thread 3 
Thread 2 is executing iteration #1 (798 delay) 
Thread 4 is executing iteration #1 (477 delay) 
Thread 3 is executing iteration #1 (104 delay) 
Thread 3 is synchronizing end of iteration #1 
Thread 3 is waiting on synchronization barrier 
Thread 4 is synchronizing end of iteration #1 
Thread 4 is waiting on synchronization barrier 
Thread 1 is synchronizing end of iteration #1 
Thread 1 is waiting on synchronization barrier 
Thread 2 is synchronizing end of iteration #1 
Thread 2 is the last to arrive, releasing synchronization barrier 
---~~~--- 
Thread 2 is executing iteration #2 (798 delay) 
Thread 3 is executing iteration #2 (104 delay) 
Thread 1 is executing iteration #2 (712 delay) 
Thread 4 is executing iteration #2 (477 delay) 
Thread 3 is synchronizing end of iteration #2 
Thread 3 is waiting on synchronization barrier 
Thread 4 is synchronizing end of iteration #2 
Thread 4 is waiting on synchronization barrier 
Thread 1 is synchronizing end of iteration #2 
Thread 1 is waiting on synchronization barrier 
Thread 2 is synchronizing end of iteration #2 
Thread 2 is the last to arrive, releasing synchronization barrier 
---~~~--- 
Thread 4 is executing iteration #3 (477 delay) 
Thread 3 is executing iteration #3 (104 delay) 
Thread 1 is executing iteration #3 (712 delay) 
Thread 2 is executing iteration #3 (798 delay) 
Thread 3 is synchronizing end of iteration #3 
Thread 3 is waiting on synchronization barrier 
Thread 4 is synchronizing end of iteration #3 
Thread 4 is waiting on synchronization barrier 
Thread 1 is synchronizing end of iteration #3 
Thread 1 is waiting on synchronization barrier 
Thread 2 is synchronizing end of iteration #3 
Thread 2 is the last to arrive, releasing synchronization barrier 
---~~~--- 
Thread 2 is executing iteration #4 (798 delay) 
Thread 3 is executing iteration #4 (104 delay) 
Thread 1 is executing iteration #4 (712 delay) 
Thread 4 is executing iteration #4 (477 delay) 
Thread 3 is synchronizing end of iteration #4 
Thread 3 is waiting on synchronization barrier 
Thread 4 is synchronizing end of iteration #4 
Thread 4 is waiting on synchronization barrier 
Thread 1 is synchronizing end of iteration #4 
Thread 1 is waiting on synchronization barrier 
Thread 2 is synchronizing end of iteration #4 
Thread 2 is the last to arrive, releasing synchronization barrier 
---~~~--- 
Thread 3 is executing iteration #5 (104 delay) 
Thread 4 is executing iteration #5 (477 delay) 
Thread 1 is executing iteration #5 (712 delay) 
Thread 2 is executing iteration #5 (798 delay) 
Thread 3 is synchronizing end of iteration #5 
Thread 3 is waiting on synchronization barrier 
Thread 4 is synchronizing end of iteration #5 
Thread 4 is waiting on synchronization barrier 
Thread 1 is synchronizing end of iteration #5 
Thread 1 is waiting on synchronization barrier 
Thread 2 is synchronizing end of iteration #5 
Thread 2 is the last to arrive, releasing synchronization barrier 
---~~~--- 
Finishing thread 4 
Finishing thread 3 
Finishing thread 2 
Finishing thread 1 

Si noti che per semplicità che ogni filo ha durata casuale dell'iterazione, ma tutte le iterazioni di quel thread useranno la stessa durata casuale (es. non cambia tra le iterazioni).


Come funziona?

Il "nucleo" della soluzione è nella funzione "RandezvousOthers". Questa funzione bloccherà su un semaforo condiviso (se il thread su cui questa funzione è stata chiamata non è l'ultimo a chiamare la funzione) o ripristina la struttura Sync e sblocca tutti i thread bloccati su un semaforo condiviso (se il thread su cui è presente la funzione è stata chiamata è stata l'ultima a chiamare la funzione).

+0

Per conto delle generazioni future, grazie per aver pubblicato un esempio completo. Ho implementato la stessa logica (contatore condiviso e semaforo), e sembra funzionare perfettamente. – suszterpatt

2

Per farlo funzionare, impostare il secondo parametro di CreateEvent su TRUE. Ciò renderà gli eventi "reset manuale" e impedirà allo Waitxxx di ripristinarlo. Quindi posizionare uno ResetEvent all'inizio del ciclo.

+0

Non desidero ** ** il comando "Attendi ..." per resettare gli eventi? Altrimenti i thread non sono sincronizzati dopo il primo ciclo, no? – suszterpatt

+0

No, perché il primo Wait che restituisce reimposta gli eventi, impedendo l'altro Wait to return. – Loghorn

+0

Ah, capisco. Grazie. – suszterpatt

1

Ho trovato questo SyncTools (scarica SyncTools.zip) googling "finestre di sincronizzazione barriera". Usa una CriticalSection e un Event per implementare una barriera per N thread.