2017-02-27 36 views
9

Próbuję zrównoleglować niektóre prace ze strumieniami Java. Rozważmy ten prosty przykład:Dlaczego generator Java Stream jest nieuporządkowany?

Stream.generate(new Supplier<Integer>() { 
     @Override 
     public Integer get() { 
      return generateNewInteger(); 
     } 
    }) 
    .parallel() 
    .forEachOrdered(new Consumer<Integer>() { 
     @Override 
     public void accept(Integer integer) { 
      System.out.println(integer); 
     } 
    }); 

Problemem jest to, że nie wywołać metodę accept dla forEachOrdered, to działa tylko wtedy, gdy używam forEach. Domyślam się, że problem polega na tym, że Stream.generate tworzy wewnętrznie InfiniteSupplyingSpliterator, który nie ma charakterystyki ORDERED.

Pytanie brzmi, dlaczego? Wygląda na to, że wiemy, w jakiej kolejności generowane są dane. Drugie pytanie brzmi: jak wykonać forEachOrdered na równoległym strumieniu z generowaniem elementów strumienia?

+3

Jest nieuporządkowany, ponieważ specyfikacja mówi tak. Przyczyną, dla której nie została ukończona, jest fakt, że jest * nieskończony * w połączeniu ze szczegółami implementacji. – Holger

+2

Można to napisać znacznie lenniej z lambdami. –

+0

Niepowiązane, ale możesz po prostu zrobić '.forEachOrdered (System.out :: println)' –

Odpowiedz

10

Najprostsza odpowiedź brzmi: Stream.generate jest nieuporządkowana, ponieważ it’s specification mówi tak.

To nie jest tak, że implementacja próbowała przetwarzać elementy w kolejności, gdy jest to możliwe, w rzeczywistości jest odwrotnie. Po zdefiniowaniu operacji, która ma być nieuporządkowana, implementacja będzie starała się czerpać korzyści z nieuporządkowanego charakteru, gdy tylko będzie to możliwe. Jeśli wystąpi coś, co wygląda jak porządek źródłowy w operacji nieuporządkowanej, być może nie było sposobu na uzyskanie korzyści z nieuporządkowanego przetwarzania lub implementacja nie wykorzystała jeszcze wszystkich możliwości. Ponieważ może się to zmienić w przyszłej wersji lub alternatywnej implementacji, nie można polegać na zamówieniu, jeśli operacja została określona jako nieuporządkowana.

Zamiar zdefiniowania Stream.generate jako nieuporządkowany może stać się jaśniejszy w porównaniu z zamówieniem Stream.iterate. Funkcja przekazana do iterate otrzyma swój poprzedni element, więc istnieje wcześniejsza zależność między elementami, a więc zamówienie. Dostawca przeszedł Stream.generate nie otrzymał poprzedniego elementu, innymi słowy, nie ma związku z poprzednim elementem, gdy bierze pod uwagę tylko podpis funkcjonalny. To działa dla Stream.generate(() -> constant) lub Stream.generate(Type::new) jak przypadki użycia, ale mniej dla Stream.generate(instance::statefulOp), co wydaje się nie być zamierzonym podstawowym przypadkiem użycia. Nadal działa, jeśli operacja jest bezpieczna dla wątków i możesz żyć z nieuporządkowaną naturą strumienia.

Powodem, dla którego twój przykład nigdy nie robi postępów, jest fakt, że implementacja forEachOrdered w rzeczywistości nie uwzględnia natury nieuporządkowanej, ale próbuje przetwarzać porcje po podzieleniu w kolejności spotkań, tzn. Wszystkie podzadania próbują buforować swoje elementy, aby mogli przekazać je do działania, gdy podzadania po lewej stronie zostały zakończone. Oczywiście, buforowanie i nieskończone źródła nie grają ze sobą dobrze, szczególnie, że podstawowy InfiniteSupplyingSpliterator podzieli się na pod-zadania, które są nieskończone na własną rękę. Zasadniczo istnieje lewostronne zadanie, które może zasilać jego elementy bezpośrednio do akcji, ale zadania wydają się być gdzieś w kolejce, czekając na aktywację, co nigdy nie nastąpi, ponieważ wszystkie wątki robocze są już zajęty przetwarzaniem innego nieskończonego subu -dasks. Ostatecznie cała operacja zostanie przerwana z OutOfMemoryError, jeśli pozwolisz jej działać wystarczająco długo ...

Powiązane problemy