2009-02-07 11 views
6

Uwaga: Pozwól mi przeprosić za długość tego pytania, musiałem włożyć do niego wiele informacji. Mam nadzieję, że to nie spowoduje, że zbyt wielu ludzi po prostu przejrzy i przyjmie założenia. Proszę przeczytać w całości. Dzięki.Jaka jest dobra metoda obsługi sieciowych strumieni We/Wy?

Mam strumień danych przychodzących przez gniazdo. Te dane są zorientowane liniowo.

Używam APM (Async Programming Method) .NET (BeginRead, etc ..). Wyklucza to używanie operacji we/wy opartych na strumieniu, ponieważ Asynchroniczne operacje we/wy są oparte na buforze. Możliwe jest przepakowanie danych i wysłanie ich do strumienia, takiego jak strumień pamięci, ale są tam również problemy.

Problem polega na tym, że mój strumień wejściowy (który nie mam nad nim kontroli) nie daje mi żadnych informacji o tym, jak długi jest strumień. To po prostu jest strumieniem linii nowej linii wygląda tak:

COMMAND\n 
...Unpredictable number of lines of data...\n 
END COMMAND\n 
....repeat.... 

Tak, używając APM, a ponieważ nie wiem, jak długo dana zestaw danych będzie, to jest prawdopodobne, że bloki danych przetnie bufor granice wymagające wielu odczytów, ale te wielokrotne odczyty będą obejmowały również wiele bloków danych.

przykład:

Byte buffer[1024] = ".................blah\nThis is another l" 
[another read] 
        "ine\n.............................More Lines..." 

My pierwsza myśl było użyć StringBuilder i po prostu dodać do linii bufor do SB. Działa to w pewnym stopniu, ale było mi trudno wyodrębnić bloki danych. Próbowałem użyć StringReader do odczytu nowych danych, ale nie było sposobu, aby dowiedzieć się, czy otrzymujesz kompletną linię, czy nie, ponieważ StringReader zwraca częściową linię na końcu ostatniego dodanego bloku, a następnie powraca null na końcu. Nie ma sposobu, aby dowiedzieć się, czy zwrócono całkowicie nową linię danych.

Przykład:

// Note: no newline at the end 
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp.."); 
StringReader sr = new StringReader(sb); 
string s = sr.ReadLine(); // returns "This is a line" 
s = sr.ReadLine();  // returns "This is incomp.." 

Co gorsza, jest to, że jeśli po prostu zachować dodanie do danych, bufory stają się coraz większe i większe, a ponieważ może to trwać kilka tygodni lub miesięcy, w czasie, który nie jest dobry rozwiązanie.

Moją kolejną myślą było usunięcie bloków danych z SB, kiedy je czytałem. Wymagało to napisania mojej własnej funkcji ReadLine, ale utknąłem blokując dane podczas odczytu i zapisu. Ponadto, większe bloki danych (które mogą składać się z setek odczytów i megabajtów danych) wymagają skanowania całego bufora w poszukiwaniu nowych linii. To nie jest wydajne i dość brzydkie.

Szukam czegoś, co ma prostotę StreamReader/Writer z wygodą asynchronicznego I/O.

Moja następna myśl polegała na użyciu obiektu MemoryStream i zapisaniu bloków danych do strumienia pamięci, a następnie dołączeniu StreamReadera do strumienia i skorzystaniu z ReadLine, ale znowu mam problemy ze stwierdzeniem, czy ostatni odczyt w buforze jest kompletną linię lub nie, a jeszcze trudniej jest usunąć "nieaktualne" dane ze strumienia.

Myślałem również o używaniu wątku z synchronicznymi odczytami. Ma to tę zaletę, że przy użyciu StreamReadera, zawsze zwróci pełną linię z ReadLine(), z wyjątkiem zerwanych sytuacji połączenia. Ma to jednak problemy z anulowaniem połączenia, a niektóre rodzaje problemów sieciowych mogą powodować zawieszanie blokujących gniazd przez dłuższy czas. Używam asynchronicznej operacji wejścia, ponieważ nie chcę powiązywać wątku przez cały czas blokowania programu podczas odbierania danych.

Połączenie jest długotrwałe.Dane będą nadal płynąć z czasem. Podczas początkowego połączenia istnieje duży przepływ danych, a po wykonaniu tego przepływu gniazdo pozostaje otwarte, czekając na aktualizacje w czasie rzeczywistym. Nie wiem dokładnie, kiedy początkowy przepływ został "zakończony", ponieważ jedynym sposobem, aby się dowiedzieć, jest to, że nie są już wysyłane żadne dane. Oznacza to, że nie mogę się doczekać, aż wstępne ładowanie danych zostanie zakończone przed przetworzeniem, prawie utknąłem przetwarzanie "w czasie rzeczywistym", jak to jest.

Czy ktoś może zaproponować dobrą metodę radzenia sobie z tą sytuacją? w sposób, który nie jest zbyt skomplikowany? Naprawdę chcę, żeby to było tak proste i eleganckie, jak to tylko możliwe, ale wciąż wymyślam coraz bardziej skomplikowane rozwiązania ze względu na wszystkie przypadki skrajne. Domyślam się, że chcę czegoś w rodzaju FIFO, w którym mogę łatwo dołączyć więcej danych, jednocześnie wyskakując z niego dane, które pasują do pewnych kryteriów (np. Ciągi zakończone znakiem nowej linii).

+0

Pomyślałem, że to także interesujący problem, dlatego napisałem post o rozwiązaniu go z CCR, który można znaleźć na stronie http: //iodyner.spaces.live.com, jeśli jesteś zainteresowany ... –

Odpowiedz

5

To dość interesujące pytanie. Rozwiązaniem dla mnie w przeszłości było użycie osobnego wątku z synchronicznymi operacjami, jak to proponujesz. (Udało mi się obejść większość problemów z blokowaniem gniazd przy użyciu blokad i wielu wyjątkowych procedur obsługi). Mimo to zalecane jest używanie wbudowanych operacji asynchronicznych, ponieważ pozwala to na prawdziwe asynchroniczne operacje wejścia/wyjścia na poziomie systemu operacyjnego, więc rozumiem twój punkt.

Cóż, poszedłem i napisałem klasę dla osiągnięcia tego, o czym myślę, że potrzebujesz (w stosunkowo czysty sposób, powiedziałbym). Powiedz mi co myślisz.

using System; 
using System.Collections.Generic; 
using System.IO; 
using System.Text; 

public class AsyncStreamProcessor : IDisposable 
{ 
    protected StringBuilder _buffer; // Buffer for unprocessed data. 

    private bool _isDisposed = false; // True if object has been disposed 

    public AsyncStreamProcessor() 
    { 
     _buffer = null; 
    } 

    public IEnumerable<string> Process(byte[] newData) 
    { 
     // Note: replace the following encoding method with whatever you are reading. 
     // The trick here is to add an extra line break to the new data so that the algorithm recognises 
     // a single line break at the end of the new data. 
     using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine)) 
     { 
      // Read all lines from new data, returning all but the last. 
      // The last line is guaranteed to be incomplete (or possibly complete except for the line break, 
      // which will be processed with the next packet of data). 
      string line, prevLine = null; 
      while ((line = newDataReader.ReadLine()) != null) 
      { 
       if (prevLine != null) 
       { 
        yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine; 
        _buffer = null; 
       } 
       prevLine = line; 
      } 

      // Store last incomplete line in buffer. 
      if (_buffer == null) 
       // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
       // so that the buffer does not have to be expanded in most/all situations. 
       // Change it to whatever seems appropiate. 
       _buffer = new StringBuilder(prevLine, prevLine.Length * 2); 
      else 
       _buffer.Append(prevLine); 
     } 
    } 

    public void Dispose() 
    { 
     Dispose(true); 
     GC.SuppressFinalize(this); 
    } 

    private void Dispose(bool disposing) 
    { 
     if (!_isDisposed) 
     { 
      if (disposing) 
      { 
       // Dispose managed resources. 
       _buffer = null; 
       GC.Collect(); 
      } 

      // Dispose native resources. 

      // Remember that object has been disposed. 
      _isDisposed = true; 
     } 
    } 
} 

Instancja tej klasy powinny być tworzone dla każdego NetworkStream a funkcja procesu powinna nazywać, gdy pojawią się nowe dane (w metodzie zwrotnej dla BeginRead, przed wywołaniem kolejnej BeginRead Mogę sobie wyobrazić).

Uwaga: zweryfikowałem ten kod tylko z danymi testowymi, a nie rzeczywistymi danymi przesyłanymi przez sieć. Jednak nie spodziewałbym się żadnych różnic ...

Ostrzegam również, że klasa nie jest oczywiście bezpieczna dla wątków, ale tak długo, jak BeginRead nie zostanie ponownie uruchomiony, dopóki aktualne dane nie zostaną przetworzone (jak przypuszczam, że robisz), nie powinno być żadnych problemów.

Mam nadzieję, że to zadziała. Daj mi znać, jeśli pozostały problemy i spróbuję zmodyfikować rozwiązanie, aby sobie z nimi poradzić. (Może być trochę subtelności pytania, które przeoczyłem, pomimo uważnego przeczytania!)

+0

To interesujące rozwiązanie. Też uważałem Iteratory za użyteczne, ale to nie było rozwiązanie, które wymyśliłby mój umysł. Lubię to. –

+1

Czy możesz wyjaśnić, dlaczego musisz wdrożyć IDispose? Powiedziano mi, że wywoływanie GC.Collect() jest złą praktyką i może skutkować słabą wydajnością. Czy martwi cię szybkie przydzielanie w krótkim czasie wyczerpywania sterty? –

+0

Tak, iteratory są przydatne. W tym przypadku równie dobrze można to zrobić z ogólną listą, choć oczywiście nie wygląda to tak pięknie. Jeśli chcesz poradzić sobie z wynikiem w postaci listy/tablicy, to i tak konwersja do tych typów jest trywialna, a implementacja jest jeszcze prostsza. – Noldorin

0

To, co wyjaśniasz w swoim pytaniu, przypomina mi bardzo struny ASCIZ. (link text). To może być pomocny początek.

Musiałem napisać coś podobnego do tego w college'u dla projektu, nad którym pracowałem. Niestety, miałem kontrolę nad gniazdem nadawczym, więc wstawiłem długość pola komunikatu jako część protokołu. Myślę jednak, że podobne podejście może przynieść ci korzyści.

Jak zbliżyłem się do mojego rozwiązania, wysłałem coś takiego jak 5HELLO, więc najpierw zobaczyłem 5 i wiem, że miałem wiadomość o długości 5, i dlatego potrzebowałem 5 znaków. Jednakże, jeśli na moim asynchronicznym czytaniu, mam tylko 5HE, zobaczyłbym, że mam długość wiadomości 5, ale byłem w stanie odczytać tylko 3 bajty z przewodu (Przyjmijmy znaki ASCII). Z tego powodu wiedziałem, że brakuje mi niektórych bajtów i zapisałem to, co miałem w buforze fragmentów. Miałem jeden bufor fragmentów na gniazdo, aby uniknąć problemów z synchronizacją. Surowy proces jest.

  1. Odczyt z gniazdem do tablicy bajtów, ile bajtów rekord został odczytany
  2. Skanowanie bajt po bajcie, aż znajdziesz znak nowej linii (ten staje się bardzo skomplikowane, jeśli nie jesteś odbierania znaków ASCII, ale znaki, które mogą być wielokrotnymi bajtami, jesteś na swój własny sposób)
  3. Zamień swój bufor na fragmenty w łańcuch i dodaj bufor do odczytu aż do nowego wiersza. Upuść ten ciąg jako zakończoną wiadomość do kolejki lub własnego delegata do przetworzenia. (możesz zoptymalizować te bufory przez fakt, że czytasz gniazdo do tej samej tablicy bajtów, co fragment, ale jest to trudniejsze do wyjaśnienia)
  4. Kontynuuj pętlę, za każdym razem, gdy znajdziemy nową linię, utwórz ciąg z bajtu ustaw z zapisanej pozycji początkowej/końcowej i upuść w kolejce/delegacie do przetworzenia.
  5. Po dotarciu do końca naszego bufora odczytu, skopiuj wszystko, co pozostało do bufora frag.
  6. Zadzwoń do BeginRead na gnieździe, które przeskoczy do kroku 1., gdy dane będą dostępne w gnieździe.

Następnie użyć innego wątku czytać jesteś kolejkę incommign wiadomości, lub po prostu niech puli wątków obsługiwać go za pomocą delegatów. I wykonaj dowolne przetwarzanie danych, które musisz wykonać. Ktoś mnie poprawi, jeśli się mylę, ale jest z tym bardzo mało problemów z synchronizacją wątków, ponieważ możesz tylko czytać lub oczekiwać na odczyt z gniazda w tym samym czasie, więc nie martw się o blokady (z wyjątkiem sytuacji, gdy jesteś zapełnianie kolejki, użyłem delegatów w mojej implementacji). Jest kilka szczegółów, które będziesz potrzebować, aby się upewnić, że jesteś sam, np. Jak duża część bufora fragu do opuszczenia, jeśli otrzymasz 0 nowych linii podczas czytania, cała wiadomość musi być dołączona do bufora fragmentów bez nadpisywania byle co. Wydaje mi się, że w końcu obsłużyło mi to około 700 - 800 linii kodu, ale zawierało to elementy konfiguracji połączenia, negocjacje szyfrowania i kilka innych rzeczy.

Ta konfiguracja sprawdziła się bardzo dobrze; Byłem w stanie wykonać do 80 Mb/s na 100 Mbps sieci Ethernet przy użyciu tej implementacji 1,8 GHz opteron, w tym przetwarzania szyfrowania. A ponieważ jesteś przywiązany do gniazda, serwer skaluje się, ponieważ wiele gniazd może pracować w tym samym czasie. Jeśli potrzebujesz produktów przetworzonych w kolejności, musisz użyć kolejki, ale jeśli zamówienie nie ma znaczenia, to delegaci dadzą ci bardzo skalowalną wydajność z poziomu wątku.

Mam nadzieję, że to pomogło, nie było to kompletne rozwiązanie, ale kierunek, w którym należy zacząć szukać.

* Zauważyłem, że moja implementacja została obniżona wyłącznie na poziomie bajtów i obsługiwane szyfrowanie. Użyłem znaków z mojego przykładu, aby ułatwić wizualizację.

+0

Tak, wprowadziłem już podejście podobne do tego, ale nie podoba mi się to. Jest zbyt brudny i skomplikowany dla moich gustów, dlatego proszę o sugestie tutaj. Lubię podejście Noldorina, ma on elongację i ponowne wykorzystanie istniejącego kodu szkieletowego, którego pragnę. –

Powiązane problemy