W przypadku kolekcji zagnieżdżonej nie ma magii. Spark będzie obsługiwać w ten sam sposób co RDD[(String, String)]
i RDD[(String, Seq[String])]
.
Odczytywanie takiej zagnieżdżonej kolekcji z plików Parkietu może być trudne.
Weźmy przykład z spark-shell
(1.3.1):
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
Zapisz plik parkiet
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> outers.toDF.saveAsParquetFile("outers.parquet")
odczytać pliku parkiet
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]
scala> val outers = dataFrame.map { row =>
| val key = row.getString(0)
| val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
| Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
Ważną część to row.getAs[Seq[Row]](1)
. Wewnętrzna reprezentacja zagnieżdżonej sekwencji struct
jest ArrayBuffer[Row]
, możesz użyć dowolnego jej supertypu zamiast Seq[Row]
. 1
to indeks kolumny w zewnętrznym wierszu. Użyłem tutaj metody getAs
, ale są alternatywne wersje w najnowszych wersjach Sparka. Zobacz kod źródłowy Row trait.
Teraz, gdy masz już RDD[Outer]
, możesz zastosować dowolną pożądaną transformację lub akcję.
Pamiętaj, że użyliśmy biblioteki iskrowo-SQL tylko do odczytu pliku parkietu. Można na przykład wybrać tylko żądane kolumny bezpośrednio w DataFrame, przed odwzorowaniem ich na RDD.
dataFrame.select('col1, 'col2).map { row => ... }
Dziękuję Lomig do szczegółowej odpowiedzi. Zaznaczam to jako poprawną odpowiedź. Chociaż nie jesteśmy jeszcze w Spark 1.3, planujemy upgrade w tym miesiącu. Czy można to zrobić bez API ramki danych w Sparku 1.2? Proszę dać mi znać, jak działa getAs [Seq [Row]] (1)? Indeks [1] jest pozycją kolumny zawierającej zagnieżdżoną tablicę, czy to prawda? – Tagar
Zobacz moją edycję. W przypadku Sparka 1.2 możesz użyć tego samego kodu do transformacji z 'Wiersza' do swojej klasy spraw. Aby odczytać plik parkietu w starszych wersjach, zapoznaj się z oficjalną dokumentacją, która jest bardzo bliska. –
Mam to. Wielkie dzięki. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala#L268 GetSeq [Wiersz] (1) zrobiłby także? – Tagar