2015-07-13 18 views

Odpowiedz

58

Jeśli ramka danych zmieści się w pamięci sterownika i chcesz zapisać się do lokalnego systemu plików można przekonwertować Spark DataFrame lokalnym Pandas DataFrame użyciu toPandas metodę, a następnie po prostu użyć to_csv:

df.toPandas().to_csv('mycsv.csv') 

W przeciwnym wypadku można użyć spark-csv:

  • zapłonowa 1,3

    df.save('mycsv.csv', 'com.databricks.spark.csv') 
    
  • Spark 1.4+

    df.write.format('com.databricks.spark.csv').save('mycsv.csv') 
    

W Spark 2.0+ można używać csv źródło danych bezpośrednio:

df.write.csv('mycsv.csv') 
+0

Super odpowiedź. W przypadku pierwszej opcji, czy jest to możliwe, jeśli chcę pisać do pliku rozdzielanego potokami, a nie do pliku CSV z oddzielonymi przecinkami? –

+2

Jeśli masz iskrowskie ramki danych, możesz użyć 'df.write.csv ('/ tmp/lookatme /')', co spowoduje upuszczenie zestawu plików csv w '/ tmp/lookatme' Używanie iskry jest znacznie szybsze niż serializowanie w pandach. Jedyną wadą jest to, że otrzymasz zestaw csv zamiast pojedynczego, a jeśli narzędzie docelowe nie wie, jak je łączyć, musisz to zrobić samodzielnie. – Txangel

+0

Co za wielki problem, aby uzyskać csv z iskry. Coś interesującego w tym pierwszym rozwiązaniu polega na tym, że 'to_csv' działa bez potrzeby importowania Pand. '.toPandas' jest częścią Sparka, może to domyślnie importuje .. – cardamom

12

Jeśli nie można skorzystać z zapłonem CSV, można wykonać następujące czynności:

df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv") 

Jeśli potrzebujesz obsługi łańcuchów z linebreaks lub przecinek, które nie będą działać. Użyj tego:

import csv 
import cStringIO 

def row2csv(row): 
    buffer = cStringIO.StringIO() 
    writer = csv.writer(buffer) 
    writer.writerow([str(s).encode("utf-8") for s in row]) 
    buffer.seek(0) 
    return buffer.read().strip() 

df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv") 
3

Co powiesz na to (jeśli nie chcesz jednej linijki)?

for row in df.collect(): 
    d = row.asDict() 
    s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"]) 
    f.write(s) 

f jest otwartym deskryptorem pliku. Również separator jest znakiem TAB, ale łatwo można go zmienić na cokolwiek zechcesz.

8

Dla Apache Spark 2+, w celu zapisania ramek danych w pojedynczym pliku csv. Użyj następującego polecenia

query.repartition(1).write.csv("cc_out.csv", sep='|') 

Tutaj 1 wskazują, że muszę tylko jedną partycję z pliku CSV. możesz go zmienić zgodnie z własnymi wymaganiami.

Powiązane problemy