2014-07-10 13 views
10

Szukam elegancki sposób, aby utworzyć Observable od zwykłego delegata wywołania zwrotnego z Rx, coś podobnego do Observable.FromEventPattern?Obserwowalne dla wywołania zwrotnego w Rx

Powiedzmy, że zawijam Win32 EnumWindows API, który oddzwoni pod numer EnumWindowsProc.

Wiem, że mogę utworzyć tymczasowy adapter zdarzeń C# dla tego wywołania zwrotnego i przekazać go FromEventPattern. Poza tym prawdopodobnie mógłbym ręcznie zaimplementować IObservable, więc zadzwoniłbym pod numer IObserver.OnNext z mojego wywołania zwrotnego EnumWindowsProc.

Czy istnieje istniejący wzorzec do zawijania wywołania zwrotnego w Rx, którego mi brakuje?

+1

Albo możesz po prostu użyć 'Subject' i wywołać' OnNext' na tym wywołanie zwrotnym. – Dirk

+0

@Dirk, ciekawe, dzięki. A więc "Subject.OnNext", a następnie "Subject.OnComplete", gdy nie ma już żadnych przedmiotów? – avo

+1

Tak, Subject implementuje zarówno "IObservable', jak i' IObserver'. Wywołaj OnNext/OnError/OnCompleted, aby wydać te polecenia subskrybentom Tematu.Są serwerami jako rodzaj bramy od kodu innego niż Rx do Rx. – Dirk

Odpowiedz

8

Możesz użyć Subject<T>, który może być użyty do przejścia z imperatywnego świata programowania do funkcjonalnego świata Rx.

Subject<T> realizuje zarówno IObservable<T> i IObserver<T>, więc można zadzwonić do jego OnNext, OnError i OnCompleted metod i abonenci zostaną powiadomieni.

Jeśli chcesz wystawiać Subject<T> jako własność to należy to zrobić za pomocą .AsObservable() jak to ukrywa fakt, że IObservable<T> jest w rzeczywistości Subject<T>. Sprawia, że ​​rzeczy takie jak ((Subject<string>) obj.Event).OnNext("Foo") są niemożliwe.

+0

To prawdopodobnie jest droga. Chciałbym zobaczyć, co inni mogą powiedzieć, zanim je zaakceptują. – avo

+1

@avo Jest to z pewnością łatwe rozwiązanie, ale niektórzy ludzie nie lubią używać Tematu, ponieważ jest to jedna z niewielu niefunkcjonalnych rzeczy w Rx. Myślę, że ma kilka dobrych zastosowań, ale jeśli jest inny sposób - na przykład FromEventPattern - użyłbym tego zamiast tego. – Dirk

3

Należy pamiętać, że wywołania zwrotne, takie jak te używane w EnumWindows są subtelnie różne od Rx. W szczególności wywołanie zwrotne może komunikować się z powrotem z osobą dzwoniącą poprzez jego wartość zwracaną. Obserwatorzy Rx nie mogą tego zrobić. Również wywołania zwrotne mogą odbierać wiele parametrów, ale obserwatorzy Rx otrzymują pojedynczą wartość. Musisz więc opakować wiele parametrów w jeden obiekt.

Mając to na uwadze, alternatywą dla używania Subject jest użycie Observable.Create. W ten sposób zarejestrujesz wywołanie zwrotne tylko wtedy, gdy faktycznie jest obserwator i wyrejestrujesz go, jeśli ten obserwator zrezygnuje z subskrypcji.

Dla synchronicznego interfejsu API, którego użyłeś w przykładzie, możesz zrobić coś takiego. Uwaga: w tym przykładzie nie ma sposobu na wyrejestrowanie pośredniego strumienia oddzwaniania, ponieważ wszystko dzieje się synchronicznie, zanim będziemy mogli kiedykolwiek zrezygnować z subskrypcji.

public static IObservable<Foo> WrapFooApi(string arg1, string arg2) 
{ 
    return Observable.Create<Foo>(observer => 
    { 
     FooApi.enumerate(arg1, arg2, e => 
     { 
      observer.OnNext(new Foo(e)); 
      return true; 
     }); 

     // In your case, FooApi.enumerate is actually synchronous 
     // so when we get to this line of code, we know 
     // the stream is complete. 
     observer.OnCompleted(); 
     return Disposable.Empty; 
    }); 
} 

// Usage 
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item 

Możemy rozwiązać problem z byciem w stanie zatrzymać wcześnie wprowadzając trochę asynchroniczność, które dadzą czas obserwatora, aby uzyskać jednorazowe, że można go wyrzucać do powiadamiania. Możemy użyć CreateAsync, aby uzyskać CancellationToken, który zostanie anulowany, gdy obserwator zrezygnuje z subskrypcji. I możemy uruchomić kod FooApi wewnątrz Task.Run:

public static IObservable<Foo> WrapFooApi(string arg1, string arg2) 
{ 
    return Observable.CreateAsync<Foo>(async (observer, ct) => 
    { 
     await Task.Run(() => FooApi.register_callback(arg1, arg2, e => 
     { 
      observer.OnNext(e); 

      // Returning false will stop the enumeration 
      return !ct.IsCancellationRequested; 
     })); 
     observer.OnCompleted(); 
    }); 
} 

W bardziej tradycyjnych asynchronicznych wywołań zwrotnych API, gdzie można zarejestrować się w pewnym momencie i wyrejestrowania w jakimś innym miejscu, może mieć coś więcej tak:

public static IObservable<Foo> WrapFooApi(string args) 
{ 
    return Observable.Create<Foo>(observer => 
    { 
     FooToken token = default(FooToken); 
     var unsubscribe = Disposable.Create(() => FooApi.Unregister(token)); 
     token = FooApi.Register(args, e => 
     { 
      observer.OnNext(new Foo(e)); 
     }); 

     return unsubscribe; 
    }); 
} 
+1

Nie sądzę, aby w tym konkretnym przypadku można było użyć dowolnego asynchronicznego interfejsu API, aby to zrobić - wywołanie zwrotne EnumWindows jest wywoływane synchronicznie od wywołującego (to jest wyobrazić sobie, że interfejs API EnumWindows po prostu używa pętli for i wywołuje wywołanie zwrotne dla każdego elementu) –

Powiązane problemy