2013-07-24 6 views
5

Mam następujące ustawieniaJak anulować Wybierz w RX, jeśli nie jest zakończone przed kolejnym wydarzeniu przybywa

IObservable<Data> source = ...; 

source 
    .Select(data=>VeryExpensiveOperation(data)) 
    .Subscribe(data=>Console.WriteLine(data)); 

Normalnie wydarzenia przychodzą oddzielone rozsądnych ramach czasowych. Wyobraź sobie użytkownika aktualizującego pole tekstowe w formularzu. Nasz VeryExpensiveOperation może zająć 5 sekund, aby zakończyć, a gdy zajmie godzinę, na ekranie wyświetli się szklanka.

Jeśli jednak przez 5 sekund użytkownik zaktualizuje pole tekstowe ponownie Chciałbym wysłać anulowanie do bieżącego VeryExpensiveOperation przed rozpoczęciem nowego.

Mogę sobie wyobrazić scenariusz, jak

source 
    .SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token)) 
    .Subscribe(data=>Console.WriteLine(data)); 

więc za każdym razem lambda nazywa się nazywa z cancelToken, które mogą być używane do zarządzania anulowanie Task. Jednak teraz mieszamy Task, CancelationToken i RX. Nie jestem pewien, jak to wszystko zmieścić. Jakieś sugestie.

Bonus Points na zastanawianie się, jak testować operatora za pomocą xUnit :)

pierwsza próba

public static IObservable<U> SelectWithCancelation<T, U>(this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn) 
    { 
     CancellationTokenSource tokenSource = new CancellationTokenSource(); 

     return This 
      .ObserveOn(Scheduler.Default) 
      .Select(v=>{ 
       tokenSource.Cancel(); 
       tokenSource=new CancellationTokenSource(); 
       return new {tokenSource.Token, v}; 
      }) 
      .SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v))); 
    } 

jeszcze nie testowano. Mam nadzieję, że zadanie, które nie jest kompletne, generuje IObservable, które kończy się bez uruchamiania żadnych zdarzeń OnNext.

Odpowiedz

7

Musisz modelować VeryExpensiveOperation jako anulowaną rzecz asynchroniczną. Albo Task lub IObservable. Ja zakładam, że jest to zadanie o CancellationToken:

Task<TResult> VeryExpensiveOperationAsync<TSource, TResult>(TSource item, CancellationToken token); 

Potem robisz to tak:

source 
    .Select(item => Observable.DeferAsync(async token => 
    { 
     // do not yield the observable until after the operation is completed 
     // (ie do not just do VeryExpensiveOperation(...).ToObservable()) 
     // because DeferAsync() will dispose of the token source as soon 
     // as you provide the observable (instead of when the observable completes) 
     var result = await VeryExpensiveOperationAsync(item, token); 
     return Observable.Return(result); 
    }) 
    .Switch(); 

Select tylko odroczony tworzy się zaobserwować, że gdy subskrybowane, stworzy token i rozpocznij operację. Jeśli obserwowalne zostanie anulowane przed zakończeniem operacji, token zostanie anulowany.

Subskrybuje do każdego nowego obserwowalnego, który pochodzi z Select, anulując subskrypcję z poprzedniego obserwowalnego, do którego został subskrybowany.

Ma to pożądany efekt.

P.S. to jest łatwe do sprawdzenia. Po prostu podaj fałszywe źródło i próbę: VeryExpensiveOperation, które używa TaskCompletetionSource dostarczonego przez test jednostkowy, dzięki czemu test jednostki może kontrolować dokładnie, kiedy są produkowane nowe elementy źródłowe i kiedy zadania są zakończone. Coś takiego:

void SomeTest() 
{ 
    // create a test source where the values are how long 
    // the mock operation should wait to do its work. 
    var source = _testScheduler.CreateColdObservable<int>(...); 

    // records the actions (whether they completed or canceled) 
    List<bool> mockActionsCompleted = new List<bool>(); 
    var resultStream = source.SelectWithCancellation((token, delay) => 
    { 
     var tcs = new TaskCompletionSource<string>(); 
     var tokenRegistration = new SingleAssignmentDisposable(); 

     // schedule an action to complete the task 
     var d = _testScheduler.ScheduleRelative(delay,() => 
     { 
      mockActionsCompleted.Add(true); 
      tcs.SetResult("done " + delay); 
      // stop listening to the token 
      tokenRegistration.Dispose(); 
     }); 

     // listen to the token and cancel the task if the token signals 
     tokenRegistration.Disposable = token.Register(() => 
     { 
      mockActionsCompleted.Add(false); 
      tcs.TrySetCancelled(); 
      // cancel the scheduled task 
      d.Dispose(); 
     }); 

     return tcs.Task; 
    }); 

    // subscribe to resultStream 
    // start the scheduler 
    // assert the mockActionsCompleted has the correct sequence 
    // assert the results observed were what you expected. 
} 

Można popaść w kłopoty z wykorzystaniem testScheduler.Start() powodu nowych działań zaplanowanych dynamicznie. pętla while z testScheduler.AdvanceBy(1) może działać lepiej.

+0

Nie bardzo rozumiem, gdzie "token" jest anulowane. Czy dzieje się to automagicznie, gdy Switch unsubscribes z poprzedniej wewnętrznej obserwowalne? – bradgonesurfing

+2

'DeferAsync' tworzy token, gdy obserwowalny jest zasubskrybowany. Następnie anuluje token, jeśli obserwowalne zostanie anulowane, zanim zadanie wygeneruje * rzeczywiste * obserwowalne (informując, że nie musisz pracować nad wygenerowaniem rzeczywistego obserwowalnego). 'Switch' utrzymuje subskrypcję nowego obserwowalnego, który przybywa i anuluje subskrypcję poprzedniego, co powoduje anulowanie poprzedniego. – Brandon

+0

Niesamowity wgląd w to, jak to działa. Dzięki! – bradgonesurfing

0

Dlaczego po prostu nie użyć przepustnicy?

http://rxwiki.wikidot.com/101samples#toc30

przepustnicy zatrzymuje przepływ zdarzeń, dopóki nie ma więcej wydarzeń produkowanych przez określony okres czasu. Na przykład, jeśli dławisz zdarzenie TextChanged pola tekstowego do 0,5 sekundy, żadne zdarzenia nie będą przekazywane, dopóki użytkownik nie przestanie pisać przez 5 sekund. Jest to użyteczne w polach wyszukiwania, w których nie chcesz rozpoczynać nowego wyszukiwania po każdym naciśnięciu klawisza, ale chcesz poczekać, aż użytkownik się zatrzyma.

SearchTextChangedObservable = Observable.FromEventPattern<TextChangedEventArgs>(this.textBox, "TextChanged"); 
_currentSubscription = SearchTextChangedObservable.Throttle(TimeSpan.FromSeconds(.5)).ObserveOnDispatcher 
+0

Zrobiłeś to źle. Wydarzenia nie przychodzą zbyt szybko. Nie ma potrzeby otwierania przepustnicy. Przetwarzanie jest bardzo kosztowne i powinno zostać anulowane, jeśli są dostępne nowe dane. – bradgonesurfing

+0

Właściwie miałem na myśli przepustnicę przed rozpoczęciem kosztownego przetwarzania, abyśmy mogli go uruchomić tylko wtedy, gdy wiemy, że będzie miał czas na zakończenie. Jednak teraz widzę, że byłby to prymitywny szacunek czasu potrzebnego do ukończenia w porównaniu z wdrożeniem Brandona. – AlSki

Powiązane problemy