Próbuję użyć Rx do odczytu ze strumienia odbierającego TCPClient i przeanalizowania danych do IObservable z łańcucha, rozdzielonego przez znak nowej linii "\ r \ n" Poniżej znajduje się sposób odbierania z gniazda stream ...Obserwowalna analiza IO sieci
var messages = new Subject<string>();
var functionReceiveSocketData =
Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
(client.Client.BeginReceive, client.Client.EndReceive);
Func<byte[], int, byte[]> copy = (bs, n) =>
{
var rs = new byte[buffer.Length];
bs.CopyTo(rs, 0);
return rs;
};
Observable
.Defer(() =>
{
var buffer = new byte[50];
return
from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
select copy(buffer, n);
}).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));
Oto, co wymyśliłem, aby przeanalizować ciąg znaków. To obecnie nie działa ...
obsStrings = messages.Buffer<string,string>(() =>
messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
);
Przedmiotem wiadomość otrzymuje wiadomość w kawałkach więc staram się je Concat i sprawdzić, czy łączone ciąg zawiera znak nowej linii, sygnalizując w ten sposób bufor zamknąć i wyjście buforowane Kawałki. Nie wiem, dlaczego to nie działa. Wydaje się, że otrzymałem tylko pierwszy kawałek z obsStrings.
Poszukuję dwóch rzeczy. Chciałbym uprościć odczytywanie strumienia io i wyeliminować użycie tematu wiadomości. Po drugie, chciałbym, aby działał proces parsowania. Hackowałem to trochę i nie mogę wymyślić działającego rozwiązania. Jestem początkującym z Rx.
EDIT: Oto gotowy produkt po problem został rozwiązany ....
var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
.SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
.Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
.Where(x => x.EndsWith("\r\n"))
.Select(buffered => String.Join("", buffered))
.Select(a => a.Replace("\n", ""));
"ReceiveUntilCompleted" jest rozszerzeniem z projektu RXX.
Znalazłem, że nie potrzebuję .Buffer (1); na końcu. – TK3
Usunięto z odpowiedzi. – ronag