2009-11-01 16 views
21

Niedawno natknąłem się na implementację C# producenta/konsumenta. jest bardzo prosty i (przynajmniej dla mnie) bardzo elegancki.C# producent/konsument

wydaje wynaleziono około 2006, więc zastanawiałem się, czy ta implementacja jest
- bezpieczny
- nadal stosowane

Kod jest poniżej (oryginalny kod był określany w http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)

using System; 
using System.Collections; 
using System.Threading; 

public class Test 
{ 
    static ProducerConsumer queue; 

    static void Main() 
    { 
     queue = new ProducerConsumer(); 
     new Thread(new ThreadStart(ConsumerJob)).Start(); 

     Random rng = new Random(0); 
     for (int i=0; i < 10; i++) 
     { 
      Console.WriteLine ("Producing {0}", i); 
      queue.Produce(i); 
      Thread.Sleep(rng.Next(1000)); 
     } 
    } 

    static void ConsumerJob() 
    { 
     // Make sure we get a different random seed from the 
     // first thread 
     Random rng = new Random(1); 
     // We happen to know we've only got 10 
     // items to receive 
     for (int i=0; i < 10; i++) 
     { 
      object o = queue.Consume(); 
      Console.WriteLine ("\t\t\t\tConsuming {0}", o); 
      Thread.Sleep(rng.Next(1000)); 
     } 
    } 
} 

public class ProducerConsumer 
{ 
    readonly object listLock = new object(); 
    Queue queue = new Queue(); 

    public void Produce(object o) 
    { 
     lock (listLock) 
     { 
      queue.Enqueue(o); 

      // We always need to pulse, even if the queue wasn't 
      // empty before. Otherwise, if we add several items 
      // in quick succession, we may only pulse once, waking 
      // a single thread up, even if there are multiple threads 
      // waiting for items.    
      Monitor.Pulse(listLock); 
     } 
    } 

    public object Consume() 
    { 
     lock (listLock) 
     { 
      // If the queue is empty, wait for an item to be added 
      // Note that this is a while loop, as we may be pulsed 
      // but not wake up before another thread has come in and 
      // consumed the newly added object. In that case, we'll 
      // have to wait for another pulse. 
      while (queue.Count==0) 
      { 
       // This releases listLock, only reacquiring it 
       // after being woken up by a call to Pulse 
       Monitor.Wait(listLock); 
      } 
      return queue.Dequeue(); 
     } 
    } 
} 
+0

Do której wersji .NET dążysz? –

+0

Witaj lboregard, istnieje działający przykład, używając BlockingCollection do wdrożenia prostego wzorca Producent-konsument w [ten link] (https://stackoverflow.com/questions/733793/implementing-the-producer-consumer-pattern-in- c-sharp/47179576 # 47179576). –

Odpowiedz

28

Kod jest starszy - napisałem go jakiś czas przed wydaniem .NET 2.0. pojęcie z kolejki producenta/konsumenta jest sposób starsze niż że chociaż :)

Tak, że kod jest bezpieczny, o ile jestem świadomy - ale to ma pewne braki:

  • Jest nietypowy. Nowoczesna wersja z pewnością byłaby ogólna.
  • Nie ma możliwości zatrzymania kolejki. Jednym z prostych sposobów zatrzymania kolejki (aby wszystkie wątki konsumenckie wycofały się) jest posiadanie znacznika "zatrzymania pracy", który można umieścić w kolejce. Następnie dodajesz tyle żetonów, ile masz wątków. Alternatywnie masz osobną flagę, która wskazuje, że chcesz się zatrzymać. (Umożliwia to zatrzymanie innych wątków przed zakończeniem wszystkich bieżących prac w kolejce.)
  • Jeśli zadania są bardzo małe, jednorazowe zlecenie może nie być najbardziej wydajną czynnością.

Pomysły na kod są ważniejsze niż sam kod, szczerze mówiąc.

+0

pulsowanie niekoniecznie obudzić konsumenta, wiem, że nie jest prawdopodobne, ale teoretycznie tylko producenci mogą działać w nieskończoność. także ive mierzony monitor wait/pulse nie ma przewagi wydajności nad zdarzeniami waha się – TakeMeAsAGuest

+0

@TakeMeAsAGuest: Nie jesteś pewien, co masz na myśli mówiąc o pierwszej części - czy mówisz, że widziałeś przypadki, w których wątki czekały na monitorze, ale impuls nie zrobił nic ? Jeśli chodzi o wydajność - istnieje wiele, wiele różnych scenariuszy do rozważenia (sprzęt, oprogramowanie, liczba oczekujących wątków itp.). Zobaczę, czy uda mi się wykopać jakieś wzmianki w książce Joego Duffy'ego ... –

23

Możesz zrobić coś takiego, jak poniższy fragment kodu. Jest generyczny i ma metodę wpisywania wartości null (lub dowolnej flagi, której chcesz użyć), aby powiadomić wątki robocze, aby zakończyły pracę.

Kod pochodzi stąd: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 

namespace ConsoleApplication1 
{ 

    public class TaskQueue<T> : IDisposable where T : class 
    { 
     object locker = new object(); 
     Thread[] workers; 
     Queue<T> taskQ = new Queue<T>(); 

     public TaskQueue(int workerCount) 
     { 
      workers = new Thread[workerCount]; 

      // Create and start a separate thread for each worker 
      for (int i = 0; i < workerCount; i++) 
       (workers[i] = new Thread(Consume)).Start(); 
     } 

     public void Dispose() 
     { 
      // Enqueue one null task per worker to make each exit. 
      foreach (Thread worker in workers) EnqueueTask(null); 
      foreach (Thread worker in workers) worker.Join(); 
     } 

     public void EnqueueTask(T task) 
     { 
      lock (locker) 
      { 
       taskQ.Enqueue(task); 
       Monitor.PulseAll(locker); 
      } 
     } 

     void Consume() 
     { 
      while (true) 
      { 
       T task; 
       lock (locker) 
       { 
        while (taskQ.Count == 0) Monitor.Wait(locker); 
        task = taskQ.Dequeue(); 
       } 
       if (task == null) return;   // This signals our exit 
       Console.Write(task); 
       Thread.Sleep(1000);    // Simulate time-consuming task 
      } 
     } 
    } 
} 
+3

To jest jak dotąd najlepsze wdrożenie wzorców konsumenckich. Używałem tego ostatnio w mojej aplikacji wielowątkowej i działałem sprawnie nawet przy 1000-1500 wątkach. –

+0

Jestem pewna, że ​​czegoś tu brakuje (bycie "zardzewiałym" przy moich umiejętnościach C#), ale ten przykład nie wywołuje metody na zużytym 'zadaniu' (w przeciwieństwie do przywoływanego [kodu nietypowego] (http: // www.albahari.com/threading/part4.aspx#%5FWait%5Fand%5FPulse), który przechowuje i wywołuje delegata 'Action' w konsumencie.) Więc jaki jest sens tworzenia tego ogólnego, jeśli nie demonstruje inwokacji metody w zużyte zadanie? (Próbuję omijać ten przykład w stosunku do implementacji referencyjnej). – Inactivist

+2

@Inaktywista Możesz dodać akcję do Ctor, zapisać jako pole _action i wywołać to zaraz po wykonaniu Dequeue(), tak jak to: _ action (zadanie); – Marcus

1

Ostrzeżenie: Jeśli czytasz komentarze, musisz zrozumieć, moja odpowiedź jest źle :)

Istnieje możliwość impas w Twój kod.

wyobrazić sobie następujący przypadek, dla jasności, użyłem podejście pojedynczej nici, ale powinny być łatwe do konwersji do wielowątkowego ze snu:

// We create some actions... 
object locker = new object(); 

Action action1 =() => { 
    lock (locker) 
    { 
     System.Threading.Monitor.Wait(locker); 
     Console.WriteLine("This is action1"); 
    } 
}; 

Action action2 =() => { 
    lock (locker) 
    { 
     System.Threading.Monitor.Wait(locker); 
     Console.WriteLine("This is action2"); 
    } 
}; 

// ... (stuff happens, etc.) 

// Imagine both actions were running 
// and there's 0 items in the queue 

// And now the producer kicks in... 
lock (locker) 
{ 
    // This would add a job to the queue 

    Console.WriteLine("Pulse now!"); 
    System.Threading.Monitor.Pulse(locker); 
} 

// ... (more stuff) 
// and the actions finish now! 

Console.WriteLine("Consume action!"); 
action1(); // Oops... they're locked... 
action2(); 

Proszę dać mi znać, jeśli tego nie robi jakiś sens.

Jeśli jest to potwierdzone, odpowiedź na twoje pytanie brzmi "nie, to nie jest bezpieczne";) Mam nadzieję, że to pomoże.

+0

Nie widzę żadnego zakleszczenia z oryginalnym kodem plakatu, ponieważ elementy można dodawać do kolejki w momencie, gdy każdy konsument znajduje się poza blokadą (w takim przypadku konsument, przy następnym nabyciu blokady, zaobserwuje, że kolejka nie jest pusta i dlatego nie musi czekać) lub czeka na impuls (w takim przypadku puls gwarantuje obudzić konsumenta). – supercat

+0

Muszę przyznać, że w większości zapomniałem o tym, o czym myślałem. Odczytam, uważam, że problemem jest pętla while. Dzięki temu wątek konsumenta będzie zablokowany, dopóki nie znajdzie się coś w kolejce, co uniemożliwi producentowi blokowanie i sklejanie. Czy to ma jakiś sens? – DiogoNeves

+1

Metoda 'Monitor.Wait()' zwalnia blokadę, na którą czeka, aż do chwili, gdy jakiś inny wątek ją pulsuje. Nić konsumenta zostanie zablokowana w ramach metody "Pospiesz się", ale to nie przeszkodzi producentowi w dostarczaniu jej. Największym niebezpieczeństwem byłoby to, że jeśli producent zrezygnuje z generowania wszystkich danych, których oczekuje konsument, konsument będzie czekał na zawsze na rzeczy, które nigdy nie nadejdą. To może być obsługiwane przez np. posiadanie flagi 'AllDone'. – supercat

13

Wczoraj dowiedziałem się, jak działa Monitor.Wait/Pulse (i dużo o wątkach w ogóle) z powyższego fragmentu kodu i article series, z którego pochodzi. Tak jak mówi Jon, ma to wiele wartości i jest rzeczywiście bezpieczny i odpowiedni.

Jednak od.NET 4, istnieje producer-consumer queue implementation in the framework. Znalazłem to sam, ale do tego momentu robi wszystko, czego potrzebuję.