2015-06-26 8 views
11

Dokumentacja API Apache Spark pyspark.RDD wspomina, że ​​groupByKey() jest nieefektywna. Zamiast tego zaleca się użycie zamiast tego reduceByKey(), aggregateByKey(), , , , , . Spowoduje to dokonanie części agregacji w pracownikach przed przetasowaniem, co zmniejszy przetasowanie danych przez pracowników.Apache Spark: Jaka jest odpowiednia implementacja RDD.groupByKey() przy użyciu RDD.aggregateByKey()?

Biorąc pod uwagę następujący zestaw danych i wyrażenie groupByKey(), co jest równoważną i wydajną implementacją (zmniejszone tasowanie danych między pracownikami), które nie korzysta z groupByKey(), ale dostarcza taki sam wynik?

dataset = [("a", 7), ("b", 3), ("a", 8)] 
rdd = (sc.parallelize(dataset) 
     .groupByKey()) 
print sorted(rdd.mapValues(list).collect()) 

wyjściowa:

[('a', [7, 8]), ('b', [3])] 
+0

Czy dane są podzielone losowo, czy według klucza? Jeśli możesz upewnić się, że wszystkie rekordy z a._1 = "a" są na tej samej partycji, możesz znacznie przyspieszyć rzeczy - możesz być w stanie uciec bez potrzeby tasowania innych niż te potrzebne do początkowego partycjonowania . Może spróbuj użyć partycjonera mieszającego? –

Odpowiedz

18

miarę Mogę powiedzieć, że nie ma nic do zyskania * w tym konkretnym przypadku przy użyciu aggregateByKey lub podobnej funkcji. Ponieważ budujesz listę, nie ma "prawdziwej" redukcji, a ilość danych, które trzeba przetasować, jest mniej więcej taka sama.

Aby naprawdę zaobserwować pewne zwiększenie wydajności, potrzebne są transformacje, które faktycznie zmniejszają ilość przekazywanych danych, na przykład zliczanie, obliczanie statystyk podsumowujących, wyszukiwanie unikatowych elementów.

Odnośnie różnic zalet korzystania reduceByKey(), combineByKey() lub foldByKey() istnieje istotna różnica koncepcyjna, która jest lepiej widoczny, gdy uznają Scala singatures API.

Zarówno reduceByKey, jak i foldByKey mapa od RDD[(K, V)] do RDD[(K, V)], podczas gdy druga zapewnia dodatkowy element zerowy.

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)] 

combineByKey (nie ma aggregateByKey, ale jest to ten sam typ transformacji) przekształca od RDD[(K, V)] do RDD[(K, C)]:

combineByKey[C](
    createCombiner: (V) ⇒ C, 
    mergeValue: (C, V) ⇒ C, 
    mergeCombiners: (C, C) ⇒ C): RDD[(K, C)] 

Wracając do przykładu combineByKey (aw PySpark aggregateByKey) jest naprawdę obowiązujące od czasu transformacji z RDD[(String, Int)] do RDD[(String, List[Int])].

Choć w dynamicznym języku Python jak to jest faktycznie możliwe do przeprowadzenia takiej operacji przy użyciu foldByKey lub reduceByKey to sprawia, że ​​semantyka kodu niejasne i cytować @ Tim-Peters „Nie powinno być jedno- i korzystnie tylko jeden - czysty sposób na zrobienie tego: "[1].

Różnica między aggregateByKey i combineByKey jest prawie taka sama jak między reduceByKey i foldByKey więc na liście najczęściej jest to kwestia gustu:

def merge_value(acc, x): 
    acc.append(x) 
    return acc 

def merge_combiners(acc1, acc2): 
    acc1.extend(acc2) 
    return acc1 

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
    .combineByKey(
     lambda x: [x], 
     lambda u, v: u + [v], 
     lambda u1,u2: u1+u2)) 

W praktyce powinno wolisz groupByKey chociaż. Implementacja PySpark jest znacznie bardziej zoptymalizowana w porównaniu do implementacji naiwnej, takiej jak przedstawiona powyżej.

1.Peters, T. PEP 20 - The Zen of Python. (2004). w https://www.python.org/dev/peps/pep-0020/


* W praktyce nie jest całkiem dużo do stracenia tutaj, zwłaszcza przy użyciu PySpark. Implementacja Pythona groupByKey jest znacznie bardziej zoptymalizowana niż naiwna kombinacja przez klucz. Możesz sprawdzić Be Smart About groupByKey, utworzony przeze mnie i @eliasah dla dodatkowej dyskusji.

+0

Jeśli używasz partycjonera (powiedzmy, partycji za pomocą skrótu klucza), czy możesz uciec bez potrzeby innych tasowań? –

+0

@GlennStrycker O ile mi wiadomo, odpowiedź jest pozytywna. Jeśli RDD jest podzielony na partycje według klucza, wszystkie wartości dla danego klucza powinny być przetwarzane lokalnie w jednym węźle. Możliwym problemem jest jednak wypaczona dystrybucja kluczy. – zero323

3

Oto jedna opcja, która wykorzystuje aggregateByKey(). Chciałbym usłyszeć, jak można to zrobić za pomocą reduceByKey(), combineByKey() lub foldByKey() i jaki jest koszt/korzyść dla każdej alternatywy.

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
     .aggregateByKey(list(), 
         lambda u,v: u+[v], 
         lambda u1,u2: u1+u2)) 
print sorted(rdd.mapValues(list).collect()) 

wyjściowa:

[('a', [7, 8]), ('b', [3])] 

Poniżej znajduje się nieco więcej pamięci efektywne wdrożenie, choć mniej czytelny dla początkujących Pythona, który daje ten sam wynik:

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
     .aggregateByKey(list(), 
         lambda u,v: itertools.chain(u,[v]), 
         lambda u1,u2: itertools.chain(u1,u2))) 
print sorted(rdd.mapValues(list).collect())