2014-11-20 9 views
15

Przy zmniejszaniu liczby partycji można użyć coalesce, co jest wspaniałe, ponieważ nie powoduje przetasowania i wydaje się działać natychmiast (nie wymaga dodatkowego etapu pracy).Spark: zwiększyć liczbę partycji bez powodowania tasowania?

Chciałbym czasami robić coś odwrotnego, ale repartition indukuje przetasowanie. Myślę, że kilka miesięcy temu faktycznie działało to przy użyciu CoalescedRDD z balanceSlack = 1.0 - więc to, co by się stało, to podzieliłoby partycję tak, aby wynikowa lokalizacja partycji, gdzie wszystkie w tym samym węźle (tak małe netto IO).

Ten rodzaj funkcjonalności jest automatyczny w Hadoop, jeden po prostu zmienia rozmiar podzielonego. Wydaje się, że nie działa tak w Spark, chyba że zmniejsza się liczba partycji. Myślę, że rozwiązaniem może być napisanie niestandardowego partycjonera wraz z niestandardowym RDD, gdzie zdefiniujemy getPreferredLocations ... ale myślałem, że jest to takie proste i powszechne, aby zrobić to z pewnością musi być prosty sposób na zrobienie tego?

Co próbowałem:

.set("spark.default.parallelism", partitions) na moim SparkConf, a gdy w kontekście czytania parkietu Próbowałem sqlContext.sql("set spark.sql.shuffle.partitions= ..., który na 1.0.0 powoduje błąd i nie ma naprawdę chcę, chcę partycji liczba do zmiany we wszystkich rodzajach pracy, a nie tylko w tasowaniu.

+0

Przy odrobinie szczęścia znalezienie rozwiązania tego problemu? – nbubis

Odpowiedz

0

Nie do końca rozumiem, o co ci chodzi. Czy masz na myśli, że masz teraz 5 partycji, ale po następnej operacji potrzebujesz danych dystrybuowanych do 10? Ponieważ posiadanie 10, ale wciąż używanie 5 nie ma większego sensu ... Proces wysyłania danych do nowych partycji musi kiedyś nastąpić.

Kiedy robisz coalesce, możesz pozbyć się niepotrzebnych partycji, na przykład: jeśli miałeś początkowo 100, ale potem po zmniejszeniu KeyKey masz 10 (jak tam, gdzie tylko 10 klawiszy), możesz ustawić coalesce.

Jeśli chcesz proces, aby przejść na drugą stronę, można po prostu zmusić jakąś partycjonowania:

[RDD].partitionBy(new HashPartitioner(100)) 

nie jestem pewien, że to co szukasz, ale nadzieję, że tak.

+3

Każda partycja ma lokalizację, tj. Węzeł, przypuśćmy, że mam 5 partycji i 5 węzłów. Jeśli wywołasz 'partycję' lub twój kod, do 10 partycji, przetasuje dane - to znaczy, że dane dla każdego z 5 węzłów mogą przejść przez sieć do innych węzłów. To, czego chcę, to to, że Spark po prostu dzieli każdą partycję na 2 bez przenoszenia jakichkolwiek danych - tak dzieje się w Hadoop podczas modyfikowania ustawień podziału. – samthebest

+0

Nie jestem pewien, czy możesz to zrobić. Domyślam się, że potrzebujesz jakiejś funkcji '.forEachNode'. Ale nigdy nie widziałem czegoś takiego. I nie jestem pewien, czy można to łatwo wdrożyć. Partycja musi za każdym razem zwrócić tę samą partycję dla tego samego obiektu. Domyślnie Spark używa 'HashPartitioner', który ma ** hashCode modulo number_of_partitions **. Jeśli podzielisz dane na dwie nowe partycje, na pewno skończy się to nie ich miejscem. Dlatego tasowanie jest konieczne. Może jeśli masz własny program partycjonujący, może to zwiększyć liczbę partycji bez przetasowania przez sieć. – szefuf

Powiązane problemy