2016-02-05 18 views
18

Próbuję użyć ramek danych Spark zamiast RDD, ponieważ wydają się być bardziej zaawansowane niż RDD i mają tendencję do tworzenia bardziej czytelnego kodu, ale byłbym bardziej niż szczęśliwy, aby uzyskać sugestie dotyczące czegoś bardziej idiomatycznego dla danego zadania.Znajdź maksymalną liczbę wierszy na grupę w Spark DataFrame

W klastrze Google Dataproc z 14 węzłami mam około 6 milionów nazw przetłumaczonych na ids przez dwa różne systemy: sa i sb. Każdy zawiera name,i id_sb. Moim celem jest wygenerowanie mapowania z id_sa do id_sb, tak aby dla każdego id_sa odpowiadający id_sb był najczęstszym identyfikatorem spośród wszystkich nazw dołączonych do id_sa.

Spróbujmy wyjaśnić na przykładzie. Jeśli mam następujące wiersze:

[Row(name='n1', id_sa='a1', id_sb='b1'), 
Row(name='n2', id_sa='a1', id_sb='b2'), 
Row(name='n3', id_sa='a1', id_sb='b2'), 
Row(name='n4', id_sa='a2', id_sb='b2')] 

Moim celem jest stworzenie mapowania z a1 do b2. Rzeczywiście, nazwy związane z a1n1, n2 i n3, które mapują odpowiednio b1, b2 i b2 tak b2 jest najczęstszą w mapowaniu nazwy związane z a1. W ten sam sposób, a2 zostanie zmapowany do b2. Można założyć, że zawsze będzie zwycięzca: nie trzeba łamać więzi.

Miałem nadzieję, że będę mógł używać groupBy(df.id_sa) na mojej ramce danych, ale nie wiem, co robić dalej. Miałem nadzieję na agregacji, które mogłyby produkować, w końcu się następujące wiersze:

[Row(id_sa=a1, max_id_sb=b2), 
Row(id_sa=a2, max_id_sb=b2)] 

Ale może Próbuję użyć niewłaściwego narzędzia i należy po prostu wrócić do korzystania z RDD.

+1

Jakie jest twoje pytanie? – eliasah

+0

@eliasah wszelkie wskazówki (linki, przykład kodu) na temat sposobów, aby to zrobić z ramkami danych? –

+1

Chcesz przeprowadzić grupowanie według i maksimum agregacji? – eliasah

Odpowiedz

18

Stosując join (spowoduje to więcej niż jeden wiersz w grupy, w przypadku tych):

import pyspark.sql.functions as F 
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts") 
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs") 

cnts.join(maxs, 
    (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa")) 
).select(col("cnts.id_sa"), col("cnts.id_sb")) 

przy użyciu funkcji okna (spadnie związki)

from pyspark.sql.functions import rowNumber 
from pyspark.sql.window import Window 

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc()) 

(cnts 
    .withColumn("rn", rowNumber().over(w)) 
    .where(col("rn") == 1) 
    .select("id_sa", "id_sb")) 

Stosując struct Kolejność:

from pyspark.sql.functions import struct 

(cnts 
    .groupBy("id_sa") 
    .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max")) 
    .select(col("id_sa"), col("max.id_sb"))) 

Zobacz także SPARK DataFrame: select the first row of each group

+1

@QuentinPradet Podałem inne rozwiązanie, które nie jest zbyt dobrze przetestowane, ale może być interesujące pod względem wydajności – zero323

+0

Dzięki! Dostaję taką samą wydajność z wersją funkcji okna i kolejnością struct (między 15s a 20s), może zobaczę różnicę, kiedy przełożę się na większy zbiór danych. Mam też nadzieję, że nie masz nic przeciwko, ale zmieniłem maksimum na F.max, ponieważ to mnie potknęło. Zapraszam do wycofania. –

+1

Nie mam nic przeciwko. Jeśli zauważysz jakieś błędy lub uważasz, że istnieje lepszy sposób wyrażenia czegoś, możesz edytować któryś z moich wpisów :) Zgadzam się - mieszanie "maksimum" może być mylące. Zajęło mi trochę czasu, aby dowiedzieć się, co się dzieje, gdy spotkałem się z tym po raz pierwszy. – zero323

2

myślę, co może być patrząc na to funkcje okien: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Oto przykład w Scala (nie mam iskry Shell z Hive dostępnych w tej chwili, więc nie był w stanie przetestować kod, ale myślę, że powinno działać):

case class MyRow(name: String, id_sa: String, id_sb: String) 

val myDF = sc.parallelize(Array(
    MyRow("n1", "a1", "b1"), 
    MyRow("n2", "a1", "b2"), 
    MyRow("n3", "a1", "b2"), 
    MyRow("n1", "a2", "b2") 
)).toDF("name", "id_sa", "id_sb") 

import org.apache.spark.sql.expressions.Window 

val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc) 

myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb") 

Istnieje prawdopodobnie bardziej skuteczne sposoby, aby osiągnąć te same rezultaty z funkcji okna, ale mam nadzieję, że to punktów w prawo kierunek.

Powiązane problemy