2015-05-02 13 views
17

Mam tabeli parkiet z jednej z kolumn jestJak czytać zagnieżdżony w kolekcji Spark

struct, array < < col1, col2 .. Cöln >>

można uruchomić zapytań przeciwko tej tabela w Hive przy użyciu składni LATERAL VIEW.

Jak odczytać tę tabelę w RDD, a co ważniejsze, jak filtrować, mapować itp. Tę zagnieżdżoną kolekcję w Spark?

Nie można znaleźć żadnych odniesień do tego w dokumentacji Spark. Z góry dziękuję za wszelkie informacje!

ps. Felt może być pomocne, aby podać pewne statystyki na stole. Liczba kolumn w głównej tabeli ~ 600. Liczba rzędów ~ 200m. Liczba "kolumn" w kolekcji zagnieżdżonej ~ 10. Średnia liczba rekordów w kolekcji zagnieżdżonej ~ 35.

Odpowiedz

18

W przypadku kolekcji zagnieżdżonej nie ma magii. Spark będzie obsługiwać w ten sam sposób co RDD[(String, String)] i RDD[(String, Seq[String])].

Odczytywanie takiej zagnieżdżonej kolekcji z plików Parkietu może być trudne.

Weźmy przykład z spark-shell (1.3.1):

scala> import sqlContext.implicits._ 
import sqlContext.implicits._ 

scala> case class Inner(a: String, b: String) 
defined class Inner 

scala> case class Outer(key: String, inners: Seq[Inner]) 
defined class Outer 

Zapisz plik parkiet

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b"))))) 
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25 

scala> outers.toDF.saveAsParquetFile("outers.parquet") 

odczytać pliku parkiet

scala> import org.apache.spark.sql.catalyst.expressions.Row 
import org.apache.spark.sql.catalyst.expressions.Row 

scala> val dataFrame = sqlContext.parquetFile("outers.parquet") 
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>] 

scala> val outers = dataFrame.map { row => 
    | val key = row.getString(0) 
    | val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1))) 
    | Outer(key, inners) 
    | } 
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848 

Ważną część to row.getAs[Seq[Row]](1). Wewnętrzna reprezentacja zagnieżdżonej sekwencji struct jest ArrayBuffer[Row], możesz użyć dowolnego jej supertypu zamiast Seq[Row]. 1 to indeks kolumny w zewnętrznym wierszu. Użyłem tutaj metody getAs, ale są alternatywne wersje w najnowszych wersjach Sparka. Zobacz kod źródłowy Row trait.

Teraz, gdy masz już RDD[Outer], możesz zastosować dowolną pożądaną transformację lub akcję.

Pamiętaj, że użyliśmy biblioteki iskrowo-SQL tylko do odczytu pliku parkietu. Można na przykład wybrać tylko żądane kolumny bezpośrednio w DataFrame, przed odwzorowaniem ich na RDD.

dataFrame.select('col1, 'col2).map { row => ... } 
+1

Dziękuję Lomig do szczegółowej odpowiedzi. Zaznaczam to jako poprawną odpowiedź. Chociaż nie jesteśmy jeszcze w Spark 1.3, planujemy upgrade w tym miesiącu. Czy można to zrobić bez API ramki danych w Sparku 1.2? Proszę dać mi znać, jak działa getAs [Seq [Row]] (1)? Indeks [1] jest pozycją kolumny zawierającej zagnieżdżoną tablicę, czy to prawda? – Tagar

+1

Zobacz moją edycję. W przypadku Sparka 1.2 możesz użyć tego samego kodu do transformacji z 'Wiersza' do swojej klasy spraw. Aby odczytać plik parkietu w starszych wersjach, zapoznaj się z oficjalną dokumentacją, która jest bardzo bliska. –

+0

Mam to. Wielkie dzięki. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala#L268 GetSeq [Wiersz] (1) zrobiłby także? – Tagar

8

Podam odpowiedź w oparciu o Python, ponieważ to właśnie używam. Myślę, że Scala ma coś podobnego.

Funkcja explode została dodana w Spark 1.4.0 do obsługi zagnieżdżonych tablic w DataFrames, zgodnie z Python API docs.

Tworzenie dataframe badawczej:

from pyspark.sql import Row 

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])]) 
df.show() 

## +-+--------------------+ 
## |a|    intlist| 
## +-+--------------------+ 
## |1|ArrayBuffer(1, 2, 3)| 
## |2|ArrayBuffer(4, 5, 6)| 
## +-+--------------------+ 

Korzystając explode spłaszczyć kolumnę listy:

from pyspark.sql.functions import explode 

df.select(df.a, explode(df.intlist)).show() 

## +-+---+ 
## |a|_c0| 
## +-+---+ 
## |1| 1| 
## |1| 2| 
## |1| 3| 
## |2| 4| 
## |2| 5| 
## |2| 6| 
## +-+---+ 
+0

Dzięki dnlbrky. Wygląda na to, że jest łatwiejszy do odczytania niż Scala. Zdecydowanie wypróbuję twój pythonowy przykład. Prawdopodobnie nie będziemy mieć Sparka 1.4, ale do pewnego końca tego roku, gdy Cloudera wyda CDH 5.5 :-) Mamy nadzieję, że do tego czasu Spark 1.5. – Tagar

3

Innym podejściem byłoby przy użyciu wzorca dopasowania tak:

val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
    case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match { 
    case List(a:String, b: String) => (a, b) 
    }).toList 
}) 

Można Dopasowanie do wzorca bezpośrednio w wierszu, ale najprawdopodobniej zakończy się niepowodzeniem z kilku powodów.

0

Powyższe odpowiedzi są wspaniałymi odpowiedziami i rozwiązują to pytanie z różnych stron; Spark SQL to również całkiem przydatny sposób uzyskiwania dostępu do zagnieżdżonych danych.

Oto przykład jak użyć explode() w SQL bezpośrednio do zapytania na zagnieżdżoną kolekcję.

SELECT hholdid, tsp.person_seq_no 
FROM ( SELECT hholdid, explode(tsp_ids) as tsp 
     FROM disc_mrt.unified_fact uf 
    ) 

tsp_ids jest zagnieżdżony w elemencie, który ma wiele atrybutów, w tym person_seq_no który mam wyboru w zewnętrznej kwerendy powyżej.

Powyższy test został przetestowany w programie Spark 2.0. Zrobiłem mały test i nie działa on w Sparku 1.6. To pytanie zadano, gdy Spark 2 nie było w pobliżu, więc ta odpowiedź dodaje ładnie do listy dostępnych opcji do obsługi struktur zagnieżdżonych.

wyczuwalny nie rozwiązane JIRAs na eksplozję() za dostęp do SQL: