2014-12-31 14 views
23

Używamy funkcjonalność Stream w RavenDB załadować, transformacji i migracji danych między 2 baz danych tak:RavenDB Stream dla nieograniczonej wyszukiwania - Odporność połączenia

var query = originSession.Query<T>(IndexForQuery); 

using (var stream = originSession.Advanced.Stream(query)) 
{ 
    while (stream.MoveNext()) 
    { 
     var streamedDocument = stream.Current.Document; 

     OpenSessionAndMigrateSingleDocument(streamedDocument); 
    } 
} 

Problem polega na tym, że jedna z kolekcji ma miliony wierszy i trzymamy otrzymaniu IOException w następującym formacie:

Application: MigrateToNewSchema.exe 
Framework Version: v4.0.30319 
Description: The process was terminated due to an unhandled exception. 
Exception Info: System.IO.IOException 
Stack: 
    at System.Net.ConnectStream.Read(Byte[], Int32, Int32) 
    at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32) 
    at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32) 
    at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef) 
    at System.IO.StreamReader.Read(Char[], Int32, Int32) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue() 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal() 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read() 
    at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext() 
    at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext() 
    at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection() 
    at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore) 
    at MigrateToNewSchema.Program.Main(System.String[]) 

dzieje się dość długą drogę do strumieniowania i oczywiście problemy z połączeniem przemijające nastąpi na tego rodzaju okresu (trwa godzin do zakończenia).

Jednakże, gdy próbujemy ponownie, ponieważ używamy Query, musimy zacząć od zera. Tak więc ostatecznie, jeśli wystąpi awaria połączenia podczas całego Stream, musimy spróbować ponownie i ponownie, aż do końca.

Wiem, że można użyć ETag ze strumieniem do efektywnego restartu w pewnym punkcie, jednak nie ma przeciążenia, aby to zrobić z Query, który musimy filtrować migrowane wyniki i określić poprawną kolekcję.

Tak, czy w RavenDB istnieje sposób na poprawienie wewnętrznej odporności połączenia (właściwości ciągu połączenia, ustawień wewnętrznych itp.) Lub efektywne "odzyskanie" strumienia w przypadku błędu?

+0

odkryłem [Dane Subskrypcje] (http://ravendb.net/docs/article-page/3.0/csharp/client-api/data- subskrypcje/how-to-create-subskrypcja danych), funkcja RavenDb 3.0, która zapewnia niezawodny mechanizm do iteracji w zbiorze dokumentów spełniających określone kryteria i umożliwia łatwe przechwytywanie od miejsca, w którym zostało przerwane. Gdyby ktoś chciał złożyć kilka próbek kodu pokazujących, w jaki sposób ta funkcja może odpowiedzieć na to pytanie, uznałbym to za warte nagrody. – StriplingWarrior

+0

Czy masz powiązanie z zapytaniem? Chociaż będzie to bardziej nieefektywne, jest to migracja, więc pamięć nie jest problemem - dlaczego nie przerobić surowych kolekcji dokumentów i filtrować w pamięci, aby można było wznowić działanie przy użyciu Etag? Tak radzę sobie z wszystkimi strumieniami, nigdy nie używam zapytań. – kamranicus

+0

@StriplingWarrior Minęło już trochę czasu :-) Nie pracuję już dla firmy korzystającej z RavenDB, ale nadal mnie to interesuje, więc podpiszę dzisiaj kod subskrypcji danych. –

Odpowiedz

2

Zgodnie z sugestią @StriplingWarrior, odtworzyłem rozwiązanie przy użyciu Data Subscriptions.

Dzięki temu podejściu udało mi się powtórzyć wszystkie 2 miliony wierszy (choć wprawdzie przy znacznie mniejszej liczbie operacji na sztukę); 2 punkty tutaj, które pomogły kiedy staraliśmy się realizować tę samą logikę za pomocą strumieni:

„kolejka”
  1. Partie tylko zostaną usunięte z subskrypcji raz potwierdzony (jak większość standardowych kolejek)
    1. subskrybowanego IObserver<T> musi zakończyć się pomyślnie, aby ustawić to potwierdzenie.
    2. Informacje te są obsługiwane przez serwer zamiast klient więc pozwala klientowi na ponowne uruchomienie bez wpływu na ostatnią udaną pozycję przetwarzane w subskrypcji
    3. See here for more details
  2. Jak @StriplingWarrior wskazane, ponieważ można utworzyć z subskrypcji od razu do poziomu właściwości możliwe byłoby powtórzenie z mniejszym zestawem wyników w przypadku wystąpienia wyjątku w samej subskrypcji.
    1. Pierwszy punkt naprawdę zastępuje to; ale pozwala nam dodatkową elastyczność nie widział w strumieniu API

środowisku testowym jest 3,0 bazie RavenDB (komputer lokalny, działa jako usługa Windows) z ustawieniami domyślnymi przeciwko kolekcji 2 miliony rekordów.

Kod do generowania fikcyjne rekordy:

using (IDocumentStore store = GetDocumentStore()) 
{ 
    store.Initialize(); 

    using (var bulkInsert = store.BulkInsert()) 
    { 
     for (var i = 0; i != recordsToCreate; i++) 
     { 
      var person = new Person 
      { 
       Id = Guid.NewGuid(), 
       Firstname = NameGenerator.GenerateFirstName(), 
       Lastname = NameGenerator.GenerateLastName() 
      }; 

      bulkInsert.Store(person); 
     } 
    } 
} 

korzystającymi z tej kolekcji jest wówczas przypadek:

using (IDocumentStore store = GetDocumentStore()) 
{ 
    store.Initialize(); 

    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>()); 

    var personSubscription = store.Subscriptions.Open<Person>(
     subscriptionId, new SubscriptionConnectionOptions() 
    { 
     BatchOptions = new SubscriptionBatchOptions() 
     { 
      // Max number of docs that can be sent in a single batch 
      MaxDocCount = 16 * 1024, 
      // Max total batch size in bytes 
      MaxSize = 4 * 1024 * 1024, 
      // Max time the subscription needs to confirm that the batch 
      // has been successfully processed 
      AcknowledgmentTimeout = TimeSpan.FromMinutes(3) 
     }, 
     IgnoreSubscribersErrors = false, 
     ClientAliveNotificationInterval = TimeSpan.FromSeconds(30) 
    }); 

    personSubscription.Subscribe(new PersonObserver()); 

    while (true) 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(500)); 
    } 
} 

zanotować PersonObserver; to jest po prostu podstawowy realizacja IObserver tak:

public class PersonObserver : IObserver<Person> 
{ 
    public void OnCompleted() 
    { 
     Console.WriteLine("Completed"); 
    } 

    public void OnError(Exception error) 
    { 
     Console.WriteLine("Error occurred: " + error.ToString()); 
    } 

    public void OnNext(Person person) 
    { 
     Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'"); 
    } 
} 
+1

Fajna wiadomość. Zauważyłem, że warto przekazać w 'Task' (lub utworzyć' Task' na podstawie danego 'CancellationToken') i' czekać' na zadanie zamiast 'while (true)'. W ten sposób kod wywołujący może bezpiecznie anulować operację bez zabicia całego wątku lub procesu. Wymyśliłem także mechanizm oparty na ETag, aby pomóc migrantom dowiedzieć się, kiedy "wykonano" uderzenie we wszystkie dokumenty docelowe, aby mógł się zatrzymać, ale jest to dość skomplikowane i nie jest dobre dla wszystkich celów. – StriplingWarrior