2012-11-19 18 views
9

Piszę aplikację C# (.NET 4.5), która służy do agregowania zdarzeń opartych na czasie do celów raportowania. Aby moja logika kwerend mogła być ponownie używana zarówno w czasie rzeczywistym, jak iw danych historycznych, korzystam z infrastruktury Reactive Extensions (2.0) i ich infrastruktury IScheduler (i znajomych).Dlaczego metoda Observable.Generate() generuje wyjątek System.StackOverflowException?

Na przykład załóżmy, tworzymy listę zdarzeń (chronologicznie, ale mogą pokrywać się!), Której jedynym ładowność ist ich znacznika czasu i chcesz poznać ich dystrybucji w całej buforów ustalony czas trwania:

const int num = 100000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 

var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

var stream = Observable.Generate<int, DateTimeOffset>(
    0, 
    s => s < events.Count, 
    s => s + 1, 
    s => events[s], 
    s => events[s], 
    time); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 

uruchomiony ten kod wyniki w System.StackOverflowException z poniższego śladu stosu (its ciągu ostatnich 3 linie w dół):

mscorlib.dll!System.Threading.Interlocked.Exchange<System.IDisposable>(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 
System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes  
System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes  
System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem<System.DateTimeOffset>.Cancel() + 0x23 bytes  
... 

Ok, problem wydaje się pochodzić z mojego użytkowania Observable.Generate(), w zależności od listy rozmiar (num) i niezależnie od wyboru programu planującego.

Co robię źle? Lub bardziej ogólnie, jaki byłby preferowany sposób tworzenia IObservable z wydarzeń, które dostarczają własne znaczniki czasu?

+1

Jak duży może być "num", zanim wystąpi ten błąd? Co więcej, jeśli rozwiążesz ten problem w debugerze, jaki jest ostatni wiersz kodu wykonywany przed pojawieniem się błędu? –

+0

Dla mnie próg krytyczny wydaje się mieć wartość ~ 'num = 51600' (w wersji Release config, nieco mniej w konfiguracji debugowania). Obserwowalna sekwencja wydaje się być całkowicie stworzona. Mogę trafić punkty przerwania w wyrażeniach lamdba dla 'Observable.Generate()'. Wyjątek zostanie zgłoszony po ostatnim wywołaniu 'Console.WriteLine()'. –

+1

Zrozum, to tylko przypuszczenie, ale wygląda podejrzanie, tak jak strumień próbuje pozbyć się każdego elementu, a każdy element próbuje zrzucić strumień. Kończy się to, co w istocie wywołuje rekursywne wezwania do "Cancel" lub "Dispose", które dmucha twój stack (domyślny rozmiar to 1 megabajt). Nie jestem wystarczająco obeznany z "Observable", aby powiedzieć, dlaczego tak się dzieje. –

Odpowiedz

3

(aktualizacja - zrealizowane nie stanowić alternatywę: patrz na bottom of answer)

Problem polega na tym, jak działa Observable.Generate - służy do rozwinięcia generatora corecursive (pomyśl, że rekurencja odwrócona) na podstawie argumentów; jeśli te argumenty w końcu generują generator corecursive zagnieżdżony, wyrzucisz swój stos.

Od tego momentu, Spekuluję dużo (nie mam źródła Rx przede mną) (patrz poniżej), ale jestem gotów założyć się, że twoja definicja kończy się rozszerzeniem na coś takiego :

initial_state => 
generate_next(initial_state) => 
generate_next(generate_next(initial_state)) => 
generate_next(generate_next(generate_next(initial_state))) => 
generate_next(generate_next(generate_next(generate_next(initial_state)))) => ... 

I tak długo, aż twój stos wywołań będzie wystarczająco duży, aby się przepełnić.Na przykład sygnatura metody + twój licznik int, który będzie miał wartość około 8-16 bajtów na rekurencyjne wywołanie (więcej w zależności od tego, w jaki sposób jest realizowany generator automatów stanów), więc 60 000 dźwięków w przybliżeniu prawych (1M/16 ~ 62500 maks. głębokość)

EDIT: podciągnął źródło - potwierdził: „Uruchom” metoda Generowanie wygląda następująco - zapoznaje się z zagnieżdżonych wywołań Generate:

protected override IDisposable Run(
    IObserver<TResult> observer, 
    IDisposable cancel, 
    Action<IDisposable> setSink) 
{ 
    if (this._timeSelectorA != null) 
    { 
     Generate<TState, TResult>.α α = 
       new Generate<TState, TResult>.α(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(α); 
     return α.Run(); 
    } 
    if (this._timeSelectorR != null) 
    { 
     Generate<TState, TResult>.δ δ = 
       new Generate<TState, TResult>.δ(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
     setSink(δ); 
     return δ.Run(); 
    } 
    Generate<TState, TResult>._ _ = 
      new Generate<TState, TResult>._(
        (Generate<TState, TResult>) this, 
        observer, 
        cancel); 
    setSink(_); 
    return _.Run(); 
} 

EDIT: Derp, nie oferują wszelkie alternatywy ... oto jedna, która może działać:

(EDYTOWANIE: naprawiono Enumerable.Range, więc rozmiar strumienia nie zostanie pomnożony przez chunkSize)

const int num = 160000; 
const int dist = 10; 

var events = new List<DateTimeOffset>(); 
var curr = DateTimeOffset.Now; 
var gap = new Random(); 
var time = new HistoricalScheduler(curr); 

for (int i = 0; i < num; i++) 
{ 
    events.Add(curr); 
    curr += TimeSpan.FromMilliseconds(gap.Next(dist)); 
} 

    // Size too big? Fine, we'll chunk it up! 
const int chunkSize = 10000; 
var numberOfChunks = events.Count/chunkSize; 

    // Generate a whole mess of streams based on start/end indices 
var streams = 
    from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count/chunkSize) - 1) 
    let startIdx = chunkIndex * chunkSize 
    let endIdx = Math.Min(events.Count, startIdx + chunkSize) 
    select Observable.Generate<int, DateTimeOffset>(
     startIdx, 
     s => s < endIdx, 
     s => s + 1, 
     s => events[s], 
     s => events[s], 
     time); 

    // E pluribus streamum 
var stream = Observable.Concat(streams); 

stream.Buffer(TimeSpan.FromMilliseconds(num), time) 
    .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); 

time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 
+0

Dzięki, to jest idealne! Wydaje się również być bardziej skuteczny niż moje własne obejście. Musiałem jednak naprawić mały błąd w twojej arytmetyce (patrz edycja). Wciąż nie rozumiem, dlaczego rekurencyjna implementacja jest potrzebna w RX. W końcu wydaje się, że działa z RX v1.0 (znacznie powyżej rozmiaru 60 000). Mimo to, miłe śledztwo, sprytne rozwiązanie. Dzięki jeszcze raz! –

+0

Nie ma problemu! Heh - jestem pod wrażeniem, że miałem tylko * jeden * błąd matematyczny ...;) – JerKimball

3

OK, wziąłem inną metodę fabryczną, która nie wymaga wyrażeń lamdba jako przejść stanu i teraz nie widzę już żadnych przepełnień stosu. Nie jestem jeszcze pewien, czy to zakwalifikować jako poprawną odpowiedź na moje pytanie, ale działa i pomyślałem bym udostępnić go tutaj:

var stream = Observable.Create<DateTimeOffset>(o => 
    { 
     foreach (var e in events) 
     { 
      time.Schedule(e,() => o.OnNext(e)); 
     } 

     time.Schedule(events[events.Count - 1],() => o.OnCompleted()); 

     return Disposable.Empty; 
    }); 

Ręcznie planowania zdarzeń przed powrotem subskrypcję wydaje (!) niezręczne dla mnie, ale w tym przypadku można to zrobić w wyrażeniu lambda.

Jeśli coś jest nie tak z tym podejściem, popraw mnie. Ponadto nadal będę szczęśliwy słysząc, jakie domyślne założenia przez System.Reactive naruszyłem z moim oryginalnym kodem.

(Oh my, powinienem sprawdzili, że wcześniej: z RX v1.0, oryginalny Observable.Generate() ma w rzeczywistości wydają się działać!)

Powiązane problemy