2011-11-09 18 views
5

Próbuję wykonać , który zwraca true, jeśli komunikat UDP został odebrany w ciągu ostatnich 5 sekund i jeśli wystąpi przekroczenie limitu czasu, zwracana jest wartość false.Limit czasu reakcji, który nie zatrzymuje sekwencji?

Do tej pory mam to:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    var udp = BaseComms.UDPBaseStringListener(localEP) 
     .Where(msg => msg.Data.Contains("running")) 
     .Select(s => true); 

    return Observable 
     .Timeout(udp, TimeSpan.FromSeconds(5)) 
     .Catch(Observable.Return(false)); 
} 

na problemy z tym to: -

  • Po fałszywym jest zwracany, sekwencja zatrzymuje
  • ja tylko naprawdę trzeba true lub false na zmiany stanu.

Mogę użyć Subject<T>, ale muszę zachować ostrożność, aby pozbyć się UDPBaseStringListener obserwowalnego, gdy nie ma już abonentów.

Aktualizacja

Za każdym razem pojawia się komunikat UDP chciałbym go zwrócić true. Jeśli nie otrzymałem wiadomości UDP w ciągu ostatnich 5 sekund, chciałbym, aby zwróciła ona false.

+0

FYI, 'Timeout' ma przeciążenia, które ma zastępcę do zaobserwowania, gdy czas timeout występuje raczej niż "rzucanie" i potrzeba "Catch". –

+0

Czytelnicy mogą również być zainteresowani [1] (http://stackoverflow.com/q/23394441/1267663), [2] (http://stackoverflow.com/q/12786901/1267663) oraz [3] (http://stackoverflow.com/q/35873244/1267663). – Whymarrh

Odpowiedz

3

Jak wskazano przez Bj0, rozwiązanie z BufferWithTime nie zwróci punktu danych, gdy tylko zostanie odebrany, a limit czasu bufora nie zostanie zresetowany po otrzymaniu punktu danych.

Z Rx Extensions 2.0, twój może rozwiązać oba problemy ze nowy bufor przeciążenie przyjmowanie zarówno timeout i rozmiar:

static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    return BaseComms 
     .UDPBaseStringListener(localEP) 
     .Where(msg => msg.Data.Contains("running")) 
     .Buffer(TimeSpan.FromSeconds(5), 1) 
     .Select(s => s.Count > 0) 
     .DistinctUntilChanged(); 
} 
1

Sugerowałbym unikanie użycia Timeout - powoduje to wyjątki i kodowanie z wyjątkami jest złe.

Wydaje się, że ma sens tylko to, że obserwowalne zatrzymanie następuje po jednej wartości. Być może będziesz musiał wyjaśnić więcej, na czym polega twoje zachowanie.

Moje obecne rozwiązanie problemu jest:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    return Observable.Create<bool>(o => 
    { 
     var subject = new AsyncSubject<bool>(); 
     return new CompositeDisposable(
      Observable.Amb(
       BaseComms 
        .UDPBaseStringListener(localEP) 
        .Where(msg => msg.Data.Contains("running")) 
        .Select(s => true), 
       Observable 
        .Timer(TimeSpan.FromMilliseconds(10.0)) 
        .Select(_ => false) 
      ).Take(1).Subscribe(subject), subject.Subscribe(o)); 
    }); 
} 

Czy to pomoże?

+0

To nie kompiluje. Observable.Create spodziewa się zwrotu akcji ... – Tim

+0

'Limit czasu 'powoduje domyślnie wyjątki. Ma przeciążenie, które pobiera alternatywę, którą można obserwować, jeśli zostanie przekroczony limit czasu, który domyślne przeciążenie wywołuje z 'Observable.Throw' jako alternatywnym. –

+0

@Jim - Musisz używać innej wersji Rx - skompilował się dobrze z wersją 1.1.10621. – Enigmativity

1

Jeśli nie chcesz, aby zatrzymać sekwencję, tylko owinąć go odroczyć + Repeat:

Observable.Defer(() => GettingUDPMessages(endpoint) 
    .Repeat(); 
2

Problem z buforem jest to, że „timeout” przerwa nie zostanie zresetowany, gdy pojawi się nowa wartość, okna bufora to po prostu skrawki czasu (w tym przypadku 5s), które następują po sobie. Oznacza to, że w zależności od tego, kiedy otrzymasz ostatnią wartość, możesz poczekać niemal dwukrotnie więcej niż wartość limitu czasu. To również może przegapić limity czasu:

   should timeout here 
         v 
0s   5s   10s  15s 
|x - x - x | x - - - - | - - - x -| ... 
      true  true  true 

IObservable.Throttle jednak resetuje się za każdym razem, gdy nowa wartość przychodzi i tylko generuje wartość po upływie Okres (ostatnia wartość przychodzące). Może być używany jako limitu czasu i połączone z IObservable wstawić „time-out” wartości w strumieniu:

var obs = BaseComms.UDPBaseStringListener(localEP) 
      .Where(msg => msg.Data.Contains("running")); 

return obs.Merge(obs.Throttle(TimeSpan.FromSeconds(5)) 
         .Select(x => false)) 
      .DistinctUntilChanged(); 

Działający przykład LINQPad:

var sub = new Subject<int>(); 

var script = sub.Timestamp() 
    .Merge(sub.Throttle(TimeSpan.FromSeconds(2)).Select(i => -1).Timestamp()) 
    .Subscribe(x => 
{ 
    x.Dump("val"); 
}); 


Thread.Sleep(1000); 

sub.OnNext(1); 
sub.OnNext(2); 

Thread.Sleep(10000); 

sub.OnNext(5); 

-1 dodaje się do strumienia po limit czasu 2s.