2014-06-05 13 views
20

Mam listę Tupli typu: (identyfikator użytkownika, nazwisko, liczba).Używanie reduceByKey w Apache Spark (Scala)

Na przykład

val x = sc.parallelize(List(
    ("a", "b", 1), 
    ("a", "b", 1), 
    ("c", "b", 1), 
    ("a", "d", 1)) 
) 

Ja próbując zmniejszyć tę kolekcję do typu, gdzie każda nazwa element jest liczony. w powyższym val x

Więc jest konwertowany do:

(a,ArrayBuffer((d,1), (b,2))) 
(c,ArrayBuffer((b,1))) 

Oto kod Obecnie używam:

val byKey = x.map({case (id,uri,count) => (id,uri)->count}) 

val grouped = byKey.groupByKey 
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))} 
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey 

grouped2.foreach(println) 

Ja próbuje użyć reduceByKey jak wykonuje się szybciej niż groupByKey.

W jaki sposób można zaimplementować metodę lowerBeyKey zamiast powyższego kodu, aby zapewnić takie samo odwzorowanie?

Odpowiedz

26

Po kodzie:

val byKey = x.map({case (id,uri,count) => (id,uri)->count}) 

Można zrobić:

val reducedByKey = byKey.reduceByKey(_ + _) 

scala> reducedByKey.collect.foreach(println) 
((a,d),1) 
((a,b),2) 
((c,b),1) 

PairRDDFunctions[K,V].reduceByKey bierze asocjacyjną funkcji, które mogą być stosowane do zmniejszenia do typu V z RDD [(K, V) ]. Innymi słowy, potrzebujesz funkcji f[V](e1:V, e2:V) : V. W tym konkretnym przypadku z sumą na Ints: (x:Int, y:Int) => x+y lub _ + _ w krótkiej notacji podkreślenia.

Dla zapisu: reduceByKey działa lepiej niż groupByKey, ponieważ ma na celu zastosowanie funkcji redukcji lokalnie przed fazą losowania/redukcji. groupByKey wymusza wymieszanie wszystkich elementów przed zgrupowaniem.

+1

Więc, w zasadzie, reduceByKey ma taki sam skutek jak zrobienie groupBy, a następnie zastosowanie funkcji Custom Dise? – Savvas

+5

@Savvas wynik końcowy jest równy, ale 'reduceByKey' ma wymaganie pamięci O (1) na executor, podczas gdy' groupByKey' zachowuje wszystkie pogrupowane wartości w pamięci, które mogą prowadzić do OOM. – maasg

5

Struktura danych wyjściowych to: RDD [(String, String, Int)], a reduceByKey może być używany tylko wtedy, gdy struktura danych to RDD [(K, V)].

val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)] 
val reduced = kv.reduceByKey(_ + _)  // reduced is RDD[((String, String), Int)] 
val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))] 
val grouped = kv2.groupByKey()   // grouped is RDD[(String, Iterable[(String, Int)])] 
grouped.foreach(println) 
+0

Nie ma takiego ograniczenia, że ​​'V' musi być numeryczne. Jedynym wymaganiem jest to, że funkcja f (V, V) => V musi być asocjacyjna. Otrzymasz niespójne wyniki, jeśli tak nie jest. – maasg

+0

To jest błąd ... Myślałem o (_ + _) w tym Momeent: P, aktualizacja. – cloud

0

Składnia jest poniżej:

reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V], 

która mówi o tym samym kluczu w RDD potrzebny wartości (co będzie na pewno z tego samego typu) wykonuje operację dostarczone jako część funkcji i zwraca wartość tego samego typu co nadrzędny RDD.