2011-11-22 12 views
13

Czy istnieje struktura danych/kombinacji danych .NET, która pozwala na dołączenie danych bajtowych na końcu bufora, ale wszystkie odczyty i odczyty są od początku, skracając bufor, gdy Czytam?FIFO/Bufor kolejki specjalizujący się w strumieniach bajtów

Klasa MemoryStream wydaje się być częścią tego, ale muszę utrzymywać oddzielne lokalizacje do czytania i pisania i nie odrzuca automatycznie danych na początku po przeczytaniu.

Odpowiedź została wysłana w odpowiedzi na this question, co jest zasadniczo tym, co próbuję zrobić, ale wolałbym coś, co mogę zrobić asynchroniczne operacje we/wy na różnych komponentach tego samego procesu, tak jak normalna rura lub nawet strumień sieciowy (najpierw muszę filtrować/przetwarzać dane).

+1

Czy jest coś nie tak z przeskakiwaniem w buforze odczytu? – Ryan

+0

Tylko to co powiedziałem i muszę to śledzić w przeciwieństwie do stylu NetworkStream odczytu, odczytu, odczytu itp. – Deanna

+0

Czy potrzebujesz czytać i pisać tablice o różnych rozmiarach? Czy kolejka 'byte []' nie byłaby dla ciebie wystarczająco dobra? – svick

Odpowiedz

10

Powiem pustą kopię jakiejś logiki, którą napisałem dla projektu w pracy. Zaletą tej wersji jest to, że działa z połączoną listą zbuforowanych danych i dlatego nie trzeba buforować dużych ilości pamięci i/lub kopiować pamięci podczas czytania. ponadto wątek bezpieczny i zachowuje się jak strumień sieci, to znaczy: Podczas czytania, gdy nie ma dostępnych danych: Poczekaj na dostępność danych lub limit czasu. Ponadto, podczas odczytywania x ilości bajtów i są tylko y ilości bajtów, zwracaj po przeczytaniu wszystkich bajtów. Mam nadzieję, że to pomoże!

public class SlidingStream : Stream 
{ 
    #region Other stream member implementations 

    ... 

    #endregion Other stream member implementations 

    public SlidingStream() 
    { 
     ReadTimeout = -1; 
    } 

    private readonly object _writeSyncRoot = new object(); 
    private readonly object _readSyncRoot = new object(); 
    private readonly LinkedList<ArraySegment<byte>> _pendingSegments = new LinkedList<ArraySegment<byte>>(); 
    private readonly ManualResetEventSlim _dataAvailableResetEvent = new ManualResetEventSlim(); 

    public int ReadTimeout { get; set; } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     if (_dataAvailableResetEvent.Wait(ReadTimeout)) 
      throw new TimeoutException("No data available"); 

     lock (_readSyncRoot) 
     { 
      int currentCount = 0; 
      int currentOffset = 0; 

      while (currentCount != count) 
      { 
       ArraySegment<byte> segment = _pendingSegments.First.Value; 
       _pendingSegments.RemoveFirst(); 

       int index = segment.Offset; 
       for (; index < segment.Count; index++) 
       { 
        if (currentOffset < offset) 
        { 
         currentOffset++; 
        } 
        else 
        { 
         buffer[currentCount] = segment.Array[index]; 
         currentCount++; 
        } 
       } 

       if (currentCount == count) 
       { 
        if (index < segment.Offset + segment.Count) 
        { 
         _pendingSegments.AddFirst(new ArraySegment<byte>(segment.Array, index, segment.Offset + segment.Count - index)); 
        } 
       } 

       if (_pendingSegments.Count == 0) 
       { 
        _dataAvailableResetEvent.Reset(); 

        return currentCount; 
       } 
      } 

      return currentCount; 
     } 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     lock (_writeSyncRoot) 
     { 
      byte[] copy = new byte[count]; 
      Array.Copy(buffer, offset, copy, 0, count); 

      _pendingSegments.AddLast(new ArraySegment<byte>(copy)); 

      _dataAvailableResetEvent.Set(); 
     } 
    } 
} 
+1

Wygląda dobrze i był taki sam jak byłem. Spróbuję tego dziś wieczorem. – Deanna

+0

Wydaje mi się, że spowoduje to awarię, jeśli spróbujesz odczytać dane, gdy żadne z nich nie będzie dostępne. – svick

+0

@svick - Absolutnie w prawo, to tylko szkic, brak sprawdzania poprawności argumentów itp. ManualResetEvent jest tam z tego powodu, po prostu zapomniałem na niego poczekać na początku metody odczytu. Naprawiono teraz. Dzięki za headsup – Polity

1

Kod może być prostszy niż w zaakceptowanej odpowiedzi. Nie ma potrzeby używania pętli for .:

/// <summary> 
/// This class is a very fast and threadsafe FIFO buffer 
/// </summary> 
public class FastFifo 
{ 
    private List<Byte> mi_FifoData = new List<Byte>(); 

    /// <summary> 
    /// Get the count of bytes in the Fifo buffer 
    /// </summary> 
    public int Count 
    { 
     get 
     { 
      lock (mi_FifoData) 
      { 
       return mi_FifoData.Count; 
      } 
     } 
    } 

    /// <summary> 
    /// Clears the Fifo buffer 
    /// </summary> 
    public void Clear() 
    { 
     lock (mi_FifoData) 
     { 
      mi_FifoData.Clear(); 
     } 
    } 

    /// <summary> 
    /// Append data to the end of the fifo 
    /// </summary> 
    public void Push(Byte[] u8_Data) 
    { 
     lock (mi_FifoData) 
     { 
      // Internally the .NET framework uses Array.Copy() which is extremely fast 
      mi_FifoData.AddRange(u8_Data); 
     } 
    } 

    /// <summary> 
    /// Get data from the beginning of the fifo. 
    /// returns null if s32_Count bytes are not yet available. 
    /// </summary> 
    public Byte[] Pop(int s32_Count) 
    { 
     lock (mi_FifoData) 
     { 
      if (mi_FifoData.Count < s32_Count) 
       return null; 

      // Internally the .NET framework uses Array.Copy() which is extremely fast 
      Byte[] u8_PopData = new Byte[s32_Count]; 
      mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count); 
      mi_FifoData.RemoveRange(0, s32_Count); 
      return u8_PopData; 
     } 
    } 

    /// <summary> 
    /// Gets a byte without removing it from the Fifo buffer 
    /// returns -1 if the index is invalid 
    /// </summary> 
    public int PeekAt(int s32_Index) 
    { 
     lock (mi_FifoData) 
     { 
      if (s32_Index < 0 || s32_Index >= mi_FifoData.Count) 
       return -1; 

      return mi_FifoData[s32_Index]; 
     } 
    } 
} 
+0

Jest to zasadniczo to samo, co połączone pytanie, które nie spełnia wymagań asynchronicznych lub blokujących. W każdym razie dzięki. – Deanna

+0

OK, ale ten kod nie jest tak elegancki i nie jest bezpieczny dla wątków. Możesz to zrobić za pomocą 6 linii zamiast 16 linii. – Elmue

Powiązane problemy