2016-04-12 11 views
13

W uproszczonym przykładzie mam dataframe „DF” z kolumny „kol1, kolumna2” i chce obliczenie maksimum w rzędach po zastosowaniu funkcji każdej kolumny:PySpark wierszami funkcją składu

def f(x): 
    return (x+1) 

max_udf=udf(lambda x,y: max(x,y), IntegerType()) 
f_udf=udf(f, IntegerType()) 

df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2))) 

Więc jeśli DF:

col1 col2 
1  2 
3  0 

Następnie

df2:

col1 col2 result 
1  2  3 
3  0  4 

Powyższe nie wydają się działać i produkuje „Nie można ocenić wyrażenia: PythonUDF # f ...”

jestem absolutnie pozytywne „f_udf” działa dobrze na moim stole, a głównym problemem jest to, ze max_udf.

Bez tworzenia dodatkowych kolumn lub używania podstawowej mapy/zmniejszenia, czy istnieje sposób na wykonanie powyższych czynności w całości przy użyciu ramek danych i udfs? Jak mam zmodyfikować "max_udf"?

Próbowałem również:

max_udf=udf(max, IntegerType()) 

która produkuje ten sam błąd.

Ja również potwierdziła, że ​​następujące utwory:

df2=(df.withColumn("temp1", f_udf(df.col1)) 
     .withColumn("temp2", f_udf(df.col2)) 

df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2)) 

Dlaczego jest to, że nie mogę zrobić to za jednym zamachem?

Chciałbym zobaczyć odpowiedź, która uogólnia każdą funkcję "f_udf" i "max_udf".

Odpowiedz

21

miałem podobny problem i znalazł rozwiązanie w odpowiedzi na this stackoverflow question

zdać wiele kolumn lub cały rząd do UDF użyć struct:

from pyspark.sql.functions import udf, struct 
from pyspark.sql.types import IntegerType 

df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b")) 

count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType()) 

new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns]))) 

new_df.show() 

zwrócone:

+----+----+----------+ 
| a| b|null_count| 
+----+----+----------+ 
|null|null|   2| 
| 1|null|   1| 
|null| 2|   1| 
+----+----+----------+ 
+0

Dzięki, to pierwsza prawdziwa odpowiedź na to pytanie! –

+0

@AlexR. - jeśli jesteś zadowolony z tej odpowiedzi, zaakceptuj to! – proinsias

7

UserDefinedFunction rzuca błąd akceptując UDF jako ich argumenty.

Możesz zmodyfikować max_udf jak poniżej, aby działał.

df = sc.parallelize([(1, 2), (3, 0)]).toDF(["col1", "col2"]) 

max_udf = udf(lambda x, y: max(x + 1, y + 1), IntegerType()) 

df2 = df.withColumn("result", max_udf(df.col1, df.col2)) 

Albo

def f_udf(x): 
    return (x + 1) 

max_udf = udf(lambda x, y: max(x, y), IntegerType()) 
## f_udf=udf(f, IntegerType()) 

df2 = df.withColumn("result", max_udf(f_udf(df.col1), f_udf(df.col2))) 

Uwaga:

Drugie podejście jest ważna tylko wtedy, gdy funkcje wewnętrzne (tutaj f_udf) generowanie poprawnych wyrażeń SQL.

Działa tutaj ponieważ f_udf(df.col1) i f_udf(df.col2) są oceniane jako Column<b'(col1 + 1)'> i Column<b'(col2 + 1)'> odpowiednio, zanim przeszedł do max_udf. Nie działałby z funkcją dowolną.

To nie będzie działać, jeśli spróbujemy na przykład coś takiego:

from math import exp 

df.withColumn("result", max_udf(exp(df.col1), exp(df.col2))) 
+0

Dzięki za odpowiedź! Czy możesz wyjaśnić drugie podejście? Jestem zdezorientowany, jak nie potrzebujesz f_udf, aby być popularnym UDF, aby zastosować go do kolumny danych? –

+0

Również druga odpowiedź wydaje się wykorzystywać fakt, że kolumny danych ramek odpowiadają na operacje "+". Czy istnieje coś, co uogólnia to na inne "f_udf"? Ogólnie, jeśli mam wiele różnych funkcji "f_udf", czy musiałbym napisać osobny zestaw funkcji max_udf dla każdego z nich? –

+0

Przykro mi, jestem też nowy, by iskrzyć. Zauważyłem, że mogę wykonywać operacje na kolumnach według normalnych funkcji bez przekształcania ich w UDF. Czy możesz to podnieść jako oddzielne pytanie? Muszę znać ans też – Mohan