2016-02-25 7 views
9

Quando si crea un batchblock con capacità limitata e call triggerBatch mentre (in parallelo a) postare un nuovo articolo, la pubblicazione di un nuovo elemento fallirà durante il tempo di esecuzione del batch trigger.Comportamento imprevisto - TPL DataFlow BatchBlock Rifiuta gli articoli mentre TriggerBatch è in esecuzione

La chiamata al batch di trigger (ogni X volta) viene eseguita per garantire che i dati non vengano ritardati per troppo tempo nel blocco, nei casi in cui il flusso di dati in entrata si è interrotto o rallentato.

Il codice seguente emetterà alcuni eventi "post failure". Per esempio:

public static void Main(string[] args) 
    { 
     var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 }); 
     var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 }); 
     batchBlock.LinkTo(actionBlock); 

     var producerTask = Task.Factory.StartNew(() => 
     { 
      //Post 10K Items 
      for (int i = 0; i < 10000; i++) 
      { 
       var postResult = batchBlock.Post(i); 
       if (!postResult) 
        Console.WriteLine("Failed to Post"); 
      } 
     }); 

     var triggerBatchTask = Task.Factory.StartNew(() => 
      {      
       //Trigger Batch.. 
       for (int i = 0; i < 1000000; i++) 
        batchBlock.TriggerBatch(); 
      }); 

     producerTask.Wait(); 
     triggerBatchTask.Wait(); 
    } 

    public static void ProcessBatch(int[] batch) 
    { 
     Console.WriteLine("{0} - {1}", batch.First(), batch.Last()); 
    } 

* Si noti che questo scenario è riproducibile solo quando il batchBlock è limitato.

Mi manca qualcosa o si tratta di un problema con batchBlock?

risposta

3

Il BatchBlock in realtà non rifiuta l'elemento, tenta di rimandarlo. Tranne che nel caso di Post(), il rinvio non è un'opzione. Un modo semplice per risolvere questo problema è utilizzare await batchBlock.SendAsync(i) anziché batchBlock.Post(i) (ciò significa anche che è necessario modificare Task.Factory.StartNew(() => in Task.Run(async() =>).

Perché succede? Secondo the source code, se lo BatchBlock è limitato, TriggerBatch() viene elaborato in modo asincrono e mentre viene elaborato, non vengono accettati nuovi articoli.

In ogni caso, non si deve aspettare che Post() restituirà sempre true su un blocco limitato, se il blocco è pieno, Post() sarà anche tornare false.

+0

Questo è stato molto sorprendente per me ... – i3arnon

+0

Nel frattempo sto usando una soluzione diversa, introducendo un altro blocco che accetterà i guasti, e infine chiamerò triggerbatch in modo seriale su entrambi i blocchi. Per la tua soluzione suggerita - attendi e asincronizzazione creerà un compito per gestire ogni elemento in arrivo, questo potrebbe causare problemi di memoria quando si ha una grande esplosione di eventi molte attività verranno create senza limiti. –

+0

@AlYaros No, non lo farà. Se l'oggetto è accettato, si ottiene un 'Task' nella cache, quindi nessuna allocazione lì. E se l'oggetto viene posticipato, il codice che hai mostrato non aggiungerà nuovi oggetti finché non sarà accettato. Se nel tuo codice effettivo "attendi" causerebbe problemi, allora IMO dovresti essere in grado di risolverli, altrimenti avrai problemi anche senza. – svick

Problemi correlati