2012-03-27 14 views
12

Próbowałem dostać to, co uważam za najprostszą możliwą formę wątkowania do pracy w mojej aplikacji, ale po prostu nie mogę tego zrobić.C# Threading - Czytanie i mieszanie wielu plików jednocześnie, najłatwiejszą metodą?

Co chcę zrobić: Mam główny formularz z paskiem stanu i paskiem postępu na nim. Muszę przeczytać coś pomiędzy 3 a 99 plikami i dodać ich skróty do napisu [], który chcę dodać do listy wszystkich plików z ich odpowiednimi skrótami. Następnie muszę porównać pozycje z tej listy z bazą danych (która jest dostępna w plikach tekstowych). Po wykonaniu wszystkich czynności, muszę zaktualizować pole tekstowe w formularzu głównym, a pasek postępu do 33%; głównie po prostu nie chcę, aby główna forma zamarzała podczas przetwarzania.

Pliki, z którymi pracuję, zawsze sumują się do 1,2 GB (+/- kilka MB), co oznacza, że ​​powinienem móc odczytać je w bajcie [] s i przetworzyć je stamtąd (muszę obliczyć CRC32 , MD5 i SHA1 każdego z tych plików, więc powinno to być szybsze niż odczytanie ich wszystkich z dysku twardego 3 razy).

Należy również zauważyć, że niektóre pliki mogą mieć 1 MB, a inne mogą mieć 1 GB. Początkowo chciałem utworzyć 99 wątków dla 99 plików, ale to nie jest mądre, przypuszczam, że najlepiej byłoby ponownie użyć wątków małych plików, podczas gdy większe wątki plików nadal działają. Ale to brzmi dość skomplikowanie, więc nie jestem pewien, czy to też jest mądre.

Do tej pory próbowałem workerThreads i backgroundWorkers, ale nie wydają się działać zbyt dobrze dla mnie; przynajmniej praca w tle działała NIEKTÓRE czasu, ale nie potrafię nawet zrozumieć, dlaczego nie będą inne czasy ... tak czy inaczej główna forma wciąż zamiera. Teraz czytałem o Task Parallel Library w .NET 4.0, ale pomyślałem, że powinienem zapytać kogoś, kto wie, co robi, zanim zmarnuje więcej czasu.

Co chcę zrobić coś jak to wygląda (bez gwintu):

List<string[]> fileSpecifics = new List<string[]>(); 

int fileMaxNumber = 42; // something between 3 and 99, depending on file set 

for (int i = 1; i <= fileMaxNumber; i++) 
{ 
    string fileName = "C:\\path\\to\\file" + i.ToString("D2") + ".ext"; // file01.ext - file99.ext 
    string fileSize = new FileInfo(fileName).Length.ToString(); 
    byte[] file = File.ReadAllBytes(fileName); 
    // hash calculations (using SHA1CryptoServiceProvider() etc., no problems with that so I'll spare you that, return strings) 
    file = null; // I didn't yet check if this made any actual difference but I figured it couldn't hurt 
    fileSpecifics.Add(new string[] { fileName, fileSize, fileCRC, fileMD5, fileSHA1 }); 
} 

// look for files in text database mentioned above, i.e. first check for "file bundles" with the same amount of files I have here; then compare file sizes, then hashes 
// again, no problems with that so I'll spare you that; the database text files are pretty small so parsing them doesn't need to be done in an extra thread. 

Czy ktoś będzie na tyle uprzejmy, żeby wskazać mi w dobrym kierunku? Poszukuję najłatwiejszego sposobu czytania i mieszania tych plików szybko (uważam, że haszowanie zajmuje trochę czasu, w którym można już odczytać inne pliki) i zapisać wynik na ciąg [], bez zamrożenia głównej formy, nic więcej , nic mniej.

Jestem wdzięczny za wszelkie dane wejściowe.

EDYCJA w celu wyjaśnienia: przez "backgroundWorkers pracujący przez pewien czas" Miałem na myśli, że (dla tego samego zestawu plików), być może pierwsze i czwarte wykonanie mojego kodu daje prawidłowy wynik, a UI odmawia w ciągu 5 sekund , dla drugiej, trzeciej i piątej realizacji blokuje formularz (i po 60 sekundach pojawia się komunikat o błędzie, że jakiś wątek nie odpowiedział w tym przedziale czasowym) i muszę przerwać wykonywanie przez VS.

Dziękuję za wszystkie sugestie i wskazówki, ponieważ wszyscy prawidłowo odgadliście, że jestem zupełnie nowy w tworzeniu wątków i będę musiał zapoznać się z opublikowanymi świetnymi linkami. Wtedy spróbuję tych metod i zaznaczę odpowiedź, która pomogła mi najbardziej. Dzięki jeszcze raz!

+1

Co masz na myśli przez BackgroundWorker pracy na jakiś czas? Jeśli zostanie poprawnie zaimplementowany, przetwarzanie wykonywane w ramach BackgroundWorker nie powinno powodować zawieszania się formularza. – evasilchenko

+0

Jeśli są na 1 dysku, potrzebujesz tylko 1 (dodatkowego) wątku. –

+1

Ten artykuł może być pomocny dla Ciebie: http://www.hanselman.com/blog/BackToParallelBasicsDontBlockYourThreadsMakeAsyncIOWorkForYou.aspx –

Odpowiedz

18

Z .NET Framework 4.X

  1. Zastosowanie Directory.EnumerateFiles Metoda wydajne/plików leniwe wyliczenie
  2. Korzystanie Parallel.For() aby powierzyć pracę równoległości do PLINQ ram lub wykorzystać TPL delegować jednego zadania na stopień rurociągu
  3. Zastosowanie Pipelines pattern do rurociągów następujących etapach: wyliczający hashcodes porównaj z wzorca, zaktualizuj UI
  4. Aby uniknąć UI zastosowania odpowiednich technik zamrażania: dla WPF użyć Dispatcher.BeginInvoke() dla WinForms używać Invoke() see this SO answer
  5. Biorąc pod uwagę, że wszystkie te rzeczy mają interfejs użytkownika, może być przydatne dodanie funkcji anulowania, aby w razie potrzeby przerwać długotrwałe działanie, spójrz na klasę CreateLinkedTokenSource, która umożliwia wyzwalanie CancellationToken z "zewnętrznego zakresu" Mogę spróbować dodać przykład, ale jest to Warto zrobić to sam, abyś nauczył się wszystkich tych rzeczy, a nie tylko kopiował/wklejał -> działał -> o tym zapomniał.

PS: Musi czytać - Pipelines paper na MSDN


wdrożeniowe TPL specyficzny rurociąg

  • wdrożeniowe Pipeline wzór: trzy etapy: obliczyć hash, mecz, aktualizacja UI
  • trzy zadania , jeden na etap
  • Dwie blokady kolejki

//

// 1) CalculateHashesImpl() should store all calculated hashes here 
// 2) CompareMatchesImpl() should read input hashes from this queue 
// Tuple.Item1 - hash, Typle.Item2 - file path 
var calculatedHashes = new BlockingCollection<Tuple<string, string>>(); 


// 1) CompareMatchesImpl() should store all pattern matching results here 
// 2) SyncUiImpl() method should read from this collection and update 
// UI with available results 
var comparedMatches = new BlockingCollection<string>(); 

var factory = new TaskFactory(TaskCreationOptions.LongRunning, 
           TaskContinuationOptions.None); 


var calculateHashesWorker = factory.StartNew(() => CalculateHashesImpl(...)); 
var comparedMatchesWorker = factory.StartNew(() => CompareMatchesImpl(...)); 
var syncUiWorker= factory.StartNew(() => SyncUiImpl(...)); 

Task.WaitAll(calculateHashesWorker, comparedMatchesWorker, syncUiWorker); 

CalculateHashesImpl():

private void CalculateHashesImpl(string directoryPath) 
{ 
    foreach (var file in Directory.EnumerateFiles(directoryPath)) 
    { 
     var hash = CalculateHashTODO(file); 
     calculatedHashes.Add(new Tuple<string, string>(hash, file.Path)); 
    } 
} 

CompareMatchesImpl():

private void CompareMatchesImpl() 
{ 
    foreach (var hashEntry in calculatedHashes.GetConsumingEnumerable()) 
    { 
     // TODO: obviously return type is up to you 
     string matchResult = GetMathResultTODO(hashEntry.Item1, hashEntry.Item2); 
     comparedMatches.Add(matchResult); 
    } 
} 

SyncUiImpl():

private void UpdateUiImpl() 
{ 
    foreach (var matchResult in comparedMatches.GetConsumingEnumerable()) 
    { 
     // TODO: track progress in UI using UI framework specific features 
     // to do not freeze it 
    } 
} 

TODO: Rozważ używanie CancellationToken jako parametr dla wszystkich GetConsumingEnumerable() połączeń, dzięki czemu łatwo można zatrzymać wykonanie rurociągu, gdy są potrzebne.

+0

Zajrzę do tego, wielkie dzięki! –

+0

Papier Pipeline jest całkiem niezły! :) –

17

Po pierwsze, powinieneś używać wyższego poziomu abstrakcji, aby rozwiązać ten problem. Masz do wykonania mnóstwo zadań, więc użyj abstrakcji "zadania". Powinieneś używać biblioteki zadań do wykonania tego typu rzeczy. Niech TPL poradzi sobie z pytaniem, ile wątków roboczych ma utworzyć - odpowiedź może być tak niska jak jedna, jeśli praca jest zamknięta na I/O.

Jeśli chcesz zrobić własną wątków, niektóre dobra rada:

  • Nie zawsze blokować na wątku UI. To właśnie zamraża twoją aplikację.Wymyśl protokół , za pomocą którego wątki robocze mogą komunikować się z twoim wątkiem interfejsu użytkownika, który nie robi nic poza reakcją na zdarzenia interfejsu użytkownika. Pamiętaj, że metody kontroli interfejsu użytkownika, takie jak paski zakończenia zadania, nie mogą nigdy być wywoływane przez żaden inny wątek inny niż wątek UI.

  • Nie twórz 99 wątków do odczytu 99 plików. To tak, jakby zdobyć 99 przesyłek pocztowych i zatrudnić 99 asystentów do napisania odpowiedzi: niezwykle kosztowne rozwiązanie prostego problemu. Jeśli twoja praca jest intensywnie CPU, nie ma sensu "wynajmować" więcej wątków, niż masz procesory do ich obsługi. (To tak, jakby zatrudnić 99 asystentów w biurze, które ma tylko cztery biurka.) Asystenci spędzają większość czasu czekając na biurko, aby usiąść, zamiast czytać pocztę.) Jeśli twoja praca wymaga dużego nakładu pracy, to większość tych wątków być bezczynnym przez większość czasu, czekając na dysk, który jest jeszcze większym marnowaniem zasobów.

+0

+1. W przypadku zadań nie związanych z dyskiem SSD/dyskiem flash jest to ** prawdopodobnie ** (prototyp i miara), że dodanie dodatkowych wątków spowolni działanie programu z powodu ograniczeń sprzętowych. –

0

Sprawdź TPL Dataflow. Możesz użyć zdławionego ActionBlocka, który zarekomenduje ci trudną część.

0

Jeśli rozumiem, że chcesz wykonywać niektóre zadania w tle i nie blokować interfejsu użytkownika, dobrym wyborem będzie usługa BackgroundWorker interfejsu użytkownika. Wspomniałeś, że pracujesz przez jakiś czas, więc moją rekomendacją byłoby wzięcie tego, co miałeś w stanie półproduktowym, i ulepszanie go poprzez śledzenie niepowodzeń. Jeśli moje przeczucie jest poprawne, twój robotnik rzucił wyjątek, który nie wydaje się, że zajmujesz się kodem. Nieobsługiwane wyjątki, które pojawiają się w ich wątkach zawierających, powodują, że dzieje się coś złego.

2

Po pierwsze, mam nadzieję, że do obliczania skrótów używana jest wbudowana biblioteka. Można pisać własne, ale o wiele bezpieczniej jest użyć czegoś, co działo się przez jakiś czas.

Może być konieczne utworzenie tylko tylu wątków, co procesorów, jeśli procesor jest obciążony przez procesor. Jeśli jest związany przez I/O, możesz być w stanie uciec z większą liczbą wątków.

Nie polecam ładowania całego pliku do pamięci. Twoja biblioteka haszująca powinna obsługiwać aktualizowanie porcji na raz. Odczytuj porcję do pamięci, użyj jej do aktualizacji skrótów każdego algorighma, przeczytaj następny fragment i powtarzaj do końca pliku. Chunked podejście pomoże obniżyć wymagania pamięci programu.

Jak sugerowali inni, spójrz na Task Parallel Library, szczególnie Data Parallelism. To może być tak proste, jak to:

Parallel.ForEach(fileSpecifics, item => CalculateHashes(item)); 
+0

Twoje jedno rozwiązanie liniowe jest oczywiście najbardziej intrygujące :) Ale oczywiście będę musiał przeczytać znacznie więcej. W przypadku MD5 i SHA1 używam System.Security.Cryptography, ale nie mogłem znaleźć tego samego dla CRC32, więc użyłem klasy CRC32 należącej do domeny publicznej Damien Guard. Będę musiał sprawdzić, czy to obsługuje obliczenia z porcjami. Dzięki! –

0

Ten kod mieszania jeden plik (strumień) za pomocą dwóch zadań - jeden do czytania, drugi dla mieszaja, na bardziej solidnej sposób należy odczytać kolejne kawałki do przodu.

Ponieważ przepustowość procesora jest znacznie większa niż w przypadku dysku, o ile nie używasz szybkiej pamięci Flash, nic nie zyskujesz z jednoczesnego mieszania większej liczby plików.

public void TransformStream(Stream a_stream, long a_length = -1) 
{ 
    Debug.Assert((a_length == -1 || a_length > 0)); 

    if (a_stream.CanSeek) 
    { 
     if (a_length > -1) 
     { 
      if (a_stream.Position + a_length > a_stream.Length) 
       throw new IndexOutOfRangeException(); 
     } 

     if (a_stream.Position >= a_stream.Length) 
      return; 
    } 

    System.Collections.Concurrent.ConcurrentQueue<byte[]> queue = 
     new System.Collections.Concurrent.ConcurrentQueue<byte[]>(); 
    System.Threading.AutoResetEvent data_ready = new System.Threading.AutoResetEvent(false); 
    System.Threading.AutoResetEvent prepare_data = new System.Threading.AutoResetEvent(false); 

    Task reader = Task.Factory.StartNew(() => 
    { 
     long total = 0; 

     for (; ;) 
     { 
      byte[] data = new byte[BUFFER_SIZE]; 
      int readed = a_stream.Read(data, 0, data.Length); 

      if ((a_length == -1) && (readed != BUFFER_SIZE)) 
       data = data.SubArray(0, readed); 
      else if ((a_length != -1) && (total + readed >= a_length)) 
       data = data.SubArray(0, (int)(a_length - total)); 

      total += data.Length; 

      queue.Enqueue(data); 
      data_ready.Set(); 

      if (a_length == -1) 
      { 
       if (readed != BUFFER_SIZE) 
        break; 
      } 
      else if (a_length == total) 
       break; 
      else if (readed != BUFFER_SIZE) 
       throw new EndOfStreamException(); 

      prepare_data.WaitOne(); 
     } 
    }); 

    Task hasher = Task.Factory.StartNew((obj) => 
    { 
     IHash h = (IHash)obj; 
     long total = 0; 

     for (; ;) 
     { 
      data_ready.WaitOne(); 

      byte[] data; 
      queue.TryDequeue(out data); 

      prepare_data.Set(); 

      total += data.Length; 

      if ((a_length == -1) || (total < a_length)) 
      { 
       h.TransformBytes(data, 0, data.Length); 
      } 
      else 
      { 
       int readed = data.Length; 
       readed = readed - (int)(total - a_length); 
       h.TransformBytes(data, 0, data.Length); 
      } 

      if (a_length == -1) 
      { 
       if (data.Length != BUFFER_SIZE) 
        break; 
      } 
      else if (a_length == total) 
       break; 
      else if (data.Length != BUFFER_SIZE) 
       throw new EndOfStreamException(); 
     } 
    }, this); 

    reader.Wait(); 
    hasher.Wait(); 
} 

Reszta kod tutaj: http://hashlib.codeplex.com/SourceControl/changeset/view/71730#514336