2017-10-16 68 views
5

Przetwarzam pliki PDF o bardzo różnych rozmiarach (proste skany od 2 MB do wysokiej rozdzielczości z kilkuset MB) za pośrednictwem Parallel.ForEach i od czasu do czasu dochodzę do OutOfMemoryException - co jest zrozumiałe z powodu proces jest 32-bitowy, a wątki spawnowane przez Parallel.ForEach zajmują nieznaną ilość pracy pochłaniającej pamięć.Parallel.ForEach z niestandardowym TaskScheduler, aby zapobiec OutOfMemoryException

Ograniczenie działa, chociaż przepustowość w czasach, gdy istnieje duża (10k +) partia małych plików PDF do pracy, nie jest wystarczająca, ponieważ może istnieć więcej wątków pracujących ze względu na mały ślad pamięciowy wspomnianych wątków. Jest to proces obciążający procesor z Parallel.ForEach z łatwością osiągający 100% CPU zanim trafi się sporadycznie do grupy dużych plików PDF i uzyska wyjątek OutOfMemoryException. Uruchamianie Performance Profiler robi to z powrotem.

Z mojego rozumienia, posiadanie partycji dla mojego Parallel.ForEach nie poprawi mojej wydajności.

To prowadzi mnie do używania niestandardowego TaskScheduler przekazanego do mojego Parallel.ForEach z czekiem MemoryFailPoint. Podczas wyszukiwania dookoła niego istnieje niewielka ilość informacji na temat tworzenia niestandardowych obiektów o nazwie TaskScheduler.

Patrząc między Specialized Task Schedulers in .NET 4 Parallel Extensions Extras, A custom TaskScheduler in C# i różne odpowiedzi tutaj na Stackoverflow, stworzyłem własną TaskScheduler i mam moją metodę QueueTask jako takie:

protected override void QueueTask(Task task) 
{ 
    lock (tasks) tasks.AddLast(task); 
    try 
    { 
     using (MemoryFailPoint memFailPoint = new MemoryFailPoint(600)) 
     { 
      if (runningOrQueuedCount < maxDegreeOfParallelism) 
      { 
       runningOrQueuedCount++; 
       RunTasks(); 
      } 
     } 
    } 
    catch (InsufficientMemoryException e) 
    {  
     // somehow return thread to pool?   
     Console.WriteLine("InsufficientMemoryException"); 
    } 
} 

Choć try/catch jest trochę drogie, moim celem tutaj ma złapać, gdy prawdopodobny maksymalny rozmiar pliku PDF (+ niewielki dodatkowy narzut pamięci) wynoszący 600 MB spowoduje odrzucenie wyjątku OutOfMemoryException. To rozwiązanie wydaje się zabijać wątek próbujący wykonać pracę, gdy złapię wyjątek InsufficientMemoryException. Przy wystarczającej ilości dużych plików PDF mój kod kończy się jako pojedynczy wątek Parallel.ForEach.

Inne pytania znajdujące się na Stackoverflow na Parallel.ForEach i OutOfMemoryExceptions nie wydaje się, aby dopasować mój przypadek użycia maksymalnej przepustowości z dynamicznym zużycie pamięci na nici, a często po prostu wykorzystać MaxDegreeOfParallelism jako statyczny rozwiązania, np

Tak, aby mieć maksymalną przepustowość na rozmiar pamięci zmienna pracy, albo:

  • Jak mogę przywrócić wątek do wątku, gdy odmówiono mu pracy za pomocą sprawdzenia MemoryFailPoint?
  • W jaki sposób/gdzie bezpiecznie odradzam nowe wątki, aby ponownie wykonać pracę, gdy dostępna jest wolna pamięć?

Edit: Wielkość PDF na dysku może nie liniowo reprezentować rozmiar w pamięci z powodu rasteryzacji i rastrowych komponentu Image Manipulation, która jest zależna od zawartości PDF.

Odpowiedz

0

Używanie od Samples for Parallel Programming with the .NET Framework Udało mi się dokonać drobnej korekty, aby uzyskać coś, co wyglądało tak, jak chciałem.Poniżej znajduje się metoda NotifyThreadPoolOfPendingWork klasy LimitedConcurrencyLevelTaskScheduler po modyfikacji:

private void NotifyThreadPoolOfPendingWork() 
{ 
    ThreadPool.UnsafeQueueUserWorkItem(_ => 
    { 
     // Note that the current thread is now processing work items. 
     // This is necessary to enable inlining of tasks into this thread. 
     _currentThreadIsProcessingItems = true; 
     try 
     { 
      // Process all available items in the queue. 
      while (true) 
      { 
       Task item; 
       lock (_tasks) 
       { 
        // When there are no more items to be processed, 
        // note that we're done processing, and get out. 
        if (_tasks.Count == 0) 
        { 
         --_delegatesQueuedOrRunning; 
         break; 
        } 

        // Get the next item from the queue 
        item = _tasks.First.Value; 
        _tasks.RemoveFirst(); 
       } 

       // Execute the task we pulled out of the queue 
       //base.TryExecuteTask(item); 

       try 
       { 
        using (MemoryFailPoint memFailPoint = new MemoryFailPoint(650)) 
        { 
         base.TryExecuteTask(item); 
        } 
       } 
       catch (InsufficientMemoryException e) 
       { 
        Thread.Sleep(500); 

        lock (_tasks) 
        { 
         _tasks.AddLast(item); 
        } 
       } 

      } 
     } 
     // We're done processing items on the current thread 
     finally { _currentThreadIsProcessingItems = false; } 
    }, null); 
} 

Przyjrzymy się połowu, ale w odwrotnej kolejności. Dodajemy zadanie, nad którym zamierzaliśmy pracować, do listy zadań (_tasks), która uruchamia zdarzenie, aby uzyskać dostępny wątek do pobrania tej pracy. Ale najpierw przesypiamy bieżący wątek, aby nie podnosił pracy w prosty sposób i nie powrócił do nieudanego sprawdzenia MemoryFailPoint.

Powiązane problemy