2015-08-11 19 views
30

Komenda ta działa z HiveQL:Jak eksportować dane z Spark SQL do pliku CSV

insert overwrite directory '/data/home.csv' select * from testtable; 

Ale Spark SQL Dostaję błąd ze śladu org.apache.spark.sql.hive.HiveQl stosu:

java.lang.RuntimeException: Unsupported language features in query: 
    insert overwrite directory '/data/home.csv' select * from testtable 

proszę kierować mi napisać eksport do funkcji CSV w Spark SQL.

Odpowiedz

45

Możesz skorzystać z poniższego zestawienia napisać zawartość dataframe w formacie CSV df.write.csv("/data/home/csv")

Jeśli trzeba napisać całą dataframe w jednym pliku CSV, a następnie użyć df.coalesce(1).write.csv("/data/home/sample.csv")

Dla iskry 1 .x, można użyć spark-csv aby zapisać wyniki do plików CSV

Poniżej Scala fragment będzie Hel p

import org.apache.spark.sql.hive.HiveContext 
// sc - existing spark context 
val sqlContext = new HiveContext(sc) 
val df = sqlContext.sql("SELECT * FROM testtable") 
df.write.format("com.databricks.spark.csv").save("/data/home/csv") 

Aby zapisać zawartość do jednego pliku

import org.apache.spark.sql.hive.HiveContext 
// sc - existing spark context 
val sqlContext = new HiveContext(sc) 
val df = sqlContext.sql("SELECT * FROM testtable") 
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv") 
+0

Próbowałem rzeczy, o której wspomniałeś. Tworzy katalog o podanej ścieżce z plikiem "part" i plikiem o nazwie "_SUCCESS". Czy znasz sposób, aby uzyskać tylko jeden plik? –

+0

Nie, myślę, że nie ma sposobu, aby to zrobić. – sag

1

Komunikat o błędzie sugeruje, że nie jest to obsługiwana funkcja w języku zapytań. Ale możesz zapisać ramkę DataFrame w dowolnym formacie jak zwykle za pośrednictwem interfejsu RDD (df.rdd.saveAsTextFile). Lub możesz sprawdzić https://github.com/databricks/spark-csv.

+0

scala> df.write.format ("com.databricks.spark.csv") zapisz ("/ data/home.csv") . 18: error: wartość zapisu nie jest członkiem org.apache.spark.sql.SchemaRDD Czy muszę ponownie skompilować obecny słoik z pakietem databricks? – shashankS

+0

'DataFrame.write' dodano w Apache Spark 1.4.0. –

8

Najprostszym sposobem jest mapa nad RDD w DataFrame i użyć mkString:

df.rdd.map(x=>x.mkString(",")) 

Od Spark 1.5 (lub jeszcze przed tym) df.map(r=>r.mkString(",")) zrobiłby to samo , jeśli chcesz, aby CSV uciekł, możesz użyć do tego apache commons lang. na przykład Oto kod używamy

def DfToTextFile(path: String, 
        df: DataFrame, 
        delimiter: String = ",", 
        csvEscape: Boolean = true, 
        partitions: Int = 1, 
        compress: Boolean = true, 
        header: Option[String] = None, 
        maxColumnLength: Option[Int] = None) = { 

    def trimColumnLength(c: String) = { 
     val col = maxColumnLength match { 
     case None => c 
     case Some(len: Int) => c.take(len) 
     } 
     if (csvEscape) StringEscapeUtils.escapeCsv(col) else col 
    } 
    def rowToString(r: Row) = { 
     val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters 
     st.split("~-~").map(trimColumnLength).mkString(delimiter) 
    } 

    def addHeader(r: RDD[String]) = { 
     val rdd = for (h <- header; 
        if partitions == 1; //headers only supported for single partitions 
        tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1) 
     rdd.getOrElse(r) 
    } 

    val rdd = df.map(rowToString).repartition(partitions) 
    val headerRdd = addHeader(rdd) 

    if (compress) 
     headerRdd.saveAsTextFile(path, classOf[GzipCodec]) 
    else 
     headerRdd.saveAsTextFile(path) 
    } 
+2

Chociaż jest to najprostsza odpowiedź (i dobra), jeśli tekst ma podwójne cudzysłowy, musisz je uwzględnić. – devonlazarus

+0

Po prostu błąd po utworzeniu RDD dla tabeli scala> df.rdd.map (x => x.mkString (",")); : 18: błąd: wartość rdd nie jest członkiem org.apache.spark.sql.SchemaRDD df.rdd.map (x => x.mkString (",")); – shashankS

22

odpowiedź powyżej z zapłonem CSV jest poprawna, ale nie jest to problem - biblioteka tworzy kilka plików w oparciu o podział ramki danych. I to nie jest to, czego zwykle potrzebujemy. Tak, można połączyć wszystkie partycje na jednym:

df.coalesce(1). 
    write. 
    format("com.databricks.spark.csv"). 
    option("header", "true"). 
    save("myfile.csv") 

i zmienić wyjście lib (nazwa „część-00000”) do nazwy pliku pragnienie.

Ten blogu zapewnia Więcej szczegółów: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

+2

Czy powinien to być df.repartition.write zamiast df.write.repartition? –

+0

@Cedric masz rację, dziękuję! Przedsprzedaż! Edytowane. –

+2

Można również dodać model, jeśli chce się pisać do istniejącego pliku. 'resultDF.repartition (1) .write.mode (" append ") .format (" com.databricks.spark.csv "). option (" header "," true ") .save (" s3: // .. ")' – Pramit

24

Od Spark 2.Xspark-csv jest zintegrowany jako native datasource. W związku z tym konieczne stwierdzenie upraszcza się (Windows)

df.write 
    .option("header", "true") 
    .csv("file:///C:/out.csv") 

lub UNIX

df.write 
    .option("header", "true") 
    .csv("/var/out.csv") 
+1

To powinna być teraz akceptowana odpowiedź. –

+0

Witam wszystkich, Czy istnieje sposób na zastąpienie pliku, ponieważ nie powiedzie się, gdy próbuje przepisać plik. – user3341078

+0

Pewnie! '.mode (" overwrite "). csv ("/var/out.csv ")' – Boern

0

Z pomocą zapłonie CSV możemy zapisać do pliku CSV.

val dfsql = sqlContext.sql("select * from tablename") 
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")` 
Powiązane problemy