2015-07-26 42 views
19

Mam tabelę z dwoma kolumnami typu string (nazwa użytkownika, przyjaciel) i dla każdej nazwy użytkownika chcę zebrać wszystkich jej znajomych w jednym wierszu, połączonych jako ciągi ("nazwa użytkownika1", "przyjaciele1, przyjaciele2, przyjaciele3"). Wiem, że MySql robi to przez GROUP_CONCAT, czy jest jakiś sposób to zrobić z SPARK SQL?Zastępowanie SPARK SQL dla funkcji agregującej mysql GROUP_CONCAT

Dzięki

Odpowiedz

32

Przed rozpoczęciem operacji: To jest jeszcze jeden inny groupByKey. Chociaż ma wiele legalnych aplikacji, jest stosunkowo drogi, więc należy go używać tylko wtedy, gdy jest to wymagane.


Niezupełnie zwięzły lub wydajne rozwiązanie, ale można użyć UserDefinedAggregateFunction wprowadzony Spark 1.5.0:

object GroupConcat extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("x", StringType) 
    def bufferSchema = new StructType().add("buff", ArrayType(StringType)) 
    def dataType = StringType 
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
     buffer.update(0, ArrayBuffer.empty[String]) 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) = { 
     if (!input.isNullAt(0)) 
     buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0)) 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
     buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)) 
    } 

    def evaluate(buffer: Row) = UTF8String.fromString(
     buffer.getSeq[String](0).mkString(",")) 
} 

Przykład użycia:

val df = sc.parallelize(Seq(
    ("username1", "friend1"), 
    ("username1", "friend2"), 
    ("username2", "friend1"), 
    ("username2", "friend3") 
)).toDF("username", "friend") 

df.groupBy($"username").agg(GroupConcat($"friend")).show 

## +---------+---------------+ 
## | username|  friends| 
## +---------+---------------+ 
## |username1|friend1,friend2| 
## |username2|friend1,friend3| 
## +---------+---------------+ 

Można również utworzyć otoki Python jako wyświetlane w Spark: How to map Python with Scala or Java User Defined Functions?

W praktyce może to być wyodrębnić RDD, groupByKey, i odbudować DataFrame.

można uzyskać podobny efekt, łącząc collect_list funkcji (Spark> = 1.6.0) z concat_ws:

import org.apache.spark.sql.functions.{collect_list, udf, lit} 

df.groupBy($"username") 
    .agg(concat_ws(",", collect_list($"friend")).alias("friends")) 
+0

What If Chcę go użyć w SQL Jak mogę zarejestrować to UDF w Spark SQL? –

+0

@MurtazaKanchwala [Istnieje metoda "register", która akceptuje UDAFS] (https://github.com/apache/spark/blob/37c617e4f580482b59e1abbe3c0c27c7125cf605/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration .scala # L63-L69), więc powinien działać jako standardowy UDF. – zero323

+0

@ zero323 dowolne podejście zrobić to samo w iskrze sql 1.4.1 –

2

Jednym ze sposobów, aby to zrobić z pyspark < 1.6, która niestety nie obsługuje user- funkcja określona łączna:

byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y) 

a jeśli chcesz, aby to dataframe jeszcze:

sqlContext.createDataFrame(byUsername, ["username", "friends"]) 

Począwszy od 1.6, można użyć collect_list a następnie przystąpić do utworzonej listy:

from pyspark.sql import functions as F 
from pyspark.sql.types import StringType 
join_ = F.udf(lambda x: ", ".join(x), StringType()) 
df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends")) 
10

Można spróbować funkcję collect_list

sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A 

Albo można regieter UDF coś podobnego

sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b)) 

i możesz użyć tej funkcji w zapytaniu

sqlConttext.sql("select A,collect_list(myzip(B,C)) from tbl group by A") 
+1

Próbowałem, ale działa tylko z HiveContext –

2

Język: Scala Spark wersja: 1.5.2

miałem ten sam problem, a także starał się rozwiązać go za pomocą udfs ale, niestety, to doprowadziło do większej liczby problemów później w kodzie z powodu niespójności typu.I był w stanie pracować moją drogę dokoła to najpierw przekształcenie DF do RDD następnie zgrupowania przez i manipulowania danymi w pożądany sposób, a następnie przekształcenie RDD powrotem do DF następująco:

val df = sc 
    .parallelize(Seq(
     ("username1", "friend1"), 
     ("username1", "friend2"), 
     ("username2", "friend1"), 
     ("username2", "friend3"))) 
    .toDF("username", "friend") 

+---------+-------+ 
| username| friend| 
+---------+-------+ 
|username1|friend1| 
|username1|friend2| 
|username2|friend1| 
|username2|friend3| 
+---------+-------+ 

val dfGRPD = df.map(Row => (Row(0), Row(1))) 
    .groupByKey() 
    .map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))} 
    .toDF("username", "groupOfFriends") 

+---------+---------------+ 
| username| groupOfFriends| 
+---------+---------------+ 
|username1|friend2,friend1| 
|username2|friend3,friend1| 
+---------+---------------+ 
Powiązane problemy