2016-02-22 18 views
8

Chcę przeanalizować kolumny z datami w DataFrame, a dla każdej kolumny daty może się zmienić rozdzielczość daty (tj. 2011/01/10 => 2011/01, jeśli rozdzielczość jest ustawiona na "Miesiąc").Jak mogę przekazać dodatkowe parametry do UDF w SparkSql?

Napisałem następujący kod:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame = 
{ 
    import org.apache.spark.sql.functions._ 
    val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)} 
    val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)} 

    val allColNames = dataframe.columns 
    val allCols = allColNames.map(name => dataframe.col(name)) 

    val mappedCols = 
    { 
    for(i <- allCols.indices) yield 
    { 
     schema(i) match 
     { 
     case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i))) 
     case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i)) 
     case _ => allCols(i) 
     } 
    } 
    } 

    dataframe.select(mappedCols:_*) 

}} 

Jednak to nie działa. Wygląda na to, że mogę przekazać tylko Column s do UDF. I zastanawiam się, czy to będzie bardzo powolne, jeśli przekonwertuję DataFrame na RDD i zastosuję funkcję w każdym rzędzie.

Czy ktoś zna prawidłowe rozwiązanie? Dziękuję Ci!

Odpowiedz

25

Wystarczy użyć trochę zmiękczania:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
    SparkDateTimeConverter.convertDate(x, resolution)) 

i używać go w sposób następujący:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i)) 

Na marginesie należy przyjrzeć sql.functions.trunc i sql.functions.date_format. Powinny one co najmniej część pracy bez korzystania UDF w ogóle.

Uwaga:

W Spark 2.2 lub nowszej można korzystać typedLit funkcja:

import org.apache.spark.sql.functions.typedLit 

które obsługują szerszy zakres literałów jak Seq lub Map.

+1

Dziękuję za odpowiedź i intuicji currying! – DarkZero

+4

Napisałem samouczek dotyczący używania currying do tworzenia UDF Spark, który akceptuje dodatkowe parametry w czasie wywołania. https://gist.github.com/andrearota/5910b5c5ac65845f23856b2415474c38 –

10

Można tworzyć dosłownym Column przejść do UDF przy użyciu funkcji lit(...) zdefiniowany w org.apache.spark.sql.functions

Na przykład:

val takeRight = udf((s: String, i: Int) => s.takeRight(i)) 
df.select(takeRight($"stringCol", lit(1))) 
+1

Dziękuję, początkowo użyłem również "lit", ale okazuje się, że jego wydajność nie jest tak dobra, jak druga odpowiedź ... – DarkZero

Powiązane problemy