2014-07-17 11 views
28

Powiedz, że mam system dystrybucji na 3 węzłach, a moje dane są dystrybuowane między tymi węzłami. na przykład mam pliku test.csv który istnieje na wszystkich 3 węzłów i zawiera 2 kolumny:Jak działa funkcja agregacji Spark - działa funkcja aggregateByKey?

**row | id, c.** 
--------------- 
row1 | k1 , c1 
row2 | k1 , c2 
row3 | k1 , c3 
row4 | k2 , c4 
row5 | k2 , c5 
row6 | k2 , c6 
row7 | k3 , c7 
row8 | k3 , c8 
row9 | k3 , c9 
row10 | k4 , c10 
row11 | k4 , c11 
row12 | k4 , c12 

Następnie używam SparkContext.textFile odczytać pliku jako RDD i tak. O ile rozumiem, każdy węzeł roboczy iskrzenia odczyta fragment z pliku. Więc teraz powiedzmy, że każdy węzeł będzie przechowywać:

  • węzeł 1: wiersz 1 ~ 4
  • węzła 2: wiersz 5 ~ 8
  • węzła 3: wiersz 9 ~ 12

My pytanie brzmi: powiedzmy, że chcę wykonać obliczenia na tych danych i jest jeden krok, który muszę zgrupować klucz razem, więc kluczowa para wartości będzie [k1 [{k1 c1} {k1 c2} {k1 c3}]].. i tak dalej.

Istnieje funkcja o nazwie groupByKey(), która jest bardzo kosztowna w użyciu i zalecana jest wersja aggregateByKey(). Zastanawiam się więc, w jaki sposób działa groupByKey() i aggregateByKey() pod maską? Czy ktoś, kto może posłużyć się przykładem podanym powyżej, może to wyjaśnić? Po przetasowaniu gdzie znajdują się wiersze na każdym węźle?

Odpowiedz

40

aggregateByKey() jest niemal identyczna reduceByKey() (zarówno wywołującego combineByKey() za kulisami), z wyjątkiem podać wartość początkową dla aggregateByKey(). Większość ludzi zna reduceByKey(), więc użyję tego w wyjaśnieniu.

Powód: reduceByKey() jest o wiele lepszy, ponieważ korzysta z funkcji MapReduce nazywanej kombinatorem. Dowolna funkcja taka jak + lub * może być używana w ten sposób, ponieważ kolejność elementów, do których jest wywoływana, nie ma znaczenia. Dzięki temu Spark może rozpocząć "zmniejszanie" wartości za pomocą tego samego klucza, nawet jeśli nie są jeszcze na tej samej partycji.

Po drugiej stronie groupByKey() zapewnia większą wszechstronność, ponieważ pisze się funkcję, która ma funkcję Iterable, co oznacza, że ​​można nawet przeciągnąć wszystkie elementy do tablicy. Jest to jednak nieefektywne, ponieważ aby działał, pełny zestaw par (K,V,) musi znajdować się w jednej partycji.

kroku, który przenosi dane wokół w operacji zmniejszenia typu jest zwykle nazywany losowo na bardzo najprostszej dane rozdziela się do każdego węzła (często z partycjonowania mieszania), a następnie sortowane w każdym węźle .

+2

ok, więc pozwala wrócić do mojego przykładu, jeśli node1 został row1 ~ row3 Node2 został row4 ~ row6 i Node3 został row7 do row12. i kiedy robię groupByKey, czy dane będą się poruszać w ogóle, czy nic się nie poruszy, skoro rdd z tym samym kluczem jest już w tym samym węźle? dzięki – EdwinGuo

+1

@EdwinGuo żadne dane nie mogą wciąż się przemieszczać, załóżmy, że używasz partycji mieszającej, jeśli wszystkie k1 znajdują się w węźle 1, ale wynik mieszania k1 na poziomie 3 to nadal będzie trzeci węzeł. – aaronman

+0

Ale jeśli nie zwracaj uwagi na kolejność, po prostu chcę zwrócić tablicę ze wszystkimi wartościami, na przykład groupByKey. Czy jest to możliwe z inną składnią niż groupbykey? –

40

aggregateByKey() różni się od reduBeyKey. Co się dzieje, to, że reduceByKey jest pewnego rodzaju przypadkiem aggregateByKey.

aggregateByKey() połączy wartości dla określonego klucza, a wynikiem takiej kombinacji może być dowolny określony obiekt. Musisz określić, w jaki sposób wartości są łączone ("dodane") wewnątrz jednej partycji (która jest wykonywana w tym samym węźle) i jak połączyć wynik z różnych partycji (które mogą znajdować się w różnych węzłach). reduceByKey jest szczególnym przypadkiem, w tym sensie, że wynik kombinacji (np. suma) jest tego samego typu, co wartości, i że operacja, gdy jest połączona z różnych partycji, jest również taka sama jak operacja podczas łączenia wartości wewnątrz przegroda.

Przykład: Wyobraź sobie, że masz listę par. Równolegle go:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5))) 

Teraz chcesz "połączyć" je przez klucz, tworząc sumę. W tym przypadku reduceByKey i aggregateByKey są takie same:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything 
resReduce.collect 
res3: Array[(String, Int)] = Array((b,7), (a,9)) 

//0 is initial value, _+_ inside partition, _+_ between partitions 
val resAgg = pairs.aggregateByKey(0)(_+_,_+_) 
resAgg.collect 
res4: Array[(String, Int)] = Array((b,7), (a,9)) 

Teraz wyobraź sobie, że chcesz agregację być zbiorem wartości, to jest inny typ, że wartości, które są liczbami całkowitymi (suma liczb całkowitych jest również całkowite):

import scala.collection.mutable.HashSet 
//the initial value is a void Set. Adding an element to a set is the first 
//_+_ Join two sets is the _++_ 
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_) 
sets.collect 
res5: Array[(String, scala.collection.mutable.HashSet[Int])] =Array((b,Set(7)), (a,Set(1, 5, 3))) 
+0

Bardzo dokładna odpowiedź na to, jak działają te dwie funkcje, doceń to! – SparkleGoat

+0

Czy możesz również wysłać kod java, trudno zrozumieć scala – rohanagarwal