2015-01-24 13 views
11

Mam uruchomiła moją klaster ten sposób:Spark: strategia Repartition po przeczytaniu pliku tekstowego

/usr/lib/spark/bin/spark-submit --class MyClass --master yarn-cluster--num-executors 3 --driver-memory 10g --executor-memory 10g --executor-cores 4 /path/to/jar.jar 

Pierwszą rzeczą, którą robię jest odczytywany duży plik tekstowy, a liczyć go:

val file = sc.textFile("/path/to/file.txt.gz") 
println(file.count()) 

Kiedy robiąc to widzę, że tylko jeden z moich węzłów faktycznie czyta plik i wykonuje liczenie (ponieważ widzę tylko jedno zadanie). Czy to jest oczekiwane? Czy powinienem ponownie rozdzielić mój RDD lub kiedy używam funkcji zmniejszania mapy, czy Spark zrobi to za mnie?

+0

Jakie są twoje "defaultMinPartitions"? Jak wyraźnie wskazuje dokument, textFile przyjmuje opcjonalną liczbę parametrów partycji, która domyślnie jest równa. –

+0

Moje defaultMinPartitions jest większe niż jeden. Wygląda na to, że nie mogę wymusić określonej liczby partycji, ponieważ jest to tylko jeden plik tekstowy ... działa .... val file = sc.textFile ("/ path/to/file.txt.gz", 8) println (file.partitions.length) zwraca 1 – Stephane

+0

Cóż, to musi zrobić czytanie w jednym miejscu, ponieważ jest to z natury szeregowe. Ale nie rozumiem, dlaczego miałby ten opcjonalny parametr, gdyby coś takiego nie zrobiło. –

Odpowiedz

20

Wygląda na to, że używasz spakowanego pliku gzip.

Cytując my answer here:

Myślę, że uderzył w dość typowy problem z spakowane gzipem plików, które nie mogą być ładowane równolegle. Dokładniej, pojedynczy plik gzipowany nie może być ładowany równolegle przez wiele zadań, więc Spark załaduje go z 1 zadaniem, a tym samym da RDD z 1 partycją.

Po załadowaniu trzeba jawnie je ponownie podzielić na partycje RDD, aby można było wykonywać równolegle więcej zadań.

Na przykład:

val file = sc.textFile("/path/to/file.txt.gz").repartition(sc.defaultParallelism * 3) 
println(file.count()) 

Jeśli chodzi o komentarze na swoje pytanie, powodem ustawienie minPartitions nie pomaga tu dlatego a gzipped file is not splittable, więc Spark będzie zawsze używać 1 zadanie odczytać pliku.

Jeśli ustawisz minPartitions podczas czytania zwykłego pliku tekstowego lub pliku skompresowanego z możliwym do podzielenia formatem kompresji, jak bzip2, zobaczysz, że Spark faktycznie wdroży tę liczbę zadań równolegle (do liczby dostępnych rdzeni w klastrze), aby odczytać plik.

+0

Dzięki! Czy poleciłbyś więc bzip2 over gzip? Jeśli często ładuję ten plik, jaka powinna być moja strategia optymalizacji każdego uruchomienia? – Stephane

+0

@Stephane - To zależy od tego, ile danych przychodzi i ile czasu klastra spędza na ponownej partycjonowaniu danych. Pojedynczy plik zgzipowany może być w porządku. Jeśli jeden plik jest zbyt duży, prawdopodobnie można również użyć wielu spakowanych plików gzip (tj. Podziału przed kompresją), ponieważ każdy plik skompresowany gzip może zostać załadowany równolegle do tego samego dokumentu RDD (jedno zadanie na plik). To prawdopodobnie ścieżka najmniejszego oporu. –

+0

bardzo, bardzo ciekawe dzięki! Więc .gz.001 podzielone pliki lub bzip2 ... Będę eksperymentować z obydwoma!Myślę, że tak, dużym wąskim gardłem jest pierwszy podział, więc jeśli uda mi się już podzielić moje pliki, kiedy przyjdą, może zaoszczędzić mi trochę czasu – Stephane

Powiązane problemy