2015-05-28 13 views
19

Mam plik json z niektórych danych, jestem w stanie stworzyć DataFrame z niego i schemat konkretnej części tego jestem zainteresowany wygląda jak następuje:Czy struktura zagnieżdżona DataFrame Spark jest ograniczona do selekcji?

val json: DataFrame = sqlc.load("entities_with_address2.json", "json")

root 
|-- attributes: struct (nullable = true) 
| |-- Address2: array (nullable = true) 
| | |-- value: struct (nullable = true) 
| | | |-- Zip: array (nullable = true) 
| | | | |-- element: struct (containsNull = true) 
| | | | | |-- value: struct (nullable = true) 
| | | | | | |-- Zip5: array (nullable = true) 
| | | | | | | |-- element: struct (containsNull = true) 
| | | | | | | | |-- value: string (nullable = true) 

kiedy staram się po prostu wybrać najgłębsze murawę: json.select("attributes.Address2.value.Zip.value.Zip5").collect()

To daje mi wyjątek: org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value, StructType(StructField(Zip5, ArrayType(StructType(StructField(value, StringType, true)), true), true)), true)), true), true);

Patrząc na metodę resolveGetField LogicalPlan widzę, że można wybrać z StructType lub z ArrayType (StructType), ale czy jest jakiś sposób, aby wybrać głębiej? Jak mogę wybrać pole, którego potrzebuję?

Oto pełny wyjątek.

org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true); 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214) 
     at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
     at scala.collection.immutable.List.foldLeft(List.scala:84) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:50) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:46) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252) 
     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252) 
     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.immutable.List.foreach(List.scala:318) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
     at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:122) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
     at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
     at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:46) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:44) 
     at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89) 
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44) 
     at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:40) 
     at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080) 
     at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133) 
     at org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157) 
     at org.apache.spark.sql.DataFrame.select(DataFrame.scala:476) 
     at org.apache.spark.sql.DataFrame.select(DataFrame.scala:491) 
     at com.reltio.analytics.PREDF.test(PREDF.scala:55) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) 
     at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
     at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) 
     at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) 
     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) 
     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) 
     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) 
     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) 
     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) 
     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) 
     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) 
     at org.junit.runners.ParentRunner.run(ParentRunner.java:309) 
     at org.junit.runner.JUnitCore.run(JUnitCore.java:160) 
     at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74) 
     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211) 
     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67) 

Odpowiedz

28

Problemem jest ArrayType - można odtworzyć ten błąd bardzo prosto:

val df = Seq(Tuple1(Array[String]())).toDF("users") 

W tym momencie df.printSchema pokazuje:

root 
|-- users: array (nullable = true) 
| |-- element: string (containsNull = true) 

A teraz jeśli Spróbuj:

df.select($"users.element") 

Otrzymasz dokładnie ten sam wyjątek - GetField is not valid...

Masz kilka różnych opcji, aby się zrelaksować Array. Można dostać na poszczególne pozycje z getItem tak:

df.select($"users".getItem(0)) 

a od getItem powraca inny Column można kopać tak głęboko jak chcesz:

df.select($"attributes.Address2".getItem(0).getField("value").getField("Zip").getItem(...) 
// etc 

Ale z tablicy, prawdopodobnie chcesz programowo rozwiń całą tablicę. Jeśli spojrzysz na sposób, w jaki Hive to obsługuje, musisz wykonać LATERAL VIEW. W Spark, będziesz musiał użyć explode stworzyć odpowiednik Hive LATERAL VIEW:

case class User(name: String) 
df.explode($"users"){ case Row(arr: Array[String]) => arr.map(User(_)) } 

pamiętać, że użycie klasy Sprawa w moim mapie - to jest to, co mają docs. Jeśli nie chcesz, aby utworzyć klasę przypadku można po prostu wrócić do Tuple1 (lub Tuple2 lub Tuple3 etc):

df.explode($"users"){ case Row(arr: Array[String]) => arr.map(Tuple1(_)) } 
+1

David, dzięki za odpowiedź. Było jasne, dlaczego nie działa - możliwe jest projektowanie tylko z Struct lub Array (Struct) (jest to klasa LogicalPlan). Nie chciałem przegapić czegoś, czego nie całkiem rozumiem. Chociaż odpowiedź nie jest tym, czego się spodziewałem, jestem naprawdę wdzięczny, ponieważ widzę kogoś, kto próbował i zawiódł. Wygląda na to, że jedynym sposobem jest eksplozja, a następnie projekt. – evgenii

Powiązane problemy