2013-01-31 12 views
11

Oto mój przypadek użycia.Używanie Spring @Scheduled i @Sync razem

Istniejący system aktualizuje tabelę kolejek baz danych QUEUE.

Chcę zaplanowane powracającym zadanie, które - sprawdza zawartość kolejki - jeśli istnieją wiersze w tabeli to blokuje wiersz i robi trochę pracy - usuwa wiersz w KOLEJCE

jeśli poprzednia praca nadal działa, a następnie zostanie utworzony nowy wątek do wykonania pracy. Chcę skonfigurować maksymalną liczbę współbieżnych wątków.

Używam Spring 3 i mój obecny rozwiązaniem jest wykonanie następujących czynności (za pomocą fixedRate z 1 milisekundę, aby uzyskać nici uruchomić w zasadzie w sposób ciągły)

@Scheduled(fixedRate = 1) 
@Async 
public void doSchedule() throws InterruptedException { 
    log.debug("Start schedule"); 
    publishWorker.start(); 
    log.debug("End schedule"); 
} 

<task:executor id="workerExecutor" pool-size="4" /> 

Ta utworzona 4 wątki prosto i nici poprawnie udostępnił obciążenie pracą z kolejki. Jednak wydaje mi się, że dostaję wycieku pamięci, gdy wątkom zajmuje dużo czasu.

java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0        |    80 | 373,410,496 |  89.74% 
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940       |   48 | 373,410,136 |  89.74% 
| |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68 

Więc

1: Czy mogę używać @Async i @Scheduled razem?

2: Jeśli nie, to w jaki inny sposób mogę użyć sprężyny, aby spełnić moje wymagania?

3: Jak mogę utworzyć nowe wątki tylko wtedy, gdy inne wątki są zajęte?

Dzięki!

EDIT: Myślę, że kolejka pracy był już nieskończenie długo ... Teraz za pomocą

<task:executor id="workerExecutor" 
    pool-size="1-4" 
    queue-capacity="10" rejection-policy="DISCARD" /> 

przygotuje sprawozdanie z wyników

+4

Czy nie działać prawidłowo bez '@ Async'? Metoda opisana za pomocą '@ Scheduled' powinna być mimo to wykonana asynchronicznie. – ach

+0

Jeśli chcesz, aby wątki działały nieprzerwanie, nie powinieneś naprawdę używać funkcji @Scheduled. Jego użycie byłoby dla "zaplanowanych" działań, a nie ciągłych ... – JoeG

+0

możesz rozważyć utworzenie publishWorker.start(); metoda Async. –

Odpowiedz

0
//using a fixedRate of 1 millisecond to get the threads to run basically continuously 
@Scheduled(fixedRate = 1) 

Podczas korzystania @Scheduled nowy wątek zostanie utworzony i wywoła metodę doSchedule o określonej wartości stałej o 1 milisekundach. Po uruchomieniu aplikacji możesz już zobaczyć 4 wątki rywalizujące o tabelę QUEUE i prawdopodobnie martwą blokadę.

Sprawdź, czy występuje zakleszczenie, wykonując zrzut wątku. http://helpx.adobe.com/cq/kb/TakeThreadDump.html

@ Adnotacja asynchroniczna nie będzie tutaj przydatna.

Lepszym sposobem implementacji tego jest utworzenie klasy jako wątku poprzez wdrożenie działającego i przekazanie klasy do TaskExecutor z wymaganą liczbą wątków.

Using Spring threading and TaskExecutor, how do I know when a thread is finished?

również sprawdzić swój projekt to nie wydaje się być obsługa synchronizacji poprawnie. Jeśli poprzednie zadanie jest uruchomione i przytrzymuje blokadę w rzędzie, następne zadanie, które utworzysz, będzie nadal widzieć ten wiersz i będzie oczekiwać na uzyskanie blokady w tym konkretnym wierszu.

2

Można spróbować

  1. Uruchom program planujący z jednej sekundowym opóźnieniem, co będzie blokować & pobrać wszystkie rekordy kolejce, które nie były do ​​tej pory zamknięte.
  2. Dla każdego rekordu wywołaj metodę Async, która przetworzy ten rekord & go usunąć.
  3. Zasady odrzucania executora powinny być ABORT, aby program planujący mógł odblokować QUEUE, które nie zostały jeszcze podane do przetworzenia. W ten sposób program planujący może ponownie przetworzyć te KOLEJKI w następnym uruchomieniu.

Oczywiście będziesz musiał obsłużyć scenariusz, w którym program planujący zablokował KOLEJNOŚĆ, ale program obsługi nie zakończył przetwarzania z jakiegokolwiek powodu.

Pseudo kod:

public class QueueScheduler { 
    @AutoWired 
    private QueueHandler queueHandler; 

    @Scheduled(fixedDelay = 1000) 
    public void doSchedule() throws InterruptedException { 
     log.debug("Start schedule"); 
     List<Long> queueIds = lockAndFetchAllUnlockedQueues(); 
     for (long id : queueIds) 
      queueHandler.process(id); 
     log.debug("End schedule"); 
    } 
} 

public class QueueHandler { 

    @Async 
    public void process(long queueId) { 
     // process the QUEUE & delete it from DB 
    } 
} 
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10" 
    rejection-policy="ABORT"/> 
Powiązane problemy