2013-03-06 14 views
7

Utworzono operator SlidingWindow() dla reaktywnych rozszerzeń, ponieważ chcę łatwo monitorować takie rzeczy, jak średnie kroczące itp. Jako prosty przykład chcę subskrybować, aby usłyszeć zdarzenia myszy, ale za każdym razem jest wydarzenie, które chcę otrzymać trzy ostatnie (zamiast czekać na co trzecie wydarzenie, aby otrzymać trzy ostatnie). Właśnie dlatego przeciążenie okna, które znalazłem nie wydaje mi się, co muszę z pudełka.Problemy z zaimplementowaniem okna przesuwnego w Rx

Oto, co wymyśliłem. Obawiam się, że może nie być najbardziej wydajnych rozwiązanie, biorąc pod uwagę jego częste operacje Lista:

public static IObservable<List<T>> SlidingWindow<T>(this IObservable<T> seq, int length) 
{ 
    var seed = new List<T>(); 

    Func<List<T>, T, List<T>> accumulator = (list, arg2) => 
    { 
     list.Add(arg2); 

     if (list.Count > length) 
      list.RemoveRange(0, (list.Count - length)); 

     return list; 
    }; 

    return seq.Scan(seed, accumulator) 
       .Where(list => list.Count == length); 
} 

To można nazwać w ten sposób:

var rollingSequence = Observable.Range(1, 5).SlidingWindow().ToEnumerable(); 

Jednak, ku mojemu wielkiemu zaskoczeniu, zamiast otrzymaniu spodziewane wyniki

1,2,3 
2,3,4 
3,4,5 

otrzymam wyniki

2,3,4 
3,4,5 
3,4,5 

Wszelkie spostrzeżenia są mile widziane!

Odpowiedz

5

Spróbuj zamiast tego - Musiałbym siedzieć i mają o tym myśleć krewny wydajność, ale to przynajmniej prawdopodobne jako dobre, a sposób łatwiejsze do odczytania: amatorskie

public static IObservable<IList<T>> SlidingWindow<T>(
     this IObservable<T> src, 
     int windowSize) 
{ 
    var feed = src.Publish().RefCount();  
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list 
    return Observable.Zip(
     Enumerable.Range(0, windowSize) 
      .Select(skip => feed.Skip(skip)) 
      .ToArray()); 
} 

testową :

var source = Observable.Range(0, 10); 
var query = source.SlidingWindow(3); 
using(query.Subscribe(Console.WriteLine)) 
{    
    Console.ReadLine(); 
} 

wyjściowa:

ListOf(0,1,2) 
ListOf(1,2,3) 
ListOf(2,3,4) 
ListOf(3,4,5) 
ListOf(4,5,6) 
... 

EDYCJA: Odkładam na bok, że jestem od tego czasu palony, nie robiąc tego ... Nie sądzę, że jest to bezwzględnie wymagane tutaj, o tysiąc dziewięćset dziewięćdziesiąt jeden.

EDIT dla yzorg:

Jeśli poszerzyć metodę jak tak, zobaczysz zachowanie wykonania jaśniej:

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> src, 
    int windowSize) 
{ 
    var feed = src.Publish().RefCount();  
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list 
    return Observable.Zip(
    Enumerable.Range(0, windowSize) 
     .Select(skip => 
     { 
      Console.WriteLine("Skipping {0} els", skip); 
      return feed.Skip(skip); 
     }) 
     .ToArray()); 
} 
+0

@blaster Bez problemu - w rzeczywistości, dziękuję za "zrobienie" mnie, że to piszę, ponieważ użyłem go sam kilka razy, od czasu odpowiedzi. ;) – JerKimball

+0

Nie sądzę, że to jest dobre. .Publish(), .Range (0, x) i .Skip() - gdy są połączone, to wygląda na złą wydajność, konkretnie O n^2, ponieważ Skip ma zamiar iterować cały strumień w kółko.Na przykład musisz przetestować 30 000 liczb całkowitych, aby uzyskać (10000, 10001, 10002). Tak naprawdę nie przechowujesz w pamięci przesuwnego bufora strumienia źródłowego, musisz przechowywać cały strumień źródłowy (od początku czasu) w pamięci, co uważałem za unikanie. – yzorg

+0

@yzorg sprawdź edycję – JerKimball

9

Używanie oryginalnego testu z argumentem 3 dla zliczania, to daje pożądane rezultaty:

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> source, int count) 
{ 
    return source.Buffer(count, 1) 
       .Where(list => list.Count == count); 
} 

Testowanie tak:

var source = Observable.Range(1, 5); 
var query = source.SlidingWindow(3); 
using (query.Subscribe(i => Console.WriteLine(string.Join(",", i)))) 
{ 

} 

wyjściowa:

1,2,3 
2,3,4 
3,4,5 
6

Wystarczy source.Window(count, 1) - lub source.Buffer(count, 1) to być okno/bufor "Count" pozycje, przesuwając się o jeden.