5

Mam DataFrame że wygląda następująco:Spark, DataFrame: zastosować transformator/Estymator na grupach

+-----------+-----+------------+ 
|  userID|group| features| 
+-----------+-----+------------+ 
|12462563356| 1| [5.0,43.0]| 
|12462563701| 2| [1.0,8.0]| 
|12462563701| 1| [2.0,12.0]| 
|12462564356| 1| [1.0,1.0]| 
|12462565487| 3| [2.0,3.0]| 
|12462565698| 2| [1.0,1.0]| 
|12462565698| 1| [1.0,1.0]| 
|12462566081| 2| [1.0,2.0]| 
|12462566081| 1| [1.0,15.0]| 
|12462566225| 2| [1.0,1.0]| 
|12462566225| 1| [9.0,85.0]| 
|12462566526| 2| [1.0,1.0]| 
|12462566526| 1| [3.0,79.0]| 
|12462567006| 2| [11.0,15.0]| 
|12462567006| 1| [10.0,15.0]| 
|12462567006| 3| [10.0,15.0]| 
|12462586595| 2| [2.0,42.0]| 
|12462586595| 3| [2.0,16.0]| 
|12462589343| 3| [1.0,1.0]| 
+-----------+-----+------------+ 

Gdzie typy kolumn są: identyfikator użytkownika: Długi, grupa: Int, i cechy: vector.

Jest to już pogrupowana DataFrame, tzn. Identyfikator użytkownika pojawi się w danej grupie w tym samym czasie.

Moim celem jest skalowanie kolumny features dla grupy.

Czy istnieje sposób, aby zastosować feature transformer (w moim przypadku chciałbym zastosować StandardScaler) na grupę zamiast stosowania jej do pełnej DataFrame.

P.S. używanie ML nie jest obowiązkowe, więc nie ma problemu, jeśli rozwiązanie jest oparte na MLlib.

+0

W jaki sposób planujesz zainstalować StandardScaler? W każdej grupie? – eliasah

+0

Chciałbym skalować każdy wymiar wektora funkcji dla każdej grupy. – Rami

+1

AFAIK to nie robi, ale zawsze możesz zastosować wszystkie operacje bezpośrednio. Scaler działa na RDD, więc jest to tylko kwestia obliczania statystyk i transformacji na grupę. – zero323

Odpowiedz

5

Można obliczyć statystyki według grup używając niemal ten sam kod jako domyślnie Scaler:

import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 
import org.apache.spark.sql.Row 

// Compute Multivariate Statistics 
val summary = data.select($"group", $"features") 
    .rdd 
    .map { 
     case Row(group: Int, features: Vector) => (group, features) 
    } 
    .aggregateByKey(new MultivariateOnlineSummarizer)(/* Create an empty new MultivariateOnlineSummarizer */ 
     (agg, v) => agg.add(v), /* seqOp : Add a new sample Vector to this summarizer, and update the statistical summary. */ 
     (agg1, agg2) => agg1.merge(agg2)) /* combOp : As MultivariateOnlineSummarizer accepts a merge action with another MultivariateOnlineSummarizer, and update the statistical summary. */ 
    .mapValues { 
     s => (
     s.variance.toArray.map(math.sqrt(_)), /* compute the square root variance for each key */ 
     s.mean.toArray /* fetch the mean for each key */ 
    ) 
    }.collectAsMap 

Jeżeli przewidywana liczba grup jest stosunkowo niska można nadawać takie:

val summaryBd = sc.broadcast(summary) 

i przekształcić swoje dane :

val scaledRows = df.map{ case Row(userID, group: Int, features: Vector) => 
    val (stdev, mean) = summaryBd.value(group) 
    val vs = features.toArray.clone() 
    for (i <- 0 until vs.size) { 
    vs(i) = if(stdev(i) == 0.0) 0.0 else (vs(i) - mean(i)) * (1/stdev(i)) 
    } 
    Row(userID, group, Vectors.dense(vs)) 
} 
val scaledDf = sqlContext.createDataFrame(scaledRows, df.schema) 

W przeciwnym razie możesz po prostu dołączyć. Nie powinno być trudno opakować go jako transformator ML z kolumną grupy jako parametrem.

+1

To jest doskonała odpowiedź! – eliasah

Powiązane problemy