2010-12-29 10 views
14

Sto cercando di capire quale sarà il modo migliore di lavorare con una coda. Ho un processo che restituisce un DataTable. Ogni DataTable, a sua volta, viene unito al precedente DataTable. C'è un problema, troppi record da conservare fino al BulkCopy finale (OutOfMemory).Come eseguire il threading con ConcurrentQueue <T>

Quindi, ho determinato che dovrei elaborare immediatamente ogni DataTable in arrivo. Pensando allo ConcurrentQueue<T> ... ma non vedo come il metodo WriteQueuedData() saprebbe che rimpicciolisce una tabella e la scrive nel database.

Per esempio:

public class TableTransporter 
{ 
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>(); 

    public TableTransporter() 
    { 
     tableQueue.OnItemQueued += new EventHandler(WriteQueuedData); // no events available 
    } 

    public void ExtractData() 
    { 
     DataTable table; 

     // perform data extraction 
     tableQueue.Enqueue(table); 
    } 

    private void WriteQueuedData(object sender, EventArgs e) 
    { 
     BulkCopy(e.Table); 
    } 
} 

mia prima domanda è, a parte il fatto che io in realtà non ci sono eventi a sottoscrivere, se chiamo ExtractData() modo asincrono Sarà questo tutto quello che mi serve? Secondo, c'è qualcosa che mi manca nel modo in cui funzioni ConcurrentQueue<T> e che necessitano di una qualche forma di trigger per funzionare in modo asincrono con gli oggetti in coda?

Aggiornamento Ho appena derivata una classe da ConcurrentQueue<T> che ha un gestore di eventi OnItemQueued. Quindi:

new public void Enqueue (DataTable Table) 
{ 
    base.Enqueue(Table); 
    OnTableQueued(new TableQueuedEventArgs(Table)); 
} 

public void OnTableQueued(TableQueuedEventArgs table) 
{ 
    EventHandler<TableQueuedEventArgs> handler = TableQueued; 

    if (handler != null) 
    { 
     handler(this, table); 
    } 
} 

Eventuali dubbi su questa implementazione?

risposta

18

Dalla mia comprensione del problema, ti mancano alcune cose.

La coda concorrente è una struttura dati progettata per accettare più thread di lettura e scrittura in coda senza che sia necessario bloccare esplicitamente la struttura dati. (Tutto ciò che il jazz è curato dietro le quinte, o la collezione è implementato in modo tale che non è necessario bloccarlo.)

Con questo in mente, sembra lo schema che stai provando usare è "Produce/Consumer". In primo luogo, hai alcune attività che producono lavoro (e aggiungendo oggetti alla coda). E secondo hai una seconda attività che consuma cose dalla coda (e dalla rimozione di oggetti).

Quindi davvero vuoi due thread: uno che aggiunge elementi e un secondo rimuove gli oggetti. Poiché stai utilizzando una raccolta simultanea, puoi avere più thread che aggiungono elementi e più thread rimuovendo elementi. Ma ovviamente la maggiore contesa che hai sulla coda concorrente è la più veloce che diventerà il collo di bottiglia.

+0

Pensavo di avere 2 fili. In pratica, il thread principale attende l'attivazione dell'evento. Il secondo thread inizia come una chiamata asincrona a 'ExtractData()'. Nel callback asincrono continuerò semplicemente il processo di estrazione. – IAbstract

+0

In realtà, penso di averlo indietro; il thread principale dovrebbe essere costituito da data-code di messa in coda; quindi iniziare il metodo di scrittura asincrona tramite il trigger dell'evento dell'elemento accodato. – IAbstract

3

Questa è la soluzione completa per quello che mi si avvicinò con:

public class TableTransporter 
{ 
    private static int _indexer; 

    private CustomQueue tableQueue = new CustomQueue(); 
    private Func<DataTable, String> RunPostProcess; 
    private string filename; 

    public TableTransporter() 
    { 
     RunPostProcess = new Func<DataTable, String>(SerializeTable); 
     tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued); 
    } 

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e) 
    { 
     // do something with table 
     // I can't figure out is how to pass custom object in 3rd parameter 
     RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename); 
    } 

    public void ExtractData() 
    { 
     // perform data extraction 
     tableQueue.Enqueue(MakeTable()); 
     Console.WriteLine("Table count [{0}]", tableQueue.Count); 
    } 

    private DataTable MakeTable() 
    { return new DataTable(String.Format("Table{0}", _indexer++)); } 

    private string SerializeTable(DataTable Table) 
    { 
     string file = Table.TableName + ".xml"; 

     DataSet dataSet = new DataSet(Table.TableName); 

     dataSet.Tables.Add(Table); 

     Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file); 
     string xmlstream = String.Empty; 

     using (MemoryStream memstream = new MemoryStream()) 
     { 
      XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet)); 
      XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8); 

      xmlSerializer.Serialize(xmlWriter, dataSet); 
      xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray()); 

      using (var fileStream = new FileStream(file, FileMode.Create)) 
       fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2); 
     } 
     filename = file; 

     return file; 
    } 

    private void PostComplete(IAsyncResult iasResult) 
    { 
     string file = (string)iasResult.AsyncState; 
     Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file); 

     RunPostProcess.EndInvoke(iasResult); 
    } 

    public static String UTF8ByteArrayToString(Byte[] ArrBytes) 
    { return new UTF8Encoding().GetString(ArrBytes); } 

    public static Byte[] StringToUTF8ByteArray(String XmlString) 
    { return new UTF8Encoding().GetBytes(XmlString); } 
} 

public sealed class CustomQueue : ConcurrentQueue<DataTable> 
{ 
    public event EventHandler<TableQueuedEventArgs> TableQueued; 

    public CustomQueue() 
    { } 
    public CustomQueue(IEnumerable<DataTable> TableCollection) 
     : base(TableCollection) 
    { } 

    new public void Enqueue (DataTable Table) 
    { 
     base.Enqueue(Table); 
     OnTableQueued(new TableQueuedEventArgs(Table)); 
    } 

    public void OnTableQueued(TableQueuedEventArgs table) 
    { 
     EventHandler<TableQueuedEventArgs> handler = TableQueued; 

     if (handler != null) 
     { 
      handler(this, table); 
     } 
    } 
} 

public class TableQueuedEventArgs : EventArgs 
{ 
    #region Fields 
    #endregion 

    #region Init 
    public TableQueuedEventArgs(DataTable Table) 
    {this.Table = Table;} 
    #endregion 

    #region Functions 
    #endregion 

    #region Properties 
    public DataTable Table 
    {get;set;} 
    #endregion 
} 

Come prova di concetto, sembra funzionare abbastanza bene. Al massimo ho visto 4 thread di lavoro.

+0

TODO: aggiornamento con metodo asincrono più recente. – IAbstract

+0

Guardando attraverso questo, è una buona implementazione, tuttavia, dopo aver eseguito un test rapido, quando viene rimosso un oggetto? –

+0

@RichardPriddy: poiché questo era poco più di 5 anni fa (* e mi sono trasferito da tempo nella mia terza azienda *), posso solo supporre che questo non fosse un esempio completo. Nota l'osservazione * proof of concept * alla fine. ;) Detto questo, in base ai requisiti è possibile esporre l'evento 'enqueued' e lasciare che qualcos'altro gestisca il dequeueing. Altrimenti, potrebbe essere logico rimandare da qualche parte la funzione 'AsyncCallback' della funzione di post-elaborazione. Sarebbe davvero difficile individuare qualcosa di più specifico in questa data. – IAbstract

8

Penso che ConcurrentQueue sia utile solo in pochissimi casi. Il suo principale vantaggio è che è privo di blocco. Tuttavia, di solito i thread del produttore devono informare il/i thread/i del consumatore in qualche modo che ci sono dati disponibili per l'elaborazione. Questa segnalazione tra i thread richiede blocchi e annulla il vantaggio dell'utilizzo di ConcurrentQueue. Il modo più rapido per sincronizzare i thread è utilizzare Monitor.Pulse(), che funziona solo all'interno di un blocco. Tutti gli altri strumenti di sincronizzazione sono ancora più lenti.

Ovviamente, il consumatore può controllare continuamente se c'è qualcosa nella coda, che funziona senza serrature, ma è un enorme spreco di risorse del processore. Un po 'meglio è se il consumatore attende tra il controllo.

Alzare un filo durante la scrittura in coda è una pessima idea. Usando ConcurrentQueue per salvare mabe 1 microsecondo sarà completamente sprecato eseguendo l'eventhandler, che potrebbe richiedere 1000 volte più a lungo.

Se tutta l'elaborazione viene eseguita in un gestore di eventi o in una chiamata asincrona, la domanda è: perché è ancora necessaria una coda? È meglio passare i dati direttamente al gestore e non utilizzare affatto una coda.

Si noti che l'implementazione di ConcurrentQueue è piuttosto complicata per consentire la concorrenza. Nella maggior parte dei casi, è meglio usare una coda normale <> e bloccare ogni accesso alla coda. Poiché l'accesso alla coda richiede solo microsecondi, è estremamente improbabile che 2 thread accedano alla coda nello stesso microsecondo e non ci sarà quasi mai un ritardo a causa del blocco. L'utilizzo di una coda normale <> con il blocco causerà spesso un'esecuzione più rapida del codice rispetto a ConcurrentQueue.

+0

Peccato per aver ricevuto il voto negativo. Penso che sia un'opinione valida e pragmatica. – user3085342

Problemi correlati