8

Sto lavorando a un framework di threading del lavoro semplice che è molto simile a quello descritto in id Tech 5 Challenges. Al livello più elementare, ho una serie di elenchi di lavori, e voglio programmare questi elenchi su un gruppo di thread della CPU (usando un pool di thread standard per l'invio effettivo). Tuttavia, mi chiedo come questo segnale/aspetti all'interno di una lista di attesa può essere implementato in modo efficiente. A quanto ho capito, il token di attesa blocca l'esecuzione della lista se il token del segnale non è stato eseguito. Questo implicitamente significa che tutto prima che un segnale debba finire prima che il segnale possa essere sollevato. Quindi diciamo che abbiamo una lista come questa:Implementazione di un elenco di processi con sincronizzazione interna

J1, J2, S, J3, W, J4 

quindi l'invio può andare in questo modo:

#1: J1, J2, J3 
<wait for J1, J2, run other lists if possible> 
#2: J4 

Tuttavia, questo non è così facile come sembra, come dato un insieme di liste , Dovrei spostare alcuni di essi tra ready e waiting e avere anche un codice speciale per raccogliere tutti i lavori prima di un segnale e taggare qualcosa su di essi, in modo che possano attivare il segnale se e solo se hanno tutto finito (ad esempio, per esempio che non è più possibile aggiungere lavori all'elenco mentre viene eseguito, come i seguenti segnali accedono ai lavori precedentemente inseriti.)

Esiste un modo "standard" per implementarlo in modo efficiente? Mi chiedo anche come pianificare al meglio l'esecuzione della lista di lavoro, in questo momento, ogni core acquisisce una lista di lavoro e pianifica tutti i lavori al suo interno, che offre un ridimensionamento abbastanza buono (per lavori a 32k a 0,7 ms, ottengo il 101%, che suppongo è in parte dovuto al fatto che la versione a thread singolo viene pianificata su diversi core alcune volte.)

+0

Quanti elenchi di lavoro hai in media? – fabrizioM

+0

10-40 elenchi di lavoro, con un totale di ~ 300-500 lavori (alcuni elenchi possono contenere più di 50 lavori) – Anteru

+0

È possibile che le coppie segnale/sincronizzazione vengano annidate? –

risposta

4

Questo è un algoritmo di programmazione relativamente semplice. Un paio di questioni sembrano inizialmente complicate, ma in realtà non lo sono (signal/wait e cache locality). Spiegherò le tecniche, poi fornirò del codice che ho scritto per illustrare i concetti, quindi fornirò alcune note finali sull'ottimizzazione.

algoritmi da usare

Manipolazione del segnale/attendere in modo efficiente è sembra difficile in un primo momento, ma in realtà si rivela essere estremamente facile. Dato che le coppie segnale/attesa non possono annidarsi o sovrapporsi, ci possono essere solo due persone soddisfatte e una in attesa in un dato momento. È sufficiente mantenere un puntatore "Indicatore attuale" sul segnale insoddisfatto più recente per fare tutto ciò che è necessario per la contabilità.

Assicurarsi che i core non saltino troppo da una lista all'altra e che una data lista non sia condivisa da troppi core è anche relativamente facile: ogni core continua a ricevere lavori dalla stessa lista finché non blocca, quindi passa in un'altra lista. Per mantenere tutti i nuclei coinvolti in un unico elenco, viene mantenuto un WorkerCount per ogni elenco che indica quanti nuclei lo stanno utilizzando e gli elenchi sono organizzati in modo che i nuclei selezionino innanzitutto gli elenchi con meno lavoratori.

Il blocco può essere mantenuto semplice bloccando solo lo scheduler o l'elenco su cui si sta lavorando in qualsiasi momento, mai entrambi.

Hai espresso qualche preoccupazione sull'aggiunta di lavori a un elenco dopo che l'elenco ha già iniziato l'esecuzione. Si scopre che il supporto di questo è quasi banale: tutto ciò di cui ha bisogno è una chiamata dall'elenco all'utilità di pianificazione quando un lavoro viene aggiunto a un elenco che è attualmente completato, quindi lo schedulatore può pianificare il nuovo lavoro.

Strutture dati

Qui ci sono le strutture di dati di base di cui ha bisogno:

class Scheduler 
{ 
    LinkedList<JobList>[] Ready; // Indexed by number of cores working on list 
    LinkedList<JobList> Blocked; 
    int ReadyCount; 
    bool Exit; 

    public: 
    void AddList(JobList* joblist); 
    void DoWork(); 

    internal: 
    void UpdateQueues(JobList* joblist); 

    void NotifyBlockedCores(); 
    void WaitForNotifyBlockedCores(); 
} 

class JobList 
{ 
    Scheduler Scheduler; 
    LinkedList<JobList> CurrentQueue; 

    LinkedList<Job> Jobs;   // All jobs in the job list 
    LinkedList<SignalPoint> Signals; // All signal/wait pairs in the job list, 
             plus a dummy 

    Job* NextJob;     // The next job to schedule, if any 
    int NextJobIndex;    // The index of NextJob 

    SignalPoint* CurrentSignal;  // First signal not fully satisfied 

    int WorkerCount;     // # of cores executing in this list 

    public: 
    void AddJob(Job* job); 
    void AddSignal(); 
    void AddWait(); 

    internal: 
    void Ready { get; } 
    void GetNextReadyJob(Job& job, int& jobIndex); 
    void MarkJobCompleted(Job job, int jobIndex); 
} 
class SignalPoint 
{ 
    int SignalJobIndex = int.MaxValue; 
    int WaitJobIndex = int.MaxValue; 
    int IncompleteCount = 0; 
} 

noti che i punti di segnale per una data lista di job vengono più comodamente memorizzate separatamente dalla lista reale di posti di lavoro .

implementazione di pianificazione

Lo scheduler tiene traccia delle liste di lavoro, li assegna al core, ed esegue i lavori dalle liste di lavoro.

AddList aggiunge un lavoro allo scheduler. Deve essere collocato sulla coda Ready o Blocked a seconda che abbia qualche lavoro da fare (ad esempio se siano già stati aggiunti dei lavori), quindi chiama UpdateQueues.

void Scheduler.AddList(JobList* joblist) 
{ 
    joblist.Scheduler = this; 
    UpdateQueues(joblist); 
} 

UpdateQueues centralizza la logica di aggiornamento della coda. Si noti l'algoritmo per la selezione di una nuova coda, e anche la notifica al minimo core quando il lavoro diventa disponibile:

void Scheduler.UpdateQueues(JobList* joblist) 
{ 
    lock(this) 
    { 
    // Remove from prior queue, if any 
    if(joblist.CurrentQueue!=null) 
    { 
     if(joblist.CurrentQueue!=Blocked) ReadyCount--; 
     joblist.CurrentQueue.Remove(joblist); 
    } 

    // Select new queue 
    joblist.CurrentQueue = joblist.Ready ? Ready[joblist.WorkerCount] : Blocked; 

    // Add to new queue 
    joblist.CurrentQueue.Add(joblist); 
    if(joblist.CurrentQueue!=Blocked) 
     if(++ReadyCount==1) NotifyBlockedCores(); 
    } 
} 

DoWork è un normale lavoro di pianificazione ad eccezione di: 1. Seleziona l'ElencoProcessi con i lavoratori più basso 2. lavora da una data lista di lavoro fino a quando non ne può più e 3. Memorizza jobIndex e il lavoro in modo che la lista di lavoro possa facilmente aggiornare lo stato di completamento (dettaglio dell'implementazione).

void Scheduler.DoWork() 
{ 
    while(!Exit) 
    { 
    // Get a job list to work on 
    JobList *list = null; 
    lock(this) 
    { 
     for(int i=0; i<Ready.Length; i++) 
     if(!Ready[i].Empty) 
     { 
      list = Ready[i].First; 
      break; 
     } 
     if(list==null) // No work to do 
     { 
     WaitForNotifyBlockedCores(); 
     continue; 
     } 
     list.WorkerCount++; 
     UpdateQueues(list); 
    } 

    // Execute jobs in the list as long as possible 
    while(true) 
    { 
     int jobIndex; 
     Job job; 
     if(!GetNextReadyJob(&job, &jobIndex)) break; 

     job.Execute(); 

     list.MarkJobCompleted(job, jobIndex); 
    } 

    // Release the job list 
    lock(this) 
    { 
     list.WorkerCount--; 
     UpdateQueues(list); 
    } 
    } 
} 

implementazione ElencoProcessi

L'ElencoProcessi tiene traccia di come il segnale/attesa sono intervallate da posti di lavoro e tiene traccia di quale segnale/attendere coppie hanno già completato tutto ciò prima del loro punto di segnale.

Il costruttore crea un punto di segnale fittizio per aggiungere lavori a. Questo punto di segnale diventa un vero punto di segnale (e viene aggiunto un nuovo dummy) ogni volta che viene aggiunto un nuovo "segnale".

JobList.JobList() 
{ 
    // Always have a dummy signal point at the end 
    Signals.Add(CurrentSignal = new SignalPoint()); 
} 

AddJob aggiunge un lavoro all'elenco. È contrassegnato come incompleto nel SignalPoint. Quando il lavoro viene effettivamente eseguito, il valore IncompleteCount dello stesso SignalPoint viene decrementato. È anche necessario comunicare allo scheduler che le cose potrebbero essere cambiate, poiché il nuovo lavoro potrebbe essere immediatamente eseguibile. Si noti che lo scheduler viene chiamato dopo che il blocco su "this" viene rilasciato per evitare deadlock.

void JobList.AddJob(Job job) 
{ 
    lock(this) 
    { 
    Jobs.Add(job); 
    Signals.Last.IncompleteCount++; 
    if(NextJob == null) 
     NextJob = job; 
    } 
    if(Scheduler!=null) 
    Scheduler.UpdateQueues(this); 
} 

AddSignal e AddWait aggiungere segnali e attende l'elenco dei lavori. Si noti che AddSignal crea effettivamente un nuovo SignalPoint e AddWait riempie semplicemente le informazioni del punto di attesa nel SignalPoint creato in precedenza.

void JobList.AddSignal() 
{ 
    lock(this) 
    { 
    Signals.Last.SignalJobIndex = Jobs.Count; // Reify dummy signal point 
    Signals.Add(new SignalPoint());   // Create new dummy signal point 
    } 
} 


void JobList.AddWait() 
{ 
    lock(this) 
    { 
    Signals.Last.Previous.WaitJobIndex = Jobs.Count; 
    } 
} 

La proprietà Pronto determina se la lista è pronta per core aggiuntivi ad esso assegnati. Ci possono essere due o tre core che lavorano sulla lista senza che la lista sia "pronta" se il lavoro successivo è in attesa di un segnale prima che possa iniziare.

bool JobList.Ready 
{ 
    get 
    { 
    lock(this) 
    { 
     return NextJob!=null && 
     (CurrentSignal==Signals.Last || 
     NextJobIndex < CurrentSignal.WaitJobIndex); 
    } 
    } 
} 

GetNextReadyJob è molto semplice: se noi siamo pronti, basta restituire il lavoro successivo nella lista.

void JobList.GetNextReadyJob(Job& job, int& jobIndex) 
{ 
    lock(this) 
    { 
    if(!Ready) return false; 
    jobIndex = list.NextJobIndex++; 
    job = list.NextJob; list.NextJob = job.Next; 
    return true; 

    } 
} 

MarkJobCompleted è probabilmente il più interessante di tutti. A causa della struttura dei segnali e delle attese, il lavoro corrente è prima di CurrentSignal o è compreso tra CurrentSignal e CurrentSignal.Next (se si trova dopo l'ultimo segnale effettivo, verrà contato tra CurrentSignal e Dummy SignalPoint alla fine). Dobbiamo ridurre il numero di lavori incompleti. Potremmo anche aver bisogno di passare al segnale successivo se questo conteggio va a zero. Ovviamente non superiamo mai il dummy SignalPoint alla fine.

Si noti che questo codice non ha una chiamata a Scheduler.UpdateQueue perché sappiamo che lo scheduler chiamerà GetNextReadyJob in appena un secondo e se restituisce false chiamerà comunque UpdateQueue.

void JobList.MarkJobCompleted(Job job, int jobIndex) 
{ 
    lock(this) 
    { 
    if(jobIndex >= CurrentSignal.SignalJobIndex) 
     CurrentSignal.Next.IncompleteCount--; 
    else 
    { 
     CurrentSignal.IncompleteCount--; 
     if(CurrentSignal.IncompleteCount==0) 
     if(CurrentSignal.WaitJobIndex < int.MaxValue) 
      CurrentSignal = CurrentSignal.Next; 
    } 
    } 
} 

sintonizzazione in base alla durata lista, le stime di lunghezza di lavoro, ecc

Il codice di cui sopra non prestare alcuna attenzione a quanto tempo gli elenchi dei lavori sono, quindi se ci sono un centinaio di piccole liste di lavoro e uno enorme è possibile per ogni nucleo di prendere una piccola lista di lavoro separata e poi tutti si riuniscono su quello enorme, portando a inefficienza. Ciò può essere risolto rendendo Ready [] una serie di code prioritarie con priorità su (joblist.Jobs.Count - joblist.NextJobIndex), ma con la priorità solo effettivamente aggiornata nelle normali situazioni di UpdateQueue per l'efficienza.

Questo potrebbe diventare ancora più sofisticato creando un'euristica che tenga conto del numero e della spaziatura delle combinazioni segnale/attesa per determinare la priorità. Questa euristica dovrebbe essere ottimizzata utilizzando una distribuzione delle durate di lavoro e l'utilizzo delle risorse.

Se sono note durate di lavoro individuali o se sono disponibili stime valide per loro, l'euristica potrebbe utilizzare la durata rimanente stimata invece della sola lunghezza dell'elenco.

Note finali

Si tratta di una soluzione piuttosto standard per il problema che si presenti. È possibile utilizzare gli algoritmi che ho dato e lavoreranno, tra cui il blocco, ma non sarà in grado di compilare il codice che ho scritto sopra per diversi motivi:

  1. Si tratta di un mix folle di C++ e C# sintassi. Inizialmente avevo iniziato a scrivere in C#, poi ho cambiato un po 'di sintassi in stile C++ poiché pensavo che fosse più probabile quello che usereste per un progetto del genere. Ma ho lasciato in parecchi C# -ismi. Fortunatamente no LINQ ;-).

  2. I dettagli di LinkedList presentano alcuni movimenti a mano. Presumo che l'elenco possa fare Primo, Ultimo, Aggiungi e Rimuovi e che gli elementi nell'elenco possono fare Precedente e Successivo. Ma non ho usato l'API vera e propria per qualsiasi classe di elenco linkato che conosco.

  3. Non l'ho compilato o testato. Garantisco che c'è un bug o due in là da qualche parte.

Bottom line: si dovrebbe trattare il codice sopra come pseudocodice anche se sembra il vero McCoy.

Divertiti!

+0

Ciao, bel post. Hai qualche suggerimento per leggere materiale su questo argomento? Grazie. –

+0

@Mark: Scusate, so di aver letto diversi buoni articoli relativi a questo 25-30 anni fa, ma non potevo darvi un riferimento ora. Cose del genere si attaccano al tuo cervello e non vanno via, se sai cosa intendo. –

1

Se si ha accesso ad un work stealing framework nell'ambiente (ad esempio, Cilk se siete in C, o fork/join framework di Doug Lea in Java), si può facilmente ottenere una soluzione semplice e pulito (rispetto al basso livello tentativi ad-hoc, che probabilmente dovrai fare se non puoi usare qualcosa del genere), che ti danno il bilanciamento del carico automatico e una buona localizzazione dei dati.

Ecco una descrizione di alto livello di una soluzione: si avvia una discussione per core. Ad ognuno viene assegnata una lista finché non sono esauriti (molti modi per farlo - questo è il compito di meccanismi di accodamento simultanei molto buoni, e questo è un motivo per cui si vorrebbe evitare le soluzioni fai-da-te se possibile). Ogni lavoratore passa le righe degli elenchi uno per uno: - Vengono mantenute due code, una per quei lavori prima di un token signal e uno o quelli successivi. - Quando viene rilevato un lavoro, è biforcuta, e ha aggiunto alla rispettiva lista d'attesa (a seconda se abbiamo visto un signal simbolico o meno) - Quando viene rilevato un token wait, ci uniamo a tutti i lavori prima del segnale (che è la semantica che descrivi se ho capito correttamente). Notare che nel codice che uso helpJoin(), significa che il thread sarà effettivamente di aiuto (eseguendo operazioni biforcute ed eseguendole fino a quando il join può procedere)

"Fork" significa mettere l'attività in una coda locale di thread, che sarà eseguito dal thread stesso in seguito, oppure può essere rubato da un altro thread che cerca qualche lavoro da fare.

A scopo illustrativo, ecco una simulazione di funzionamento a ~ 80 linee di questo scenario, utilizzando il già citato framework java. Crea tanti thread quanti sono i core disponibili e alcuni elenchi e inizia ad eseguirli. Si noti quanto sia semplice il metodo run() - mentre ha ancora i benefici del bilanciamento del carico e che i thread eseguono per lo più attività dal proprio elenco, a meno che non abbiano a corto il lavoro e inizino a rubare per ottenerne alcuni. Ovviamente, se non si è in Java o C, si dovrebbe trovare un framework simile, ma lo stesso insieme di idee di base semplificerebbe similmente il codice indipendentemente dalla lingua.

import java.util.*; 
import java.util.concurrent.*; 
import jsr166y.ForkJoinPool; 
import jsr166y.ForkJoinTask; 
import jsr166y.RecursiveTask; 

public class FJTest { 
    public static void main(String[] args) throws Exception { 
     Iterable<List<TaskType>> lists = createLists(10); 

     ForkJoinPool pool = new ForkJoinPool(); 

     for (final List<TaskType> list : lists) { 
      pool.submit(new Runnable() { 
       public void run() { 
        List<ForkJoinTask> beforeSignal = new ArrayList<ForkJoinTask>(); 
        List<ForkJoinTask> afterSignal = new ArrayList<ForkJoinTask>(); 
        boolean signaled = false; 
        for (TaskType task : list) { 
         switch (task) { 
          case JOB: 
           ForkJoinTask job = new Job(); 
           if (signaled == false) 
            beforeSignal.add(job); 
           else 
            afterSignal.add(job); 
           job.fork(); 
           break; 
          case SIGNAL: 
           signaled = true; 
           break; 
          case WAIT: 
           signaled = false; 
           for (ForkJoinTask t : beforeSignal) { 
            t.helpJoin(); 
           } 
           beforeSignal = afterSignal; 
           afterSignal = new ArrayList<ForkJoinTask>(); 
         } 
        } 
       } 
      }); 
     } 

     pool.shutdown(); 
     pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
    } 

    private static Iterable<List<TaskType>> createLists(int size) { 
     List<List<TaskType>> tasks = new ArrayList<List<TaskType>>(); 
     for (int i = 0; i < size; i++) { 
      tasks.add(createSomeList()); 
     } 
     return tasks; 
    } 

    private static List<TaskType> createSomeList() { 
     return Arrays.asList(
       TaskType.JOB, 
       TaskType.JOB, 
       TaskType.SIGNAL, 
       TaskType.JOB, 
       TaskType.WAIT, 
       TaskType.JOB); 
    } 

} 

enum TaskType { 
    JOB, SIGNAL, WAIT; 
} 
class Job extends RecursiveTask<Void> { 
    @Override 
    protected Void compute() { 
     long x = 1; 
     for (long i = 1; i < 200000001; i++) { 
      x = i * x; 
     } 
     System.out.println(x); //just to use x 
     return null; 
    } 
} 
Problemi correlati