2013-08-08 13 views
6

Używam ExecutorService do uruchamiania wielu zadań w różnych wątkach. Czasami zbyt wiele uruchomionych instancji oczekujących w puli wątków może powodować problem braku pamięci.W jaki sposób ograniczasz wątki w ExecutorService?

Próbuję napisać executor zadań blokujących, aby go rozwiązać. Czy istnieje jakieś oficjalne rozwiązanie, aby to zrobić?

Na przykład:

BlockingJobExecutor executor = new BlockingJobExecutor(3); 
    for (int i = 0; i < 1000; i++) { 
     executor.addJob(new Runnable() { 

      @Override 
      public void run() { 
       try { 
        Thread.sleep(1000); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       LogFactory.getLog(BTest.class).info("test " + System.currentTimeMillis()); 
      } 
     }); 
    } 
    executor.shutdown(); 

tutaj jest klasa BlockingJobExecutor:

import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicInteger; 

public class BlockingJobExecutor { 

    AtomicInteger counter = new AtomicInteger(); 
    ExecutorService service; 
    int threads; 

    public BlockingJobExecutor(int threads) { 
     if (threads < 1) { 
      throw new IllegalArgumentException("threads must be greater than 1."); 
     } 
     service = Executors.newFixedThreadPool(threads); 
     this.threads = threads; 
    } 

    static class JobWrapper implements Runnable { 
     BlockingJobExecutor executor; 
     Runnable job; 

     public JobWrapper(BlockingJobExecutor executor, Runnable job) throws InterruptedException { 
      synchronized (executor.counter) { 
       while (executor.counter.get() >= executor.limit()) { 
        executor.counter.wait(); 
       } 
      } 
      this.executor = executor; 
      this.job = job; 
     } 

     @Override 
     public void run() { 
      try { 
       job.run(); 
      } finally { 
       synchronized (executor.counter) { 
        executor.counter.decrementAndGet(); 
        executor.counter.notifyAll(); 
       } 
      } 
     } 
    } 

    public int limit() { 
     return threads; 
    } 

    public void shutdown() { 
     service.shutdown(); 
     try { 
      service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 
     } catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     } 
    } 

    public void addJob(Runnable job) { 
     try { 
      service.execute(new JobWrapper(this, job)); 
     } catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     } 
    } 

} 
+0

Czy masz na myśli, że masz zbyt wiele zadań oczekujących na uruchomienie, lub że masz zbyt wiele wątków na żywo jednocześnie? – chrylis

+0

Zbyt wiele zadań czekających na uruchomienie. – qrtt1

Odpowiedz

12

Istnieją dwa sposoby, że to może się zdarzyć. Możesz mieć zbyt wiele oczekujących na uruchomienie oczekujących na uruchomienie lub zbyt wiele wątków uruchomionych w tym samym czasie. Jeśli w kolejce jest zbyt dużo zadań, można użyć stałego rozmiaru BlockingQueue w ExecutorService, aby ograniczyć liczbę elementów, które można umieścić w kolejce. Następnie, podczas próby kolejkowania nowego zadania, operacja zostanie zablokowana, dopóki nie będzie miejsca w kolejce.

Jeśli istnieje zbyt wiele wątków działających jednocześnie, można ograniczyć liczbę wątków dostępnych do uruchamiania zadań w ExecutorService, wywołując Executors.newFixedThreadPool z żądaną liczbą wątków.

Powiązane problemy