2016-05-29 11 views
10

Domyślne "paralellStream()" w Java 8 używa wspólnego ForkJoinPool, co może być problemem z opóźnieniem, jeśli wspólne wątki puli zostaną wyczerpane po przesłaniu zadania. Jednak w wielu przypadkach dostępna jest wystarczająca moc procesora, a zadania są na tyle krótkie, że nie stanowi to problemu. Jeśli mamy jakieś długo działające zadania, będzie to oczywiście wymagało starannego rozważenia, ale na to pytanie przyjmijmy, że to nie jest problem.Czy jest coś złego w korzystaniu z I/O + ManagedBlocker w Java8 parallelStream()?

Jednak wypełnianie ForkJoinPool z zadaniami I/O, które w rzeczywistości nie wykonują żadnych prac związanych z CPU, jest sposobem na wprowadzenie wąskiego gardła, nawet jeśli dostępna jest wystarczająca moc procesora. I understood that. Jednak to właśnie mamy dla ManagedBlocker. Więc jeśli mamy zadanie I/O, po prostu pozwólmy ForkJoinPool zarządzać tym w ramach ManagedBlocker. Brzmi niewiarygodnie łatwo. Jednak ku mojemu zaskoczeniu użycie ManagedBlocker jest raczej skomplikowanym API dla prostej rzeczy, którą jest. I w końcu myślę, że jest to powszechny problem. Więc po prostu zbudowany prostą metodę narzędziowy, który sprawia ManagedBlocker łatwe w użyciu dla wspólnej sprawy:

public class BlockingTasks { 

    public static<T> T callInManagedBlock(final Supplier<T> supplier) { 
     final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier); 
     try { 
      ForkJoinPool.managedBlock(managedBlock); 
     } catch (InterruptedException e) { 
      throw new Error(e); 
     } 
     return managedBlock.getResult(); 
    } 

    private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker { 
     private final Supplier<T> supplier; 
     private T result; 
     private boolean done = false; 

     private SupplierManagedBlock(final Supplier<T> supplier) { 
      this.supplier = supplier; 
     } 

     @Override 
     public boolean block() { 
      result = supplier.get(); 
      done = true; 
      return true; 
     } 

     @Override 
     public boolean isReleasable() { 
      return done; 
     } 

     public T getResult() { 
      return result; 
     } 
    } 
} 

Teraz, jeśli chcę, aby pobrać kod HTML z kilku stron internetowych w paralell mogłem, żeby go tak nie I/o powoduje żadnych kłopotów:

public static void main(String[] args) { 
    final List<String> pagesHtml = Stream 
     .of("https://google.com", "https://stackoverflow.com", "...") 
     .map((url) -> BlockingTasks.callInManagedBlock(() -> download(url))) 
     .collect(Collectors.toList()); 
} 

Jestem trochę zaskoczony, że nie ma klasy jak BlockingTasks powyżej dostarczany z Java (? lub nie go znaleźć), ale to nie było to, że trudno budować.

Kiedy google dla "Java 8 strumień równolegle" mam w czterech pierwszych wyników tych artykułach, które twierdzą, że ze względu na I/O problemu Widelec/join zasysa Java:

Zmieniłem nieco moje kryteria wyszukiwania i chociaż wiele osób narzeka na to, jak okropne jest życie, nie znalazłem nikogo, kto mógłby mówić o takim rozwiązaniu jak wyżej. Ponieważ nie czuję się jak Marvin (mózg jak planeta), a Java 8 jest dostępna od dłuższego czasu, podejrzewam, że jest coś nie w porządku z tym, co proponuję na górze.

Uderzyłem razem mały test:

public static void main(String[] args) { 
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start"); 
    IntStream.range(0, 10).parallel().forEach((x) -> sleep()); 
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End"); 
} 

public static void sleep() { 
    try { 
     System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName()); 
     Thread.sleep(10000); 
    } catch (InterruptedException e) { 
     throw new Error(e); 
    } 
} 

wpadłem, że mam następujący wynik:

18:41:29.021: Start 
18:41:29.033: Sleeping main 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5 
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3 
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7 
18:41:39.034: Sleeping main 
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1 
18:41:49.035: End 

Więc na moim 8 komputerze CPU ForkJoinPool naturalnie wyboru 8 wątków, zakończono pierwszy 8 zadań i wreszcie dwa ostatnie zadania, co oznacza, że ​​zajęło to 20 sekund, a jeśli w kolejce były inne zadania, pulpit mógł nadal nie używać wyraźnie bezczynnych procesorów (z wyjątkiem 6 rdzeni w ciągu ostatnich 10 sekund).

Następnie użyłem ...

IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; })); 

... zamiast ...

IntStream.range(0, 10).parallel().forEach((x) -> sleep()); 

... i mam następujący wynik:

18:44:10.93: Start 
18:44:10.945: Sleeping main 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6 
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3 
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0 
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11 
18:44:20.957: End 

wygląda mi jak to działa, dodatkowe wątki zaczęto zrekompensować moje makiety "blokowanie działania I/O" (uśpienia). Czas został skrócony do 10 sekund i przypuszczam, że gdybym miał kolejkę więcej zadań, które mogą nadal korzystać z dostępnej mocy procesora.

Czy jest coś nie tak z tym rozwiązaniem lub ogólnie z użyciem operacji we/wy w strumieniach, jeśli operacja wejścia/wyjścia jest opakowana w ManagedBlock?

Odpowiedz

6

Krótko mówiąc, istnieją pewne problemy z rozwiązaniem. Zdecydowanie poprawia to użycie kodu blokującego w strumieniu równoległym, a niektóre biblioteki innych firm zapewniają podobne rozwiązanie (patrz na przykład klasa Blocking w bibliotece jOOλ). Jednak to rozwiązanie nie zmienia wewnętrznej strategii podziału używanej w Stream API. Liczba podzadań stworzonych przez Stream API jest kontrolowana przez określoną stałą w AbstractTask klasy:

/** 
* Default target factor of leaf tasks for parallel decomposition. 
* To allow load balancing, we over-partition, currently to approximately 
* four tasks per processor, which enables others to help out 
* if leaf tasks are uneven or some processors are otherwise busy. 
*/ 
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 

Jak widać to jest cztery razy większe niż zwykłe równoległości basenie (która jest domyślnie liczby rdzeni CPU). Prawdziwy algorytm podziału jest trochę trudniejszy, ale z grubsza nie można wykonywać zadań 4x-8x, nawet jeśli wszystkie są blokowane.

Na przykład, jeśli masz 8 rdzeni procesora, Twój test Thread.sleep() będzie dobrze działać do IntStream.range(0, 32) (jako 32 = 8 * 4). Jednak dla IntStream.range(0, 64) będziesz mieć 32 równoległe zadania, z których każdy przetwarza dwa numery wejściowe, więc całe przetwarzanie zajmie 20 sekund, a nie 10.

+0

Dobra rada z rozkładem. To oczywiście ograniczy czas wykonania jednego zadania, ale nie ograniczy całkowitej przepustowości, jeśli w kolejce znajdzie się wystarczająca liczba innych zadań obliczeniowych. Wniosek: Jeśli problemem jest tylko przepustowość, rozwiązanie jest w porządku. Jeśli czas odpowiedzi pojedynczego zadania wejścia/wyjścia stanowi problem i pojedyncze zadanie we/wy można rozłożyć na większą liczbę kroków, należy rozważyć inne rozwiązanie. – yankee

+3

I nie należy zapominać: wykorzystanie przez Stream API funkcji Fork/Join jest szczegółem implementacji. Tak długo, jak Strumienie nie będą gwarantować korzystania z tej struktury, nie ma gwarancji, że użycie 'ManagedBlocker' poprawi współbieżność w ogóle ... – Holger

Powiązane problemy