Mam następujące ustawieniaJak anulować Wybierz w RX, jeśli nie jest zakończone przed kolejnym wydarzeniu przybywa
IObservable<Data> source = ...;
source
.Select(data=>VeryExpensiveOperation(data))
.Subscribe(data=>Console.WriteLine(data));
Normalnie wydarzenia przychodzą oddzielone rozsądnych ramach czasowych. Wyobraź sobie użytkownika aktualizującego pole tekstowe w formularzu. Nasz VeryExpensiveOperation
może zająć 5 sekund, aby zakończyć, a gdy zajmie godzinę, na ekranie wyświetli się szklanka.
Jeśli jednak przez 5 sekund użytkownik zaktualizuje pole tekstowe ponownie Chciałbym wysłać anulowanie do bieżącego VeryExpensiveOperation
przed rozpoczęciem nowego.
Mogę sobie wyobrazić scenariusz, jak
source
.SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token))
.Subscribe(data=>Console.WriteLine(data));
więc za każdym razem lambda nazywa się nazywa z cancelToken, które mogą być używane do zarządzania anulowanie Task
. Jednak teraz mieszamy Task, CancelationToken i RX. Nie jestem pewien, jak to wszystko zmieścić. Jakieś sugestie.
Bonus Points na zastanawianie się, jak testować operatora za pomocą xUnit :)
pierwsza próba
public static IObservable<U> SelectWithCancelation<T, U>(this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn)
{
CancellationTokenSource tokenSource = new CancellationTokenSource();
return This
.ObserveOn(Scheduler.Default)
.Select(v=>{
tokenSource.Cancel();
tokenSource=new CancellationTokenSource();
return new {tokenSource.Token, v};
})
.SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v)));
}
jeszcze nie testowano. Mam nadzieję, że zadanie, które nie jest kompletne, generuje IObservable, które kończy się bez uruchamiania żadnych zdarzeń OnNext
.
Nie bardzo rozumiem, gdzie "token" jest anulowane. Czy dzieje się to automagicznie, gdy Switch unsubscribes z poprzedniej wewnętrznej obserwowalne? – bradgonesurfing
'DeferAsync' tworzy token, gdy obserwowalny jest zasubskrybowany. Następnie anuluje token, jeśli obserwowalne zostanie anulowane, zanim zadanie wygeneruje * rzeczywiste * obserwowalne (informując, że nie musisz pracować nad wygenerowaniem rzeczywistego obserwowalnego). 'Switch' utrzymuje subskrypcję nowego obserwowalnego, który przybywa i anuluje subskrypcję poprzedniego, co powoduje anulowanie poprzedniego. – Brandon
Niesamowity wgląd w to, jak to działa. Dzięki! – bradgonesurfing