2012-04-12 13 views
8

Jako część naszej aplikacji (w produkcji przez około 4 miesięcy) mamy strumienia danych pochodzących z zewnętrznego urządzenia, które przekształcają się do IObservablepreferowaną metodą generowania IObservable <String> ze strumienia

Aż teraz używamy następujących po to, aby ją wygenerować i działa ona całkiem dobrze.

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    var streamReader = new StreamReader(inputStream); 
    return Observable 
      .Create<string>(observer => Scheduler.ThreadPool 
      .Schedule(() => ReadLoop(streamReader, observer))); 
} 

private void ReadLoop(StreamReader reader, IObserver<string> observer) 
{ 
    while (true) 
    { 
     try 
     { 
      var line = reader.ReadLine(); 
      if (line != null) 
      { 
       observer.OnNext(line); 
      } 
      else 
      { 
       observer.OnCompleted(); 
       break; 
      } 
     } 
     catch (Exception ex) 
     { 
      observer.OnError(ex); 
      break; 
     } 
    } 
} 

Ostatniej nocy zastanawiałem się, czy istnieje sposób, aby użyć składni yield return aby osiągnąć ten sam wynik i wyszedł z tego:

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    var streamReader = new StreamReader(inputStream); 
    return ReadLoop(streamReader) 
      .ToObservable(Scheduler.ThreadPool); 
} 

private IEnumerable<string> ReadLoop(StreamReader reader) 
{ 
    while (true) 
    { 
     var line = reader.ReadLine(); 
     if (line != null) 
     { 
      yield return line; 
     } 
     else 
     { 
      yield break; 
     } 
    } 
} 

Wydaje się całkiem dobrze i jest znacznie czystsze, ale zastanawiałem się, czy są jakieś plusy i minusy w jedną stronę, czy w inny sposób.

+2

Pro: 'return' wydajność obsługuje leniwy/późne ładowanie kolekcji. –

+2

Con: kiedy jest wyjątek nie nazywają onException, to po prostu się pęcherzyki –

+0

Myślę, że to zależy, jeśli nie przeszkadza palenie nić zrobić swoją pętlę odczytu, który jedzie w dół, ile urządzeń trzeba wspierać. Napisałem AsyncTextReader, który sam był Observable , aby zrobić coś podobnego, ale na dużą skalę. Z pewnością w tych dniach można było czekać na coś ... – piers7

Odpowiedz

11

Myślę, że masz tam dobry pomysł (zamień Stream na Enumerable, a następnie IObservable). Jednak kod Enumberable może być znacznie czystsze:

IEnumerable<string> ReadLines(Stream stream) 
{ 
    using (StreamReader reader = new StreamReader(stream)) 
    { 
     while (!reader.EndOfStream) 
      yield return reader.ReadLine(); 
    } 
} 

A potem obserwowalnym:

IObservable<string> ObserveLines(Stream inputStream) 
{ 
    return ReadLines(inputStream).ToObservable(Scheduler.ThreadPool); 
} 

ten jest krótszy, bardziej czytelny, a właściwie zbywa strumieni. Jest też leniwy.

Rozszerzenie ToObservable zajmuje się przechwytywaniem zdarzeń OnNext (nowe linie), jak również zdarzenia OnCompleted (koniec przeliczania) i OnError.

+0

Ładne, bardzo czyste. Muszę spróbować jutro. Moją jedyną obawą jest to, że mogę otrzymać końcową wartość zerową jako ostatni element w Obserwowalnym, ale to jest łatwe do odfiltrowania za pomocą .Where (line => line! = Null) – baralong

2

Nie mam kodu do przekazania, ale oto jak to zrobić: async pre-asynchroniczny CTP.

[Uwaga dla odtłuszczonego czytelników: nie trzeba się przejmować, jeśli nie trzeba przeskalować znacznie]

Załóż realizację AsyncTextReader, że sama jest widoczne. Ctor pobiera strumień i wykonuje BeginRead (256bytes) w strumieniu, przekazując siebie jako kontynuację, a następnie powraca.

Po wprowadzeniu kontynuacji wywołaj funkcję EndRead i dodaj zwrócone bajty do małego bufora klasy. Powtarzaj tę czynność, dopóki bufor nie zawiera jednej lub więcej sekwencji końca linii (jak w TextWriter). Kiedy tak się stanie, wyślij te bity bufora jako ciąg za pośrednictwem interfejsu Obserwuj i powtórz.

Po zakończeniu, zasygnalizuj opcję OnComplete itd ... (i wyślij strumień). Jeśli otrzymasz wyjątek od EndReadByte w kontynuacji, złap go i przekaż interfejs OnError.

wywołanie kodu wówczas wygląda następująco:

IObservable = new AsyncTextReader (stream);

To dobrze się skaluje. Po prostu upewnij się, że nie robisz nic zbyt głupiego z obsługą bufora.

pseudokod:

public ctor(Stream stream){ 
    this._stream = stream; 
    BeginRead(); 
    return; 
} 

private void BeginRead(){ 
    // kick of async read and return (synchronously) 
    this._stream.BeginRead(_buffer,0,256,EndRead,this); 
} 

private void EndRead(IAsyncResult result){ 
    try{ 
     // bytesRead will be *up to* 256 
     var bytesRead = this._stream.EndRead(result); 
     if(bytesRead < 1){ 
      OnCompleted(); 
      return; 
     } 
     // do work with _buffer, _listOfBuffers 
     // to get lines out etc... 
     OnNext(aLineIFound); // times n 
     BeginRead(); // go round again 
    }catch(Exception err){ 
     OnException(err); 
    } 
} 

Ok, to jest APM i coś tylko matka może kochać. Z uwagą czekam na alternatywę.

PS czy czytelnik powinien zostać zamknięty strumień jest interesujące pytanie. Mówię "nie", ponieważ go nie stworzyłem.

0

Z async/oczekują wsparcia, po to najprawdopodobniej najlepiej:

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    return Observable.Using(() => new StreamReader(inputStream), 
     sr => Observable.Create<string>(async (obs, ct) => 
     { 
      while (true) 
      { 
       ct.ThrowIfCancellationRequested(); 
       var line = await sr.ReadLineAsync().ConfigureAwait(false); 
       if (line == null) 
        break; 
       obs.OnNext(line); 
      } 
      obs.OnCompleted(); 
    })); 
} 
Powiązane problemy