2013-07-24 11 views
6

I przyjął moje wykonanie równoległego/konsumenta w oparciu o kod w this questionParallel.ForEach utknęły w martwym punkcie, gdy zintegrowany z BlockingCollection

class ParallelConsumer<T> : IDisposable 
{ 
    private readonly int _maxParallel; 
    private readonly Action<T> _action; 
    private readonly TaskFactory _factory = new TaskFactory(); 
    private CancellationTokenSource _tokenSource; 
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>(); 
    private Task _task; 

    public ParallelConsumer(int maxParallel, Action<T> action) 
    { 
     _maxParallel = maxParallel; 
     _action = action; 
    } 

    public void Start() 
    { 
     try 
     { 
      _tokenSource = new CancellationTokenSource(); 
      _task = _factory.StartNew(
       () => 
       { 
        Parallel.ForEach(
         _entries.GetConsumingEnumerable(), 
         new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token }, 
         (item, loopState) => 
         { 
          Log("Taking" + item); 
          if (!_tokenSource.IsCancellationRequested) 
          { 
           _action(item); 
           Log("Finished" + item); 
          } 
          else 
          { 
           Log("Not Taking" + item); 
           _entries.CompleteAdding(); 
           loopState.Stop(); 
          } 
         }); 
       }, 
       _tokenSource.Token); 
     } 
     catch (OperationCanceledException oce) 
     { 
      System.Diagnostics.Debug.WriteLine(oce); 
     } 
    } 

    private void Log(string message) 
    { 
     Console.WriteLine(message); 
    } 

    public void Stop() 
    { 
     Dispose(); 
    } 

    public void Enqueue(T entry) 
    { 
     Log("Enqueuing" + entry); 
     _entries.Add(entry); 
    } 

    public void Dispose() 
    { 
     if (_task == null) 
     { 
      return; 
     } 

     _tokenSource.Cancel(); 
     while (!_task.IsCanceled) 
     { 
     } 

     _task.Dispose(); 
     _tokenSource.Dispose(); 
     _task = null; 
    } 
} 

A oto kod testowy

class Program 
{ 
    static void Main(string[] args) 
    { 
     TestRepeatedEnqueue(100, 1); 
    } 

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount) 
    { 
     bool[] flags = new bool[itemCount]; 
     var consumer = new ParallelConsumer<int>(parallelCount, 
               (i) => 
               { 
                flags[i] = true; 
               } 
      ); 
     consumer.Start(); 
     for (int i = 0; i < itemCount; i++) 
     { 
      consumer.Enqueue(i); 
     } 
     Thread.Sleep(1000); 
     Debug.Assert(flags.All(b => b == true)); 



    } 
} 

Test zawsze kończy się niepowodzeniem - to zawsze zatrzymywało się na około 93 pozycji ze 100 testowanych. Każdy pomysł, która część mojego kodu spowodowała ten problem i jak to naprawić?

Odpowiedz

8

Nie możesz użyć Parallel.Foreach() z BlockingCollection.GetConsumingEnumerable(), jak odkryłeś.

Dla wyjaśnienia można znaleźć w tym blogu:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

To blog zapewnia również kod źródłowy metody zwanej GetConsumingPartitioner() których można użyć, aby rozwiązać ten problem.

Wyciąg z bloga: GetConsumingEnumerable realizacja

BlockingCollection jest z wykorzystaniem wewnętrznego synchronizacji BlockingCollection, które już obsługuje wielu klientów jednocześnie, ale ForEach tym nie wie, a jej logika przeliczalny-partycjonowania musi także brać blokady podczas uzyskiwania dostępu do przeliczalności.

W związku z tym jest tu więcej synchronizacji, niż jest to konieczne, co powoduje potencjalnie niezauważalne działanie.

[Również] algorytm partycjonowania użyty domyślnie przez oba Parallel.ForEach i PLINQ używać chunking w celu zminimalizowania kosztów synchronizacji: zamiast zabrać blokadę raz na element, to zajmie zamek, chwyci grupę elementów (kawałek), a następnie zwolnij blokadę.

Chociaż ten projekt może pomóc w ogólnej przepustowości, w scenariuszach, które koncentrują się bardziej na małym opóźnieniu, to chunking może być zaporowy.

+0

Dzięki. To rozwiązało mój problem. W każdym razie, kiedy testuję dalej, kod w moim OP nie zawiedzie, gdy numer pozycji jest członkiem tej sekwencji, [A200672] (http://oeis.org/A200672) np. 1, 2, 3, 5, 7, 9, 13, 17, 21, 29, 37, 45, 61, 77, 93, ... Jakiś pomysł dlaczego? po prostu ciekawy. – user69715

+0

@ user69715 To dziwne zachowanie, które znalazłem, gdy próbowałem zrobić coś podobnego. Przypuszczam, że jest to po prostu dziwna interakcja między Parallel.ForEach() i podstawową BlockingCollection, ale naprawdę nie mogę tego wyjaśnić. –

2

Powodem awarii jest z powodu następującego powodu jak wyjaśniono here

Algorytm podziału stosowane domyślnie zarówno Parallel.ForEach i wykorzystania PLINQ wyrwy w celu zminimalizowania kosztów synchronizacji: raczej niż zabranie blokady raz na element, to zajmie zamek, chwyci grupę elementów (kawałek), a następnie zwolni blokadę.

Aby zmusić go do pracy, można dodać metodę na klasy ParallelConsumer<T> aby wskazać, że dodawanie jest zakończone, jak poniżej

public void StopAdding() 
    { 
     _entries.CompleteAdding(); 
    } 

I teraz nazywają tę metodę po for loop, jak poniżej

 consumer.Start(); 
     for (int i = 0; i < itemCount; i++) 
     { 
      consumer.Enqueue(i); 
     } 
     consumer.StopAdding(); 

W przeciwnym razie, Parallel.ForEach() będzie czekać na osiągnięcie progu, aby pobrać porcję i rozpocząć przetwarzanie.

+0

rzecz jest w produkcji, zadania są ustawiane w kolejce w sposób ciągły, więc oznaczanie "StopAdding" nie pomaga. Dziękuję za odpowiedź +1, ale przyjdę z inną odpowiedzią. – user69715

+0

Ups, wygląda na to, że nie mogę jeszcze dodawać +1 – user69715

Powiązane problemy