2009-10-28 14 views
23

Napotkałem dwukrotnie problem polegający na tym, że wątek producenta tworzy N elementów roboczych, przesyła je do ExecutorService, a następnie musi poczekać, aż wszystkie N zostaną przetworzone.Elastyczne CountDownLatch?

Ostrzeżenia

  • N nie jest znana z góry. Gdyby tak było, po prostu utworzyłbym CountDownLatch, a następnie miał wątek producenta await(), aż wszystkie prace zostały zakończone.
  • Używanie CompletionService jest niewłaściwe, ponieważ mimo, że mój wątek producent musi blokować (tzn wywołując take()) nie żaden sposób sygnalizowania, że ​​wszystkie prace są kompletne, aby spowodować wątek producent przestanie czekać.

My obecne rozwiązanie preferowane jest użycie licznika całkowitą oraz przyrostu tym, gdy element pracy jest złożony i ubytek, gdy element roboczy jest przetwarzany. Po przejściu wszystkich zadań N mój wątek producenta będzie musiał poczekać na blokadę, sprawdzając, czy counter == 0, gdy jest powiadomiony. Gwinty konsumenckie będą musiały powiadomić producenta, jeśli zmniejszyło licznik, a nowa wartość to 0.

Czy istnieje lepsze podejście do tego problemu lub czy istnieje odpowiednia konstrukcja w java.util.concurrent Powinienem raczej użyć niż "toczyć własne"?

Z góry dziękuję.

+0

W którym momencie producent wie, ile jest elementów roboczych? Kiedy ostatni element został wyprodukowany? –

+0

Twoje obecne rozwiązanie może ucierpieć z powodu wyścigu: wyprodukuj przedmiot 1 -> counter ++ -> element procesu 1 -> counter-- -> wyprodukuj pozycję 2. Ponieważ licznik został zmniejszony, zanim producent wyprodukuje następny element, producent myśli, że jest gotowy. –

+0

@rwwilden: Masz rację, że taki scenariusz może wystąpić. Jednak mój producent sprawdzał/czekał na liczniku po przesłaniu * wszystkich * elementów pracy, więc nie stanowi to stanu wyścigu w tym konkretnym przypadku. – Adamski

Odpowiedz

24

java.util.concurrent.Phaser wygląda na to, że będzie dobrze działać. Planowane jest wydanie na platformie Java 7, ale najbardziej stabilną wersję można znaleźć na stronie grupy zainteresowań jsr166.

Fazer to uwielbiona Cykliczna Bariera. Możesz zarejestrować liczbę stron N i kiedy będziesz gotowy czekać na ich awans w konkretnej fazie.

Szybki przykład jak to działa:

final Phaser phaser = new Phaser(); 

public Runnable getRunnable(){ 
    return new Runnable(){ 
     public void run(){ 
      ..do stuff... 
      phaser.arriveAndDeregister(); 
     } 
    }; 
} 
public void doWork(){ 
    phaser.register();//register self 
    for(int i=0 ; i < N; i++){ 
     phaser.register(); // register this task prior to execution 
     executor.submit(getRunnable()); 
    } 
    phaser.arriveAndAwaitAdvance(); 
} 
+3

Cool - Dzięki; Wygląda to dokładnie na to, czego szukam ... Nie mogę uwierzyć, że nazwali to Phaser. – Adamski

+2

Szkoda, że ​​nie udało mi się zdobyć x100 więcej razy. Wielkie dzięki. –

+2

Jeśli punktem 'getRunnable()' jest przedstawienie zadania, które tylko sygnalizuje fazer, a następnie kończy się, chcesz wywołać 'phaser.arriveAndDeregister()' i ** nie ** 'phaser.arrive()', ponieważ w przeciwnym razie gdy zadanie nadrzędne wywołuje 'phaser.arriveAndAwaitAdvance()' po raz drugi będzie zakleszczone, ponieważ zadanie będzie czekało na zakończone zadania, które są nadal rejestrowane w phaserze. –

2

Można oczywiście użyć CountDownLatch chroniony przez AtomicReference tak, że zadania się owinięty w ten sposób:

public class MyTask extends Runnable { 
    private final Runnable r; 
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; } 

    public void run() { 
     r.run(); 
     while (l.get() == null) Thread.sleep(1000L); //handle Interrupted 
     l.get().countDown(); 
    } 
} 

Wskazówka że zadania uruchomić swoją pracę, a następnie wirowania aż odliczanie jest set (tzn. znana jest całkowita liczba zadań). Po ustawieniu odliczania odliczają i znikają. Należą się składać w następujący sposób:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>(); 
executor.submit(new MyTask(r, l)); 

Po momencie jej tworzenia/złożenia pracy, gdy wiesz, ile zadań utworzono:

latch.set(new CountDownLatch(nTasks)); 
latch.get().await(); 
+0

Dlaczego zawijanie CountDownLatch z AtomicReference pomaga w tym przypadku? Odwołanie nie musi być chronione, ponieważ jest przekazywane do MyTask w jego konstruktorze i nigdy później się nie zmienia. – Avi

+0

Pamiętaj też, aby obsługiwać arbitralne throwables (Wyjątki i błędy w czasie wykonywania) w metodzie run(), przynajmniej przez umieszczenie coundDown() w końcu {} bloku. – Avi

+0

Oczywiście pytanie dotyczyło również przypadku, gdy nie znasz liczby zadań w czasie tworzenia zadania. – Avi

0

I zakładamy, że producent nie nie musi wiedzieć, kiedy kolejka jest pusta, ale musi wiedzieć, kiedy ostatnie zadanie zostało zakończone.

Dodałbym do konsumenta metodę waitforWorkDone(producer). Producent może dodać swoje N zadań i wywołać metodę wait. Metoda wait blokuje wątek przychodzący, i kolejka robocza nie jest pusta i nie są wykonywane żadne zadania.

Nitki konsumenckie notifyAll() w funkcji waitfor lock iff jej zadanie zostało zakończone, kolejka jest pusta i żadne inne zadanie nie jest wykonywane.

1

Użyłem się ExecutorCompletionService na coś takiego:

ExecutorCompletionService executor = ...; 
int count = 0; 
while (...) { 
    executor.submit(new Processor()); 
    count++; 
} 

//Now, pull the futures out of the queue: 
for (int i = 0; i < count; i++) { 
    executor.take().get(); 
} 

Wiąże utrzymując kolejkę zadań, które zostały złożone, więc jeśli lista jest dowolnie długo, metoda może być lepiej.

Ale upewnij się, że do koordynacji używasz AtomicInteger, dzięki czemu będziesz mógł go zwiększyć w jednym wątku i zmniejszyć go w wątkach roboczych.

+0

Jak możesz "czekać na wypowiedzenie"? Co zamyka twoją usługę? Ponadto, nie możesz ponownie użyć usługi uzupełniania w swojej odpowiedzi, ponieważ możesz czekać na zadania, które logicznie stanowiły część * innego zestawu *. I (jak wspomniano w komentarzach do mojej odpowiedzi), nadal musisz znać 'nTasks' w pewnym momencie –

+0

@oxbow_lakes: True. Dlatego mam go tylko jako opcję (którą zredagowałem, ponieważ nie pomaga w tym przypadku). Zakładam, że ta usługa zakończenia nie będzie używana do innego zestawu zadań w tym samym czasie (nakładania się) z tym. Można go użyć później, z tego samego wątku. – Avi

+0

Zaletą tego sposobu śledzenia liczby zadań, zamiast korzystania z AtomicBoolean, jest to, że nie musisz ręcznie obsługiwać funkcji wait() i notify() - kolejka zajmie się tym. Wszystko, co musisz śledzić, to liczba elementów w kolejce. – Avi

0

Co ty opisany jest dość podobny do używania standardowego semafora, ale używany jest "wstecz".

  • Twój semaforów zaczyna się od 0 zezwoleń
  • Każda jednostka pracy uwalnia jedno zezwolenie, gdy zakończy
  • zablokować czekając nabyć N pozwala

Plus masz elastyczność tylko nabycia M < N zezwoleń, które są przydatne, jeśli chcesz sprawdzić stan pośredni. Na przykład testuję asynchroniczną, ograniczoną kolejkę komunikatów, więc oczekuję, że kolejka będzie pełna dla jakiegoś M < N, więc mogę pobrać M i sprawdzić, czy kolejka jest rzeczywiście pełna, a następnie uzyskać pozostałe pozwolenia N - M po pobraniu wiadomości od kolejka.

Powiązane problemy