2016-02-04 8 views
11

W dokumentacji Sparka mówi, że metoda RDDs reduce wymaga asocjacyjnej i komutatywnej funkcji binarnej.Spark: różnica semantyki między redukuj i redukujByKey

Jednak metoda TYLKO wymaga reduceByKey asocjacyjnej funkcji binarnej.

sc.textFile("file4kB", 4) 

Zrobiłem kilka testów i najwyraźniej to zachowanie, które otrzymuję. Skąd ta różnica? Dlaczego reduceByKey zapewnia, że ​​funkcja binarna jest zawsze stosowana w określonej kolejności (w celu dostosowania do braku komutatywności), gdy nie ma ona wartości reduce?

przykład, jeżeli obciążenie część (mały) tekst na 4 strefy (minimalne):

val r = sc.textFile("file4k", 4) 

następnie:

r.reduce(_ + _) 

powraca do ciągu, w którym elementy nie są zawsze w tym samym porządku, mając na uwadze, że:

r.map(x => (1,x)).reduceByKey(_ + _).first 

zawsze zwraca ten sam ciąg (gdzie wszystko jest w tej samej kolejności, co w oryginale) l plik).

(Sprawdziłem z r.glom, a zawartość pliku jest rzeczywiście rozłożona na 4 partycje, nie ma pustej partycji).

+2

Chyba pomysł z 'reduceByKey' jest to, że prawdopodobnie masz wiele różnych kluczy, więc można zredukować wszystko dla pojedynczego klucza w jednym wątku, co oznacza, że ​​zawsze można uruchomić obliczenia od lewej do prawej. Natomiast "redukcja" często będzie używana na dużym zestawie danych, więc nie należy przejmować się kolejnością operacji. –

+0

Ile executorów używasz w swoich eksperymentach? – gprivitera

Odpowiedz

7

Jeśli chodzi o mnie, jest to błąd w dokumentacji, a wyniki, które widzisz, są po prostu przypadkowe. Practice, other resources i prosty analysis of the code pokazują, że funkcja przekazana do reduceByKey powinna być nie tylko asocjacyjna, ale również przemienna.

  • praktyka - podczas gdy wygląda kolejność jest zachowana w trybie lokalnym nie jest już prawdą, po uruchomieniu Spark w klastrze, w tym trybie autonomicznym.

  • innych zasobów - by zacytować Data Exploration Using Spark z AmpCamp 3:

    Jest to wygodna metoda zwana reduceByKey w Spark dokładnie tego wzorca. Zauważ, że drugi argument do zmniejszeniaByKey określa liczbę reduktorów do użycia. Domyślnie Spark zakłada, że ​​funkcja redukcji jest przemienna i asocjacyjna, i stosuje kombinatory po stronie odwzorowującej. Kod

  • - reduceByKey jest realizowany za pomocą combineByKeyWithClassTag i tworzy ShuffledRDD. Ponieważ Spark nie gwarantuje kolejności po przetasowaniu, jedynym sposobem na przywrócenie byłoby dołączenie niektórych metadanych do częściowo zredukowanych rekordów. O ile mogę powiedzieć, nic takiego się nie dzieje.

Na marginesie reduce jak to jest realizowane w PySpark będzie działać dobrze z funkcją, która jest tylko przemienne. Jest to oczywiście tylko detal realizacji, a nie część umowy.

+3

Dodam, że ograniczenie to działanie, zwracanie danych do sterownika, podczas gdy reduceByKey to transformacja, zwracanie kolejnego RDD – rhernando

+0

Dzięki! Ale czy jest jakiś sposób w Sparku, aby zapewnić poprawność nie-przemiennego traktowania? Czy to wykracza poza zakres Sparka? –

+0

Nie jestem pewien, czy rozumiem pytanie. Czy pytasz, czy możliwe jest automatyczne przetestowanie/udowodnienie przemienności lub po prostu chcesz użyć funkcji nieprzemiennej z 'zredukować'? Jeśli jest to drugi przypadek, który naśladuje zachowanie PySpark ('mapPartitions (reduceFunc)' => 'collect' => reduce (reduceFunc)') powinien działać z pewną karą wydajności. – zero323

1

Zgodnie z dokumentacją kodu, niedawno zaktualizowaną/poprawioną.(dzięki @ zero323):

reduceByKey scala wartości dla każdego klawisza za pomocą asocjacyjnej i komutatywnej funkcji redukcji. Spowoduje to również połączenie lokalnie na każdym programie odwzorowującym przed wysłaniem wyników do reduktora, podobnie jak "sumator" w MapReduce.

Tak naprawdę był to błąd dokumentacji, np. @ Zero323 wskazany w odpowiedzi.

Można sprawdzić następujące łącza do kodu, aby upewnić się, że: