2013-02-22 18 views
6

(NOTA:. Sto utilizzando .Net 4, non .Net 4.5, quindi non posso utilizzare le classi DataflowBlock del TPL)Pipelines, multiplexing, e il buffering illimitata

TL; DR Versione

In definitiva, sto solo cercando un modo per elaborare elementi di lavoro sequenziali utilizzando più thread in un modo che mantenga il loro ordine nell'output finale, senza richiedere un buffer di output illimitato.

motivazione

Ho codice per fornire un meccanismo multithread per elaborare più blocchi di dati in cui un I filo/O-bound (il "fornitore") è reponsible per enqueuing blocchi di dati per l'elaborazione esistenti. Questi blocchi di dati comprendono gli elementi di lavoro.

Uno o più thread (i "processori") sono responsabili dell'eliminazione di un elemento di lavoro alla volta, che elaborano e quindi scrivono i dati elaborati in una coda di emissione prima di eliminare il successivo elemento di lavoro.

Un thread di I/O (il "consumatore") finale è responsabile della rimozione delle voci di lavoro completate dalla coda di emissione e della loro scrittura nella destinazione finale. Questi elementi di lavoro sono (e devono essere) scritti nello stesso ordine in cui sono stati accodati. L'ho implementato utilizzando una coda di priorità concorrente, in cui la priorità di ogni elemento è definita dal suo indice di origine.

Sto utilizzando questo schema per eseguire una compressione personalizzata su un flusso di dati di grandi dimensioni, in cui la compressione è relativamente lenta ma la lettura dei dati non compressi e la scrittura dei dati compressi è relativamente veloce (sebbene I/O -limite).

Elaborare i dati in blocchi piuttosto grandi dell'ordine di 64 KB, quindi il sovraccarico della pipeline è relativamente piccolo.

La mia soluzione attuale sta funzionando bene ma coinvolge un sacco di codice personalizzato scritto 6 anni fa usando molti eventi di sincronizzazione, e il design sembra un po 'goffo; quindi ho intrapreso un esercizio accademico per vedere se può essere riscritto usando le più moderne librerie .Net.

Il nuovo design

mio nuovo design utilizza la classe BlockingCollection<>, e si basa su un po 'this Microsoft article.

In particolare, consultare la sezione Bilanciamento del carico con più produttori. Ho provato a utilizzare tale approccio e, pertanto, ho diverse attività di elaborazione, ognuna delle quali prende gli elementi di lavoro da un input condiviso BlockingCollection e scrive gli elementi completati nella propria coda di output BlockingCollection.

Poiché ogni attività di elaborazione ha una propria coda di emissione, sto tentando di utilizzare BlockingCollection.TakeFromAny() per riagganciare il primo elemento di lavoro completato disponibile.

multiplexer problema

Fin qui tutto bene, ma ora qui viene il problema. L'articolo di Microsoft afferma:

Le lacune sono un problema.La fase successiva della pipeline, la fase di visualizzazione dell'immagine, deve mostrare le immagini in ordine e senza interruzioni nella sequenza. È qui che entra in funzione il multiplexer. Usando il metodo TakeFromAny, il multiplexer attende l'input da entrambe le code di produzione del filtro. Quando arriva un'immagine, il multiplexer cerca se il numero di sequenza dell'immagine è il successivo nella sequenza prevista. Se lo è, il multiplexer lo passa alla fase di visualizzazione dell'immagine. Se l'immagine non è la successiva nella sequenza, il multiplatore conserva il valore in un buffer di previsione interno e ripete l'operazione di acquisizione per la coda di input che non ha un valore look-ahead. Questo algoritmo consente al multiplexer di mettere insieme gli input dalle code dei produttori in entrata in modo da garantire l'ordine sequenziale senza ordinare i valori.

Ok, quindi ciò che accade è che le attività di elaborazione possono produrre articoli finiti praticamente in qualsiasi ordine. Il multiplexer è responsabile dell'output di questi articoli nell'ordine corretto.

Tuttavia ...

Immaginiamo che dobbiamo 1000 elementi da elaborare. Inoltre immagina che per qualche strana ragione, il primo elemento impieghi più tempo per elaborare tutti gli altri elementi combinati.

Utilizzando il mio schema corrente, il multiplexer manterrà la lettura e il buffering degli elementi da tutte le code di output di elaborazione fino a quando non troverà il successivo che dovrebbe produrre. Dal momento che l'oggetto che sta aspettando è (secondo il mio "immaginare se" sopra) che verrà visualizzato solo dopo che TUTTI gli altri elementi di lavoro sono stati elaborati, sarà effettivamente il buffering di tutti gli elementi di lavoro nell'intero input!

La quantità di dati è troppo grande per consentire che ciò accada. Devo essere in grado di interrompere le attività di elaborazione dall'invio degli elementi di lavoro completati quando la coda di output ha raggiunto una certa dimensione massima (vale a dire è una coda di emissione limitata) A MENO CHE l'elemento di lavoro sia quello che il multiplexer sta aspettando.

Ed è qui che sto diventando un po 'bloccato. Posso pensare a molti modi per implementarlo realmente, ma sembrano tutti eccessivamente complessi nella misura in cui non sono migliori del codice che sto pensando di sostituire!

Qual è la mia domanda?

La mia domanda è: sto andando su questo nel modo giusto?

Avrei pensato che questo sarebbe stato un problema ben compreso, ma la mia ricerca ha prodotto solo articoli che sembrano ignorare il problema di buffer illimitato che si verifica se un elemento di lavoro richiede molto tempo rispetto a tutti gli altri lavori elementi.

Qualcuno può indicarmi degli articoli che descrivono un modo ragionevole per raggiungere questo obiettivo?

TL; DR Versione

definitiva, sto solo cercando un modo per elaborare gli elementi di lavoro sequenziali utilizzando più thread in modo da preservare il loro ordine nell'output finale, senza richiedere un buffer di uscita illimitata .

+0

Se si implementa questa, in qualsiasi modo, il fornitore dovrà eseguire il backup, o di essere strozzato. È accettabile? –

+0

Sì, è quello che fa la mia attuale implementazione. Il fornitore sta solo leggendo da un file, quindi non c'è problema. –

risposta

2

Creare un pool di elementi all'avvio, 1000, ad esempio. Memorizzali su BlockingCollection: una "coda di riserva".

Il fornitore riceve elementi dalla coda del pool, li carica dal file, carica il numero di sequenza/qualsiasi e li invia al threadpool del processore.

I processori eseguono le loro operazioni e inviano l'output al multiplexer. Il multiplexer svolge la funzione di archiviazione di tutti gli articoli fuori ordine fino a quando gli articoli precedenti non sono stati elaborati.

Quando un elemento è stato completamente utilizzato da qualsiasi uscita del multiplexer, viene restituito alla coda del pool per essere riutilizzato dal fornitore.

Se un "elemento lento" richiede un'enorme quantità di elaborazione, la raccolta fuori ordine nel multiplexer crescerà man mano che gli "oggetti rapidi" scivoleranno negli altri thread del pool, ma poiché il multiplexer non è effettivamente alimentando i suoi articoli al suo output, la coda del pool non viene reintegrata.

Quando il pool si svuota, il fornitore bloccherà e non sarà in grado di fornire altri articoli.

Gli 'elementi rapidi' rimanenti sull'input del pool di elaborazione verranno elaborati e quindi l'elaborazione verrà interrotta tranne che per l''elemento lento'. Il fornitore è bloccato, il multiplexer ha [poolSize-1] elementi nella sua collezione. Non viene utilizzata memoria extra, nessuna CPU viene sprecata, l'unica cosa che accade è l'elaborazione dell'elemento lento.

Quando l'elemento "lento" viene finalmente completato, viene inviato al multiplexer.

Il multiplexer può ora inviare tutti gli articoli [poolSize] nell'ordine sequenziale richiesto. Man mano che questi articoli vengono consumati, il pool viene nuovamente riempito e il fornitore, ora in grado di ottenere gli articoli dal pool, viene eseguito, leggendo nuovamente il file e accodando gli elementi al pool di processori.

Auto-regolazione, nessun buffer limitato necessario, nessuna memoria in fuga.

Edit: Volevo dire 'nessun buffer limitati richiesti' :)

Inoltre, nessun GC rapine - dal momento che gli articoli siano riutilizzati, non hanno bisogno GC'ing.

+0

Sembra una soluzione interessante. Indagherò! :) –

+0

Sto contrassegnando questo come soluzione perché ho usato una versione modificata dell'idea di una "coda di biliardo". Pubblicherò il mio codice nei prossimi giorni o due (quando avrò tempo). Non sono ancora completamente soddisfatto del mio codice multiplexor - sembra ancora un po 'complicato. Spero di poter ottenere critiche costruttive su di esso quando lo post. ;) –

1

Penso che tu abbia frainteso l'articolo. Secondo la descrizione, non ha un buffer illimitato, ci sarà al massimo un valore nel buffer look-ahread per ogni coda. Quando si rimuove un valore che non è il successivo, lo si salva e quindi si attende solo sulla coda che non ha un valore nel buffer. (Se si dispone di più buffer di input, la logica dovrà essere più complicata o sarà necessario un albero di 2 multiplexer di coda.)

Se si combina questo valore con BlockingCollection s con capacità limitata specificata, si ottiene esattamente il comportamento desiderato: se un produttore è troppo lento, gli altri si interromperanno fino a quando il thread lento non si avvicina.

+0

Le code che entrano nel multiplexer sono effettivamente limitate. Ma l'esattezza che sto trovando è che se hai più di due code che entrano nel multiplexer non puoi usare il metodo 'TakeFromAny()' come descritto nell'articolo di Microsoft. Bene, puoi ottenere il primo oggetto, ma se quell'elemento non è in sequenza devi iniziare a sbirciare nelle code fino a trovare l'elemento successivo che desideri. Con due code, è facile, ma con più di due le mie soluzioni diventano ingombranti. Forse l'albero di 2 multiplexer di coda funzionerebbe, ma ancora non suona meglio di quello che ho già! –

+0

Analizzerò ulteriormente l'idea dell'albero. –

+0

@MatthewWatson Si * può * usare 'TakeFromAny()', è sufficiente usarlo nel sottoinsieme giusto di code. – svick

1

Avete considerato di non utilizzare il buffer manuale produttore/consumatore ma invece l'alternativa PLINQ .AsParallel().AsOrdered()? Semanticamente, questo è esattamente quello che vuoi: una sequenza di elementi elaborati in parallelo ma ordinati in output. Il vostro codice potrebbe sembrare semplice come ...

var orderedOutput = 
    ReadSequentialBlocks() 
    .AsParallel() 
    .AsOrdered() 
    .Select(ProcessBlock) 
foreach(var item in orderedOutput) 
    Sink(item); 

Il grado di default di parallelismo è il numero di processori sulla vostra macchina, ma si può sintonizzare. C'è un buffer di output automatico. Se il buffer di default consuma troppe risorse, è possibile disattivarlo:

.WithMergeOptions(ParallelMergeOptions.NotBuffered) 

Tuttavia, mi piacerebbe sicuramente dare la versione disadorna pianura un colpo prima - non si sa mai, potrebbe essere solo funzionare bene fuori dalla scatola .Infine, se si desidera la semplicità di auto-multiplexing ma un buffer più grande di zero e non automatico, è possibile utilizzare sempre la query PLINQ per riempire una dimensione fissa BlockingCollection<> che viene letta con un'enumerazione che consuma su un altro thread.

+0

Se si utilizza 'NotBuffered' che non disattiva il buffer di output, significa solo che gli elementi dal buffer di output verranno restituiti il ​​prima possibile, non in blocchi (che è ciò che normalmente PLINQ fa). Ma penso che in questo modo ci sia ancora il buffer di output illimitato, quindi questo in realtà non fa quello che chiede la domanda. – svick

+0

Non è così che MSDN lo descrive: http://msdn.microsoft.com/en-us/library/dd997424.aspx.MSDN dice che è analogo allo streaming e che potrebbe richiedere più tempo in totale (che è quello che ci si aspetterebbe se non ci fosse un buffer). Ovviamente, _effettivamente_ c'è sempre un piccolo buffer sotto forma di elementi in-flight, e probabilmente anche nel crossoff thread handoff, ma ai fini dell'utilizzo della memoria questo non è rilevante. –

+0

Si noti che potrebbe esserci ancora un buffer di input, ma questo approccio è molto più semplice della codifica manuale, quasi sicuramente vale la pena provare. Se, in effetti, il benchmarking rivela che esiste un problema di utilizzo della memoria, un approccio più complesso potrebbe valerne la pena. –

1

Follow up

Per completezza, ecco il codice che ho liquidata con. Grazie a Martin James per la sua risposta, che ha fornito le basi per la soluzione.

Non sono ancora completamente soddisfatto del multiplexor (vedere ParallelWorkProcessor.multiplex()). Funziona, ma sembra un po 'klunky.

Ho usato l'idea di Martin James su un pool di lavoro per impedire la crescita illimitata del buffer multiplexor, tuttavia ho sostituito un SemaphoreSlim per la coda del pool di lavoro (poiché fornisce la stessa funzionalità, ma è un po 'più semplice da utilizzare e utilizzare meno risorse).

Le attività di lavoro scrivono gli elementi completati su una coda di priorità concorrente. Questo mi permette di trovare facilmente ed efficacemente il prossimo oggetto da produrre.

Ho utilizzato uno sample concurrent priority queue from Microsoft, modificato per fornire un evento di autoreset segnalato ogni volta che un nuovo elemento viene accodato.

Ecco la classe ParallelWorkProcessor. Lo usi fornendolo con tre delegati; uno per fornire gli elementi di lavoro, uno per elaborare un elemento di lavoro e uno per produrre un oggetto di lavoro completato.

using System; 
using System.Collections.Concurrent; 
using System.Collections.Generic; 
using System.Diagnostics.Contracts; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Demo 
{ 
    public sealed class ParallelWorkProcessor<T> where T: class // T is the work item type. 
    { 
     public delegate T Read();   // Called by only one thread. 
     public delegate T Process(T block); // Called simultaneously by multiple threads. 
     public delegate void Write(T block); // Called by only one thread. 

     public ParallelWorkProcessor(Read read, Process process, Write write, int numWorkers = 0) 
     { 
      _read = read; 
      _process = process; 
      _write = write; 

      numWorkers = (numWorkers > 0) ? numWorkers : Environment.ProcessorCount; 

      _workPool = new SemaphoreSlim(numWorkers*2); 
      _inputQueue = new BlockingCollection<WorkItem>(numWorkers); 
      _outputQueue = new ConcurrentPriorityQueue<int, T>(); 
      _workers  = new Task[numWorkers]; 

      startWorkers(); 
      Task.Factory.StartNew(enqueueWorkItems); 
      _multiplexor = Task.Factory.StartNew(multiplex); 
     } 

     private void startWorkers() 
     { 
      for (int i = 0; i < _workers.Length; ++i) 
      { 
       _workers[i] = Task.Factory.StartNew(processBlocks); 
      } 
     } 

     private void enqueueWorkItems() 
     { 
      int index = 0; 

      while (true) 
      { 
       T data = _read(); 

       if (data == null) // Signals end of input. 
       { 
        _inputQueue.CompleteAdding(); 
        _outputQueue.Enqueue(index, null); // Special sentinel WorkItem . 
        break; 
       } 

       _workPool.Wait(); 
       _inputQueue.Add(new WorkItem(data, index++)); 
      } 
     } 

     private void multiplex() 
     { 
      int index = 0; // Next required index. 
      int last = int.MaxValue; 

      while (index != last) 
      { 
       KeyValuePair<int, T> workItem; 
       _outputQueue.WaitForNewItem(); // There will always be at least one item - the sentinel item. 

       while ((index != last) && _outputQueue.TryPeek(out workItem)) 
       { 
        if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel. 
        { 
         last = workItem.Key; // The sentinel's key is the index of the last block + 1. 
        } 
        else if (workItem.Key == index) // Is this block the next one that we want? 
        { 
         // Even if new items are added to the queue while we're here, the new items will be lower priority. 
         // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at. 

         _outputQueue.TryDequeue(out workItem); 
         Contract.Assume(workItem.Key == index); // This *must* be the case. 
         _workPool.Release();     // Allow the enqueuer to queue another work item. 
         _write(workItem.Value); 
         ++index; 
        } 
        else // If it's not the block we want, we know we'll get a new item at some point. 
        { 
         _outputQueue.WaitForNewItem(); 
        } 
       } 
      } 
     } 

     private void processBlocks() 
     { 
      foreach (var block in _inputQueue.GetConsumingEnumerable()) 
      { 
       var processedData = _process(block.Data); 
       _outputQueue.Enqueue(block.Index, processedData); 
      } 
     } 

     public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite. 
     { 
      return _multiplexor.Wait(maxMillisecondsToWait); 
     } 

     private sealed class WorkItem 
     { 
      public WorkItem(T data, int index) 
      { 
       Data = data; 
       Index = index; 
      } 

      public T Data { get; private set; } 
      public int Index { get; private set; } 
     } 

     private readonly Task[] _workers; 
     private readonly Task _multiplexor; 
     private readonly SemaphoreSlim _workPool; 
     private readonly BlockingCollection<WorkItem> _inputQueue; 
     private readonly ConcurrentPriorityQueue<int, T> _outputQueue; 
     private readonly Read _read; 
     private readonly Process _process; 
     private readonly Write _write; 
    } 
} 

Ed ecco il mio codice di prova:

using System; 
using System.Diagnostics; 
using System.Threading; 

namespace Demo 
{ 
    public static class Program 
    { 
     private static void Main(string[] args) 
     { 
      _rng = new Random(34324); 

      int threadCount = 8; 
      _maxBlocks = 200; 
      ThreadPool.SetMinThreads(threadCount + 2, 4); // Kludge to prevent slow thread startup. 

      var stopwatch = new Stopwatch(); 

      _numBlocks = _maxBlocks; 
      stopwatch.Restart(); 
      var processor = new ParallelWorkProcessor<byte[]>(read, process, write, threadCount); 
      processor.WaitForFinished(Timeout.Infinite); 

      Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n"); 
     } 

     private static byte[] read() 
     { 
      if (_numBlocks-- == 0) 
      { 
       return null; 
      } 

      var result = new byte[128]; 
      result[0] = (byte)(_maxBlocks-_numBlocks); 
      Console.WriteLine("Supplied input: " + result[0]); 
      return result; 
     } 

     private static byte[] process(byte[] data) 
     { 
      if (data[0] == 10) // Hack for test purposes. Make it REALLY slow for this item! 
      { 
       Console.WriteLine("Delaying a call to process() for 5s for ID 10"); 
       Thread.Sleep(5000); 
      } 

      Thread.Sleep(10 + _rng.Next(50)); 
      Console.WriteLine("Processed: " + data[0]); 
      return data; 
     } 

     private static void write(byte[] data) 
     { 
      Console.WriteLine("Received output: " + data[0]); 
     } 

     private static Random _rng; 
     private static int _numBlocks; 
     private static int _maxBlocks; 
    } 
} 
+0

puoi inserire il codice del metodo WaitForNewItem personalizzato in ConcurrentPriorityQueue? –

+0

@KishanGajjar Purtroppo il codice è al lavoro, quindi dovrò aspettare fino a domani mattina per postarlo - scusa! –

+0

Puoi fornire il metodo oggi? –