2009-07-31 11 views
26

Mam klasyczny problem wątku przesuwającego zdarzenia do przychodzącej kolejki drugiego wątku. Tylko tym razem jestem bardzo zainteresowany osiągami. Co chcę osiągnąć to:Kolejka współbieżna i blokująca w Javie

  • Chcę mieć równoczesny dostęp do kolejki, producent popycha, odbiornik wyskakuje.
  • Gdy kolejka jest pusta, chcę, aby konsument zablokował się w kolejce, czekając na producenta.

Mój pierwszy pomysł polegał na użyciu LinkedBlockingQueue, ale wkrótce zdałem sobie sprawę, że nie jest to współbieżne, a wydajność uległa pogorszeniu. Z drugiej strony używam teraz ConcurrentLinkedQueue, ale nadal płacę za każdą publikację kosztami: wait()/notify(). Ponieważ konsument, po znalezieniu pustej kolejki, nie blokuje, muszę zsynchronizować i wait() na zamku. Z drugiej strony producent musi uzyskać tę blokadę i notify() przy każdej publikacji. Ogólny wynik jest taki, że płacę kosztem sycnhronized (lock) {lock.notify()} w każdej publikacji, nawet jeśli nie jest to konieczne.

To, co w tym przypadku jest potrzebne, to kolejka blokująca i współbieżna. Wyobrażam sobie operację push(), która działa tak jak w ConcurrentLinkedQueue, z dodatkowym notify() obiektem, gdy element pchany jest pierwszy na liście. Takie sprawdzanie uważam za już istniejące w ConcurrentLinkedQueue, ponieważ pchanie wymaga połączenia z kolejnym elementem. Tak więc byłoby to znacznie szybsze niż synchronizacja za każdym razem na zewnętrznej blokadzie.

Czy coś takiego jest dostępne/rozsądne?

+0

Dlaczego uważasz, że java.util.concurrent.LinkedBlockingQueue nie jest współbieżne? Myślę, że jest idealnie równoległy, widząc jego javadoc i kod źródłowy. Ale nie mam pojęcia o wydajności. – Rorick

+0

Zobacz także http://stackoverflow.com/questions/1301691/java-queue-implementations-which-one – Vadzim

Odpowiedz

11

Myślę, że możesz trzymać się java.util.concurrent.LinkedBlockingQueue niezależnie od swoich wątpliwości. Jest równoległy. Chociaż nie mam pojęcia o jego wydajności. Prawdopodobnie inna wersja BlockingQueue będzie ci pasować. Nie ma ich zbyt wielu, więc przygotuj testy wydajności i zmierzyć.

+0

Mówię to tylko obserwując moją przepustowość, zmniejszając około 8 razy w porównaniu do ConcurrentLinkedQueue. Domyśliłem się, że blokowało to wszystko wewnętrznie, aby zapewnić bezpieczeństwo nici. Masz jednak rację, równie dobrze może być tylko gorsza wydajność w porównaniu do ConcurrentLinkedQueue. Jaki rodzaj bije przyczynę ;-) – Yiannis

+1

Których kiedykolwiek używasz w kolejce, w końcu użyjesz blokady, aby podtrzymać czekanie na nowy wpis. (chyba, że ​​jesteś zajęty) Blokada naprawdę nie jest aż tak droga (około 0,5 mikro-sekund dla blokady), więc jeśli jest to problem z wydajnością, możesz mieć problem z projektem, np. stwórz mniej zadań/znajdź sposób, aby dodać mniej obiektów do kolejki/uzupełnij swoją pracę. –

+6

Uwaga, jeśli chcesz uzyskać wyższą przepustowość, użyj metody drainTo() na swojej LinkedBlcokingQueue, zmierzyliśmy prawie 500% większą przepustowość w jednej z naszych aplikacji, zamiast pobrać() jeden i jeden element z kolejki – nos

2

Oto list of classes implementing BlockingQueue.

Polecam sprawdzenie pod numerem SynchronousQueue.

Podobnie jak w @Rorick wspomniano w jego komentarzu, uważam, że wszystkie te wdrożenia są równoległe. Myślę, że twoje obawy z LinkedBlockingQueue mogą być nie na miejscu.

+1

SynchronousQueue nie jest tym, czego chcę, ponieważ zablokuje mojego producenta za każdym razem, gdy próbuje opublikować. – Yiannis

+0

SynchronousQueue wygląda bardziej jak potok, niż kolejka. Wygląda na to, że nie może zawierać zadań oczekujących na przetwarzanie, tylko "popychać" pojedyncze zadania od producenta do konsumenta. – Rorick

+0

Hehe, przepraszam. – jjnguy

5

Proponuję spojrzeć na ThreadPoolExecutor newSingleThreadExecutor. Zajmie się on utrzymywaniem zleconych zadań dla ciebie, a jeśli prześlesz do executora kod Callables, będziesz w stanie uzyskać również pożądane działanie blokujące.

3

Używam ArrayBlockingQueue ilekroć muszę przekazywać dane z jednego wątku do drugiego. Korzystanie z metod put i take (które będą blokowane, jeśli są pełne/puste).

4

Można spróbować LinkedTransferQueue z jsr166: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/

Spełnia wymagania i mają mniejsze obciążenie dla operacji oferta/odpytywania. Jak widzę z kodu, gdy kolejka nie jest pusta, wykorzystuje ona operacje atomowe do odpytywania elementów. A gdy kolejka jest pusta, obraca się przez jakiś czas i zaparkuje wątek, jeśli się nie powiedzie. Myślę, że to może pomóc w twoim przypadku.

6

Podobna do tej odpowiedzi https://stackoverflow.com/a/1212515/1102730, ale nieco inna .. Skończyło się na użyciu ExecutorService. Możesz utworzyć instancję za pomocą Executors.newSingleThreadExecutor(). Potrzebowałem równoległej kolejki do odczytu/zapisu BufferedImages do plików, a także atomowości z odczytami i zapisami. Potrzebuję tylko jednego wątku, ponieważ plik IO jest o rzędy wielkości szybszy od źródła, net IO. Co więcej, bardziej przejmowałem się atomowością działań i poprawnością niż wydajnością, ale to podejście można również wykonać za pomocą wielu wątków w puli, aby przyspieszyć działanie.

Aby uzyskać obraz (try-catch-Wreszcie pominięta):

Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() { 
    @Override 
     public BufferedImage call() throws Exception { 
      ImageInputStream is = new FileImageInputStream(file); 
      return ImageIO.read(is); 
     } 
    }) 

image = futureImage.get(); 

Aby zapisać obraz (try-catch-Wreszcie pominięta):

Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() { 
    @Override 
    public Boolean call() { 
     FileOutputStream os = new FileOutputStream(file); 
     return ImageIO.write(image, getFileFormat(), os); 
    } 
}); 

boolean wasWritten = futureWrite.get(); 

Ważne jest, aby pamiętać, że powinien przepłukać i zamknąć strumienie w bloku finally. Nie wiem, jak to działa w porównaniu do innych rozwiązań, ale jest dość uniwersalny.