Oto rozsądnie prosty sposób Rx, aby zrobić to, co chcesz. Stworzyłem metodę rozszerzenia o nazwie Pausable
, która ma źródło obserwowalne, a druga obserwowalną z boolean
, która zatrzymuje lub wznawia obserwowalne.
public static IObservable<T> Pausable<T>(
this IObservable<T> source,
IObservable<bool> pauser)
{
return Observable.Create<T>(o =>
{
var paused = new SerialDisposable();
var subscription = Observable.Publish(source, ps =>
{
var values = new ReplaySubject<T>();
Func<bool, IObservable<T>> switcher = b =>
{
if (b)
{
values.Dispose();
values = new ReplaySubject<T>();
paused.Disposable = ps.Subscribe(values);
return Observable.Empty<T>();
}
else
{
return values.Concat(ps);
}
};
return pauser.StartWith(false).DistinctUntilChanged()
.Select(p => switcher(p))
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, paused);
});
}
Może być stosowany tak:
var xs = Observable.Generate(
0,
x => x < 100,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(0.1));
var bs = new Subject<bool>();
var pxs = xs.Pausable(bs);
pxs.Subscribe(x => { /* Do stuff */ });
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Teraz jedyną rzeczą, nie mogłem się zorientować, co masz na myśli przez „przychodzącego strumienia zdarzeń jest IObserver<T>
”. Strumienie to IObservable<T>
. Obserwatorzy nie są strumieniami. Wygląda na to, że czegoś tu nie robisz. Czy możesz dodać swoje pytanie i wyjaśnić dalej?
Myśląc, że jeśli wyjście jest wstrzymane, można przekierować wydarzenia w kolejce wewnętrznej, a gdy outout jest wstrzymane, to może opróżnić kolejkę na zewnątrz. – Contango
Nie zaimplementowałeś własnego 'IObserver', prawda? –
Enigmativity
Nie, wszystko, co zrobiłem, to wrzucenie wewnętrznego 'Subject' do 'IObserver ', aby można było ujawnić metodę '.OnNext'. –
Contango