2017-10-21 29 views
5

Zakładając mamy następujący plik tekstowy (wyjście polecenia df.show()):Jak odczytać dane wyjściowe operatora show do zestawu danych?

+----+---------+--------+ 
|col1|  col2| col3| 
+----+---------+--------+ 
| 1|pi number|3.141592| 
| 2| e number| 2.71828| 
+----+---------+--------+ 

Teraz chcę czytać/przetworzy go jako DataFrame/zestawów danych. Jaki jest najbardziej "iskrzący" sposób na zrobienie tego?

p.s. Jestem zainteresowany rozwiązaniami dla zarównoscala i pyspark, dlatego oba znaczniki są używane.

Odpowiedz

4

UPDATE: użyciu "jednoznaczność" parsera lib mogę pozbyć jednej linii, gdzie byłem usuwając spacje w nazwach kolumn:

Scala:

// read Spark Output Fixed width table: 
def readSparkOutput(filePath: String) : org.apache.spark.sql.DataFrame = { 
    val t = spark.read 
       .option("header","true") 
       .option("inferSchema","true") 
       .option("delimiter","|") 
       .option("parserLib","UNIVOCITY") 
       .option("ignoreLeadingWhiteSpace","true") 
       .option("ignoreTrailingWhiteSpace","true") 
       .option("comment","+") 
       .csv(filePath) 
    t.select(t.columns.filterNot(_.startsWith("_c")).map(t(_)):_*) 
} 

PySpark:

def read_spark_output(file_path): 
    t = spark.read \ 
      .option("header","true") \ 
      .option("inferSchema","true") \ 
      .option("delimiter","|") \ 
      .option("parserLib","UNIVOCITY") \ 
      .option("ignoreLeadingWhiteSpace","true") \ 
      .option("ignoreTrailingWhiteSpace","true") \ 
      .option("comment","+") \ 
      .csv("file:///tmp/spark.out") 
    # select not-null columns 
    return t.select([c for c in t.columns if not c.startswith("_")]) 

Przykład użycia:

scala> val df = readSparkOutput("file:///tmp/spark.out") 
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string ... 1 more field] 

scala> df.show 
+----+---------+--------+ 
|col1|  col2| col3| 
+----+---------+--------+ 
| 1|pi number|3.141592| 
| 2| e number| 2.71828| 
+----+---------+--------+ 


scala> df.printSchema 
root 
|-- col1: integer (nullable = true) 
|-- col2: string (nullable = true) 
|-- col3: double (nullable = true) 

Old odpowiedź:

Oto moja próba w Scala (Spark 2.2):

// read Spark Output Fixed width table: 
val t = spark.read 
    .option("header","true") 
    .option("inferSchema","true") 
    .option("delimiter","|") 
    .option("comment","+") 
    .csv("file:///temp/spark.out") 
// select not-null columns 
val cols = t.columns.filterNot(c => c.startsWith("_c")).map(a => t(a)) 
// trim spaces from columns 
val colsTrimmed = t.columns.filterNot(c => c.startsWith("_c")).map(c => c.replaceAll("\\s+","")) 
// reanme columns using 'colsTrimmed' 
val df = t.select(cols:_*).toDF(colsTrimmed:_*) 

To działa, ale mam wrażenie, że nie musi być dużo Bardziej elegancki sposób na zrobienie tego.

scala> df.show 
+----+---------+--------+ 
|col1|  col2| col3| 
+----+---------+--------+ 
| 1.0|pi number|3.141592| 
| 2.0| e number| 2.71828| 
+----+---------+--------+ 

scala> df.printSchema 
root 
|-- col1: double (nullable = true) 
|-- col2: string (nullable = true) 
|-- col3: double (nullable = true) 
+0

Zawsze myślałem o napisaniu niestandardowego źródła Sparka, ale twoje rozwiązanie jest po prostu sprytne! Dzięki. –

+1

@JacekLaskowski, nie, dziękuję !!! Dużo się uczę od twojej [Mastering Apache Spark 2] (https://www.gitbook.com/book/jaceklaskowski/mastering-apache-spark/details) i od twoich odpowiedzi. – MaxU

Powiązane problemy