2015-08-06 28 views
6

Próbuję przeprowadzić analizę na zestawach. Mam przykładowy zestaw danych, który wygląda tak:Spark grupowanie danych grupowych na listę

orders.json

{"items":[1,2,3,4,5]} 
{"items":[1,2,5]} 
{"items":[1,3,5]} 
{"items":[3,4,5]} 

Wszystko to jest, to pojedyncze pole, które znajduje się lista numerów, które reprezentują identyfikatory.

Oto skrypt Spark próbuję uruchomić:

val sparkConf = new SparkConf() 
    .setMaster("local[*]") 
    .setAppName("Dataframe Test") 

val sc = new SparkContext(sparkConf) 
val sql = new SQLContext(sc) 

val dataframe = sql.read.json("orders.json") 

val expanded = dataframe 
    .explode[::[Long], Long]("items", "item1")(row => row) 
    .explode[::[Long], Long]("items", "item2")(row => row) 

val grouped = expanded 
    .where(expanded("item1") !== expanded("item2")) 
    .groupBy("item1", "item2") 
    .count() 

val recs = grouped 
    .groupBy("item1") 

Tworzenie expanded i grouped jest w porządku, w skrócie expanded znajduje się lista wszystkich możliwych zestawów dwóch identyfikatorów, gdzie dwa identyfikatory były w ten sam oryginalny zestaw. grouped filtruje identyfikatory, które zostały dopasowane do siebie, a następnie grupuje wszystkie unikalne pary identyfikatorów i generuje liczbę dla każdego. Schemat i dane próbka grouped są:

root 
|-- item1: long (nullable = true) 
|-- item2: long (nullable = true) 
|-- count: long (nullable = false) 

[1,2,2] 
[1,3,2] 
[1,4,1] 
[1,5,3] 
[2,1,2] 
[2,3,1] 
[2,4,1] 
[2,5,2] 
... 

Więc moje pytanie brzmi: w jaki sposób mogę teraz grupa na pierwszej pozycji w każdym związku, tak że mam listę krotek? Na przykład dane powyżej, chciałbym się spodziewać czegoś podobnego do tego:

[1, [(2, 2), (3, 2), (4, 1), (5, 3)]] 
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]] 

Jak widać w moim skrypcie z recs, myślałem, że zaczniesz wykonując GroupBy na „item1”, który jest pierwszym elementem w każdy rząd. Ale po tym jesteś z tym obiektem GroupedData, który ma bardzo ograniczone działania na nim. Naprawdę, pozostaje ci tylko robić agregacje, takie jak suma, śr. Itp. Chcę tylko wyświetlić listę krotek z każdego wyniku.

Mogę łatwo korzystać z funkcji RDD w tym momencie, ale to odbiega od korzystania z Dataframes. Czy istnieje sposób, aby to zrobić z funkcjami ramek danych.

Odpowiedz

5

Można zbudować że z org.apache.spark.sql.functions (collect_list i struct) dostępny od 1,6

val recs =grouped.groupBy('item1).agg(collect_list(struct('item2,'count)).as("set")) 


+-----+----------------------------+ 
|item1|set       | 
+-----+----------------------------+ 
|1 |[[5,3], [4,1], [3,2], [2,2]]| 
|2 |[[4,1], [1,2], [5,2], [3,1]]| 
+-----+----------------------------+ 

Można użyć collect_set również

Edycja: informacji, tuples nie istnieją w dataframes. Najbliższe struktury to struct, ponieważ są odpowiednikiem klas case w interfejsie API bez typu dataset.

Powiązane problemy