2013-07-06 11 views
6

Więc moje dane wejściowe ma dwa pola/kolumny: ID1 & ID2, a mój kod jest następujący:Skalowanie: Jak zachować inne pole po grupie ("pole").

TextLine(args("input")) 
.read 
.mapTo('line->('id1,'id2)) {line: String => 
    val fields = line.split("\t") 
     (fields(0),fields(1)) 
} 
.groupBy('id2){.size} 
.write(Tsv(args("output"))) 

Wyniki uzyskane w (co zakładam) dwóch dziedzinach: ID2 size *. Trochę utknąłem na tym, aby dowiedzieć się, czy możliwe jest zachowanie wartości id1, która została również zgrupowana z id2 i dodać ją jako inne pole?

Odpowiedz

8

Nie możesz tego zrobić w miły sposób, obawiam się. Zastanów się, jak to działa pod maską - dzieli dane, które mają być policzone na kawałki i wysyła je do różnych procesów, każdy proces zlicza jego fragment, a następnie jeden reduktor dodaje je wszystkie na końcu. Podczas gdy każdy proces się liczy, nie zna całego rozmiaru, więc nie może dodać pola. Jedynym sposobem jest powrót i dodanie danych do danych, gdy znany jest cały rozmiar (tj. Połączenie).

Jeśli każda grupa wpisuje się w pamięci (można skonfigurować pamięć), można:

Tsv(args("input"), ('id1, 'id2)) 
.groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list)) 
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) { 
    case (list, size) => list.map(record => (record._1, record._2, size)) 
} 
.write(Tsv(args("output"))) 

Ale jeśli system nie ma wystarczającej ilości pamięci, będziesz musiał użyć drogie dołączyć.

Uwaga: Możesz użyć Tsv zamiast TextLine, a następnie mapTo i dzielenia.

+0

Proszę zobaczyć, czy to ma sens, czuję ten sam ból. http://stackoverflow.com/questions/25994879/scalding-flatten-fields-after-groupby – Sergey