2014-10-05 11 views
8

Ho alcune tabelle Azure di base che ho l'interrogazione in modo seriale:Azure TableQuery filo di sicurezza con Parallel.ForEach

var query = new TableQuery<DynamicTableEntity>() 
    .Where(TableQuery.GenerateFilterCondition("PartitionKey", 
    QueryComparisons.Equal, myPartitionKey)); 

foreach (DynamicTableEntity entity in myTable.ExecuteQuery(query)) { 
    // Process entity here. 
} 

Per accelerare l'operazione, mi parallelized questo modo:

Parallel.ForEach(myTable.ExecuteQuery(query), (entity, loopState) => { 
    // Process entity here in a thread-safe manner. 

    // Edited to add: Details of the loop body below: 

    // This is the essence of the fixed loop body: 
    lock (myLock) { 
    DataRow myRow = myDataTable.NewRow(); 
    // [Add entity data to myRow.] 
    myDataTable.Rows.Add(myRow); 
    } 

    // Old code (apparently not thread-safe, though NewRow() is supposed to create 
    // a DataRow based on the table's schema without changing the table state): 
    /* 
    DataRow myRow = myDataTable.NewRow(); 
    lock (myLock) { 
     // [Add entity data to myRow.] 
     myDataTable.Rows.Add(myRow); 
    } 
    */ 
}); 

Ciò produce una significativa accelerazione, ma i risultati tendono ad essere leggermente diversi tra le esecuzioni (ad esempio, alcune entità differiscono di tanto in tanto, sebbene il numero di entità restituite sia esattamente lo stesso).

Da questa e da alcune ricerche sul Web, concludo che l'enumeratore di cui sopra non è sempre thread-safe. La documentazione sembra suggerire che la sicurezza del thread è garantita solo se gli oggetti della tabella sono pubblici statici, ma ciò non ha fatto la differenza per me.

Qualcuno potrebbe suggerire come risolvere questo problema? Esiste un modello standard per la parallelizzazione delle query di tabella di Azure?

+1

L'enumeratore non deve essere thread-safe, 'Parallel.ForEach()' può gestirlo. Un problema potrebbe essere se le entità condividessero qualche stato. – svick

+1

Puoi chiarire cosa significano risultati leggermente diversi? Se registri tutte le entità in Parallel.ForEach, stai ottenendo lo stesso insieme di entità in ordine diverso? –

+0

Ho ordinato le entità per determinare le differenze esatte e gli insiemi sono quasi identici. Tuttavia, occasionalmente manca un'entità specifica mentre un'altra è stata duplicata (rispetto ai risultati che sto ottenendo in serie, che sono sempre identici e presumibilmente comprendono la verità fondamentale per il contenuto della tabella). Questo sembra essere il modello generale: alcune entità sono mancanti, ma altre sono duplicate per mantenere l'entità conteggiata allo stesso modo. È quasi come se un indice non fosse incrementato in modo thread-safe, portando a una condizione di competizione durante la lettura delle entità dalla memoria. –

risposta

4

Il tuo commento è corretto: DataTable non è adatto per operazioni simultanee che coinvolgono la mutazione ed è la fonte delle voci duplicate. Bloccare l'oggetto DataTable per le operazioni di modifica riga risolva il problema:

lock (myTable) 
{ 
    DataRow myRow = myTable.NewRow(); 
    myRow.SetField<int>("c1", (int)value); 
    myTable.Rows.Add(myRow); 
} 

Mettere NewRow() fuori dal blocco si tradurrà in modo intermittente riga voci duplicate nella tabella o "Un'eccezione di tipo 'System.ArgumentException' avvenuto in System.Data.dll "eccezioni sulla riga NewRow(). Per ulteriori dettagli e alternative per l'utilizzo simultaneo di DataTable, vedere Thread safety for DataTable

Per riprodurre la condizione di errore, utilizzare questo codice. Alcune esecuzioni saranno pulite, alcune conterranno voci duplicate e altre incontreranno eccezioni.

class Program 
    { 
     static DataTable myTable = GetTable(); 
     static ManualResetEvent waitHandle = new ManualResetEvent(false); 

     static void Main(string[] args) 
     { 
     const int threadCount = 10; 
     List<Thread> threads = new List<System.Threading.Thread>(); 
     for (int i = 0; i < threadCount; ++i) 
     { 
      threads.Add(new Thread(new ParameterizedThreadStart(AddRowThread))); 
      threads[i].Start(i); 
     } 
     waitHandle.Set(); // Release all the threads at once 
     for (int i = 0; i < threadCount; ++i) 
     { 
      threads[i].Join(); 
     } 

     // Print results once threads return 
     for (int i = 0; i < myTable.Rows.Count; ++i) 
     { 
      Console.WriteLine(myTable.Rows[i].Field<int>(0)); 
     } 
     Console.WriteLine("---Processing Complete---"); 
     Console.ReadKey(); 
     } 

     static void AddRowThread(object value) 
     { 
     waitHandle.WaitOne(); 
     DataRow myRow = myTable.NewRow(); // THIS RESULTS IN INTERMITTENT ERRORS 
     lock (myTable) 
     { 
      //DataRow myRow = myTable.NewRow(); // MOVE NewRow() CALL HERE TO RESOLVE ISSUE 
      myRow.SetField<int>("c1", (int)value); 
      myTable.Rows.Add(myRow); 
     } 
     } 

     static DataTable GetTable() 
     { 
     // Here we create a DataTable with four columns. 
     DataTable table = new DataTable(); 
     table.Columns.Add("c1", typeof(int));  
     return table; 
     } 
    } 
Problemi correlati