2012-02-27 7 views
14

Interesuje mnie struktura danych identyczna z Java BlockingQueue, z tym wyjątkiem, że musi mieć możliwość grupowania obiektów w kolejce. Innymi słowy, chciałbym, aby producent mógł umieścić obiekty w kolejce, ale mieć blok konsumenta na take(), aż kolejka osiągnie określony rozmiar (wielkość partii).Java BlockingQueue z dozowaniem?

Następnie, po osiągnięciu przez kolejkę wielkości partii, producent musi zablokować na put(), aż konsument zużyje wszystkie elementy w kolejce (w takim przypadku producent rozpocznie produkcję ponownie, a blok konsumenta do partii zostanie ponownie osiągnięty).

Czy istnieje podobna struktura danych? Czy powinienem to napisać (co nie przeszkadza mi), po prostu nie chcę tracić czasu, jeśli coś tam jest.


UPDATE

Może wyjaśnienie rzeczy trochę:

sytuacja zawsze będzie wyglądał następująco. Może istnieć wielu producentów dodających elementy do kolejki, ale nigdy nie będzie więcej niż jednego konsumenta, który będzie pobierał elementy z kolejki.

Problem polega na tym, że wiele z tych konfiguracji jest równoległych i seryjnych. Innymi słowy, producenci produkują przedmioty dla wielu kolejek, podczas gdy konsumenci samodzielnie mogą być również producentami. Można to łatwiej uznać za skierowany wykres producentów, konsumentów i konsumentów.

Powód, dla którego producenci powinni blokować, dopóki kolejki nie są puste (@Peter Lawrey), ponieważ każdy z nich będzie działał w wątku. Jeśli zostawisz je po prostu w celu wytworzenia, gdy przestrzeń stanie się dostępna, skończy się sytuacja, w której zbyt wiele wątków będzie próbowało przetwarzać zbyt wiele rzeczy naraz.

Może połączenie tego z usługą wykonawczą może rozwiązać problem?

Odpowiedz

9

Proponuję użyć BlockingQueue.drainTo(Collection, int). Możesz użyć go z take(), aby uzyskać minimalną liczbę elementów.

Zaletą stosowania tego podejścia jest to, że rozmiar partii rośnie dynamicznie wraz z obciążeniem, a producent nie musi blokować, gdy konsument jest zajęty. tj. sam się optymalizuje pod kątem opóźnień i przepustowości.


Aby zrealizować dokładnie tak, jak zapytał (co moim zdaniem jest to zły pomysł) można użyć SynchronousQueue z ruchliwej zużywającego wątku.

to gwint czasochłonne robi

list.clear(); 
while(list.size() < required) list.add(queue.take()); 
// process list. 

producent będzie blokować gdy kiedykolwiek konsument jest zajęty.

+0

Chcę, aby producent blokował, gdy konsument jest zajęty. –

+1

Ciekawe, że większość systemów robi duże starania, aby tego uniknąć. ;) Druga sugestia zrobi dokładnie to. Jeśli chcesz zablokować producenta, dlaczego używasz wielu wątków? Czy nie byłoby prostsze, gdyby "producent" był procesorem/konsumentem, a jednocześnie wydaje się, że nie chcesz, aby działały w tym samym czasie. –

+0

Proszę zobaczyć moją aktualizację. Projekt wymaga od producentów blokowania, tak aby liczba wykonywanych wątków była niska. Rozwiązuje również problem zależności między producentami a konsumentami. –

1

Nie jestem tego świadomy. Jeśli dobrze rozumiem, chcesz, aby producent działał (gdy konsument jest zablokowany), dopóki nie wypełni kolejki lub konsumenta do pracy (dopóki producent blokuje), dopóki nie wyczyści kolejki. Jeśli tak jest, mogę zasugerować, że nie potrzebujesz struktury danych, ale mechanizm blokujący jedną ze stron, podczas gdy druga działa w muteksie. Możesz zablokować obiekt i wewnętrznie mieć logikę, czy jest pełna, czy pusta, aby zwolnić blokadę i przekazać ją drugiej stronie. Krótko mówiąc, powinieneś napisać to sam :)

1

Brzmi to tak, jak działa RingBuffer we wzorcu LMAX Disruptor. Aby uzyskać więcej informacji, patrz http://code.google.com/p/disruptor/.

Bardzo przybliżonym wyjaśnieniem jest twoja główna struktura danych to RingBuffer. Producenci umieszczają kolejno dane w buforze pierścieniowym, a konsumenci mogą pobrać tyle danych, ile producent umieścił w buforze (czyli w zasadzie wsadowo). Jeśli bufor jest pełny, producent blokuje, aż konsument skończy i zwolni miejsca w buforze.

2

Oto szybka (= prosta, ale nie do końca przetestowana) implementacja, która może być odpowiednia dla twoich żądań - powinieneś być w stanie ją rozszerzyć, aby obsługiwać pełny interfejs kolejek, jeśli potrzebujesz.

celu zwiększenia wydajności można przełączyć się ReentrantLock Zamiast używać słowa kluczowego „zsynchronizowany” ..

public class BatchBlockingQueue<T> { 

    private ArrayList<T> queue; 
    private Semaphore readerLock; 
    private Semaphore writerLock; 
    private int batchSize; 

    public BatchBlockingQueue(int batchSize) { 
     this.queue = new ArrayList<>(batchSize); 
     this.readerLock = new Semaphore(0); 
     this.writerLock = new Semaphore(batchSize); 
     this.batchSize = batchSize; 
    } 

    public synchronized void put(T e) throws InterruptedException { 
     writerLock.acquire(); 
     queue.add(e); 
     if (queue.size() == batchSize) { 
      readerLock.release(batchSize); 
     } 
    } 

    public synchronized T poll() throws InterruptedException { 
     readerLock.acquire(); 
     T ret = queue.remove(0); 
     if (queue.isEmpty()) { 
      writerLock.release(batchSize); 
     } 
     return ret; 
    } 

} 

Mam nadzieję, że okaże się przydatny.