2012-08-26 12 views
5

Piszę w pełni zarządzaną bibliotekę Mercurial (do użycia w pełni zarządzanym Mercurial Server for Windows, już wkrótce) i jeden z najbardziej poważnych problemów z wydajnością, z jakimi się spotykam jest, co dziwne, dzielić tablicę na części.Najszybszy (przenośny) sposób na rozdzielenie tablicy w języku C#

Pomysł jest następujący: istnieje tablica bajtów o rozmiarze od kilkuset bajtów do megabajtów, a wszystko, co muszę zrobić, to podzielić ją na części rozdzielone według, w moim konkretnym przypadku, \n znaków .

Teraz co dotTrace pokazuje mi, że mój "optimized" version z Split (kod jest poprawne, here's the naive wersja Zacząłem) trwa 11 sekund do 2300 połączeń (jest oczywiste trafienie wydajność wprowadzony przez samego dotTrace, ale wszystko jest do skali).

Oto numery:

  • unsafe wersja: 11 297 ms dla 2 312 połączeń
  • udało ("naiwny") Wersja: 20 001 ms dla 2 312 połączeń

więc tutaj: co będzie najszybszym (najlepiej przenośnym, czyli obsługującym zarówno x86, jak i x64) sposobem dzielenia tablicy w C#.

+3

możemy zobaczyć swój * „zoptymalizowany” wersję Splicie * tak, że możemy myśleć o tym, jak ją poprawić? –

+0

Przenośny w sensie przenośnej biblioteki klasowej opisanej tutaj: http://msdn.microsoft.com/en-us/library/gg597391.aspx? –

+2

Możesz znaleźć jego kod w "zoptymalizowanej" wersji ". –

Odpowiedz

3

Dla Splitu, obsługa ulong na maszynie 32-bitowej jest naprawdę powolna, więc zdecydowanie zredukuj do uint. Jeśli naprawdę chcesz ulong, zaimplementuj dwie wersje, jedną dla wersji 32-bitowej, drugą dla wersji 64-bitowej.

Powinieneś także zmierzyć, czy obsługa bajtu na raz jest szybsza.

Konieczność profilu kosztów alokacji pamięci. Jeśli jest wystarczająco duży, spróbuj ponownie użyć pamięci dla wielu połączeń.

Inne:

ToString: to szybsze w użyciu "(" + Offset.ToString() + "" + Length.ToString() + ")";

GetHashCode: spróbuj stałą (bajt * b = bufor & [offset])


Ta wersja powinna być bardzo szybko, jeśli są wykorzystywane wielokrotnie. Kluczowy punkt: brak nowej alokacji pamięci po rozszerzeniu wewnętrznej tablicy do odpowiedniego rozmiaru, minimalna kopia danych.

class ArraySplitter 
{ 
    private byte[] m_data; 
    private int m_count; 
    private int[] m_stops; 

    private void AddRange(int start, int stop) 
    { 
     // Skip empty range 
     if (start > stop) 
     { 
      return; 
     } 

     // Grow array if needed 
     if ((m_stops == null) || (m_stops.Length < (m_count + 2))) 
     { 
      int[] old = m_stops; 

      m_stops = new int[m_count * 3/2 + 4]; 

      if (old != null) 
      { 
       old.CopyTo(m_stops, 0); 
      } 
     } 

     m_stops[m_count++] = start; 
     m_stops[m_count++] = stop; 
    } 

    public int Split(byte[] data, byte sep) 
    { 
     m_data = data; 
     m_count = 0;  // reuse m_stops 

     int last = 0; 

     for (int i = 0; i < data.Length; i ++) 
     { 
      if (data[i] == sep) 
      { 
       AddRange(last, i - 1); 
       last = i + 1; 
      } 
     } 

     AddRange(last, data.Length - 1); 

     return m_count/2; 
    } 

    public ArraySegment<byte> this[int index] 
    { 
     get 
     { 
      index *= 2; 
      int start = m_stops[index]; 

      return new ArraySegment<byte>(m_data, start, m_stops[index + 1] - start + 1); 
     } 
    } 
} 

Program testowy:

static void Main(string[] args) 
    { 
     int count = 1000 * 1000; 

     byte[] data = new byte[count]; 

     for (int i = 0; i < count; i++) 
     { 
      data[i] = (byte) i; 
     } 

     Stopwatch watch = new Stopwatch(); 

     for (int r = 0; r < 10; r++) 
     { 
      watch.Reset(); 
      watch.Start(); 

      int len = 0; 

      foreach (var seg in data.MySplit(13)) 
      { 
       len += seg.Count; 
      } 

      watch.Stop(); 

      Console.WriteLine("MySplit  : {0} {1,8:N3} ms", len, watch.Elapsed.TotalMilliseconds); 

      watch.Reset(); 
      watch.Start(); 

      ArraySplitter splitter = new ArraySplitter(); 

      int parts = splitter.Split(data, 13); 

      len = 0; 

      for (int i = 0; i < parts; i++) 
      { 
       len += splitter[i].Count; 
      } 

      watch.Stop(); 
      Console.WriteLine("ArraySplitter: {0} {1,8:N3} ms", len, watch.Elapsed.TotalMilliseconds); 
     } 
    } 

Wynik:

MySplit  : 996093 9.514 ms 
ArraySplitter: 996093 4.754 ms 
MySplit  : 996093 7.760 ms 
ArraySplitter: 996093 2.710 ms 
MySplit  : 996093 8.391 ms 
ArraySplitter: 996093 3.510 ms 
MySplit  : 996093 9.677 ms 
ArraySplitter: 996093 3.468 ms 
MySplit  : 996093 9.685 ms 
ArraySplitter: 996093 3.370 ms 
MySplit  : 996093 9.700 ms 
ArraySplitter: 996093 3.425 ms 
MySplit  : 996093 9.669 ms 
ArraySplitter: 996093 3.519 ms 
MySplit  : 996093 9.844 ms 
ArraySplitter: 996093 3.416 ms 
MySplit  : 996093 9.721 ms 
ArraySplitter: 996093 3.685 ms 
MySplit  : 996093 9.703 ms 
ArraySplitter: 996093 3.470 ms 
+0

Jeszcze jedno: twoja wewnętrzna pętla ma zbyt wiele zmiennych. –

+0

Ile różnych segmentów ma te dane testowe? –

+0

1000 * 1000/256 = 3906 –

4

Uważam, że problemem jest to, że robisz wiele skomplikowanych operacji w pętli. Ten kod usuwa wszystkie operacje oprócz pojedynczego dodania i porównania wewnątrz pętli. Inne skomplikowane rzeczy dzieją się tylko wtedy, gdy zostanie wykryty split lub na końcu tablicy.

Trudno też określić, z jakimi danymi przeprowadzasz testy, więc to tylko domysły.

public static unsafe Segment[] Split2(byte[] _src, byte value) 
{ 
    var _ln = _src.Length; 

    if (_ln == 0) return new Segment[] { }; 

    fixed (byte* src = _src) 
    { 
     var segments = new LinkedList<Segment>(); // Segment[c]; 

     byte* last = src; 
     byte* end = src + _ln - 1; 
     byte lastValue = *end; 
     *end = value; // value-termination 

     var cur = src; 
     while (true) 
     { 
      if (*cur == value) 
      { 
       int begin = (int) (last - src); 
       int length = (int) (cur - last + 1); 
       segments.AddLast(new Segment(_src, begin, length)); 

       last = cur + 1; 

       if (cur == end) 
       { 
        if (lastValue != value) 
        { 
         *end = lastValue; 
        } 
        break; 
       } 
      } 
      cur++; 
     } 

     return segments.ToArray(); 
    } 
} 

Edit: Poprawiono kod, więc zwraca poprawne wyniki.

+0

To bardzo trudne/sprytne użycie wartownika. Jednak tylko redux jednego warunkowego/bajtu ... Byłoby miło zobaczyć taktowanie. –

+0

Nice! '4 696' ms dla wywołań' 2 312' w obszarze dotTrace. –

+0

@pst: To prawie trzy razy szybciej niż moja "niebezpieczna" wersja. –

2

Anton,

ja nie wiem, czy nadal jesteś zainteresowany optymalizacji ta od tego wątku jest dość stary, ale widziałem, że kod był prawie taki sam w repozytorium online, więc myślałem, że Spróbuj. Przejrzałem twój kod HgSharp na bitbucket.org podczas oceniania twojej aplikacji HgLab. Przepisałem funkcję za pomocą natywnych konstrukcji, które znacznie ją uprościły. Moje testy zaowocowały ponad połowę czasu w porównaniu z oryginalną rutyną. Przetestowałem go, ładując plik źródłowy o wielkości kilku megabajtów i porównując taktowanie z wydajnością tej samej operacji przy użyciu oryginalnej procedury.

Oprócz przepisywania podstawowej logiki, postanowiłem użyć natywnej wersji ArraySegment<> wbudowanej w strukturę zamiast niestandardowej implementacji. Jedyną znaczącą różnicą jest to, że ArraySegment<> ujawnia właściwość Count zamiast właściwości Length. Poniższy kod nie wymaga słowa kluczowego "unsafe", ponieważ nie używam wskaźników, ale wydaje się, że istnieje niewielkie polepszenie wydajności, jeśli zostanie zmienione.


public static ArraySegment<byte>[] SplitEx(this byte[] source, byte value) { 
     var _previousIndex = -1; 
     var _segments = new List<ArraySegment<byte>>(); 
     var _length = source.Length; 

     if (_length > 0) { 
      int _index; 

      for (_index = 0; _index < _length; _index++) { 
       var _value = source[_index]; 
       if (_value == value) { 
        _segments.Add(new ArraySegment<byte>(source, _previousIndex + 1, _index - _previousIndex)); 
        _previousIndex = _index; 
       } 
      } 

      if (--_index != _previousIndex) { 
       _segments.Add(new ArraySegment<byte>(source, _previousIndex + 1, _index - _previousIndex)); 
      } 
     } 

     return _segments.ToArray(); 
    } 
Powiązane problemy