2013-05-16 16 views
5

Mam N pracowników, którzy dzielą kolejkę elementów do obliczenia. W każdej iteracji każdy pracownik usuwa element z kolejki i może wytworzyć więcej elementów do obliczenia, które zostaną umieszczone w tej samej kolejce. Zasadniczo każdy producent jest również konsumentem. Obliczenia kończą się, gdy w kolejce nie ma elementów, a wszyscy pracownicy zakończyli obliczanie bieżącego elementu (dlatego nie można już tworzyć żadnych elementów do obliczenia). Chcę uniknąć dyspozytora/koordynatora, aby pracownicy byli skoordynowani. Jaki jest najlepszy wzór, aby pracownik mógł sprawdzić, czy warunki zatrzymania są ważne, a tym samym zatrzymać obliczenia w imieniu innych?Producent Java-konsument z warunkiem zatrzymania

Na przykład, jeśli wszystkie wątki po prostu zrobić tego cyklu, gdy elementy są obliczane, to spowodowałoby wszystkie wątki są zablokowane wiecznie:

while (true) { 
    element = queue.poll(); 
    newElements[] = compute(element); 
    if (newElements.length > 0) { 
     queue.addAll(newElements); 
    } 
} 

Odpowiedz

6

Utrzymanie liczby aktywnych wątków. Pętla odpytywania

public class ThreadCounter { 
    public static final AtomicInteger threadCounter = new AtomicInteger(N); 
    public static final AtomicInteger queueCounter = new AtomicInteger(0); 
    public static final Object poisonPill = new Object(); 
    public static volatile boolean cancel = false; // or use a final AomticBoolean instead 
} 

aĹ wątki powinny wyglądać następująco (Zakładam, że używasz BlockingQueue)

while(!ThreadCounter.cancel) { 
    int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking 
    if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) { 
     ThreadCounter.cancel = true; 
     queue.offer(ThreadCounter.poisonPill); 
    } else { 
     Object obj = queue.take(); 
     ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking 
     ThreadCounter.queueCounter.decrementAndGet(); 
     if(obj == ThreadCounter.poisonPill) { 
      queue.offer(obj); // send the poison pill back through the queue so the other threads can read it 
      continue; 
     } 
    } 
} 

Jeśli gwint ma zamiar blokować na BlockingQueue to zmniejsza Licznik; jeśli wszystkie wątki już czekają w kolejce (czyli counter == 0), to ostatni wątek ustawia wartość true, a następnie wysyła trującą pigułkę przez kolejkę, aby obudzić pozostałe wątki; każdy wątek widzi pigułkę trucizny, wysyła ją z powrotem do kolejki, aby obudzić pozostałe wątki, a następnie zamyka pętlę, gdy widzi, że jest ustawiona na wartość true.

Edit: Usunąłem wyścig danych dodając queueCounter który utrzymuje rachunek liczby obiektów w kolejce (oczywiście będziesz też trzeba dodać queueCounter.incrementAndGet() wezwanie do gdziekolwiek jesteś dodawania obiektów do kolejki). Działa to w następujący sposób: jeśli threadCount == 0, ale queueCount != 0, oznacza to, że wątek właśnie usunął element z kolejki, ale nie nazwał jeszcze threadCount.getAndIncrement, a zatem zmienna anulowania jest , a nie ustawiona na true. Ważne jest, aby wywołanie threadCount.getAndIncrement poprzedzało wywołanie queueCount.getAndDecrement, w przeciwnym razie nadal będziesz mieć wyścig danych. Nie powinno mieć znaczenia, w jakiej kolejności zadzwonisz pod numer queueCount.getAndIncrement, ponieważ nie będziesz tego przeplatał wywołaniem threadCount.getAndDecrement (ten ostatni zostanie wywołany na końcu pętli, pierwszy zostanie wywołany na początku pętli).

Należy pamiętać, że nie można po prostu użyć queueCount, aby określić, kiedy zakończyć proces, ponieważ wątek może nadal być aktywny bez umieszczania jakichkolwiek danych w kolejce - innymi słowy, queueCount będzie wynosił zero, ale być niezerowe, gdy wątek zakończy swoją bieżącą iterację.

Zamiast wielokrotnie przesyłać poisonPill przez kolejkę, można zamiast tego przesłać anulujący wątek wysyłając (N-1) poisonPills przez kolejkę. Po prostu zachowaj ostrożność, jeśli używasz tego podejścia, używając innej kolejki, ponieważ niektóre kolejki (np. Prosta usługa kolejkowa firmy Amazon) mogą zwracać wiele elementów na odpowiedniki ich metod take. W takim przypadku musisz wielokrotnie przesyłać przez numer poisonPill, aby zapewnić że wszystko się kończy.

Dodatkowo, zamiast używać while(!cancel) pętli, można użyć while(true) pętlę i złamać, gdy pętla wykryje poisonPill

+0

Chyba anulować powinien być lotny? – marcorossi

+0

co się dzieje, gdy wątek jest między wywołaniem queue.take() a incrementAndGet(), a drugi wątek ocenia stan count == 0 i queue.isEmpty()? – marcorossi

+0

@marcorossi yeah 'cancel' powinno być zmienne, więc zmiana jest widoczna we wszystkich wątkach. Na przykład wszystkie wątki widzą najnowszy zapis w polu 'cancel', gwarantuje to, że odczyt i zapis są atomowe. Zmieniono odpowiedź. – adamjmarkham