(NOTA:. Sto utilizzando .Net 4, non .Net 4.5, quindi non posso utilizzare le classi DataflowBlock del TPL)Pipelines, multiplexing, e il buffering illimitata
TL; DR Versione
In definitiva, sto solo cercando un modo per elaborare elementi di lavoro sequenziali utilizzando più thread in un modo che mantenga il loro ordine nell'output finale, senza richiedere un buffer di output illimitato.
motivazione
Ho codice per fornire un meccanismo multithread per elaborare più blocchi di dati in cui un I filo/O-bound (il "fornitore") è reponsible per enqueuing blocchi di dati per l'elaborazione esistenti. Questi blocchi di dati comprendono gli elementi di lavoro.
Uno o più thread (i "processori") sono responsabili dell'eliminazione di un elemento di lavoro alla volta, che elaborano e quindi scrivono i dati elaborati in una coda di emissione prima di eliminare il successivo elemento di lavoro.
Un thread di I/O (il "consumatore") finale è responsabile della rimozione delle voci di lavoro completate dalla coda di emissione e della loro scrittura nella destinazione finale. Questi elementi di lavoro sono (e devono essere) scritti nello stesso ordine in cui sono stati accodati. L'ho implementato utilizzando una coda di priorità concorrente, in cui la priorità di ogni elemento è definita dal suo indice di origine.
Sto utilizzando questo schema per eseguire una compressione personalizzata su un flusso di dati di grandi dimensioni, in cui la compressione è relativamente lenta ma la lettura dei dati non compressi e la scrittura dei dati compressi è relativamente veloce (sebbene I/O -limite).
Elaborare i dati in blocchi piuttosto grandi dell'ordine di 64 KB, quindi il sovraccarico della pipeline è relativamente piccolo.
La mia soluzione attuale sta funzionando bene ma coinvolge un sacco di codice personalizzato scritto 6 anni fa usando molti eventi di sincronizzazione, e il design sembra un po 'goffo; quindi ho intrapreso un esercizio accademico per vedere se può essere riscritto usando le più moderne librerie .Net.
Il nuovo design
mio nuovo design utilizza la classe BlockingCollection<>
, e si basa su un po 'this Microsoft article.
In particolare, consultare la sezione Bilanciamento del carico con più produttori. Ho provato a utilizzare tale approccio e, pertanto, ho diverse attività di elaborazione, ognuna delle quali prende gli elementi di lavoro da un input condiviso BlockingCollection e scrive gli elementi completati nella propria coda di output BlockingCollection.
Poiché ogni attività di elaborazione ha una propria coda di emissione, sto tentando di utilizzare BlockingCollection.TakeFromAny()
per riagganciare il primo elemento di lavoro completato disponibile.
multiplexer problema
Fin qui tutto bene, ma ora qui viene il problema. L'articolo di Microsoft afferma:
Le lacune sono un problema.La fase successiva della pipeline, la fase di visualizzazione dell'immagine, deve mostrare le immagini in ordine e senza interruzioni nella sequenza. È qui che entra in funzione il multiplexer. Usando il metodo TakeFromAny, il multiplexer attende l'input da entrambe le code di produzione del filtro. Quando arriva un'immagine, il multiplexer cerca se il numero di sequenza dell'immagine è il successivo nella sequenza prevista. Se lo è, il multiplexer lo passa alla fase di visualizzazione dell'immagine. Se l'immagine non è la successiva nella sequenza, il multiplatore conserva il valore in un buffer di previsione interno e ripete l'operazione di acquisizione per la coda di input che non ha un valore look-ahead. Questo algoritmo consente al multiplexer di mettere insieme gli input dalle code dei produttori in entrata in modo da garantire l'ordine sequenziale senza ordinare i valori.
Ok, quindi ciò che accade è che le attività di elaborazione possono produrre articoli finiti praticamente in qualsiasi ordine. Il multiplexer è responsabile dell'output di questi articoli nell'ordine corretto.
Tuttavia ...
Immaginiamo che dobbiamo 1000 elementi da elaborare. Inoltre immagina che per qualche strana ragione, il primo elemento impieghi più tempo per elaborare tutti gli altri elementi combinati.
Utilizzando il mio schema corrente, il multiplexer manterrà la lettura e il buffering degli elementi da tutte le code di output di elaborazione fino a quando non troverà il successivo che dovrebbe produrre. Dal momento che l'oggetto che sta aspettando è (secondo il mio "immaginare se" sopra) che verrà visualizzato solo dopo che TUTTI gli altri elementi di lavoro sono stati elaborati, sarà effettivamente il buffering di tutti gli elementi di lavoro nell'intero input!
La quantità di dati è troppo grande per consentire che ciò accada. Devo essere in grado di interrompere le attività di elaborazione dall'invio degli elementi di lavoro completati quando la coda di output ha raggiunto una certa dimensione massima (vale a dire è una coda di emissione limitata) A MENO CHE l'elemento di lavoro sia quello che il multiplexer sta aspettando.
Ed è qui che sto diventando un po 'bloccato. Posso pensare a molti modi per implementarlo realmente, ma sembrano tutti eccessivamente complessi nella misura in cui non sono migliori del codice che sto pensando di sostituire!
Qual è la mia domanda?
La mia domanda è: sto andando su questo nel modo giusto?
Avrei pensato che questo sarebbe stato un problema ben compreso, ma la mia ricerca ha prodotto solo articoli che sembrano ignorare il problema di buffer illimitato che si verifica se un elemento di lavoro richiede molto tempo rispetto a tutti gli altri lavori elementi.
Qualcuno può indicarmi degli articoli che descrivono un modo ragionevole per raggiungere questo obiettivo?
TL; DR Versione
definitiva, sto solo cercando un modo per elaborare gli elementi di lavoro sequenziali utilizzando più thread in modo da preservare il loro ordine nell'output finale, senza richiedere un buffer di uscita illimitata .
Se si implementa questa, in qualsiasi modo, il fornitore dovrà eseguire il backup, o di essere strozzato. È accettabile? –
Sì, è quello che fa la mia attuale implementazione. Il fornitore sta solo leggendo da un file, quindi non c'è problema. –