2014-12-31 9 views
23

Stiamo utilizzando la funzionalità di flusso in RavenDB per caricare, trasformare e migrare i dati tra 2 database in questo modo:RavenDB Stream Risultati Unbounded - Resilienza Collegamento

var query = originSession.Query<T>(IndexForQuery); 

using (var stream = originSession.Advanced.Stream(query)) 
{ 
    while (stream.MoveNext()) 
    { 
     var streamedDocument = stream.Current.Document; 

     OpenSessionAndMigrateSingleDocument(streamedDocument); 
    } 
} 

Il problema è che una delle raccolte ha milioni di righe, e continuiamo a ricevere un IOException nel seguente formato:

Application: MigrateToNewSchema.exe 
Framework Version: v4.0.30319 
Description: The process was terminated due to an unhandled exception. 
Exception Info: System.IO.IOException 
Stack: 
    at System.Net.ConnectStream.Read(Byte[], Int32, Int32) 
    at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32) 
    at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32) 
    at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef) 
    at System.IO.StreamReader.Read(Char[], Int32, Int32) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue() 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal() 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read() 
    at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext() 
    at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext() 
    at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection() 
    at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore) 
    at MigrateToNewSchema.Program.Main(System.String[]) 

questo accade abbastanza lontano in streaming e, naturalmente, problemi di connessione transitorie si verificheranno su questo tipo di periodo (ci vogliono ore per completare).

Tuttavia, quando riproviamo, poiché stiamo utilizzando uno Query, dobbiamo ripartire da zero. Quindi, in definitiva, se c'è un errore di connessione durante l'intero Stream, dobbiamo provarlo di nuovo, e di nuovo fino a quando non funziona fino alla fine.

So che è possibile utilizzare ETag con flusso per riavviare in modo efficace in un determinato punto, tuttavia non c'è sovraccarico per fare questo con uno Query che abbiamo bisogno di filtrare i risultati di essere migrati e specificare la raccolta corretta.

Quindi, in RavenDB, c'è un modo per migliorare la resilienza interna della connessione (proprietà stringa di connessione, impostazioni interne ecc.) O "recuperare" efficacemente un flusso su un errore?

+0

ho scoperto [abbonamenti dati] (http://ravendb.net/docs/article-page/3.0/csharp/client-api/data- abbonamenti/sottoscrizione how-to-create-data), una funzionalità RavenDb 3.0 che fornisce un meccanismo affidabile per iterare attraverso una raccolta di documenti che soddisfano determinati criteri e che consente di riprendere facilmente da dove si era interrotto. Se qualcuno fosse disposto a mettere insieme alcuni esempi di codice che mostrano come quella caratteristica potrebbe rispondere a questa domanda, la considero degna della generosità. – StriplingWarrior

+0

Sei legato all'utilizzo di una query? Anche se sarà più inefficiente, si tratta di una migrazione, quindi la memoria non è un problema, perché non iterare le raccolte di documenti non elaborati e filtrare in memoria, in modo che tu possa riprendere da un Etag? Questo è il modo in cui gestisco tutto lo streaming, non uso mai le query. – kamranicus

+0

@StriplingWarrior E 'passato un po' di tempo :-) Non lavoro più per la compagnia usando RavenDB ma questo mi interessa ancora, quindi risponderò con il codice di sottoscrizione dati oggi –

risposta

2

Come da suggerimento di @StriplingWarrior, ho ricreato la soluzione utilizzando Data Subscriptions.

Utilizzando questo approccio sono stato in grado di eseguire l'iterazione su tutti i 2 milioni di righe (anche se ammettiamo con molta meno elaborazione per articolo); 2 punti qui che avrebbe permesso di quando stavamo cercando di attuare la stessa logica con Streams:

  1. lotti solo vengono rimossi dalla sottoscrizione "coda" una volta riconosciuto (come la maggior parte delle code normali)
    1. Il sottoscritto IObserver<T> deve essere completato correttamente per poter impostare questo riconoscimento.
    2. Queste informazioni viene gestito dal server piuttosto che il cliente in modo permette al cliente di riavviare senza influenzare l'ultima posizione di successo elaborati nella sottoscrizione
    3. See here for more details
  2. Come @StriplingWarrior indicato perché è possibile creare abbonamenti con filtri fino al livello della proprietà sarebbe possibile riprodurre con un risultato inferiore impostato in caso di un'eccezione all'interno della sottoscrizione stessa.
    1. Il primo punto sostituisce questo; ma ci permette maggiore flessibilità che non si vedono nelle API flusso

L'ambiente di test è un 3.0 di database RavenDB (macchina locale, in esecuzione come servizio di Windows) con le impostazioni predefinite contro una raccolta di 2 milioni di dischi.

codice per generare i record fittizi:

using (IDocumentStore store = GetDocumentStore()) 
{ 
    store.Initialize(); 

    using (var bulkInsert = store.BulkInsert()) 
    { 
     for (var i = 0; i != recordsToCreate; i++) 
     { 
      var person = new Person 
      { 
       Id = Guid.NewGuid(), 
       Firstname = NameGenerator.GenerateFirstName(), 
       Lastname = NameGenerator.GenerateLastName() 
      }; 

      bulkInsert.Store(person); 
     } 
    } 
} 

Iscrizione a questa collezione è quindi un caso di:

using (IDocumentStore store = GetDocumentStore()) 
{ 
    store.Initialize(); 

    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>()); 

    var personSubscription = store.Subscriptions.Open<Person>(
     subscriptionId, new SubscriptionConnectionOptions() 
    { 
     BatchOptions = new SubscriptionBatchOptions() 
     { 
      // Max number of docs that can be sent in a single batch 
      MaxDocCount = 16 * 1024, 
      // Max total batch size in bytes 
      MaxSize = 4 * 1024 * 1024, 
      // Max time the subscription needs to confirm that the batch 
      // has been successfully processed 
      AcknowledgmentTimeout = TimeSpan.FromMinutes(3) 
     }, 
     IgnoreSubscribersErrors = false, 
     ClientAliveNotificationInterval = TimeSpan.FromSeconds(30) 
    }); 

    personSubscription.Subscribe(new PersonObserver()); 

    while (true) 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(500)); 
    } 
} 

Annotare il PersonObserver; questa è solo un'implementazione di base di IObserver in questo modo:

public class PersonObserver : IObserver<Person> 
{ 
    public void OnCompleted() 
    { 
     Console.WriteLine("Completed"); 
    } 

    public void OnError(Exception error) 
    { 
     Console.WriteLine("Error occurred: " + error.ToString()); 
    } 

    public void OnNext(Person person) 
    { 
     Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'"); 
    } 
} 
+1

Nice write-up. Ho trovato utile passare un 'Task' (o creare un' Task' basato su un dato 'CancellationToken'), e' attendere l'operazione piuttosto che 'while (true)'. In questo modo, il codice chiamante può annullare in modo sicuro l'operazione senza uccidere l'intero thread o processo. Ho anche inventato un meccanismo basato su ETag per aiutare la migrazione a sapere quando ha "finito" di colpire tutti i documenti di destinazione, in modo che possa fermarsi, ma è piuttosto complicato e non è perfetto per tutti gli scopi. – StriplingWarrior