2010-06-13 7 views
5

Uświadomiłem sobie, że gdy próbuję przetwarzać elementy w równoległej kolejce przy użyciu wielu wątków, podczas gdy wiele wątków może w nich umieszczać elementy, idealnym rozwiązaniem byłoby użycie rozszerzeń reaktywnych z równoległymi strukturami danych.Jak używać IObservable/IObserver z ConcurrentQueue lub ConcurrentStack

Moje oryginalne pytanie jest:

While using ConcurrentQueue, trying to dequeue while looping through in parallel

Więc jestem ciekawy czy jest jakiś sposób, aby mieć LINQ (lub PLINQ) kwerendę, która będzie stale dequeueing jako przedmioty są wprowadzane do niego.

Próbuję uruchomić to w taki sposób, że mogę mieć liczbę producentów pchających do kolejki i ograniczoną liczbę wątków do przetworzenia, więc nie przeładowuję bazy danych.

Gdybym mógł użyć Rx framework, to spodziewam się, że mógłbym go uruchomić, a jeśli 100 elementów zostanie umieszczonych w ciągu 100ms, wtedy 20 wątków, które są częścią zapytania PLINQ, przetworzyłoby właśnie kolejkę.

Istnieją trzy technologie Staram się współpracować:

  1. Rx Framework (Reactive LINQ)
  2. Pling
  3. System.Collections.Concurrent struktur
+0

Czy możesz opracować sposób, w jaki oczekiwałeś, że Rx Ci tu pomoże? –

+0

@Richard Szalay - Jak już wspomniałem pod koniec, myślę, że nie muszę sondować, aby zobaczyć, czy coś jest w kolejce, mógłbym zareagować, gdy coś się tam znajdzie, więc jeśli duża liczba przedmiotów nagle się popychałem, mogłem przetworzyć kilka wątków. Próbuję unikać głosowania, co właśnie teraz robię. –

Odpowiedz

3

I don” t wiedzieć, jak najlepiej to osiągnąć z Rx, ale polecam po prostu używając BlockingCollection<T> i producer-consumer pattern. Główny wątek dodaje elementy do kolekcji, która domyślnie używa pod spodem ConcurrentQueue<T>. Następnie masz oddzielny Task, który rozpędzasz się przed tym, który używa Parallel::ForEach ponad BlockingCollection<T> do przetworzenia jak największej liczby elementów z kolekcji, co ma sens dla systemu jednocześnie. Teraz prawdopodobnie będziesz chciał również skorzystać z metody GetConsumingPartitioner biblioteki ParallelExtensions, aby była najbardziej wydajna, ponieważ domyślny program partycjonujący wygeneruje więcej narzutów, niż chcesz w tym przypadku. Możesz przeczytać więcej na ten temat od this blog post.

Gdy główny wątek jest gotowy zadzwonić CompleteAdding na BlockingCollection<T> i Task::Wait na Task ty obrócił się czekać na wszystkich konsumentów, aby zakończyć przetwarzanie wszystkich elementów w kolekcji.

+0

Głównym haczykiem do użycia 'BlockingCollection' jest blokowanie wątku zużywającego. Obserwowalny wzorzec zajmowałby tylko wątek, gdy było coś do przetworzenia. –

6

Drew ma rację, myślę, że ConcurrentQueue, nawet jeśli brzmi idealnie do tego zadania, jest w rzeczywistości podstawową strukturą danych używaną przez BlockingCollection. Wydaje mi się, że jest to bardzo ważne. Zapoznaj się z rozdziałem 7 tej książki * http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 , która wyjaśni, jak korzystać z BlockingCollection i ma wielu producentów i wielu klientów, z których każdy bierze "kolejkę". Będziesz chciał spojrzeć na metodę "GetConsumingEnumerable()" i ewentualnie po prostu wywołać .ToObservable() na tym.

* Reszta książki jest dość przeciętna.

edit:

Oto przykładowy program, który myślę, że robi to, co chcesz?

class Program 
{ 
    private static ManualResetEvent _mre = new ManualResetEvent(false); 
    static void Main(string[] args) 
    { 
     var theQueue = new BlockingCollection<string>(); 
     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000)); 


     LoadQueue(theQueue, "Producer A"); 
     LoadQueue(theQueue, "Producer B"); 
     LoadQueue(theQueue, "Producer C"); 

     _mre.Set(); 

     Console.WriteLine("Processing now...."); 

     Console.ReadLine(); 
    } 

    private static void ProcessNewValue(string value, string consumerName, int delay) 
    { 
     Thread.SpinWait(delay); 
     Console.WriteLine("{1} consuming {0}", value, consumerName); 
    } 

    private static void LoadQueue(BlockingCollection<string> target, string prefix) 
    { 
     var thread = new Thread(() => 
            { 
             _mre.WaitOne(); 
             for (int i = 0; i < 100; i++) 
             { 
              target.Add(string.Format("{0} {1}", prefix, i)); 
             } 
            }); 
     thread.Start(); 
    } 
} 
+0

To naprawdę .... genialny człowiek ... łączący Rx z BlockingCollection. Wow .. możesz nawet zrobić z tym potokiem: https://msdn.microsoft.com/en-us/library/ff963548.aspx – Oooogi

Powiązane problemy