2011-01-12 7 views
14

Na temat oczekiwania do zakończenia zadań i synchronizacji wątków.Parallel.ForEach - Graceful Cancellation

Mam obecnie iterację, którą zawarłem w Parallel.ForEach. W poniższym przykładzie zadałem kilka pytań w komentarzach o tym, jak najlepiej poradzić sobie z płynnym zakończeniem pętli (.NET 4.0);

private void myFunction() 
    { 

     IList<string> iListOfItems = new List<string>(); 
     // populate iListOfItems 

     CancellationTokenSource cts = new CancellationTokenSource(); 

     ParallelOptions po = new ParallelOptions(); 
     po.MaxDegreeOfParallelism = 20; // max threads 
     po.CancellationToken = cts.Token; 

     try 
     { 
      var myWcfProxy = new myWcfClientSoapClient(); 

      if (Parallel.ForEach(iListOfItems, po, (item, loopsate) => 
      { 
       try 
       { 
        if (_requestedToStop) 
         loopsate.Stop(); 
        // long running blocking WS call, check before and after 
        var response = myWcfProxy.ProcessIntervalConfiguration(item); 
        if (_requestedToStop) 
         loopsate.Stop(); 

        // perform some local processing of the response object 
       } 
       catch (Exception ex) 
       { 
        // cannot continue game over. 
        if (myWcfProxy.State == CommunicationState.Faulted) 
        { 
         loopsate.Stop(); 
         throw; 
        } 
       } 

       // else carry on.. 
       // raise some events and other actions that could all risk an unhanded error. 

      } 
      ).IsCompleted) 
      { 
       RaiseAllItemsCompleteEvent(); 
      } 
     } 
     catch (Exception ex) 
     { 
      // if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the 
      // ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception? 
      // Do I need to call cts.Cancel here? 

      // I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that? 

      // do i need to call cts.Dispose() ? 

      MessageBox.Show(Logging.FormatException(ex)); 
     } 
     finally 
     { 

      if (myWcfProxy != null) 
      { 
      // possible race condition with the for-each threads here unless we wait for them to terminate. 
       if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted) 
        myWcfProxy.Abort(); 

       myWcfProxy.Close(); 
      } 

      // possible race condition with the for-each threads here unless we wait for them to terminate. 
      _requestedToStop = false; 

     } 

    } 

Każda pomoc będzie najbardziej ceniona. Dokumentacja MSDN mówi o ManualResetEventSlim i cancellationToken.WaitHandle. ale nie wiem, jak je do tego podłączyć, wydaje się, że trudno jest zrozumieć przykłady MSDN, ponieważ większość z nich nie ma zastosowania.

Odpowiedz

8

Zrobiłem kpiny z poniższego kodu, który może odpowiedzieć na twoje pytanie. Podstawowym celem jest równoległość wideł/równoległości z Parallel.ForEach, więc nie musisz się martwić o warunki wyścigu poza zadaniem równoległym (wątki wywołujące bloki, dopóki zadania nie zakończą się pomyślnie lub w inny sposób). Musisz tylko upewnić się, że używasz zmiennej LoopState (drugi argument do lambda), aby kontrolować stan twojej pętli.

Jeśli jakakolwiek iteracja pętli spowodowała nieobsługiwany wyjątek, ogólna pętla spowoduje wychwycenie wyjątku AggregateException na końcu.

Inne linki wspomnieć, że ten wątek:

Parallel.ForEach throws exception when processing extremely large sets of data

http://msdn.microsoft.com/en-us/library/dd460720.aspx

Does Parallel.ForEach limits the number of active threads?

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 
using System.ServiceModel; 

namespace Temp 
{ 
    public class Class1 
    { 
     private class MockWcfProxy 
     { 
      internal object ProcessIntervalConfiguration(string item) 
      { 
       return new Object(); 
      } 

      public CommunicationState State { get; set; } 
     } 

     private void myFunction() 
     { 

      IList<string> iListOfItems = new List<string>(); 
      // populate iListOfItems 

      CancellationTokenSource cts = new CancellationTokenSource(); 

      ParallelOptions po = new ParallelOptions(); 
      po.MaxDegreeOfParallelism = 20; // max threads 
      po.CancellationToken = cts.Token; 

      try 
      { 
       var myWcfProxy = new MockWcfProxy(); 

       if (Parallel.ForEach(iListOfItems, po, (item, loopState) => 
        { 
         try 
         { 
          if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) 
           loopState.Stop(); 

          // long running blocking WS call, check before and after 
          var response = myWcfProxy.ProcessIntervalConfiguration(item); 

          if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) 
           loopState.Stop(); 

          // perform some local processing of the response object 
         } 
         catch (Exception ex) 
         { 
          // cannot continue game over. 
          if (myWcfProxy.State == CommunicationState.Faulted) 
          { 
           loopState.Stop(); 
           throw; 
          } 

          // FYI you are swallowing all other exceptions here... 
         } 

         // else carry on.. 
         // raise some events and other actions that could all risk an unhanded error. 
        } 
       ).IsCompleted) 
       { 
        RaiseAllItemsCompleteEvent(); 
       } 
      } 
      catch (AggregateException aggEx) 
      { 
       // This section will be entered if any of the loops threw an unhandled exception. 
       // Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here 
       // to see those (if you want). 
      } 
      // Execution will not get to this point until all of the iterations have completed (or one 
      // has failed, and all that were running when that failure occurred complete). 
     } 

     private void RaiseAllItemsCompleteEvent() 
     { 
      // Everything completed... 
     } 
    } 
} 
+0

dzięki za wgląd. Powinienem powiedzieć, że w punkcie, w którym słusznie wskazujesz "połknięcie wszystkich innych wyjątków", robię wywołanie logujące, które rejestruje usługę Web Service lub wyjątek WCF po stronie klienta. Intencją jest, aby pętla była kontynuowana, jeśli wyjątek nie spowoduje unieważnienia proxy WCF. Przewiduję błędy przekroczenia limitu czasu lub wyjątki błędu po stronie serwera. z których dla tej konkretnej funkcjonalności potrzebna jest funkcja złagodzenia w haczyku. jednakże będziemy przeglądać plik dziennika i każdy taki wyjątek zostanie zbadany. – Terry

+0

Co mnie myli o Parallel.ForEach było to, że ja też założyłem, że powinno to być wywołanie blokujące, dopóki wszystkie wątki w puli nie zostaną ukończone (wyjątki są buforowane lub nie), jednak liczba wątków zgłaszanych jako działająca w punkcie przerwania ustawionym na przykład Twój blok (AggregateException aggEx) blokowałby się jako 20 wątków w przeglądarce wątków VS 2010. Dostałem więc sysinternals i spojrzałem na debugowany plik wykonywalny vshost, a także pokazano 22 wątki, w tym interfejs użytkownika i pompę komunikatów. – Terry

+0

Co więcej, zdarzenia, które zostały podniesione w pętli, rzuciły kolejne wyjątki po wykonaniu funkcji i uruchomieniu bloku finally. – Terry

Powiązane problemy