2012-05-01 10 views
7

Próbuję użyć Rx do odczytu ze strumienia odbierającego TCPClient i przeanalizowania danych do IObservable z łańcucha, rozdzielonego przez znak nowej linii "\ r \ n" Poniżej znajduje się sposób odbierania z gniazda stream ...Obserwowalna analiza IO sieci

var messages = new Subject<string>(); 

var functionReceiveSocketData = 
      Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int> 
      (client.Client.BeginReceive, client.Client.EndReceive); 

Func<byte[], int, byte[]> copy = (bs, n) => 
     { 
      var rs = new byte[buffer.Length]; 
      bs.CopyTo(rs, 0); 
      return rs; 
     }; 

Observable 
    .Defer(() => 
      { 
       var buffer = new byte[50]; 
       return 
        from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None) 
       select copy(buffer, n); 
      }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x))); 

Oto, co wymyśliłem, aby przeanalizować ciąg znaków. To obecnie nie działa ...

obsStrings = messages.Buffer<string,string>(() => 
       messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n")) 
      ); 

Przedmiotem wiadomość otrzymuje wiadomość w kawałkach więc staram się je Concat i sprawdzić, czy łączone ciąg zawiera znak nowej linii, sygnalizując w ten sposób bufor zamknąć i wyjście buforowane Kawałki. Nie wiem, dlaczego to nie działa. Wydaje się, że otrzymałem tylko pierwszy kawałek z obsStrings.

Poszukuję dwóch rzeczy. Chciałbym uprościć odczytywanie strumienia io i wyeliminować użycie tematu wiadomości. Po drugie, chciałbym, aby działał proces parsowania. Hackowałem to trochę i nie mogę wymyślić działającego rozwiązania. Jestem początkującym z Rx.

EDIT: Oto gotowy produkt po problem został rozwiązany ....

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None) 
      .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray()) 
      .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b) 
      .Where(x => x.EndsWith("\r\n")) 
      .Select(buffered => String.Join("", buffered)) 
      .Select(a => a.Replace("\n", "")); 

"ReceiveUntilCompleted" jest rozszerzeniem z projektu RXX.

Odpowiedz

3
messages 
    .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b) 
    .Where(x => x.EndsWith("\r\n")) 
+0

Znalazłem, że nie potrzebuję .Buffer (1); na końcu. – TK3

+0

Usunięto z odpowiedzi. – ronag

1

Zamiast Subscribe i korzystania z Subject, można spróbować tylko Select:

.Repeat().Select(x => System.Text.Encoding.UTF8.GetString(x));

Teraz zakładając To wszystko włożono nowy obserwowalne nazywa messages, następnym problemem jest to, że w tym wierszu

var obsStrings = messages.Buffer<string,string>(() => 
       messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n")) 
      ); 

Używasz obu Buffer i Scan i próbujesz zrobić to samo w obu! Zauważ, że Buffer potrzebuje selektora zamykania.

Co naprawdę chcesz to:

var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("\r\n"))) 
         .Select(buffered => String.Join(buffered)); 

Które buforowane daje zauważalny dotyczące kiedy, aby zamknąć okno (gdy zawiera \ r \ n) i daje Wybierz buforowany kwotę złączyć. Powoduje to nowe obserwowalne z podzielonych ciągów.

Jedną z kwestii jest to, że wciąż możesz wstawić nową linię pośrodku porcji, co spowoduje problemy. Jeden prosty pomysł jest obserwować na znaki zamiast pełnej kawałkami strunowych, takich jak:

obsStrings.Repeat().SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());

Następnie można zrobić messages.Where(c => c != '\r') pominąć \r i zmienić bufor:

var obsStrings = messages.Buffer(() => messages.Where(x => x == '\n'))) 
         .Select(buffered => String.Join("", buffered)); 
+0

Sprawdź moją edycję. Wciąż otrzymuję dziwne wyniki. – TK3

+0

Myślisz, że chcesz 'messages.Where (x => x == '\ n')' not' messages.Where (x => x! = '\ N') 'w wywołaniu' Buffer'. Oznacza to, że złamie bufor na każdej nowej linii. – yamen

+0

Ciągle dostaję rozłączne kawałki z tego możliwego do zaobserwowania.Zastanawiam się, czy jest to wątek, w którym testowanie jest wykonywane na innym wątku, a buforowanie trwa lub wcześnie się załamuje, więc buforowanie staje się nieokreślone. – TK3