2013-07-04 14 views
7

Mam async metodę, która jest długo działa metoda, która czyta strumień i gdy znajdzie coś uruchamia zdarzenie:Konwersja metody asynchronicznej wrócić IObservable <>

public static async void GetStream(int id, CancellationToken token) 

Zajmuje token anulowania ponieważ to jest tworzenie w nowym zadaniu. Wewnętrznie nazywa to await gdy czyta strumień:

var result = await sr.ReadLineAsync() 

Teraz chcę przekonwertować to do metody, która zwraca IObservable <> tak, że mogę używać tego z reaktywnymi rozszerzeń. Z tego co czytałem, najlepszym sposobem na to jest za pomocą Observable.Create i od RX 2.0 obsługuje teraz także asynchroniczny mogę to wszystko do pracy z mniej więcej tak:

public static IObservable<Message> ObservableStream(int id, CancellationToken token) 
{ 
    return Observable.Create<Message>(
     async (IObserver<Message> observer) => 
      { 

reszty kodu wewnątrz jest taki sam, ale zamiast wystrzeliwania zdarzeń dzwonię pod numer observer.OnNext(). Ale to wydaje się złe. Z jednej strony miksuję tam CancellationTokeny, i chociaż dodanie słowa asynchronicznego sprawiło, że działa, czy to naprawdę najlepsza rzecz do zrobienia? Dzwonię do mojego ObservableStream tak:

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m)); 
+1

Należy prawie nigdy nie używać 'asynchroniczny void', na pewno nie w metodach bibliotecznych. – svick

Odpowiedz

1

należy zmienić GetStream zwrócić zadanie, zamiast pustki (powrót asynchronicznej pustkę nie jest dobre, z wyjątkiem gdy jest to absolutnie konieczne, ponieważ svick skomentował). Gdy zwrócisz zadanie, możesz po prostu zadzwonić .ToObservable() i gotowe.

Na przykład:

public static async Task<int> GetStream(int id, CancellationToken token) { ... } 

Następnie

GetStream(1, new CancellationToken(false)) 
    .ToObservable() 
    .Subscribe(Console.Write); 
+0

Myślę, że 'GetStream()' uruchamia zdarzenie kilka razy, więc 'Task ' nie wystarczy. – svick

+0

Tak, podpis był przykładem, ale w edycji sugeruję pozbycie się GetStream bezpośrednio za pomocą ToObservable(). Aktualizuję moją odpowiedź z kilkoma wyjaśnieniami. –

+0

Ach dobrze, dzięki. Tak, GetStream działa wiecznie i uruchamia zdarzenie po znalezieniu wiadomości. Więc jeśli użyłem 'Task ' zamiast, czy to ma sens? Czy porzuciłbym strzelaninę tam na coś innego? –

11

Jesteś poprawne. Po przedstawieniu interfejsu za pomocą interfejsu IObservable należy unikać wymagania od dzwoniących do dostarczenia numeru CancellationToken. To nie znaczy, że nie możesz ich używać wewnętrznie. Rx udostępnia kilka mechanizmów do tworzenia instancji CancellationToken, które są anulowane, gdy obserwator zrezygnuje z obserwowania.

Istnieje wiele sposobów na rozwiązanie problemu. Najprostszy wymaga prawie żadnych zmian w kodzie. Wykorzystuje przeciążenie Observable.Create który dostarcza Ci CancellationToken skutkującym jeśli unsubscribes dzwoniących:

public static IObservable<Message> ObservableStream(int id) 
{ 
    return Observable.Create<Message>(async (observer, token) => 
    { 
     // no exception handling required. If this method throws, 
     // Rx will catch it and call observer.OnError() for us. 
     using (var stream = /*...open your stream...*/) 
     { 
      string msg; 
      while ((msg = await stream.ReadLineAsync()) != null) 
      { 
       if (token.IsCancellationRequested) { return; } 
       observer.OnNext(msg); 
      } 
      observer.OnCompleted(); 
     } 
    }); 
} 
+0

Nie należy "if (token.IsCancellationRequested) {return; } 'be' if (token.IsCancellationRequested) {break; } 'tak, aby wywołano' OnCompleted'? – TiMoch

+0

@TiMoch no obserwator został już anulowany w tym przypadku nie ma potrzeby wysyłania im żadnych dodatkowych powiadomień – Brandon

Powiązane problemy