2015-06-25 13 views
5

Mam następujący kod w Spark:Spark: zlewają się bardzo powoli nawet dane wyjściowe jest bardzo małe

myData.filter(t => t.getMyEnum() == null) 
     .map(t => t.toString) 
     .saveAsTextFile("myOutput") 

Istnieje 2000+ pliki w folderze myOutput, ale tylko niewielu t.getMyEnum() == null, więc istnieje tylko kilka rekordów wyjściowych. Ponieważ nie chcę szukać tylko kilka wyjść w 2000 + plików wyjściowych, próbowałem połączyć wyjście używając zlewają się jak poniżej:

myData.filter(t => t.getMyEnum() == null) 
     .map(t => t.toString) 
     .coalesce(1, false) 
     .saveAsTextFile("myOutput") 

Wtedy praca staje się bardzo powolny! Zastanawiam się, dlaczego jest tak powolny? Było tylko kilka rekordów wyjściowych rozpraszających w 2000+ partycjach? Czy istnieje lepszy sposób na rozwiązanie tego problemu?

Odpowiedz

8

jeśli robisz drastyczne połączenie, np. do numPartitions = 1, może to spowodować, że twoje obliczenia będą wykonywane na mniejszej liczbie węzłów, niż chcesz (na przykład jeden węzeł w przypadku numPartitions = 1). Aby tego uniknąć, możesz przekazać shuffle = true. Spowoduje to dodanie etapu losowania, ale oznacza, że ​​bieżące partycje upstream będą wykonywane równolegle (niezależnie od bieżącego partycjonowania).

Uwaga: Z shuffle = true, można rzeczywiście połączyć się z większą liczbą partycji . Jest to przydatne, jeśli masz małą liczbę partycji, powiedzmy 100, potencjalnie z kilkoma partycjami, które są wyjątkowo duże. Wywołanie koalescji (1000, shuffle = true) da 1000 partycji z danymi rozproszonymi za pomocą partycji mieszającej.

Spróbuj więc przekazać funkcję true do coalesce. tj

myData.filter(_.getMyEnum == null) 
     .map(_.toString) 
     .coalesce(1, shuffle = true) 
     .saveAsTextFile("myOutput") 
+2

jest 'COALESCE (1, Shuffle = true)' 'podziale równoważne (1)'? – asmaier

+1

Tak, to jest to samo: jeśli spojrzysz na kod źródłowy, partycja (1) domyślnie przetasowała wartość na true. – sversch

Powiązane problemy