Najprostszym sposobem jest mapa nad RDD w DataFrame i użyć mkString:
df.rdd.map(x=>x.mkString(","))
Od Spark 1.5 (lub jeszcze przed tym) df.map(r=>r.mkString(","))
zrobiłby to samo , jeśli chcesz, aby CSV uciekł, możesz użyć do tego apache commons lang. na przykład Oto kod używamy
def DfToTextFile(path: String,
df: DataFrame,
delimiter: String = ",",
csvEscape: Boolean = true,
partitions: Int = 1,
compress: Boolean = true,
header: Option[String] = None,
maxColumnLength: Option[Int] = None) = {
def trimColumnLength(c: String) = {
val col = maxColumnLength match {
case None => c
case Some(len: Int) => c.take(len)
}
if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
}
def rowToString(r: Row) = {
val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
st.split("~-~").map(trimColumnLength).mkString(delimiter)
}
def addHeader(r: RDD[String]) = {
val rdd = for (h <- header;
if partitions == 1; //headers only supported for single partitions
tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
rdd.getOrElse(r)
}
val rdd = df.map(rowToString).repartition(partitions)
val headerRdd = addHeader(rdd)
if (compress)
headerRdd.saveAsTextFile(path, classOf[GzipCodec])
else
headerRdd.saveAsTextFile(path)
}
Próbowałem rzeczy, o której wspomniałeś. Tworzy katalog o podanej ścieżce z plikiem "part" i plikiem o nazwie "_SUCCESS". Czy znasz sposób, aby uzyskać tylko jeden plik? –
Nie, myślę, że nie ma sposobu, aby to zrobić. – sag