2015-02-05 12 views
12

Zakładając, że mam aplikację Java IntStream, czy można ją przekonwertować na IntStream z łącznymi sumami? Na przykład strumień rozpoczynający się od [4, 2, 6, ...] powinien zostać przekonwertowany na [4, 6, 12, ...].Strumień obliczeń statycznych: suma skumulowana

Bardziej ogólnie, w jaki sposób należy postępować w przypadku realizacji operacji z użyciem strumienia stanowego? To powinno być możliwe:

Z oczywistym ograniczeniem, że działa to tylko na sekwencyjnych strumieniach. Jednak Stream.map wyraźnie wymaga funkcji mapowania bezstanowego. Czy mam rację, jeśli brakuje operacji Stream.statefulMap lub Stream.cumulative, czy brakuje punktu strumieni Java?

Porównaj na przykład do Haskell, gdzie funkcja scanl1 rozwiązuje dokładnie ten przykład:

scanl1 (+) [1 2 3 4] = [1 3 6 10] 
+2

Strumienie są dość celowo zaprojektowane, aby obsługiwać tylko operacje równoległe, co nie jest tak naprawdę w przypadku skanowania. –

+0

Strumienie mają więcej sensu, jeśli udajesz, że są w 'java.util.concurrent.stream' zamiast' java.util.stream'. –

+6

Zauważ, że jeśli twoje źródło jest tablicą, możesz po prostu użyć ['Arrays.parallelPrefix (array, Integer :: sum);]] (http://docs.oracle.com/javase/8/docs/api/java /util/Arrays.html#parallelPrefix-int:A-java.util.function.IntBinaryOperator-)... – Holger

Odpowiedz

3

Możesz to zrobić, używając numeru atomowego. Na przykład:

import java.util.concurrent.atomic.AtomicLong; 
import java.util.stream.IntStream; 
import java.util.stream.LongStream; 

public class Accumulator { 
    public static LongStream toCumulativeSumStream(IntStream ints){ 
     AtomicLong sum = new AtomicLong(0); 
     return ints.sequential().mapToLong(sum::addAndGet); 
    } 

    public static void main(String[] args){ 
     LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5)); 
     sums.forEachOrdered(System.out::println); 
    } 
} 

This Wyjścia:

1 
3 
6 
10 

Użyłem Long do przechowywania sum, bo to całkiem możliwe, że dwa ints sumują się do ponad Integer.MAX_VALUE i długa mniej szansy na przelanie.

+2

Uważam, że ta odpowiedź jest interesująca i nieoczekiwana. Czy możesz odpowiedzieć na kilka pytań na ten temat? Dlaczego używałeś AtomicReference zamiast AtomicInteger - z addAndGet? Ale co ważniejsze, dlaczego to zmienia fakt, że jeśli strumień został wykonany równolegle, nie ma gwarancji kolejności, w jakiej występuje akumulacja? Czy AtomicReference w jakiś sposób zmienia zachowanie strumieni? Jeśli tak, możesz wskazać na samouczek lub dokumentację na ten temat? dzięki. – sprinter

+2

Tylko dodatek do poprzedniego pytania, myślałem, że spróbuję sam z równoległym IntStream. To nie działa. Więc to nie jest dobra odpowiedź. – sprinter

+2

@sprinter Tak, to nie działa w przypadku strumieni równoległych, ale ponieważ operacja nie może zostać zrównoleglona, ​​można po prostu wywołać .sequential() przed uruchomieniem. –

5

Jest to możliwe do zrobienia z kolektorem, który następnie tworzy nowy strumień:

class Accumulator { 
    public static void accept(List<Integer> list, Integer value) { 
     list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1))); 
    } 

    public static List<Integer> combine(List<Integer> list1, List<Integer> list2) { 
     int total = list1.get(list1.size() - 1); 
     list2.stream().map(n -> n + total).forEach(list1::add); 
     return list1; 
    } 
} 

ten jest używany jako:

myIntStream.parallel() 
    .collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine) 
    .stream(); 

Mam nadzieję, że widać, że ważną cechą tego Kolektor jest taki, że nawet jeśli strumień jest równoległy, gdy instancje Accumulator są łączone, dostosowuje sumy.

To oczywiście nie jest tak wydajne, jak operacja na mapie, ponieważ zbiera cały strumień, a następnie tworzy nowy strumień. Ale to nie jest tylko szczegół implementacji: jest to niezbędna funkcja faktu, że strumienie mają być potencjalnie przetwarzane jednocześnie.

Przetestowałem to z IntStream.range(0, 10000).parallel() i działa poprawnie.

+0

Muszę powiedzieć, że to bardzo dobra odpowiedź. Może nie być idealną odpowiedzią na to pytanie, ale na równoległą akumulację. – Jatin

+1

@ Dziękuję dzięki. Dopiero później zdałem sobie sprawę, że może to być trochę prostsze niż to - całkowita suma nie jest wymagana, ponieważ jest to tylko ostatni element na liście. Zaktualizuję, aby pokazać Ci, co mam na myśli - daj mi znać, co myślisz. – sprinter

+0

Jest to bardziej czytelne. Dzięki. Nie myślałem, że biorę pod uwagę równoległe linie, które można blokować, a następnie można je łączyć. wspaniały. – Jatin

Powiązane problemy