2009-09-08 11 views
13

Wystąpiła sytuacja, gdy ThreadPoolExecutor jest zaparkowany w funkcji execute(Runnable), podczas gdy wszystkie wątki czekają w getTask func, workQueue jest pusta.Zakleszczenie w ThreadPoolExecutor

Czy ktoś ma jakieś pomysły?

ThreadPoolExecutor jest tworzony ArrayBlockingQueue i corePoolSize == maximumPoolSize = 4

[Edycja] Bardziej szczegółowo, gwint jest zablokowany w ThreadPoolExecutor.exec(Runnable command) func. Ma zadanie wykonać, ale tego nie robi.

[Edytuj2] Executor jest zablokowany gdzieś w kolejce roboczej (ArrayBlockingQueue).

[Edit3] callstack:

thread = front_end(224) 
at sun.misc.Unsafe.park(Native methord) 
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1114) 
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) 
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) 
at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:224) 
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:653) 
at net.listenThread.WorkersPool.execute(WorkersPool.java:45) 

w tym samym czasie workQueue pustego (sprawdzone za pomocą pilota zdalnego debugowania)

[Edit4] Kod pracy z ThreadPoolExecutor:

public WorkersPool(int size) { 
    pool = new ThreadPoolExecutor(size, size, IDLE_WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(WORK_QUEUE_CAPACITY), 
     new ThreadFactory() { 
     @NotNull 
     private final AtomicInteger threadsCount = new AtomicInteger(0); 

     @NotNull 
     public Thread newThread(@NotNull Runnable r) { 
      final Thread thread = new Thread(r); 
      thread.setName("net_worker_" + threadsCount.incrementAndGet()); 
      return thread; 
     } 
     }, 

     new RejectedExecutionHandler() { 
     public void rejectedExecution(@Nullable Runnable r, @Nullable ThreadPoolExecutor executor) { 
      Verify.warning("new task " + r + " is discarded"); 
     } 
     }); 
    } 

    public void execute(@NotNull Runnable task) { 
    pool.execute(task); 
    } 

    public void stopWorkers() throws WorkersTerminationFailedException { 
    pool.shutdownNow(); 
    try { 
     pool.awaitTermination(THREAD_TERMINATION_WAIT_TIME, TimeUnit.SECONDS); 
    } catch (InterruptedException e) { 
     throw new WorkersTerminationFailedException("Workers-pool termination failed", e); 
    } 
    } 
} 
+0

Jaki jest charakter zadania przekazywanego do TPE.execute() func.? Jeśli zadanie ma dostęp do TPE, może to być twój problem. – artemv

+1

Myślę, że mam porównywalny problem w wersji 1.7.0_13.Proces rozpoczyna się i działa bez problemu ... a następnie w pewnym momencie mam ~~ 200 zadań, ale moja kolejka blokująca jest pusta. Rozmiar puli rdzeniowej to 3 ... Używam ArrayBlockingQueue też .... – cljk

Odpowiedz

2

Nie widzę żadnych blokad w kodzie ThreadPoolExecutor 's execute(Runnable). Jedyną zmienną jest workQueue. Jakiego rodzaju BlockingQueue dostarczyłeś swojemu ThreadPoolExecutor?

Na temat zakleszczenia:

Można to potwierdzić to zakleszczenia poprzez zbadanie Pełny zrzut wątku, przewidziane przez <ctrl><break> w systemie Windows lub kill -QUIT w systemach UNIX.

Po uzyskaniu tych danych można sprawdzić wątki. Oto stosowny fragment Sun's article on examining thread dumps (suggested reading):

do wieszania, zakleszczony lub zamrożone programy: Jeśli uważasz, że program jest wiszące, wygenerować ślad stosu i zbadać wątki w stanach MW lub CW. Jeśli program jest zakleszczony, niektóre wątki systemowe prawdopodobnie pojawią się jako bieżące wątki, ponieważ nie ma nic więcej do roboty JVM.

Mniejsza uwaga: jeśli pracujesz w środowisku IDE, możesz upewnić się, że w tych metodach nie ma włączonych punktów wstrzymania.

+0

Jak napisałem w moim pytaniu, użyto ArrayBlockingQueue. I jest puste. Tak, wątek blokuje się gdzieś w kolejce roboczej. – Vitaly

+0

Użyłem zdalnego debugowania. Edytowano pytanie - dodano dzwonek. – Vitaly

+0

Możesz także sprawdzić zakleszczenia za pomocą JConsole – pjp

0

Jak już wspomniano, brzmi to jak normalne zachowanie, ThreadPoolExecutor czeka tylko na wykonanie pracy. Jeśli chcesz go zatrzymać, trzeba zadzwonić:

executor.shutdown()

aby zmusić go do zakończenia, zazwyczaj następuje wykonawca.awaitTermination kod

+0

Edytowane pytanie – Vitaly

0

Biblioteka źródłem jest poniżej (to w rzeczywistości klasa od http://spymemcached.googlecode.com/files/memcached-2.4.2-sources.zip)
- nieco skomplikowana - dodatkowa ochrona przed wielokrotnych wezwań FutureTask jeśli się nie mylę - ale nie wydaje się impasu skłonnej - bardzo proste użycie ThreadPool:

package net.spy.memcached.transcoders; 

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.Future; 
import java.util.concurrent.FutureTask; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 
import java.util.concurrent.atomic.AtomicBoolean; 

import net.spy.memcached.CachedData; 
import net.spy.memcached.compat.SpyObject; 

/** 
* Asynchronous transcoder. 
*/ 
public class TranscodeService extends SpyObject { 

    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L, 
      TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), 
      new ThreadPoolExecutor.DiscardPolicy()); 

    /** 
    * Perform a decode. 
    */ 
    public <T> Future<T> decode(final Transcoder<T> tc, 
      final CachedData cachedData) { 

     assert !pool.isShutdown() : "Pool has already shut down."; 

     TranscodeService.Task<T> task = new TranscodeService.Task<T>(
       new Callable<T>() { 
        public T call() { 
         return tc.decode(cachedData); 
        } 
       }); 

     if (tc.asyncDecode(cachedData)) { 
      this.pool.execute(task); 
     } 
     return task; 
    } 

    /** 
    * Shut down the pool. 
    */ 
    public void shutdown() { 
     pool.shutdown(); 
    } 

    /** 
    * Ask whether this service has been shut down. 
    */ 
    public boolean isShutdown() { 
     return pool.isShutdown(); 
    } 

    private static class Task<T> extends FutureTask<T> { 
     private final AtomicBoolean isRunning = new AtomicBoolean(false); 

     public Task(Callable<T> callable) { 
      super(callable); 
     } 

     @Override 
     public T get() throws InterruptedException, ExecutionException { 
      this.run(); 
      return super.get(); 
     } 

     @Override 
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
       ExecutionException, TimeoutException { 
      this.run(); 
      return super.get(timeout, unit); 
     } 

     @Override 
     public void run() { 
      if (this.isRunning.compareAndSet(false, true)) { 
       super.run(); 
      } 
     } 
    } 

} 
0

Zdecydowanie dziwne.

Ale przed napisaniem własnego TPE spróbować.

  • inny BlockingQueue Impl, np LinkedBlockingQueue

  • określić rzetelności = true w ArrayBlockingQueue, to znaczy używać new ArrayBlockingQueue(n, true)

Od tych dwóch zdecyduje bym wybrał drugą bo to bardzo dziwne, że offer() jest zablokowany; jeden powód, który przychodzi na myśl - polityka planowania wątków w twoim Linuksie. Podobnie jak założenie.

7

Wygląda na to, że jest to błąd związany z maszyną JVM starszą niż 6u21. Wystąpił problem w skompilowanym natywnym kodzie dla niektórych (być może wszystkich) systemów operacyjnych.

z linku:

Błąd jest spowodowany przez brak barier pamięci w różnych Parker :: Park() ścieżek, które mogą spowodować utratę wybudzeń i wisi. (Zauważ, że PlatformEvent :: park używany przez wbudowaną synchronizację nie jest zagrożony ). -XX: + UseMembar stanowi obejście problemu, ponieważ bariera membarowa w logice przejścia stanu ukrywa problem w Parker ::. (to znaczy, nie ma nic złego w korzystaniu z mechanizmu -UseMembar , ale + UseMembar ukrywa błąd Parkera: :). To jest jeden dzień-jeden błąd wprowadzony z dodatkiem java.util.concurrent w JDK 5.0. Opracowałem prosty tryb C awarii i wydaje się bardziej prawdopodobne, że zamanifestuje się na nowoczesnych platformach AMD i Nehalem, prawdopodobnie z powodu głębszych buforów sklepowych, które wymagają więcej czasu. Podałem próbną poprawkę dla Douga Lea dla Parker :: park, która wydaje się eliminować błąd. Będę dostarczać tę poprawkę do środowiska wykonawczego. (Rozszerzę również CR o dodatkowe przypadki testowe i dłuższe wyjaśnienie). Jest to prawdopodobnie dobry kandydat na back-ports.

Link: JVM Bug

Obejścia są dostępne, ale to prawdopodobnie najlepiej będzie się po prostu coraz najnowszą kopię Java.

+1

Uaktualniłem do 'build 1.6.0_27-b07' (działającego na systemie Solaris 10 SPARC), ale nadal nie rozwiązało problemu. Mój JBoss ESB wciąż tworzy tysiące wątków i nie zamyka ich. –

1

Ten impas prawdopodobnie dlatego, że uruchamiasz zadanie od samego executora. Na przykład zadajesz jedno zadanie, a ten odpala kolejne 4 zadania. Jeśli masz rozmiar puli równy 4, to po prostu całkowicie przepełnisz go, a ostatnie zadanie będzie czekało, aż ktoś z wartości zwracanej przez zadanie. Ale pierwsze zadanie czeka na zakończenie wszystkich rozwidlonych zadań.

Powiązane problemy