2016-05-11 18 views
5

Moje dane jest jak:SPARK DataFrame: Usuń wartość MAX w grupie

id | val 
---------------- 
a1 | 10 
a1 | 20 
a2 | 5 
a2 | 7 
a2 | 2 

próbuję usunąć wiersz, który ma MAX (Val) w grupie, jeśli grupa I na "id".

wynik powinien być podobny:

id | val 
---------------- 
a1 | 10 
a2 | 5 
a2 | 2 

Używam SPARK DataFrame i SqlContext. Potrzebuję coś w stylu:

DataFrame df = sqlContext.sql("SELECT * FROM jsontable WHERE (id, val) NOT IN (SELECT is,MAX(val) from jsontable GROUP BY id)"); 

Jak mogę to zrobić?

Odpowiedz

0

Oto jak to zrobić przy użyciu RDD i więcej Scala smaku podejście:

// Let's first get the data in key-value pair format 
val data = sc.makeRDD(Seq(("a",20), ("a", 1), ("a",8), ("b",3), ("b",10), ("b",9))) 

// Next let's find the max value from each group 
val maxGroups = data.reduceByKey(Math.max(_,_)) 

// We join the max in the group with the original data 
val combineMaxWithData = maxGroups.join(data) 

// Finally we filter out the values that agree with the max 
val finalResults = combineMaxWithData.filter{ case (gid, (max,curVal)) => max != curVal }.map{ case (gid, (max,curVal)) => (gid,curVal) } 


println(finalResults.collect.toList) 
>List((a,1), (a,8), (b,3), (b,9)) 
+0

Czy możesz prosić e wysłać kod równoważny JAVA? – user3802925

+0

Nie zrobiłem żadnego Sparka na Javie ... Powinien być odpowiednik Javy dla każdego z tych API. To może nie być trudne tłumaczenie. Główne idee powinny obowiązywać. – marios

+0

Dzięki Marios! Poniżej znajduje się implementacja Java, którą miałem z pewnymi modyfikacjami podczas dołączania, gdzie łączę się przy użyciu klucza zamiast pełnego łączenia kartezjańskiego. – user3802925

2

można to zrobić za pomocą operacji dataframe i funkcji okna. Zakładając, że masz swoje dane w dataframe df1:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.Window 

val maxOnWindow = max(col("val")).over(Window.partitionBy(col("id"))) 
val df2 = df1 
    .withColumn("max", maxOnWindow) 
    .where(col("val") < col("max")) 
    .select("id", "val") 

w Javie, odpowiednik byłoby coś jak:

import org.apache.spark.sql.functions.Window; 
import static org.apache.spark.sql.functions.*; 

Column maxOnWindow = max(col("val")).over(Window.partitionBy("id")); 
DataFrame df2 = df1 
    .withColumn("max", maxOnWindow) 
    .where(col("val").lt(col("max"))) 
    .select("id", "val"); 

Oto miły artykuł o funkcji okna: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

+0

Dzięki Daniel! Próbuję teraz Twojego podejścia. Zastanawiasz się, czy to lepsze niż podejście "dołącz"? – user3802925

+0

@ user3802925 Ponieważ dane pierwotnie znajdują się w DataFrame, to podejście pozwala uniknąć konwersji na RDD, co jest już osiągnięciem zarówno pod względem wydajności, jak i czytelności kodu. Jednak uważam, że faktyczna różnica w wydajności obu metod zależy od danych i wymaga pewnych testów, aby wyciągnąć wnioski. –

+0

@ user3802925: Niestety, nie widziałem wersji łączącej z DataFrame. W tym przypadku tylko moje drugie zdanie jest prawidłowe. –

1

Poniżej znajduje się Implementacja Java kodu scala Mario:

DataFrame df = sqlContext.read().json(input); 
DataFrame dfMaxRaw = df.groupBy("id").max("val"); 
DataFrame dfMax = dfMaxRaw.select(
    dfMaxRaw.col("id").as("max_id"), dfMaxRaw.col("max(val)").as("max_val") 
); 
DataFrame combineMaxWithData = df.join(dfMax, df.col("id") 
    .equalTo(dfMax.col("max_id"))); 
DataFrame finalResult = combineMaxWithData.filter(
    combineMaxWithData.col("id").equalTo(combineMaxWithData.col("max_id")) 
     .and(combineMaxWithData.col("val").notEqual(combineMaxWithData.col("max_val"))) 
); 
+0

wygląda dobrze, aczkolwiek trochę gadatliwie, ale tego oczekuje się od Javy :) – marios

Powiązane problemy