Chcę przeanalizować kolumny z datami w DataFrame
, a dla każdej kolumny daty może się zmienić rozdzielczość daty (tj. 2011/01/10 => 2011/01, jeśli rozdzielczość jest ustawiona na "Miesiąc").Jak mogę przekazać dodatkowe parametry do UDF w SparkSql?
Napisałem następujący kod:
def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
import org.apache.spark.sql.functions._
val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}
val allColNames = dataframe.columns
val allCols = allColNames.map(name => dataframe.col(name))
val mappedCols =
{
for(i <- allCols.indices) yield
{
schema(i) match
{
case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
case _ => allCols(i)
}
}
}
dataframe.select(mappedCols:_*)
}}
Jednak to nie działa. Wygląda na to, że mogę przekazać tylko Column
s do UDF. I zastanawiam się, czy to będzie bardzo powolne, jeśli przekonwertuję DataFrame
na RDD
i zastosuję funkcję w każdym rzędzie.
Czy ktoś zna prawidłowe rozwiązanie? Dziękuję Ci!
Dziękuję za odpowiedź i intuicji currying! – DarkZero
Napisałem samouczek dotyczący używania currying do tworzenia UDF Spark, który akceptuje dodatkowe parametry w czasie wywołania. https://gist.github.com/andrearota/5910b5c5ac65845f23856b2415474c38 –