2013-06-14 9 views
12

Vorrei eseguire un Task che ha un "heartbeat" che continua a essere eseguito in un intervallo di tempo specifico fino al completamento dell'attività.Creazione di un'attività con heartbeat

Sto pensando un metodo di estensione come questo dovrebbe funzionare bene:

public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) 

Ad esempio:

public class Program { 
    public static void Main() { 
     var cancelTokenSource = new CancellationTokenSource(); 
     var cancelToken = cancelTokenSource.Token; 
     var longRunningTask = Task.Factory.StartNew(SomeLongRunningTask, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current); 
     var withHeartbeatTask = longRunningTask.WithHeartbeat(TimeSpan.FromSeconds(1), PerformHeartbeat, cancelToken); 
     withHeartbeatTask.Wait(); 
     Console.WriteLine("Long running task completed!"); 
     Console.ReadLine() 
    } 

    private static void SomeLongRunningTask() { 
     Console.WriteLine("Starting long task"); 
     Thread.Sleep(TimeSpan.FromSeconds(9.5)); 
    } 
    private static int _heartbeatCount = 0; 
    private static void PerformHeartbeat(CancellationToken cancellationToken) { 
     Console.WriteLine("Heartbeat {0}", ++_heartbeatCount); 
    } 
} 

Questo programma dovrebbe uscita:

Starting long task 
Heartbeat 1 
Heartbeat 2 
Heartbeat 3 
Heartbeat 4 
Heartbeat 5 
Heartbeat 6 
Heartbeat 7 
Heartbeat 8 
Heartbeat 9 
Long running task completed! 

Nota che non dovrebbe (in circostanze normali) output "Heartbeat 10" poiché l'heartbeat inizia dopo il timeout iniziale (es. 1 secondo). Allo stesso modo, se l'attività richiede meno tempo dell'intervallo di battito cardiaco, il battito cardiaco non dovrebbe verificarsi affatto.

cosa è un buon modo per implementare questo?

Informazioni di base: Ho un servizio che sta ascoltando una coda Azure Service Bus. Mi piacerebbe non Complete il messaggio (che eliminerebbe definitivamente dalla coda) fino a quando avrò finito di elaborarlo, che potrebbe richiedere più tempo rispetto al messaggio massima LockDuration di 5 minuti. Così, ho bisogno di usare questo approccio battito cardiaco per chiamare RenewLockAsync prima che la durata di blocco scade in modo che il messaggio non timeout mentre lungo l'elaborazione è in corso.

+0

Questo suona simile a comunicare i progressi in un task asincrona (basta che la cosa innescando un rapporto è un intervallo di tempo, e non c'è un reale progresso per segnalare, ad eccezione forse conta il battito cardiaco). È utile uno di questi link? http://blogs.msdn.com/b/dotnet/archive/2012/06/06/async-in-4-5-enabling-progress-and-cancellation-in-async-apis.aspx http: // stackoverflow .com/questions/15408148/c-sharp-async-await-progress-event-on-task-object –

+0

@TimS. Sono simili ma non proprio quello che voglio, specialmente con il caso di non riportare mai se l'attività si completa rapidamente. Inoltre, il battito cardiaco non conosce il progresso per dire. Tuttavia, sarei felice di vedere se è possibile implementare l'approccio progress per abbinare la mia API di estensione e avere lo stesso effetto netto con un codice più semplice. –

risposta

11

Ecco il mio tentativo:

public static class TaskExtensions { 
    /// <summary> 
    /// Issues the <paramref name="heartbeatAction"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running. 
    /// </summary> 
    public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) { 
     if (cancellationToken.IsCancellationRequested) { 
      return; 
     } 

     var stopHeartbeatSource = new CancellationTokenSource(); 
     cancellationToken.Register(stopHeartbeatSource.Cancel); 

     await Task.WhenAny(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatAction, stopHeartbeatSource.Token)); 
     stopHeartbeatSource.Cancel(); 
    } 

    private static async Task PerformHeartbeats(TimeSpan interval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) { 
     while (!cancellationToken.IsCancellationRequested) { 
      try { 
       await Task.Delay(interval, cancellationToken); 
       if (!cancellationToken.IsCancellationRequested) { 
        heartbeatAction(cancellationToken); 
       } 
      } 
      catch (TaskCanceledException tce) { 
       if (tce.CancellationToken == cancellationToken) { 
        // Totally expected 
        break; 
       } 
       throw; 
      } 
     } 
    } 
} 

o con un leggero ritocco, si può anche fare il battito cardiaco async come in:

/// <summary> 
    /// Awaits a fresh Task created by the <paramref name="heartbeatTaskFactory"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running. 
    /// </summary> 
    public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) { 
     if (cancellationToken.IsCancellationRequested) { 
      return; 
     } 

     var stopHeartbeatSource = new CancellationTokenSource(); 
     cancellationToken.Register(stopHeartbeatSource.Cancel); 

     await Task.WhenAll(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatTaskFactory, stopHeartbeatSource.Token)); 

     if (!stopHeartbeatSource.IsCancellationRequested) { 
      stopHeartbeatSource.Cancel(); 
     } 
    } 

    public static Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory) { 
     return WithHeartbeat(primaryTask, heartbeatInterval, heartbeatTaskFactory, CancellationToken.None); 
    } 

    private static async Task PerformHeartbeats(TimeSpan interval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) { 
     while (!cancellationToken.IsCancellationRequested) { 
      try { 
       await Task.Delay(interval, cancellationToken); 
       if (!cancellationToken.IsCancellationRequested) { 
        await heartbeatTaskFactory(cancellationToken); 
       } 
      } 
      catch (TaskCanceledException tce) { 
       if (tce.CancellationToken == cancellationToken) { 
        // Totally expected 
        break; 
       } 
       throw; 
      } 
     } 
    } 

che consentirebbe di modificare il codice di esempio a qualcosa di simile this:

private static async Task PerformHeartbeat(CancellationToken cancellationToken) { 
    Console.WriteLine("Starting heartbeat {0}", ++_heartbeatCount); 
    await Task.Delay(1000, cancellationToken); 
    Console.WriteLine("Finishing heartbeat {0}", _heartbeatCount); 
} 

PerformHeartbeat può essere sostituito con una chiamata asincrona come RenewLockAsync in modo che non avrebbe dovuto perdere tempo filo con una chiamata di blocco come RenewLock che l'approccio di azione richiederebbe.

Sono answering my own question per SO guidelines, ma io sono aperto anche a più eleganti approcci a questo problema.

+0

Ciao, sono arrivato a questo post dal tuo commento sulla mia domanda di interesse simile. Nel caso di SB, quando e dove stai esattamente rinnovando il Blocco? il ruolo di lavoratore che ho è solo un singolo thread in quanto tale e la ricezione e l'elaborazione del messaggio viene eseguita in un ciclo while nel metodo Run(). – Aravind

+0

@Aravind Quando ricevo il messaggio SB, creo un compito per elaborarlo. L'attività utilizza questo helper heartbeat per l'heartbeat fintanto che è in esecuzione. –

+0

Oh ok. Poiché l'elaborazione dei messaggi che utilizzo potrebbe non essere quella utilizzata di frequente, non ho creato attività per l'elaborazione di ogni messaggio. Non ho trovato l'uso di RenewLock nel codice di esempio per cui è stato chiesto così. – Aravind

0

Ecco il mio approccio

using System; 
using System.Threading; 
using System.Threading.Tasks; 

namespace ConsoleApplication3 
{ 
class Program 
{ 
    static void Main(string[] args) 
    { 
     Console.WriteLine("Start Main"); 
     StartTest().Wait(); 
     Console.ReadLine(); 
     Console.WriteLine("Complete Main"); 
    } 

    static async Task StartTest() 
    { 
     var cts = new CancellationTokenSource(); 

     // ***Use ToArray to execute the query and start the download tasks. 
     Task<bool>[] tasks = new Task<bool>[2]; 
     tasks[0] = LongRunningTask("", 20, cts.Token); 
     tasks[1] = Heartbeat("", 1, cts.Token); 

     // ***Call WhenAny and then await the result. The task that finishes 
     // first is assigned to firstFinishedTask. 
     Task<bool> firstFinishedTask = await Task.WhenAny(tasks); 

     Console.WriteLine("first task Finished."); 
     // ***Cancel the rest of the downloads. You just want the first one. 
     cts.Cancel(); 

     // ***Await the first completed task and display the results. 
     // Run the program several times to demonstrate that different 
     // websites can finish first. 
     var isCompleted = await firstFinishedTask; 
     Console.WriteLine("isCompleted: {0}", isCompleted); 
    } 

    private static async Task<bool> LongRunningTask(string id, int sleep, CancellationToken ct) 
    { 
     Console.WriteLine("Starting long task"); 


     await Task.Delay(TimeSpan.FromSeconds(sleep)); 

     Console.WriteLine("Completed long task"); 
     return true; 
    } 

    private static async Task<bool> Heartbeat(string id, int sleep, CancellationToken ct) 
    { 
     while(!ct.IsCancellationRequested) 
     { 
      await Task.Delay(TimeSpan.FromSeconds(sleep)); 
      Console.WriteLine("Heartbeat Task Sleep: {0} Second", sleep); 
     } 

     return true; 
    } 

} 

}