2009-09-16 13 views
5

Czy istnieje jakiś sposób, aby utworzyć executor będzie zawsze co najmniej 5 wątki, a maksymalnie 20 wątków, a kolejka nieograniczony dla zadania (czyli nie zadanie zostanie odrzucony)określenie problemu ThreadPoolExecutor

Próbowałem nowy ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) z wszystkie możliwości, że myślałem o kolejce:

new LinkedBlockingQueue() // never runs more than 5 threads 
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new SynchronousQueue() // no tasks can wait, after 20, they are rejected 

i żaden pracował jako chciał.

+0

Co masz na myśli, że to nie zadziałało? Wyjątek? – Gandalf

+4

To jest zbyt blisko: Gandalf i Sarmun! – akf

+0

Entowie powinni tu być w każdej chwili. . . i gdzie jest ten nieznośny Orzeł ... – Gandalf

Odpowiedz

5

Może coś takiego przydałoby się dla Ciebie? Po prostu podniosłem to, więc proszę, włóż to. Zasadniczo, to realizuje puli wątków przelewowy, który jest używany do zasilania instrumentu bazowego ThreadPoolExecutor

Istnieją dwa główne grzbiety remis widzę z nim:

  • brak zwracanej przyszłości obiektu na submit(). Ale może to nie jest dla ciebie problemem.
  • Kolejka drugorzędna zostanie opróżniona tylko do ThreadPoolExecutor po przesłaniu zadań. Musi być eleganckie rozwiązanie, ale jeszcze tego nie widzę. Jeśli wiesz, że będzie stały strumień zadań do StusMagicExecutor, to może nie być problemem. ("Maj" jest kluczowym słowem.) Być może opcja wypełnienia złożonych zadań może zostać przesłana do StusMagicExecutor po ich zakończeniu?

Stu Magic Realizator:

public class StusMagicExecutor extends ThreadPoolExecutor { 
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>(); //capacity is Integer.MAX_VALUE. 

    public StusMagicExecutor() { 
     super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler()); 
    } 
    public void queueRejectedTask(Runnable task) { 
     try { 
      secondaryQueue.put(task); 
     } catch (InterruptedException e) { 
      // do something 
     } 
    } 
    public Future submit(Runnable newTask) { 
     //drain secondary queue as rejection handler populates it 
     Collection<Runnable> tasks = new ArrayList<Runnable>(); 
     secondaryQueue.drainTo(tasks); 

     tasks.add(newTask); 

     for (Runnable task : tasks) 
      super.submit(task); 

     return null; //does not return a future! 
    } 
} 

class RejectionHandler implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { 
     ((StusMagicExecutor)executor).queueRejectedTask(runnable); 
    } 
} 
+1

Świetnie, tnx, to jest prawie to, co chciałem. W przypadku zawijania newTasks z opcją, która wykonuje jedno zadanie po zakończeniu, tylko jeden wątek na 20 może być bezczynny, podczas gdy mamy zadania w kolejce dodatkowej. – Sarmun

1

Javadocs dla ThreadPoolExecutor są całkiem jasne, że po utworzeniu wątków corePoolSize nowe wątki zostaną utworzone dopiero po zapełnieniu kolejki. Jeśli więc ustawisz core na 5 i max na 20, nigdy nie osiągniesz pożądanego zachowania.

Jeśli jednak ustawisz zarówno core, jak i max na 20, zadania zostaną dodane do kolejki tylko wtedy, gdy wszystkie 20 wątków będzie zajęty. Oczywiście sprawia to, że twoje wymaganie "minimum 5 wątków" jest trochę bezsensowne, ponieważ wszystkie 20 pozostaną przy życiu (dopóki i tak nie przestaną działać).

+0

"bezczynność" nigdy się nie wydarzy, chyba że powiesz, że wszystkie rdzenne nitki mogą umrzeć. W żaden sposób nie widać punktu zarówno rozmiaru podstawowego, jak i maksymalnego, jeśli nie można go poprawnie użyć. Czy istnieje jakakolwiek inna klasa (inna niż ThreadPoolExecutor), która spełni moje wymagania? – Sarmun

1

myślę, że problemem jest to wada klasy i bardzo mylące ze względu na kombinacje parametrów konstruktora. Oto rozwiązanie zaczerpnięte z wewnętrznego ThreadPoolExecutor SwingWorker, do którego stworzyłem klasę najwyższego poziomu. Nie ma minimum, ale przynajmniej używa górnej granicy. Jedyne, czego nie wiem, to jaki wynik osiągnąłeś w wykonaniu blokującym.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor { 
    private final ReentrantLock pauseLock = new ReentrantLock(); 
    private final Condition unpaused = pauseLock.newCondition(); 
    private boolean isPaused = false; 
    private final ReentrantLock executeLock = new ReentrantLock(); 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
     BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       handler); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, 
      RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory, handler); 
    } 

    @Override 
    public void execute(Runnable command) { 
     executeLock.lock(); 
     try { 
      pauseLock.lock(); 
      try { 
       isPaused = true; 
      } finally { 
       pauseLock.unlock(); 
      } 
      setCorePoolSize(getMaximumPoolSize()); 
      super.execute(command); 
      setCorePoolSize(0); 
      pauseLock.lock(); 
      try { 
       isPaused = false; 
       unpaused.signalAll(); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } finally { 
      executeLock.unlock(); 
     } 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     pauseLock.lock(); 
     try { 
      while (isPaused) { 
       unpaused.await(); 
      } 
     } catch (InterruptedException ignore) { 

     } finally { 
      pauseLock.unlock(); 
     } 
    } 
}