2013-02-11 16 views
5

Jak zrobić w RX prostą, stanową transformację sekwencji?RX: Stateful transformacja sekwencji, np. wykładnicza średnia ruchoma

Powiedzmy, że chcemy dokonać wykładniczej średniej ruchomej transformacji IObservable noisySequence.

Ilekroć noisySequence kleszcze, emaSequence należy zaznaczyć i zwrócić wartość (previousEmaSequenceValue * (1-lambda) + latestNoisySequenceValue * lambda)

Chyba używamy przedmiotów, ale jak dokładnie?

public static void Main() 
    { 

     var rand = new Random(); 

     IObservable<double> sequence = Observable 
      .Interval(TimeSpan.FromMilliseconds(1000)) 
      .Select(value => value + rand.NextDouble()); 

     Func<double, double> addNoise = x => x + 10*(rand.NextDouble() - 0.5); 

     IObservable<double> noisySequence = sequence.Select(addNoise); 

     Subject<double> exponentialMovingAverage = new Subject<double>(); // ??? 


     sequence.Subscribe(value => Console.WriteLine("original sequence "+value)); 
     noisySequence.Subscribe(value => Console.WriteLine("noisy sequence " + value)); 
     exponentialMovingAverage.Subscribe(value => Console.WriteLine("ema sequence " + value)); 

     Console.ReadLine(); 
    } 
+1

Aby wyjaśnić, jestem mniej zainteresowany konkretnym sposobem zrobić średnio, ale raczej ogólne sposoby aby stanowe transformat. – Sputnik2513

Odpowiedz

3

Dla wielu z tych rodzajów obliczeń Buffer jest najprostszym sposobem

var movingAverage = noisySequence.Buffer(/*last*/ 3, /*move forward/* 1 /*at a time*/) 
    .Select(x => (x[0] + x[1] + x[2])/3.0); 

Jeśli musisz nosić stan dookoła, użyj operatora Scan, czyli jak Aggregate oprócz tego, że daje wartości co iteracja.

+0

Działa to w tym konkretnym przykładzie średniej. Bardziej ogólnie, jeśli potrzebujemy stanów wewnętrznych, jak najlepiej to zrobić w RX? – Sputnik2513

+0

Również bez żadnych stanów. Aby wprowadzić wykładniczą średnią kroczącą, musisz znać ostatnią wartość EMA. – Sputnik2513

+0

Jak powiedział Paul, możesz użyć Scan, który umożliwia agregowanie w locie, zaczynając od "stanu" nasion i wykonując pewne działania w tym stanie dla każdego OnNext. Następnie możesz wybrać ze stanu wartość, którą chcesz opublikować poza sekwencją. Jeśli jednak potrzebujesz dostępu do więcej niż tylko aktualnej wartości zagregowanej i najnowszej, operatorzy okien mogą z nich korzystać. –

7

W ten sposób można dołączyć stan do sekwencji. W tym przypadku oblicza średnią z ostatnich 10 wartości.

var movingAvg = noisySequence.Scan(new List<double>(), 
(buffer, value)=> 
{ 
    buffer.Add(value); 
    if(buffer.Count>MaxSize) 
    { 
     buffer.RemoveAt(0); 
    } 
    return buffer; 
}).Select(buffer=>buffer.Average()); 

Ale można użyć okna (który to bufor jest generalizacją), aby uzyskać również średnią.

noisySequence.Window(10) 
    .Select(window=>window.Average()) 
    .SelectMany(averageSequence=>averageSequence); 
+0

Styczna: twoje artykuły na temat 'Okna',' Bufora', 'Dołączania' i' GroupJoin' były wyjątkowo pouczające - w rzeczywistości, wydaje mi się, że "zacytowałem" cię kilka razy, odpowiadając na pytania związane z Rx. :) – JerKimball

1

Dzięki! Oto rozwiązanie wykorzystujące skanowanie

const double lambda = 0.99; 
    IObservable<double> emaSequence = noisySequence.Scan(Double.NaN, (emaValue, value) => 
     { 
      if (Double.IsNaN(emaValue)) 
      { 
       emaValue = value; 
      } 
      else 
      { 
       emaValue = emaValue*lambda + value*(1-lambda); 
      } 
      return emaValue; 
     }).Select(emaValue => emaValue);