2012-10-10 11 views
19

Ho una lista di ID e ho bisogno di eseguire diverse stored procedure su ciascun ID.Parallel non funziona con Entity Framework

Quando utilizzo un ciclo foreach standard, funziona correttamente, ma quando ho molti record, funziona piuttosto lentamente.

Volevo convertire il codice per lavorare con EF, ma sto ricevendo un'eccezione: "Il provider sottostante non è riuscito su Apri".

Sto usando questo codice, all'interno del Parallel.ForEach:

using (XmlEntities osContext = new XmlEntities()) 
{ 
    //The code 
} 

Ma getta ancora l'eccezione.

Qualche idea su come utilizzare Parallel con EF? devo creare un nuovo contesto per ogni procedura che sto utilizzando? Ho circa 10 procedure, quindi penso sia molto brutto creare 10 contesti, uno per ciascuno.

+1

Io non sono un guru multithreading, ma se si utilizzano le transazioni o la lettura/scrittura viene bloccata, è possibile che non si ottengano prestazioni migliori rispetto alla semplice esecuzione seriale. – Matthew

+0

Dubito che martellare il server SQL stressato con più thread non aiuti le prestazioni ... – rene

+2

Lo sql non è affatto stressato, ecco perché voglio usare un parallelo. e funzionerà sicuramente più velocemente. –

risposta

31

The underlying database connections that the Entity Framework are using are not thread-safe. È necessario creare un nuovo contesto per ogni operazione su un altro thread che si sta per eseguire.

La tua preoccupazione su come parallelizzare l'operazione è valida; che molti contesti saranno costosi da aprire e chiudere.

Invece, potresti voler invertire la tua opinione sulla parallelizzazione del codice. Sembra che tu stia eseguendo il loop su un numero di elementi e poi chiamando le stored procedure in serie per ciascun elemento.

Se è possibile, creare un nuovo Task<TResult> (o Task, se non hai bisogno di un risultato) per ogni procedura di e poi in quella Task<TResult>, aprire un unico contesto, il ciclo di tutti gli elementi, e quindi eseguire la stored procedure. In questo modo, hai solo un numero di contesti uguale al numero di stored procedure che stai eseguendo in parallelo.

Supponiamo di avere un MyDbContext con due stored procedure, DoSomething1 e DoSomething2, entrambi i quali assumono un'istanza di una classe, MyItem.

Attuare quanto sopra sarebbe simile:

// You'd probably want to materialize this into an IList<T> to avoid 
// warnings about multiple iterations of an IEnumerable<T>. 
// You definitely *don't* want this to be an IQueryable<T> 
// returned from a context. 
IEnumerable<MyItem> items = ...; 

// The first stored procedure is called here. 
Task t1 = Task.Run(() => { 
    // Create the context. 
    using (var ctx = new MyDbContext()) 
    // Cycle through each item. 
    foreach (MyItem item in items) 
    { 
     // Call the first stored procedure. 
     // You'd of course, have to do something with item here. 
     ctx.DoSomething1(item); 
    } 
}); 

// The second stored procedure is called here. 
Task t2 = Task.Run(() => { 
    // Create the context. 
    using (var ctx = new MyDbContext()) 
    // Cycle through each item. 
    foreach (MyItem item in items) 
    { 
     // Call the first stored procedure. 
     // You'd of course, have to do something with item here. 
     ctx.DoSomething2(item); 
    } 
}); 

// Do something when both of the tasks are done. 

Se non si può eseguire le stored procedure in parallelo (ognuno è dipendente da essere investito in un certo ordine), allora si può ancora parallelizzare le operazioni, è solo un po 'più complesso.

Si guarderebbe creating custom partitions attraverso i propri articoli (utilizzando lo statico Create method su Partitioner class). Questo ti darà i mezzi per ottenere le implementazioni IEnumerator<T> (nota, questo è nonIEnumerable<T> quindi non puoi farlo su foreach).

Per ogni IEnumerator<T> esempio torni, devi creare un nuovo Task<TResult> (se avete bisogno di un risultato), e nel corpo Task<TResult>, si potrebbe creare il contesto e poi scorrere le voci restituite dalla IEnumerator<T>, chiamando le stored procedure in ordine.

che sarebbe simile a questa:

// Get the partitioner. 
OrdinalPartitioner<MyItem> partitioner = Partitioner.Create(items); 

// Get the partitions. 
// You'll have to set the parameter for the number of partitions here. 
// See the link for creating custom partitions for more 
// creation strategies. 
IList<IEnumerator<MyItem>> paritions = partitioner.GetPartitions(
    Environment.ProcessorCount); 

// Create a task for each partition. 
Task[] tasks = partitions.Select(p => Task.Run(() => { 
     // Create the context. 
     using (var ctx = new MyDbContext()) 
     // Remember, the IEnumerator<T> implementation 
     // might implement IDisposable. 
     using (p) 
     // While there are items in p. 
     while (p.MoveNext()) 
     { 
      // Get the current item. 
      MyItem current = p.Current; 

      // Call the stored procedures. Process the item 
      ctx.DoSomething1(current); 
      ctx.DoSomething2(current); 
     } 
    })). 
    // ToArray is needed (or something to materialize the list) to 
    // avoid deferred execution. 
    ToArray(); 
0

E 'un po' difficile per risolvere questo senza sapere che cosa il risultato interno eccezione è, se presente. Questo potrebbe essere semplicemente un problema con il modo in cui è configurata la stringa di connessione o la configurazione del provider.

In generale, è necessario fare attenzione con il codice parallelo e EF. Quello che stai facendo, tuttavia, dovrebbe funzionare. Una domanda nella mia mente; Qual è il lavoro svolto su un'altra istanza di tale contesto prima del parallelo? Secondo il tuo post, stai facendo un contesto separato in ogni thread. Quello è buono. Una parte di me si chiede tuttavia se ci sia qualche contesa di costruzione interessante tra i molteplici contesti. Se non stai usando quel contesto prima di quella chiamata parallela, ti suggerirei di provare a eseguire anche una semplice query sul contesto per aprirlo e assicurarti che tutti i bit EF vengano attivati ​​prima di eseguire il metodo parallelo. Devo ammettere che non ho provato esattamente quello che hai fatto qui, ma ho chiuso e ha funzionato.

+1

si sa che questa domanda è stata pubblicata nel 2012, giusto? – Claies

2

Questo è quello che uso e funziona alla grande. Supporta inoltre la gestione delle eccezioni di errore e ha una modalità di debug, che rende molto più facile per rintracciare le cose

public static ConcurrentQueue<Exception> Parallel<T>(this IEnumerable<T> items, Action<T> action, int? parallelCount = null, bool debugMode = false) 
{ 
    var exceptions = new ConcurrentQueue<Exception>(); 
    if (debugMode) 
    { 
     foreach (var item in items) 
     { 
      try 
      { 
       action(item); 
      } 
      // Store the exception and continue with the loop.      
      catch (Exception e) 
      { 
       exceptions.Enqueue(e); 
      } 
     } 
    } 
    else 
    { 
     var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() => 
     { 
      while (partition.MoveNext()) 
      { 
       try 
       { 
        action(partition.Current); 
       } 
       // Store the exception and continue with the loop.      
       catch (Exception e) 
       { 
        exceptions.Enqueue(e); 
       } 
      } 
     })); 
     Task.WaitAll(partitions.ToArray()); 
    } 
    return exceptions; 
} 

Si usa come il seguente dove come db è l'originale DbContext e db.CreateInstance() crea un nuova istanza che utilizza la stessa stringa di connessione.

 var batch = db.Set<SomeListToIterate>().ToList(); 
     var exceptions = batch.Parallel((item) => 
     { 
      using (var batchDb = db.CreateInstance()) 
      { 
       var batchTime = batchDb.GetDBTime(); 
       var someData = batchDb.Set<Permission>().Where(x=>x.ID = item.ID).ToList(); 
       //do stuff to someData 
       item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called 
       batchDb.SaveChanges();   
      }     
     }); 
     if (exceptions.Count > 0) 
     { 
      logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions)); 
      throw new AggregateException(exceptions); //optionally throw an exception 
     } 
     db.SaveChanges(); //save the item modifications 
Problemi correlati