2014-12-12 23 views
20

Mam mały program Scala, który działa dobrze na pojedynczym węźle. Jednak skaluję go, aby działał na wielu węzłach. To moja pierwsza taka próba. Próbuję tylko zrozumieć, jak RDD działają w Spark, więc to pytanie jest oparte na teorii i może nie być w 100% poprawne.Spark RDD's - jak działają

Powiedzmy utworzyć RDD: val rdd = sc.textFile(file)

Teraz kiedy już to zrobisz, to znaczy, że plik w file jest teraz podzielona w poprzek węzłów (zakładając, że wszystkie węzły mają dostęp do ścieżki pliku) ?

Po drugie, chcę policzyć liczbę obiektów w RDD (dość proste), ale muszę użyć tego numeru w obliczeniach, które muszą być stosowane do obiektów w RDD - a pseudokod przykład:

rdd.map(x => x/rdd.size) 

Można powiedzieć, że są 100 obiekty w rdd, i powiedzieć, że są 10 węzłów, więc liczyć się z 10 obiektów na węźle (zakładając ten sposób prace pojęcie RDD), teraz gdy zgłoszę metoda jest każdy węzeł będzie wykonać obliczenia z rdd.size jako 10 lub 100? Ponieważ ogólnie RDD ma rozmiar 100, ale lokalnie w każdym węźle jest tylko 10. Czy przed wykonaniem obliczeń muszę wprowadzić zmienną emisji? To pytanie jest powiązane z poniższym pytaniem.

Wreszcie, jeśli dokonam transformacji do RDD, np. rdd.map(_.split("-")), a następnie chciałem nowego size z RDD, czy muszę wykonać działanie na RDD, takie jak count(), więc wszystkie informacje są wysyłane z powrotem do węzła sterownika?

+1

"To pytanie jest powiązane z poniższym pytaniem." ->? – gsamaras

+0

Myślę, że chodziło o 'rdd.flatMap (_. Split (" - "))' – lovasoa

Odpowiedz

6

Zwykle plik (lub jego części, jeśli jest zbyt duży) jest replikowany do N węzłów w klastrze (domyślnie N = 3 w HDFS). Nie jest intencją dzielenia każdego pliku pomiędzy wszystkie dostępne węzły.

Jednak dla Ciebie (tzn. Klienta) praca z plikiem przy użyciu Sparka powinna być przezroczysta - nie powinna być żadnej różnicy w rdd.size, bez względu na to, ile węzłów jest podzielonych i/lub zreplikowanych. Istnieją metody (przynajmniej w Hadoop), aby dowiedzieć się, w którym momencie można zlokalizować węzły (części). Jednak w prostych przypadkach najprawdopodobniej nie będziesz musiał korzystać z tej funkcji.

UPDATE: artykuł opisujący RDD wewnętrzne: https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf

+0

Dzięki za odpowiedź. Tak więc dla obliczeń takich jak: 'rdd.filter (...). Map (x => x * rdd.count)' jest krokiem 'filtru' przeprowadzanym na każdym węźle, zanim dowolny węzeł może wykonać krok' map'? Ponieważ, oczywiście, krok 'map' zależy od kroku' filter' już wykonywanego na każdym węźle, ponieważ 'map' zawiera' rdd.count'. Dzięki jeszcze raz. – monster

+0

Oczywiście, ponieważ 'map' jest zbudowana na' filter' (przeczytaj artykuł o koncepcji "lineage" w artykule). – Ashalynd

+0

Dzięki za informację, to dobra lektura, jednak zastanawiam się, jaki jest cel zmiennej Broadcast? Jeszcze raz dziękuję, docenione! – monster

18
val rdd = sc.textFile(file) 

to znaczy, że plik jest teraz podzielona w poprzek węzłów?

Plik pozostaje w dowolnym miejscu. Elementy wynikowego RDD[String] są liniami pliku. RDD jest partycjonowany, aby dopasować się do naturalnego partycjonowania podstawowego systemu plików. Liczba partycji nie zależy od liczby posiadanych węzłów.

Ważne jest, aby zrozumieć, że po wykonaniu tego wiersza nie czyta czyta pliku (ów). RDD jest leniwym obiektem i zrobi coś tylko wtedy, gdy musi. To jest świetne, ponieważ pozwala uniknąć niepotrzebnego zużycia pamięci.

Na przykład, jeśli napiszesz val errors = rdd.filter(line => line.startsWith("error")), nadal nic się nie dzieje.Jeśli następnie napiszesz val errorCount = errors.count, teraz twoja sekwencja operacji będzie musiała zostać wykonana, ponieważ wynik count jest liczbą całkowitą. To, co każdy rdzeń roboczy (wątek executora) będzie robił równolegle, jest odczytywane z pliku (lub fragmentu pliku), iteruje za pośrednictwem linii i liczy linie zaczynające się od "błędu". Buforowanie i GC na bok, tylko jedna linia na rdzeń będzie w pamięci na raz. Umożliwia to pracę z bardzo dużymi danymi bez użycia dużej ilości pamięci.

Chcę policzyć liczbę obiektów w RDD jednak muszę korzystać z tej liczby w obliczeniach, które muszą być stosowane do obiektów w RDD - jest pseudokod przykład:

rdd.map(x => x/rdd.size) 

Nie ma metody rdd.size. Istnieje rdd.count, który zlicza liczbę elementów w RDD. rdd.map(x => x/rdd.count) nie będzie działać. Kod spróbuje wysłać zmienną rdd do wszystkich pracowników i zakończy się niepowodzeniem z NotSerializableException. Co można zrobić, to:

val count = rdd.count 
val normalized = rdd.map(x => x/count) 

To działa, ponieważ count jest Int i może być w odcinkach.

Jeśli dokonam transformacji do RDD, np. rdd.map(_.split("-")), a następnie chciałem nowego rozmiaru RDD, czy muszę wykonać działanie na RDD, takie jak count(), więc wszystkie informacje są wysyłane z powrotem do węzła sterownika?

map nie zmienia liczby elementów. Nie wiem, co masz na myśli przez "rozmiar". Ale tak, musisz wykonać akcję, taką jak count, aby uzyskać wszystko poza RDD. Widzisz, żadna praca nie jest wykonywana, dopóki nie wykonasz jakiegoś działania. (Gdy wykonasz count, tylko liczba poszczególnych partycji zostanie odesłana do sterownika, oczywiście, nie "wszystkie informacje".)

+0

Zrobiłem przykład [tag: Python] na podstawie Twojej odpowiedzi w [dokumentacji] (http://stackoverflow.com/documentation/apache-spark/833/introduction-to-apache-spark#t=20160817171702245426), jeśli podoba ci się, możesz dołączyć to do swojej odpowiedzi! – gsamaras

+0

To powinna być zaakceptowana odpowiedź. Odpowiada na wszystkie części całkowicie i poprawnie. – tejaskhot