2015-10-07 14 views
14

Mam klasę Python, której używam do ładowania i przetwarzania niektórych danych w Sparku. Wśród różnych rzeczy, które muszę zrobić, generuję listę zmiennych fikcyjnych pochodzących z różnych kolumn w ramce danych Spark. Mój problem polega na tym, że nie jestem pewien, jak poprawnie zdefiniować funkcję zdefiniowaną przez użytkownika, aby osiągnąć to, czego potrzebuję.Kodowanie i łączenie wielu funkcji w PySpark

zrobić mają obecnie metodę, która, kiedy odwzorowane na podstawowej dataframe RDD, rozwiązuje połowę problemu (należy pamiętać, że jest to metoda w większym data_processor klasie):

def build_feature_arr(self,table): 
    # this dict has keys for all the columns for which I need dummy coding 
    categories = {'gender':['1','2'], ..} 

    # there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file 
    if table == 'users': 
     iter_over = self.config.dyadic_features_to_include 
    elif table == 'activty': 
     iter_over = self.config.user_features_to_include 

    def _build_feature_arr(row): 
     result = [] 
     row = row.asDict() 
     for col in iter_over: 
      column_value = str(row[col]).lower() 
      cats = categories[col] 
      result += [1 if column_value and cat==column_value else 0 for cat in cats] 
     return result 
    return _build_feature_arr 

Zasadniczo, co to oznacza, że ​​dla określonej ramki danych przyjmuje wartości zmiennych kategorialnych dla określonych kolumn i zwraca listę wartości tych nowych zmiennych fikcyjnych. Oznacza to, że podany kod:

data = data_processor(init_args) 
result = data.user_data.rdd.map(self.build_feature_arr('users')) 

powraca coś takiego:

In [39]: result.take(10) 
Out[39]: 
[[1, 0, 0, 0, 1, 0], 
[1, 0, 0, 1, 0, 0], 
[1, 0, 0, 0, 0, 0], 
[1, 0, 1, 0, 0, 0], 
[1, 0, 0, 1, 0, 0], 
[1, 0, 0, 1, 0, 0], 
[0, 1, 1, 0, 0, 0], 
[1, 0, 1, 1, 0, 0], 
[1, 0, 0, 1, 0, 0], 
[1, 0, 0, 0, 0, 1]] 

To jest dokładnie to, co chcę pod względem generowania listy zmiennych binarnych chcę, ale tutaj jest moje pytanie: Jak mogę albo (a) utworzyć UDF z podobną funkcjonalnością, której mogę użyć w zapytaniu Spark SQL (lub w inny sposób, jak przypuszczam), lub (b) wziąć RDD wynikający z mapy opisanej powyżej i dodać ją jako nową kolumnę do ramka danych user_data?

Tak czy inaczej, potrzebuję wygenerować nową ramkę danych zawierającą kolumny z user_data, wraz z nową kolumną (nazwijmy ją feature_array) zawierającą wyjście powyższej funkcji (lub coś funkcjonalnie równoważnego).

Odpowiedz

27

Cóż, możesz napisać UDF, ale dlaczego miałbyś to zrobić? Istnieje już sporo narzędzi zaprojektowanych do obsługi tej kategorii zadań:

from pyspark.sql import Row 
from pyspark.ml.linalg import DenseVector 

row = Row("gender", "foo", "bar") 

df = sc.parallelize([ 
    row("0", 3.0, DenseVector([0, 2.1, 1.0])), 
    row("1", 1.0, DenseVector([0, 1.1, 1.0])), 
    row("1", -1.0, DenseVector([0, 3.4, 0.0])), 
    row("0", -3.0, DenseVector([0, 4.1, 0.0])) 
]).toDF() 

Przede wszystkim StringIndexer.

from pyspark.ml.feature import StringIndexer 

indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df) 
indexed_df = indexer.transform(df) 
indexed_df.drop("bar").show() 

## +------+----+--------------+ 
## |gender| foo|gender_numeric| 
## +------+----+--------------+ 
## |  0| 3.0|   0.0| 
## |  1| 1.0|   1.0| 
## |  1|-1.0|   1.0| 
## |  0|-3.0|   0.0| 
## +------+----+--------------+ 

Następny OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder 

encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector") 
encoded_df = encoder.transform(indexed_df) 
encoded_df.drop("bar").show() 

## +------+----+--------------+-------------+ 
## |gender| foo|gender_numeric|gender_vector| 
## +------+----+--------------+-------------+ 
## |  0| 3.0|   0.0|(1,[0],[1.0])| 
## |  1| 1.0|   1.0| (1,[],[])| 
## |  1|-1.0|   1.0| (1,[],[])| 
## |  0|-3.0|   0.0|(1,[0],[1.0])| 
## +------+----+--------------+-------------+ 

VectorAssembler:

from pyspark.ml.feature import VectorAssembler 

assembler = VectorAssembler(
    inputCols=["gender_vector", "bar", "foo"], outputCol="features") 

encoded_df_with_indexed_bar = (vector_indexer 
    .fit(encoded_df) 
    .transform(encoded_df)) 

final_df = assembler.transform(encoded_df) 

Jeśli bar zawierał zmienne kategoryczne można użyć VectorIndexer ustawić wymaganą Metadane:

from pyspark.ml.feature import VectorIndexer 

vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed") 

, ale tak nie jest w tym przypadku.

Wreszcie można owinąć wszystkich, że za pomocą rurociągów:

from pyspark.ml import Pipeline 
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler]) 
model = pipeline.fit(df) 
transformed = model.transform(df) 

Prawdopodobnie jest o wiele solidne i czyste podejście niż pisanie wszystkiego od podstaw. Istnieją pewne zastrzeżenia, szczególnie gdy potrzebujesz spójnego kodowania między różnymi zestawami danych. Więcej informacji można znaleźć w oficjalnej dokumentacji dla StringIndexer i VectorIndexer.

Innym sposobem, aby uzyskać porównywalną moc jest RFormulawhich:

RFormula produkuje kolumnę wektora cech oraz podwójną lub ciąg kolumnę etykiecie. Podobnie, gdy formuły są używane w regresji liniowej R, kolumny wejściowe ciągów będą kodowane jeden raz, a kolumny liczbowe będą rzutowane na podwójne. Jeśli kolumna z etykietą jest typu ciąg, zostanie najpierw przekształcona na podwójną z StringIndexer. Jeśli kolumna etykiety nie istnieje w DataFrame, kolumna etykiety wyjściowej zostanie utworzona z określonej zmiennej odpowiedzi w formule.

from pyspark.ml.feature import RFormula 

rf = RFormula(formula="~ gender + bar + foo - 1") 
final_df_rf = rf.fit(df).transform(df) 

Jak widać to jest o wiele bardziej zwięzłe, ale trudniejsze do komponowania nie pozwala dużo dostosowywania. Niemniej wynik prostego rurociągu jak ten będzie identyczny:

final_df_rf.select("features").show(4, False) 

## +----------------------+ 
## |features    | 
## +----------------------+ 
## |[1.0,0.0,2.1,1.0,3.0] | 
## |[0.0,0.0,1.1,1.0,1.0] | 
## |(5,[2,4],[3.4,-1.0]) | 
## |[1.0,0.0,4.1,0.0,-3.0]| 
## +----------------------+ 


final_df.select("features").show(4, False) 

## +----------------------+ 
## |features    | 
## +----------------------+ 
## |[1.0,0.0,2.1,1.0,3.0] | 
## |[0.0,0.0,1.1,1.0,1.0] | 
## |(5,[2,4],[3.4,-1.0]) | 
## |[1.0,0.0,4.1,0.0,-3.0]| 
## +----------------------+ 

Odnośnie pytania:

dokonać UDF o podobnej funkcjonalności, które można wykorzystać w zapytaniu Spark SQL (lub jakiś inny sposób, jak przypuszczam)

To tylko UDF, jak każdy inny. Upewnij się, że używasz obsługiwanych typów, a poza tym wszystko powinno działać dobrze.

pobrać RDD wynikające z mapy opisanej powyżej i dodać ją jako nową kolumnę do karty danych user_data?

from pyspark.ml.linalg import VectorUDT 
from pyspark.sql.types import StructType, StructField 

schema = StructType([StructField("features", VectorUDT(), True)]) 
row = Row("features") 
result.map(lambda x: row(DenseVector(x))).toDF(schema) 

Uwaga:

iskry 1.x zastąpić pyspark.ml.linalg z pyspark.mllib.linalg.

+1

@DavidArenburg W tym szczególnym kontekście dzieje się tak dlatego, że OP chce uzyskać fałszywe zmienne (np. 'Model.matrix' w R). Najprawdopodobniej trenować jakiś typ modelu liniowego. Wyjaśnienie Rish - typ indeksu ciągów tworzy kolumnę typu factor z łańcuchów, jeden gorący wywołania 'model.matrix' :) – zero323

+1

dzięki @ zero323! Tylko jedna uwaga: od Spark 2.0+ 'od pyspark.mllib.linalg import DenseVector' powinien zostać zamieniony na' from pyspark.ml.linalg import DenseVector', w przeciwnym razie możesz dostać błąd typów w etapie 'VectorIndexer' – EnriqueH

+0

@EnriqueH Clarified , dzięki. – zero323