2016-01-22 20 views
13

Mam trzy tablice typu string zawierający następujące informacje:Wiele agregujące operacje na tej samej kolumnie dataframe zapłonowej

  • GroupBy tablicy: zawierające nazwy kolumn chcę grupy moich danych przez.
  • agregowana tablica: zawierająca nazwy kolumn, które chcę agregować. Tablica
  • Operacje zbiorcze zawierające operacje, które chcę wykonać

Próbuję użyć ramek danych zapłonowe do osiągnięcia tego celu. Ramki danych programu Spark udostępniają metodę agg(), w której można przekazać Map [String, String] (nazwy kolumny i odpowiedniej operacji agregującej) jako dane wejściowe, ale chcę wykonywać różne operacje agregacji w tej samej kolumnie danych. Wszelkie sugestie, jak to osiągnąć?

Odpowiedz

26

Scala:

Można na przykład mapę nad listą funkcji o określonej mapping z nazwy na funkcję:

import org.apache.spark.sql.functions.{col, min, max, mean} 
import org.apache.spark.sql.Column 

val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v") 
val mapping: Map[String, Column => Column] = Map(
    "min" -> min, "max" -> max, "mean" -> avg) 

val groupBy = Seq("k") 
val aggregate = Seq("v") 
val operations = Seq("min", "max", "mean") 
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c)))) 

df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show 
// +---+------+------+------+ 
// | k|min(v)|max(v)|avg(v)| 
// +---+------+------+------+ 
// | 1| 3.0| 3.0| 3.0| 
// | 2| -5.0| -5.0| -5.0| 
// +---+------+------+------+ 

lub

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show 

Niestety parser, który jest używany wewnętrznie SQLContext nie jest publicznie dostępny, ale zawsze możesz spróbować zbudować zwykły SQL q ueries:

df.registerTempTable("df") 
val groupExprs = groupBy.mkString(",") 
val aggExprs = aggregate.flatMap(c => operations.map(
    f => s"$f($c) AS ${c}_${f}") 
).mkString(",") 

sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs") 

Python:

from pyspark.sql.functions import mean, sum, max, col 

df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"]) 
groupBy = ["k"] 
aggregate = ["v"] 
funs = [mean, sum, max] 

exprs = [f(col(c)) for f in funs for c in aggregate] 

# or equivalent df.groupby(groupBy).agg(*exprs) 
df.groupby(*groupBy).agg(*exprs) 
+0

To działa bardzo dobrze. Dzięki wielkie. :) –

+0

@ zero323 Czy wiesz przez przypadek, jak to zrobić z Pythona API? – lanenok

+0

@lanenok W podobny sposób. Wystarczy zastąpić flatMap ze zrozumieniem. – zero323

Powiązane problemy