2012-09-27 14 views
5

Opracowujemy usługę WCF do przesyłania strumieniowego dużej ilości danych, dlatego zdecydowaliśmy się użyć funkcji WCF Streaming w połączeniu z serializacją protobuf-net.Lazy, serializacja obiektów sterowanych strumieniowo za pomocą protobuf-net

Kontekst:

Generalnie pomysł jest do serializacji obiektów w służbie, zapisać je do strumienia i wysłać. Po drugiej stronie wywołujący odbierze obiekt Stream i będzie mógł odczytać wszystkie dane.

Więc aktualnie usługa kod metoda wygląda nieco jak poniżej:

public Result TestMethod(Parameter parameter) 
{ 
    // Create response 
    var responseObject = new BusinessResponse { Value = "some very large data"}; 

    // The resposne have to be serialized in advance to intermediate MemoryStream 
    var stream = new MemoryStream(); 
    serializer.Serialize(stream, responseObject); 
    stream.Position = 0; 

    // ResultBody is a stream, Result is a MessageContract 
    return new Result {ResultBody = stream}; 
} 

Przedmiotem BusinessResponse jest szeregowane do MemoryStream i który jest zwracany z metody. Po stronie klienta kod wywołujący wygląda tak:

var parameter = new Parameter(); 

// Call the service method 
var methodResult = channel.TestMethod(parameter); 

// protobuf-net deserializer reads from a stream received from a service. 
// while reading is performed by protobuf-net, 
// on the service side WCF is actually reading from a 
// memory stream where serialized message is stored 
var result = serializer.Deserialize<BusinessResponse>(methodResult.ResultBody); 
return result; 

Więc kiedy serializer.Deserialize() nazywa się to czyta ze strumienia methodResult.ResultBody, w tym samym czasie na WCF bocznej usługa czyta MemoryStream, który został zwrócony z a TestMethod.

Problem:

Co chcielibyśmy osiągnąć to, aby pozbyć się z MemoryStream i początkowej serializacji całego obiektu na stronie serwisu naraz. Ponieważ korzystamy z przesyłania strumieniowego, chcielibyśmy, aby przed wysłaniem uniknąć przechowywania w pamięci obiektu zserializowanego.

Pomysł:

Idealnym rozwiązaniem byłoby, aby zwrócić puste, wykonanego na przedmiot strumień (z TestMethod()) w odniesieniu do przedmiotu, który ma być w odcinkach (Object „BusinessResponse”, w naszym przykładzie). Tak więc, gdy WCF wywołuje metodę Read() mojego strumienia, wewnętrznie serializuję fragment obiektu za pomocą protobuf-net i zwracam go do wywołującego bez zapisywania go w pamięci.

A teraz jest problem, ponieważ to, czego potrzebujemy, to możliwość serializacji obiektu kawałek po kawałku w momencie odczytu strumienia. Rozumiem, że jest to zupełnie inny sposób serializacji - zamiast przesuwania obiektu do serializera chciałbym poprosić o seryjną treść po fragmencie.

Czy ten rodzaj serializacji jest w jakiś sposób możliwy przy użyciu protobuf-net?

+0

Czy to jeden obiekt? Lub seria obiektów (kolekcja)? To, czy warto się na tym skupić, zależy w rzeczywistości od konfiguracji WCF - w większości konfiguracji zawsze buforuje całą wiadomość w pamięci * w każdym razie * - więc łatwo nie można niczego zmienić. –

+0

Witaj Marc, WCF jest skonfigurowany tak, aby w ogóle nie używać buforowania - to jest punkt streamingu - chcę zmniejszyć ślad pamięci po stronie serwera. Dodatkowo, jeśli chciałbym serializować kolekcję obiektów, użyłbym 'SerializeWithLengthPrefix()' za każdym razem, gdy Klient wywołuje 'Read()' a mój bazowy bufor jest mniejszy niż żądana ilość danych. Problem polega na tym, że chciałbym móc podzielić serializację pojedynczych obiektów. –

+0

ciekawe pytanie. Myślę, że * można to uogólnić, zasadniczo na fałszywy strumień, który sprawia, że ​​praca z czytaniem i pisaniem jest rutynowa. Jeśli nie masz nic przeciwko dodatkowemu wątkowi, można to zrobić za pomocą prostej bramy, jednak Jon Jon miał kilka interesujących pomysłów. Będę musiał rzucić okiem i wrócić do ciebie. Mogę jednak bez wątpliwości powiedzieć, że nie zamierzam hakować rdzenia protobuf-net w tym celu :) –

Odpowiedz

2

Ugotowałam kod, który prawdopodobnie jest zgodny z ideą bramki Marca.

public class PullStream : Stream 
{ 
    private byte[] internalBuffer; 
    private bool ended; 
    private static ManualResetEvent dataAvailable = new ManualResetEvent(false); 
    private static ManualResetEvent dataEmpty = new ManualResetEvent(true); 

    public override bool CanRead 
    { 
     get { return true; } 
    } 

    public override bool CanSeek 
    { 
     get { return false; } 
    } 

    public override bool CanWrite 
    { 
     get { return true; } 
    } 

    public override void Flush() 
    { 
     throw new NotImplementedException(); 
    } 

    public override long Length 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public override long Position 
    { 
     get 
     { 
      throw new NotImplementedException(); 
     } 
     set 
     { 
      throw new NotImplementedException(); 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     dataAvailable.WaitOne(); 
     if (count >= internalBuffer.Length) 
     { 
      var retVal = internalBuffer.Length; 
      Array.Copy(internalBuffer, buffer, retVal); 
      internalBuffer = null; 
      dataAvailable.Reset(); 
      dataEmpty.Set(); 
      return retVal; 
     } 
     else 
     { 
      Array.Copy(internalBuffer, buffer, count); 
      internalBuffer = internalBuffer.Skip(count).ToArray(); // i know 
      return count; 
     } 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     throw new NotImplementedException(); 
    } 

    public override void SetLength(long value) 
    { 
     throw new NotImplementedException(); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     dataEmpty.WaitOne(); 
     dataEmpty.Reset(); 

     internalBuffer = new byte[count]; 
     Array.Copy(buffer, internalBuffer, count); 

     Debug.WriteLine("Writing some data"); 

     dataAvailable.Set(); 
    } 

    public void End() 
    { 
     dataEmpty.WaitOne(); 
     dataEmpty.Reset(); 

     internalBuffer = new byte[0]; 

     Debug.WriteLine("Ending writes"); 

     dataAvailable.Set(); 
    } 
} 

Jest to prosta klasa potomna strumienia implementująca tylko odczyt i zapis (i koniec). Odczytuje bloki, gdy nie są dostępne żadne dane, oraz bloki zapisu, gdy dane są dostępne. W ten sposób zaangażowany jest tylko jeden bajtowy bufor. Kopiowanie pozostałych reszt jest otwarte dla optymalizacji ;-) Dodaje się metodę End, więc nie ma blokowania, gdy Read jest wykonywany, gdy żadne dane nie są dostępne i żadne dane nie zostaną zapisane.

Musisz napisać do tego strumienia z oddzielnego wątku.Pokażę to poniżej:

// create a large object 
    var obj = new List<ToSerialize>(); 
    for(int i = 0; i <= 1000; i ++) 
     obj.Add(new ToSerialize { Test = "This is my very loooong message" }); 
    // create my special stream to read from 
    var ms = new PullStream(); 
    new Thread(x => 
    { 
     ProtoBuf.Serializer.Serialize(ms, obj); 
     ms.End(); 
    }).Start(); 
    var buffer = new byte[100]; 
    // stream to write back to (just to show deserialization is working too) 
    var ws = new MemoryStream(); 
    int read; 
    while ((read = ms.Read(buffer, 0, 100)) != 0) 
    { 
     ws.Write(buffer, 0, read); 
     Debug.WriteLine("read some data"); 
    } 
    ws.Position = 0; 
    var back = ProtoBuf.Serializer.Deserialize<List<ToSerialize>>(ws); 

Mam nadzieję, że rozwiąże to Twój problem :-) Mimo to zabawnie było to zakodować.

Pozdrawiam, Jacco

+0

Powinno być możliwe zwrócenie Pullstream (nie jestem pewien, czy nazwa obejmuje to, co próbuje się zrobić) jako ResultBody – Jacco

Powiązane problemy