2016-02-03 7 views
5

Ho una tabella con Guid come chiave primaria. La tabella ha già contenuto molte righe. Inoltre, ho un win service, che fa alcune serie di azioni con ogni riga (possibili esigenze leggere e scrivere dati da un altro database). Quindi l'elaborazione di una riga richiede molto tempo. (In media circa 100 secondi)Ho bisogno di una funzione contributon per le istanze del servizio win

Il mio servizio win funziona in questo modo:

public class MyDto 
{ 
    public Guid Id { get; set; } 
} 

while (true){ 
    if(time to start){ 
      List<MyDto> rows = LoadData(); 
      foreach(MyDto obj in rows){ 
       Process(obj);//it takes in average about 100 sec 
      } 
    } 
} 

ho bisogno di ridurre i tempi di esecuzione di tutte le mie righe. Per alcune ragioni ho deciso di aumentare le insatnces del mio servizio di vincita. Quindi ho bisogno che ogni servizio di vincita scriva il proprio set di righe.

ho parametrizzata mia LoadData() divertimento:

public List<MyDto> LoadData(int winServInstanceNumber){ 

} 

quindi ho bisogno di una funzione di contributo dipende dai casi in totale Servizio vittoria contare e concreate numero di istanza del servizio vittoria.

potete offrire qualcosa di meglio di

//on .net side 
obj.Id.GetHashCode()%totalWinServiceInstancesCount 

o

--on sql side 
HASHBYTES('MD5', CAST(id as varbinary(16)))%totalWinServiceInstancesCount 

risposta

1

Sembra che tutto ciò che serve è quello di far girare più thread al trattamento dei dati. Ma per fare questo, è necessario un controllo di ciò che si sta elaborando, al fine di non elaborare la stessa cosa due volte. Per ottenere un controllo è possibile utilizzare MSMQ, ad esempio, oppure System.Collections.Queue. Il tuo servizio dovrebbe essere responsabile di interrogare il database e caricare le righe non elaborate nella tua coda.

Quindi, è possibile chiamare il metodo statico ProcessBatch. Andrà in coda e farà girare un thread (s), e passerà l'id (s) della riga (s) al processore (s)/lavoratori. worker elaborerà solo una riga. Il lavoratore può essere EXE separato e finire fuori processo. Il tuo 'ProcessBatch' dovrebbe controllare, ciò che viene elaborato/non elaborato. Dovrebbe controllare quanti thread sono attualmente in esecuzione. Non vuoi girare troppi.

Così

Service   ProcessControl   Worker 
    |      |    | 
    |---Load Queue   |    | 
    |   |    |    | 
    |<--------|    |    | 
    |      |    | 
    |-----Call When Q ----->|---Queue  | 
    |      |  |  | 
    |      |<------|  | 
    |      |    | 
    |---Load Queue   |----Start------>| 
    |  |    |<---Success-----| 
    |<-------|    |    | 
    |      |---Permanent | 
    |-----Call When Q ----->| Dequeue  | 
    |      |  |  | 
    |      |<------|  | 

Questo è probabilmente tipico spaccato carico di lavoro che accelera i processi altrimenti lenti

+0

Buon punto, grazie per la risposta, ma stavo chiedendo informazioni sulla funzione di contributo. Votazione BTW per la soluzione – isxaker

+0

@isxaker Spiacente, potrei perdersi qualcosa qui.Ho pensato che la tua domanda riguardava l'accelerazione dell'elaborazione dei tuoi dati invece di 'foreach (MyDto obj in rows)'. Potresti spiegare cosa significa per te "funzione di contributo"? Grazie –

+0

È una funzione (si presenta come una qualche funzione di hash), dipende da uno (il numero di istanze di win service corrente) o da due (il numero di istanze di win serv attuale e il numero totale di win service) globali. Anche quella funzione ha un parametro di input (GUID nel mio caso). La funzione deve fare un po 'di "magia" con valore di input e return int number appartiene a [0; totalWinServInstancesNumber -1]; sembra un po 'un hash di Guid nel mio caso – isxaker

1

Invece di cercare di eseguire più istanze dello stesso servizio, si dovrebbe adottare asincrono modello produttore/consumatore. Usa l'oggetto Task per dare il via a un produttore, quindi crea molti consumatori. Se i tuoi dati devono essere elaborati in un determinato ordine, dovrai organizzare i consumatori affinché lavorino solo sul blocco di dati assegnato. Altrimenti possono riprendere il loro lavoro e iniziare l'elaborazione.

L'esempio seguente presuppone che il lavoro possa continuare in qualsiasi ordine. È possibile ottimizzare il numero di utenti ottimizzato in base alle risorse di sistema. Usa AppSetting per configurare MaxConsumer e trova il numero ideale che ottimizza l'elaborazione.

Collega il metodo di avvio/arresto all'avvio/arresto del servizio, nonché qualsiasi altra registrazione/gestione delle eccezioni necessaria. L'esempio qui è semplicistico e mostra le basi del modello.

public class MyService 
{ 
    BlockingCollection<MyDto> sharedResource = new BlockingCollection<MyDto>(); 
    CancellationTokenSource cancellation; 
    private Task producer; 
    private List<Task> consumers; 
    //Load/Set this from configuration 
    private static readonly int MaxConsumer = 3; 

    public void Start() 
    { 
     this.cancellation = new CancellationTokenSource(); 

     // Start the producer & Consumers, as long running task 
     this.producer = Task.Factory.StartNew(() => this.Produce(), TaskCreationOptions.LongRunning); 
     this.consumers = new List<Task>(); 
     for(int i=0; i<MaxConsumer; i++) 
     { 
      this.consumers.Add(Task.Factory.StartNew(() => this.Consume() , TaskCreationOptions.LongRunning)); 
     } 

     // If you need primary service loop you can do 
     // something like the following 
     // while(!this.cancellation.IsCancellationRequested) 
     //{ 
     //  this.cancellation.Token.WaitHandle.WaitOne(1000); 
     //} 
    } 

    public void Stop() 
    { 
     this.cancellation.Cancel(); 
     WaitOnTask(producer); 
     foreach(var t in this.consumers) 
     { 
      WaitOnTask(t); 
     } 
     this.cancellation.Dispose(); 
    } 

    private void WaitOnTask(Task task) 
    { 
     try 
     { 
      if (!task.IsCompleted) 
      { 
       //May want to use timeout 
       //instead of blindly waiting 
       task.Wait(); 
      } 
     } 
     catch(ObjectDisposedException oex)   
     { 
      // Task might have been disposed/closed already 
     } 

    } 

    public void Produce() 
    { 
     var token = this.cancellation.Token; 
     while(!token.IsCancellationRequested) 
     { 
      //Code for your data loading 
      if (time to start) 
      { 
       List<MyDto> rows = LoadData(); 
       foreach(var data in rows) 
       { 
        this.sharedResource.Add(data, token); 
       } 
      } 

      //Wait and repeat 
      token.WaitHandle.WaitOne(1000); 
     } 
    } 

    public void Consume() 
    { 
     var token = this.cancellation.Token; 
     try 
     { 
      foreach (var data in this.sharedResource.GetConsumingEnumerable(token)) 
      { 
       // Code for your data processing 
       Process(data); 
      } 
     } 
     catch(OperationCanceledException ex) 
     { 
      // service stop requested, can log here 
      // or take action for saving state as needed 
     } 
    } 
} 
+0

Il mio commento è lo stesso di T.S. risposta - Buon punto, grazie per la tua risposta, ma stavo chiedendo informazioni sulla funzione di contribuzione. BTW vota per la tua soluzione – isxaker

Problemi correlati