2016-03-30 15 views
7

Mam mały zestaw danych, który będzie wynikiem pracy Sparka. Zastanawiam się nad konwersją tego zestawu danych na ramkę danych pod kątem wygody na końcu zadania, ale starałem się poprawnie zdefiniować schemat. Problem stanowi ostatnie pole poniżej (topValues); jest to ArrayBuffer krotek - klawiszy i zliczeń.Spark: Programowe tworzenie schematu danych w scala

val innerSchema = 
    StructType(
     Array(
     StructField("value", StringType), 
     StructField("count", LongType) 
    ) 
    ) 
    val outputSchema = 
    StructType(
     Array(
     StructField("name", StringType, nullable=false), 
     StructField("index", IntegerType, nullable=false), 
     StructField("count", LongType, nullable=false), 
     StructField("empties", LongType, nullable=false), 
     StructField("nulls", LongType, nullable=false), 
     StructField("uniqueValues", LongType, nullable=false), 
     StructField("mean", DoubleType), 
     StructField("min", DoubleType), 
     StructField("max", DoubleType), 
     StructField("topValues", innerSchema) 
    ) 
    ) 

    val result = stats.columnStats.map{ c => 
    Row(c._2.name, c._1, c._2.count, c._2.empties, c._2.nulls, c._2.uniqueValues, c._2.mean, c._2.min, c._2.max, c._2.topValues.topN) 
    } 

    val rdd = sc.parallelize(result.toSeq) 

    val outputDf = sqlContext.createDataFrame(rdd, outputSchema) 

    outputDf.show() 

Błąd Dostaję to MatchError: scala.MatchError: ArrayBuffer((10,2), (20,3), (8,1)) (of class scala.collection.mutable.ArrayBuffer)

Kiedy debugowania i sprawdzić swoje obiekty, widzę to:

rdd: ParallelCollectionRDD[2] 
rdd.data: "ArrayBuffer" size = 2 
rdd.data(0): [age,2,6,0,0,3,14.666666666666666,8.0,20.0,ArrayBuffer((10,2), (20,3), (8,1))] 
rdd.data(1): [gender,3,6,0,0,2,0.0,0.0,0.0,ArrayBuffer((M,4), (F,2))] 

Wydaje mi się, że” dokładnie opisałem ArrayBuffer krotek w mojej wewnętrznej scenie, ale Spark nie zgadza się.

Każdy pomysł, jak powinienem zdefiniować schemat?

+0

Przydałoby się, jeśli podasz przykładowe dane lub przynajmniej dokładny typ 'rdd'. – zero323

Odpowiedz

10
val rdd = sc.parallelize(Array(Row(ArrayBuffer(1,2,3,4)))) 
val df = sqlContext.createDataFrame(
    rdd, 
    StructType(Seq(StructField("arr", ArrayType(IntegerType, false), false) 
) 

df.printSchema 
root 
|-- arr: array (nullable = false) 
| |-- element: integer (containsNull = false) 

df.show 
+------------+ 
|   arr| 
+------------+ 
|[1, 2, 3, 4]| 
+------------+ 
+0

Tak, ArrayType jest właściwym podejściem. Dzięki! Mój ostateczny schemat jest w mojej odpowiedzi. – Stuart

4

Jak zaznaczył David, musiałem użyć typu ArrayType. Spark jest zadowolony z tego:

val outputSchema = 
    StructType(
     Array(
     StructField("name", StringType, nullable=false), 
     StructField("index", IntegerType, nullable=false), 
     StructField("count", LongType, nullable=false), 
     StructField("empties", LongType, nullable=false), 
     StructField("nulls", LongType, nullable=false), 
     StructField("uniqueValues", LongType, nullable=false), 
     StructField("mean", DoubleType), 
     StructField("min", DoubleType), 
     StructField("max", DoubleType), 
     StructField("topValues", ArrayType(StructType(Array(
      StructField("value", StringType), 
      StructField("count", LongType) 
     )))) 
    ) 
    ) 
Powiązane problemy