2012-06-01 12 views
9

Potrzebuję znaleźć sposób na wykonywanie zadań (zależnych i niezależnych) równolegle w java.Wykonywanie zadań zależnych równolegle w Javie

  1. Zadanie A i Zadanie C mogą działać niezależnie.
  2. Zadanie B jest zależna od mocy Task A.

sprawdziłem java.util.concurrent przyszłości i Fork/Join, ale wygląda na to, nie możemy dodać zależność do zadania.

Czy ktoś może wskazać mi korektę Java API.

+0

Czy uważasz, że zadanie A powiadamia zadanie B, gdy jest ono kompletne? Zanim rozpoczniesz zadanie A, stwórz zadanie B i dodaj je jako obserwator do zadania A (patrz wzorzec obserwatora). –

+0

Guava ['ListenableFuture'] (http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained) jest trochę bardziej przyjazny o tych rzeczach niż zwykłe Futures. –

Odpowiedz

0

To, czego potrzebujesz, to CountDownLatch.

final CountDownLatch gate = new CountDownLatch(2); 
// thread a 
new Thread() { 
    public void run() { 
     // process 
     gate.countDown(); 
    } 
}.start(); 

// thread c 
new Thread() { 
    public void run() { 
     // process 
     gate.countDown(); 
    } 
}.start(); 

new Thread() { 
    public void run() { 
     try { 
      gate.await(); 
      // both thread a and thread c have completed 
      // process thread b 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

    } 
}.start(); 

Jako alternatywa, w zależności od scenariusza, to może również być w stanie wykorzystać BlockingQueue zaimplementować wzorzec Producent konsumentów. Zobacz przykład na stronie dokumentacji.

+0

"CountDownLatch" jest tutaj przesadzone i zgodnie z OP, zadanie B jest zależne tylko od zadania A, a nie od obu zadań A i C. To powiedziawszy, -1 jest po prostu za niepoprawne traktowanie fragmentu "InterruptedException". –

+0

Dzięki, pomysł na fragment kodu miał pokazać mu działanie CountDownLatch, a nie pokazać mu, jak poprawnie obsługiwać wyjątki. – Jeshurun

0

Jeśli zadanie B zależy od wyniku zadania A, najpierw zadałbym pytanie, czy zadanie B naprawdę jest osobnym zadaniem. Rozdzielenie zadań miałoby sens, jeśli istnieje:

  • Niektóre nietrywialne ilość pracy, że zadanie B może zrobić zanim konieczna wyników Zadaniem za
  • Zadanie B to długo trwający proces, który obsługuje wyjście z wielu różnych przypadkach zadania
  • Istnieje kilka innych zadań (powiedzmy D), które również użyć zadań wYNIKI za

Zakładając, że jest to oddzielne zadanie, to można pozwolić zadaniem & B dzielić BlockingQueue takie, że zadaniem może przekazywać dane zadania B.

10

W Scala to jest bardzo łatwe do zrobienia i myślę, że jesteś lepiej wyłączyć za pomocą Scala. Oto przykład, który wyciągnąłem stąd: http://danielwestheide.com/ (Przewodnik Neofity do Scali, część 16: Dokąd się udać) ten facet ma świetnego bloga (nie jestem tym facetem)

Pozwala wziąć barristę do parzenia kawy.Zadania uwagi są:

  1. Rozetrzeć wymagane ziaren kawy (nie Powyższe zadania)
  2. ciepła pewną ilość wody (nie Powyższe zadania)
  3. Brew espresso pomocą mieloną kawą i ogrzanej wody (w zależności od 1 & 2)
  4. piana część mleka (nie poprzednich zadań)
  5. Połączyć spienienia mleka i kawy espresso (w zależności od 3,4)

lub jako drzewa:

Grind _ 
Coffe \ 
      \ 
Heat ___\_Brew____ 
Water    \_____Combine 
        /
Foam ____________/ 
Milk 

w Javie używając API współbieżności to byłoby:

import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.FutureTask; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 

public class Barrista { 

    static class HeatWater implements Callable<String> { 
     @Override 
     public String call() throws Exception { 
      System.out.println("Heating Water"); 
      Thread.sleep(1000); 
      return "hot water"; 
     } 
    } 

    static class GrindBeans implements Callable<String> { 
     @Override 
     public String call() throws Exception { 
      System.out.println("Grinding Beans"); 
      Thread.sleep(2000); 
      return "grinded beans"; 
     } 
    } 

    static class Brew implements Callable<String> { 

     final Future<String> grindedBeans; 
     final Future<String> hotWater; 

     public Brew(Future<String> grindedBeans, Future<String> hotWater) { 
      this.grindedBeans = grindedBeans; 
      this.hotWater = hotWater; 
     } 

     @Override 
     public String call() throws Exception 
     { 
      System.out.println("brewing coffee with " + grindedBeans.get() 
        + " and " + hotWater.get()); 
      Thread.sleep(1000); 
      return "brewed coffee"; 
     } 
    } 

    static class FrothMilk implements Callable<String> { 

     @Override 
     public String call() throws Exception { 
      Thread.sleep(1000); 
      return "some milk"; 
     } 
    } 

    static class Combine implements Callable<String> { 

     public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) { 
      super(); 
      this.frothedMilk = frothedMilk; 
      this.brewedCoffee = brewedCoffee; 
     } 

     final Future<String> frothedMilk; 
     final Future<String> brewedCoffee; 

     @Override 
     public String call() throws Exception { 
      Thread.sleep(1000); 
      System.out.println("Combining " + frothedMilk.get() + " " 
        + brewedCoffee.get()); 
      return "Final Coffee"; 
     } 

    } 

    public static void main(String[] args) { 

     ExecutorService executor = Executors.newFixedThreadPool(2); 

     FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater()); 
     FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans()); 
     FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture)); 
     FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk()); 
     FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee)); 

     executor.execute(heatWaterFuture); 
     executor.execute(grindBeans); 
     executor.execute(brewCoffee); 
     executor.execute(frothMilk); 
     executor.execute(combineCoffee); 


     try { 

      /** 
      * Warning this code is blocking !!!!!!! 
      */   
      System.out.println(combineCoffee.get(20, TimeUnit.SECONDS)); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } catch (ExecutionException e) { 
      e.printStackTrace(); 
     } catch (TimeoutException e) { 
      System.out.println("20 SECONDS FOR A COFFEE !!!! I am [email protected]#! leaving!!"); 
      e.printStackTrace(); 
     } finally{ 
       executor.shutdown(); 
      } 
     } 
    } 

Upewnij się, że dodatek out razem jednak w celu zapewnienia, że ​​kod nie będzie czekać wiecznie na coś kompletne, odbywa się to za pomocą Future.get (long, TimeUnit), a następnie odpowiednio poradzić sobie z awarią.

Jest o wiele ładniejsza w Scala jednak tutaj jest to jak to jest na blogu: Kod przygotować kawę będzie wyglądać mniej więcej tak:

def prepareCappuccino(): Try[Cappuccino] = for { 
    ground <- Try(grind("arabica beans")) 
    water <- Try(heatWater(Water(25))) 
    espresso <- Try(brew(ground, water)) 
    foam <- Try(frothMilk("milk")) 
} yield combine(espresso, foam) 

gdzie wszystkie metody zwracają przyszłość (wpisane przyszłość), na przykład harówki byłoby coś takiego:

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future { 
    // grinding function contents 
} 

dla wszystkich implementacje sprawdzeniu bloga, ale to wszystko jest do niego. Możesz także łatwo zintegrować Scala i Javę. Naprawdę zalecam robienie tego w Scali zamiast w Javie. Scala wymaga znacznie mniej kodu, znacznie czystszego i napędzanego zdarzeniami.

0

Jest biblioteka java specjalnie do tego celu (Zastrzeżenie: Jestem właścicielem tej biblioteki) o nazwie Dexecutor

Oto w jaki sposób można osiągnąć pożądany rezultat, można przeczytać więcej na ten temat here

@Test 
public void testDependentTaskExecution() { 

    DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor(); 

    executor.addDependency("A", "B"); 
    executor.addIndependent("C"); 

    executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING); 

} 

private DefaultDependentTasksExecutor<String, String> newTaskExecutor() { 
    return new DefaultDependentTasksExecutor<String, String>(newExecutor(), new SleepyTaskProvider()); 
} 

private ExecutorService newExecutor() { 
    return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize()); 
} 

private static class SleepyTaskProvider implements TaskProvider<String, String> { 

    public Task<String, String> provid(final String id) { 

     return new Task<String, String>() { 

      @Override 
      public String execute() { 
       try { 
        //Perform some task 
        Thread.sleep(500); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       String result = id + "processed"; 
       return result; 
      } 

      @Override 
      public boolean shouldExecute(ExecutionResults<String, String> parentResults) { 
       ExecutionResult<String, String> firstParentResult = parentResults.getFirst(); 
       //Do some logic with parent result 
       if ("B".equals(id) && firstParentResult.isSkipped()) { 
        return false; 
       } 
       return true; 
      } 
     };   
    } 

} 
Powiązane problemy