2014-09-03 14 views
5

Mam kilka dużych plików tekstowych, które chcę przetworzyć, grupując ich linie.Czy w java 8 można zrobić leniwego groupby, zwracając strumień?

Starałem się korzystać z nowych możliwości streamingu, jak

return FileUtils.readLines(...) 
      .parallelStream() 
      .map(...) 
      .collect(groupingBy(pair -> pair[0])); 

Problem polega na tym, że AFAIK ten generuje mapę.

Czy istnieje sposób na uzyskanie kodu wysokiego poziomu, takiego jak powyższy, który generuje na przykład strumień wpisów?

UPDATE: To, czego szukam, to coś w stylu pytona: itertools.groupby. Moje pliki są już posortowane (parami [0]), chcę tylko ładować grupy jeden po drugim.

Mam już rozwiązanie iteracyjne. Zastanawiam się tylko, czy istnieje bardziej deklaratywny sposób na zrobienie tego. Przy okazji używanie guawy lub innej biblioteki stron trzecich nie stanowiłoby dużego problemu.

+2

Jak można robisz leniwego Grupuj według? Aby pogrupować przez jakąś właściwość obiektu zawartego w strumieniu, należy powtórzyć wszystkie elementy w strumieniu. – Eran

+0

Co masz na myśli przez "zgrupowanie linii?" masz na myśli binning jak w metodzie Stream 'groupBy' lub masz na myśli czytanie wielu linii jednocześnie? – dkatzel

+0

Dzięki za komentarze, dodano AKTUALIZACJĘ do pytania. –

Odpowiedz

3

Zadanie, które chcesz osiągnąć, różni się od tego, jakie ma grupowanie. groupingBy nie opiera się na kolejności elementów Stream, ale na algorytmie Map zastosowanym do wyniku klasyfikatora Function.

Należy złożyć sąsiednie przedmioty o wspólnej wartości właściwości w jeden element List. Nie jest nawet konieczne sortowanie według tej właściwości, o ile można zagwarantować, że wszystkie elementy mające tę samą wartość właściwości są klastrowane.

Być może możliwe jest sformułowanie tego zadania jako redukcji, ale dla mnie wynikowa struktura wygląda na zbyt skomplikowaną.

Więc chyba bezpośrednie wsparcie dla tej funkcji zostanie dodana do Stream s, podejście iterator siedzibą w wygląda najbardziej pragmatyczne do mnie:

class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> { 
    static <T,G> Stream<Map.Entry<G,List<T>>> foldBy(
      Stream<? extends T> s, Function<? super T, ? extends G> f) { 
     return StreamSupport.stream(new Folding<>(s.spliterator(), f), false); 
    } 
    private final Spliterator<? extends T> source; 
    private final Function<? super T, ? extends G> pf; 
    private final Consumer<T> c=this::addItem; 
    private List<T> pending, result; 
    private G pendingGroup, resultGroup; 

    Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) { 
     source=s; 
     pf=f; 
    } 
    private void addItem(T item) { 
     G group=pf.apply(item); 
     if(pending==null) pending=new ArrayList<>(); 
     else if(!pending.isEmpty()) { 
      if(!Objects.equals(group, pendingGroup)) { 
       if(pending.size()==1) 
        result=Collections.singletonList(pending.remove(0)); 
       else { 
        result=pending; 
        pending=new ArrayList<>(); 
       } 
       resultGroup=pendingGroup; 
      } 
     } 
     pendingGroup=group; 
     pending.add(item); 
    } 
    public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) { 
     while(source.tryAdvance(c)) { 
      if(result!=null) { 
       action.accept(entry(resultGroup, result)); 
       result=null; 
       return true; 
      } 
     } 
     if(pending!=null) { 
      action.accept(entry(pendingGroup, pending)); 
      pending=null; 
      return true; 
     } 
     return false; 
    } 
    private Map.Entry<G,List<T>> entry(G g, List<T> l) { 
     return new AbstractMap.SimpleImmutableEntry<>(g, l); 
    } 
    public int characteristics() { return 0; } 
    public long estimateSize() { return Long.MAX_VALUE; } 
    public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; } 
} 

Leniwy charakter uzyskany składany Stream najlepiej można wykazać poprzez zastosowanie to do nieskończonego strumienia:

Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4) 
     .filter(e -> e.getKey()>5) 
     .findFirst().ifPresent(e -> System.out.println(e.getValue())); 
+1

drobna korekta: "groupingBy" rzeczywiście zachowuje porządek oryginalnego strumienia, jeśli współpracujący kolektor dalej współpracuje (większość robi, oprócz tych z cechą UNORDERED); podzbiór elementów w danym kuble jest prezentowany kolektorowi w dół w takiej samej kolejności, w jakiej znajdowały się na wejściu. –

+1

@Brian Goetz: tak, to * utrzymuje * kolejność, ale wszystko, co powiedziałem w mojej odpowiedzi, to to, że * nie polega * na kolejności tworzenia grup. Btw. to było jedno z przypadków testowych, które przygotowałem dla mojego rozwiązania: zbieranie strumienia zwróconego przez moje rozwiązanie do 'Mapy' musi produkować dokładnie tę samą' Mapę', co 'groupingBy' przy użyciu tego samego klasyfikatora. – Holger

1

cyclops-react, ja Library przyczynić się do oferuje zarówno sharding i grupowanie funcitonality że może robić, co chcesz.

ReactiveSeq<ListX<TYPE>> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...)) 
      .groupedStatefullyWhile((batch,next) -> batch.size()==0 ? true : next.equals(batch.get(0))); 

Operator groupedStatefullyWhile pozwala grupować elementy na podstawie bieżącego stanu partii. ReactiveSeq to potok sekwencyjny z pojedynczym gwintem.

Map<Key, Stream<Value> sharded = 
        new LazyReact() 
       .fromCollection(FileUtils.readLines(...)) 
       .map(..) 
       .shard(shards, pair -> pair[0]); 

To utworzy LazyFutureStream (to realizuje java.util.stream.Stream), który przetwarza dane w pliku, w sposób asynchroniczny i równoległy. Jest leniwy i nie rozpocznie przetwarzania, dopóki dane nie zostaną przeciągnięte.

Jedynym zastrzeżeniem jest to, że trzeba wcześniej zdefiniować odłamki. To znaczy. parametr "shards", powyżej którego jest mapa Async.Queue jest kluczowana przez klucz do fragmentu (ewentualnie dowolna para [0]?).

np.

Map<Integer,Queue<String>> shards; 

There is a sharding example with video here i test code here

0

To może być wykonane przez collapse z StreamEx

final int[][] aa = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 4, 4 } }; 

StreamEx.of(aa) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .forEach(System.out::println); 

Możemy dodać peek i limit celu sprawdzenia, czy jest leniwy kalkulacja:

StreamEx.of(aa) 
     .peek(System.out::println) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .limit(1) 
     .forEach(System.out::println); 
Powiązane problemy