2016-09-13 13 views
7

Próbuję utworzyć DataFrame przy użyciu RDD.Utwórz DataFrame z wartością pustą dla kilku kolumn

Pierwszy Tworzę RDD za pomocą poniższego kodu -

val account = sc.parallelize(Seq(
           (1, null, 2,"F"), 
           (2, 2, 4, "F"), 
           (3, 3, 6, "N"), 
           (4,null,8,"F"))) 

To działa bez zarzutu -

konto: org.apache.spark.rdd.RDD [(int, int, dowolny łańcuch)] = ParallelCollectionRDD [0] w parallelize na: 27

ale podczas próby utworzenia DataFrame z RDD za pomocą poniższego kodu

account.toDF("ACCT_ID", "M_CD", "C_CD","IND") 

otrzymuję poniżej błędu

java.lang.UnsupportedOperationException: schema dla typu Dowolna nie obsługiwane

analizowałem że gdy kładę null wartości w Seq to tylko dostałem błąd.

Czy istnieje sposób na dodanie wartości pustej?

+0

użycie '(1, null: Integer, 2, "F")' – dk14

Odpowiedz

4

Problem polega na tym, że Dowolny jest zbyt ogólny, a Spark po prostu nie ma pojęcia, jak go przekształcić do postaci szeregowej. Powinieneś wyraźnie podać określony typ, w twoim przypadku Integer. Ponieważ wartości null nie można przypisać do typów pierwotnych w Scali, można zamiast tego użyć java.lang.Integer. Więc spróbuj tego:

val account = sc.parallelize(Seq(
           (1, null.asInstanceOf[Integer], 2,"F"), 
           (2, new Integer(2), 4, "F"), 
           (3, new Integer(3), 6, "N"), 
           (4, null.asInstanceOf[Integer],8,"F"))) 

Oto wyjściowy:

rdd: org.apache.spark.rdd.RDD[(Int, Integer, Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24 

i odpowiadająca DataFrame:

scala> val df = rdd.toDF("ACCT_ID", "M_CD", "C_CD","IND") 

df: org.apache.spark.sql.DataFrame = [ACCT_ID: int, M_CD: int ... 2 more fields] 

scala> df.show 
+-------+----+----+---+ 
|ACCT_ID|M_CD|C_CD|IND| 
+-------+----+----+---+ 
|  1|null| 2| F| 
|  2| 2| 4| F| 
|  3| 3| 6| N| 
|  4|null| 8| F| 
+-------+----+----+---+ 

Ponadto można rozważyć trochę czystszy sposób zadeklarować zerową wartość całkowitą takiego:

object Constants { 
    val NullInteger: java.lang.Integer = null 
} 
+1

Powyższy kod działa poprawnie. – Avijit

+0

Jak mam to zrobić, jeśli używam 'case class' do tworzenia' DataFrame', tj. Tworzę 'DataFrame' używając' spark.sparkContext.parallellize (Seq (A (_, _), A (_, _))). toDF() 'gdzie mam' case klasy A (_, _) '? Próbowałem powyżej techniki, ale 'null.asInstanceOf [T]' daje mi 'NullPointerException' i' null: T' (jak powiedział w komentarzu na pytanie) daje mi 'wyrażenie typu Null nie kwalifikuje się do niejawnej konwersji' –

3

Alternatywna droga bez użycia RDD:

import spark.implicits._ 

val df = spark.createDataFrame(Seq(
    (1, None, 2, "F"), 
    (2, Some(2), 4, "F"), 
    (3, Some(3), 6, "N"), 
    (4, None, 8, "F") 
)).toDF("ACCT_ID", "M_CD", "C_CD","IND") 

df.show 
+-------+----+----+---+ 
|ACCT_ID|M_CD|C_CD|IND| 
+-------+----+----+---+ 
|  1|null| 2| F| 
|  2| 2| 4| F| 
|  3| 3| 6| N| 
|  4|null| 8| F| 
+-------+----+----+---+ 

df.printSchema 
root 
|-- ACCT_ID: integer (nullable = false) 
|-- M_CD: integer (nullable = true) 
|-- C_CD: integer (nullable = false) 
|-- IND: string (nullable = true) 
Powiązane problemy