2015-07-21 14 views
6

Próbuję wczytać plik CSV do ramki danych Spark za pomocą programu spark-csv [1] przy użyciu notebooka Apache Zeppelin i podczas ładowania pola liczbowego, które nie ma wartości parser kończy się niepowodzeniem dla tej linii, a linia zostaje pominięta.Jak określić brakującą wartość w ramce danych

Spodziewałbym się, że linia zostanie załadowana, a wartość w ramce danych wczytuje linię i ma wartość ustawioną na NULL, tak aby agregacje po prostu zignorowały tę wartość.

%dep 
z.reset() 
z.addRepo("my-nexus").url("<my_local_nexus_repo_that_is_a_proxy_of_public_repos>") 
z.load("com.databricks:spark-csv_2.10:1.1.0") 


%spark 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types._ 
import com.databricks.spark.csv._ 
import org.apache.spark.sql.functions._ 

val schema = StructType(
    StructField("identifier", StringType, true) :: 
    StructField("name", StringType, true) :: 
    StructField("height", DoubleType, true) :: 
    Nil) 

val sqlContext = new SQLContext(sc) 
val df = sqlContext.read.format("com.databricks.spark.csv") 
         .schema(schema) 
         .option("header", "true") 
         .load("file:///home/spark_user/data.csv") 

df.describe("height").show() 

Oto zawartość pliku danych: /home/spark_user/data.csv

identifier,name,height 
1,sam,184 
2,cath,180 
3,santa,  <-- note that there is not height recorded for Santa ! 

Oto wynik:

+-------+------+ 
|summary|height| 
+-------+------+ 
| count|  2| <- 2 of 3 lines loaded, ie. sam and cath 
| mean| 182.0| 
| stddev| 2.0| 
| min| 180.0| 
| max| 184.0| 
+-------+------+ 

w dziennikach Zeppelin mogę zobacz następujący błąd podczas analizowania linii Santa:

ERROR [2015-07-21 16:42:09,940] ({Executor task launch worker-45} CsvRelation.scala[apply]:209) - Exception while parsing line: 3,santa,. 
     java.lang.NumberFormatException: empty String 
     at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842) 
     at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) 
     at java.lang.Double.parseDouble(Double.java:538) 
     at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232) 
     at scala.collection.immutable.StringOps.toDouble(StringOps.scala:31) 
     at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:42) 
     at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:198) 
     at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:180) 
     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
     at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129) 
     at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126) 
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) 
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:70) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Więc możesz mi powiedzieć aż tak dobrze ... i masz rację;)

Teraz chcę dodać dodatkową kolumnę, np. Wiek i zawsze mam dane w tym polu.

identifier,name,height,age 
1,sam,184,30 
2,cath,180,32 
3,santa,,70 

Teraz poprosić grzecznie dla niektórych statystyki dotyczące wieku:

%spark 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types._ 
import com.databricks.spark.csv._ 
import org.apache.spark.sql.functions._ 

val schema = StructType(
    StructField("identifier", StringType, true) :: 
    StructField("name", StringType, true) :: 
    StructField("height", DoubleType, true) :: 
    StructField("age", DoubleType, true) :: 
    Nil) 

val sqlContext = new SQLContext(sc) 
val df = sqlContext.read.format("com.databricks.spark.csv") 
         .schema(schema) 
         .option("header", "true") 
         .load("file:///home/spark_user/data2.csv") 

df.describe("age").show() 

Wyniki

+-------+----+ 
|summary| age| 
+-------+----+ 
| count| 2| 
| mean|31.0| 
| stddev| 1.0| 
| min|30.0| 
| max|32.0| 
+-------+----+ 

wszystko źle! Ponieważ wysokość Mikołaja nie jest znana, cała linia zostaje utracona, a obliczenia wieku opierają się tylko na Sam i Cath, podczas gdy Mikołaj ma doskonale uzasadniony wiek.

Moje pytanie to wartość, której potrzebuję do podłączenia wysokości Świętego Mikołaja, aby można było załadować plik CSV. Próbowałem ustawić schemat będzie wszystko StringType ale potem

Kolejne pytanie jest więcej o

znalazłem w API, które można obsługiwać wartości n/a za pomocą iskry. Więc pomyślałem, może mógłbym załadować moich danych ze wszystkich kolumn ustawionych na StringType a następnie zrobić porządki, a następnie ustawić tylko schemat poprawnie napisane jak poniżej:

%spark 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types._ 
import com.databricks.spark.csv._ 
import org.apache.spark.sql.functions._ 

val schema = StructType(
StructField("identifier", StringType, true) :: 
StructField("name", StringType, true) :: 
StructField("height", StringType, true) :: 
StructField("age", StringType, true) :: 
Nil) 

val sqlContext = new SQLContext(sc) 
val df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("header", "true").load("file:///home/spark_user/data.csv") 

// eg. for each column of my dataframe, replace empty string by null 
df.na.replace("*", Map("" -> null)) 

val toDouble = udf[Double, String](_.toDouble) 
df2 = df.withColumn("age", toDouble(df("age"))) 

df2.describe("age").show() 

Ale df.na.replace() zgłasza wyjątek i zatrzymuje się:

java.lang.IllegalArgumentException: Unsupported value type java.lang.String(). 
     at org.apache.spark.sql.DataFrameNaFunctions.org$apache$spark$sql$DataFrameNaFunctions$$convertToDouble(DataFrameNaFunctions.scala:417) 
     at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337) 
     at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
     at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
     at org.apache.spark.sql.DataFrameNaFunctions.replace0(DataFrameNaFunctions.scala:337) 
     at org.apache.spark.sql.DataFrameNaFunctions.replace(DataFrameNaFunctions.scala:304) 

Każda pomoc, & wskazówki bardzo doceniane !!

[1] https://github.com/databricks/spark-csv

Odpowiedz

5

iskrowym CSV nie ma tę opcję. To has been fixed w gałęzi master. Myślę, że powinieneś go użyć lub poczekać na kolejną stabilną wersję.

+0

Mam teraz przetestowane wersji Last w Master gałęzi i rzeczywiście rozwiązuje problem. Dzięki ! –

Powiązane problemy