2015-04-12 11 views
8

Szukam sposobu implementacji operacji zgrupowania nieterminowego, tak aby narzut pamięci był minimalny.Strumienie Java - efektywne grupowanie elementów w posortowanych strumieniach

Na przykład rozważ odrębność(). W ogólnym przypadku nie ma innego wyjścia, jak zebrać wszystkie odrębne przedmioty, a dopiero potem przesyłać je dalej. Jednakże, jeśli wiemy, że strumień wejściowy jest już posortowany, operację można wykonać "w locie", używając minimalnej pamięci.

Wiem, że mogę to osiągnąć dla iteratorów za pomocą opakowania iteratora i implementując logikę grupowania. Czy istnieje prostszy sposób zaimplementowania tego przy użyciu API strumieniowego?

- EDIT -

znalazłem drogę do nadużyć Stream.flatMap (..) w celu osiągnięcia to:

private static class DedupSeq implements IntFunction<IntStream> { 
    private Integer prev; 

    @Override 
    public IntStream apply(int value) { 
     IntStream res = (prev != null && value == prev)? IntStream.empty() : IntStream.of(value); 
     prev = value; 
     return res; 
    }  
    } 

, a następnie:

IntStream.of(1,1,3,3,3,4,4,5).flatMap(new DedupSeq()).forEach(System.out::println); 

Która wydruki:

1 
3 
4 
5 

Po wprowadzeniu pewnych zmian tę samą technikę można zastosować do dowolnego efektywnego w pamięci grupowania sekwencji strumieni. W każdym razie, nie podoba mi się to rozwiązanie i szukałem czegoś bardziej naturalnego (na przykład sposobu, w jaki działa mapowanie lub filtrowanie). Co więcej, zerwam kontrakt tutaj, ponieważ funkcja dostarczana do flatMap (..) jest stateful.

+2

Zawsze można ".filter (someSet :: add) ', ale czy próbowałeś i porównałeś wydajność takiego rozwiązania ze zwykłym' distinct() '? Mówisz też "w ogólnym przypadku", ale może być tak, że istnieje zoptymalizowana implementacja w przypadku, gdy 'Stream' _is_' ORDERED', właśnie (lub dokładniej, jego bazowy 'Spliterator') – fge

+0

@fge: Nie jestem pewien, czy jest tam jakaś optymalizacja. Kod: IntStream.range (0, 100000000) .distinct(). ForEach (x -> {}); Uruchamia się pamięć, pomimo tego, że leżący u jej podstaw Spliterator zgłasza się jako ORDERED. –

+1

Czy próbowałeś z '.forEachOrdered()'? – fge

Odpowiedz

4

Jeśli chcesz rozwiązanie, które nie dodać stan zmienny do funkcji, które nie powinno mieć to, można odwołać się do collect:

static void distinctForSorted(IntStream s, IntConsumer action) { 
    s.collect(()->new long[]{Long.MIN_VALUE}, 
       (a, i)->{ if(a[0]!=i) { action.accept(i); assert i>a[0]; a[0]=i; }}, 
       (a, b)->{ throw new UnsupportedOperationException(); }); 
} 

To działa, ponieważ jest to zamierzony sposób przy użyciu zmiennych kontenerów nie może jednak działać równolegle, ponieważ dzielenie w dowolnych pozycjach strumienia implikuje możliwość napotkania wartości w dwóch (lub nawet więcej) wątkach.

Jeśli chcesz uzyskać ogólny cel IntStream, a nie działanie forEach, preferowane jest rozwiązanie niskopoziomowe o wartości Spliterator, pomimo zwiększonej złożoności.

static IntStream distinctForSorted(IntStream s) { 
    Spliterator.OfInt sp=s.spliterator(); 
    return StreamSupport.intStream(
     new Spliterators.AbstractIntSpliterator(sp.estimateSize(), 
     Spliterator.DISTINCT|Spliterator.SORTED|Spliterator.NONNULL|Spliterator.ORDERED) { 
     long last=Long.MIN_VALUE; 
     @Override 
     public boolean tryAdvance(IntConsumer action) { 
      long prev=last; 
      do if(!sp.tryAdvance(distinct(action))) return false; while(prev==last); 
      return true; 
     } 
     @Override 
     public void forEachRemaining(IntConsumer action) { 
      sp.forEachRemaining(distinct(action)); 
     } 
     @Override 
     public Comparator<? super Integer> getComparator() { 
      return null; 
     } 
     private IntConsumer distinct(IntConsumer c) { 
      return i-> { 
       if(i==last) return; 
       assert i>last; 
       last=i; 
       c.accept(i); 
      }; 
     } 
    }, false); 
} 

nawet dziedziczy równoległą obsługę chociaż to działa przez preselekcji pewne wartości przed przetworzeniem ich w innym wątku, więc nie przyspieszy wyraźną operację , ale być może kolejnym operacjom, jeśli są intensywne obliczenia te.


Na zakończenie, jest tu wyraźna praca za arbitralne, tj nieposortowane, IntStream s, które nie opierają się na „boksu powiększonej HashMap” co może mieć znacznie lepsze zużycie pamięci:

static IntStream distinct(IntStream s) { 
    boolean parallel=s.isParallel(); 
    s=s.collect(BitSet::new, BitSet::set, BitSet::or).stream(); 
    if(parallel) s=s.parallel(); 
    return s; 
} 

Działa tylko dla pozytywnych wartości int; rozszerzenie do pełnego 32-bitowego zakresu wymagałoby dwóch BitSet s nie będzie więc wyglądać tak zwięźle, ale często przypadek użycia pozwala ograniczyć przechowywanie do zakresu 31 bitów lub nawet niższego ...

+0

Dzięki. Widzę teraz, że niestandardowy Spliterator jest sposobem na zrobienie tego (tak jak w stackoverflow.com/q/28363323/1441122, sugerowanym przez ** Stuart Marks **). Rozwiązanie bitsetowe na końcu jest eleganckie, nawiasem mówiąc (choć wciąż O (n) w użyciu pamięci). –

1

Sposób to zrobić właściwie byłoby włączyć strumień do spliterator, następnie owinąć go w zależności od właściwości zwróconego spliterator

  • wykonuje naiwny deduplikacji za pomocą jednoczesnego zestaw jeśli źródłem jest nie posortowane ani różne
  • wykonuje optymalizację zoptymalizowanej dedukcji, jeśli segregator źródłowy jest posortowany.
    Obsługa operacji trySplit może być trudna, ponieważ może wymagać kilku kroków do przesunięcia podrozdzielacza, aby upewnić się, że nie widzi ogona biegu elementów nierozróżnialnych.
  • prostu zwraca spliterator jak jest, jeśli źródłem jest już odrębna

Kiedy już, że spliterator można włączyć go z powrotem do strumienia z tych samych właściwościach i nadal to robić operacji strumieniowych na nim

Ponieważ nie możemy modyfikować istniejących interfejsów jdk, pomocniczy interfejs API musiał wyglądać mniej więcej tak: dedup(IntStream.of(...).map(...)).collect(...).


Jeśli skontrolować źródło java.util.stream.DistinctOps.makeRef(AbstractPipeline<?, T, ?>) można zauważyć, że JDK mniej więcej robi dla strumieni odniesienia oparte.

To tylko implementacja IntStream (java.util.stream.IntPipeline.distinct()), która przyjmuje nieefektywne podejście, które nie wykorzystuje wartości DISTINCT lub SORTED.

Po prostu ślepo konwertuje IntStream do pudełkowego strumienia Integer i korzysta z deduplikacji referencyjnej bez przekazywania odpowiednich flag, które sprawiają, że pamięć jest wydajna.

Jeśli nie jest to już naprawione w jdk9, może być warte błędu, ponieważ jest to w zasadzie niepotrzebne zużycie pamięci i zmarnowany potencjał optymalizacji dla operacji strumieniowych, jeśli niepotrzebnie odrzucają flagi stream.

Powiązane problemy