Mam klasę Python, której używam do ładowania i przetwarzania niektórych danych w Sparku. Wśród różnych rzeczy, które muszę zrobić, generuję listę zmiennych fikcyjnych pochodzących z różnych kolumn w ramce danych Spark. Mój problem polega na tym, że nie jestem pewien, jak poprawnie zdefiniować funkcję zdefiniowaną przez użytkownika, aby osiągnąć to, czego potrzebuję.Kodowanie i łączenie wielu funkcji w PySpark
zrobić mają obecnie metodę, która, kiedy odwzorowane na podstawowej dataframe RDD, rozwiązuje połowę problemu (należy pamiętać, że jest to metoda w większym data_processor
klasie):
def build_feature_arr(self,table):
# this dict has keys for all the columns for which I need dummy coding
categories = {'gender':['1','2'], ..}
# there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
if table == 'users':
iter_over = self.config.dyadic_features_to_include
elif table == 'activty':
iter_over = self.config.user_features_to_include
def _build_feature_arr(row):
result = []
row = row.asDict()
for col in iter_over:
column_value = str(row[col]).lower()
cats = categories[col]
result += [1 if column_value and cat==column_value else 0 for cat in cats]
return result
return _build_feature_arr
Zasadniczo, co to oznacza, że dla określonej ramki danych przyjmuje wartości zmiennych kategorialnych dla określonych kolumn i zwraca listę wartości tych nowych zmiennych fikcyjnych. Oznacza to, że podany kod:
data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr('users'))
powraca coś takiego:
In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 0],
[1, 0, 1, 0, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[0, 1, 1, 0, 0, 0],
[1, 0, 1, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 1]]
To jest dokładnie to, co chcę pod względem generowania listy zmiennych binarnych chcę, ale tutaj jest moje pytanie: Jak mogę albo (a) utworzyć UDF z podobną funkcjonalnością, której mogę użyć w zapytaniu Spark SQL (lub w inny sposób, jak przypuszczam), lub (b) wziąć RDD wynikający z mapy opisanej powyżej i dodać ją jako nową kolumnę do ramka danych user_data?
Tak czy inaczej, potrzebuję wygenerować nową ramkę danych zawierającą kolumny z user_data, wraz z nową kolumną (nazwijmy ją feature_array
) zawierającą wyjście powyższej funkcji (lub coś funkcjonalnie równoważnego).
@DavidArenburg W tym szczególnym kontekście dzieje się tak dlatego, że OP chce uzyskać fałszywe zmienne (np. 'Model.matrix' w R). Najprawdopodobniej trenować jakiś typ modelu liniowego. Wyjaśnienie Rish - typ indeksu ciągów tworzy kolumnę typu factor z łańcuchów, jeden gorący wywołania 'model.matrix' :) – zero323
dzięki @ zero323! Tylko jedna uwaga: od Spark 2.0+ 'od pyspark.mllib.linalg import DenseVector' powinien zostać zamieniony na' from pyspark.ml.linalg import DenseVector', w przeciwnym razie możesz dostać błąd typów w etapie 'VectorIndexer' – EnriqueH
@EnriqueH Clarified , dzięki. – zero323