2014-06-10 22 views
9

Piszę program związany z wykresem w Scala z Spark. Zbiór danych ma 4 miliony węzłów i 4 miliony krawędzi (można traktować to jako drzewo), ale za każdym razem (Iteration), edytuję tylko jego część, a mianowicie pod drzewko zakorzenione przez dany węzeł, oraz węzły w ścieżce między danym węzłem a rootem.Jaki jest skuteczny sposób aktualizowania wartości w RDD Sparka?

Numer Iteration jest zależny, co oznacza, że ​​wymagany jest wynik z i. Muszę więc zapisać wynik każdego następnego kroku.

próbuję znaleźć skuteczny sposób, aby zaktualizować RDD, ale nie mają pojęcia, że ​​tak far.I znaleźć PairRDD mają lookup funkcji, które mogą skrócić czas obliczeń z O(N), O (M), N Oznaczmy całkowita liczba obiektów w numerach RDD i M oznacza liczbę elementów w każdej partycji.

Więc myślę, że jest tak, że mogę zaktualizować obiekt w RDD z O(M)? Lub więcej idealnie, O (1)? (Widzę wiadomość e-mail na liście poczty Sparka mówiąc, że lookup mogą być modyfikowane w celu osiągnięcia O (1))

Inną rzeczą jest to, czy mogę osiągnąć O(M) uaktualniania RDD, czy mogę zwiększyć partycję do liczby większej niż liczba rdzeni, jaką mam i osiągnąć lepszą wydajność?

+1

RDD jest niezmienna, możesz jedynie utworzyć nowy RDD przez transformację, ale nie można go zaktualizować. – cloud

+0

@cloud Dzięki za komentarz, czy to znaczy, że muszę utworzyć zupełnie nowy RDD zamiast tylko partycji? – bxshi

+1

To wszystko. Napiszę odpowiedź, aby wyjaśnić to szczegółowo. – cloud

Odpowiedz

4

RDD jest zbiorem danych rozproszonych, partycja jest jednostką pamięci RDD, a jednostka do przetwarzania i RDD jest elementem.

Na przykład odczytano duży plik z HDFS jako RDD, a następnie element tego RDD to String (linie w tym pliku), a iskra przechowuje ten RDD w klastrze według partycji. Dla ciebie, jako użytkownika iskrzenia, musisz tylko dbać o to, jak radzić sobie z liniami tych plików, tak jak piszesz normalny program i czytasz plik z lokalnego systemu plików wiersz po linii. To jest moc iskry :)

W każdym razie nie masz pojęcia, które elementy będą przechowywane na określonej partycji, więc nie ma sensu aktualizować określonej partycji.

+0

Tak więc, na podstawie twojej i maasgowej odpowiedzi, powinienem traktować RDD jako normalny obiekt i nie próbować "dostrajać" wydajności na niższym poziomie, ponieważ framework zrobi to za mnie i stworzy nowy RDD z ponownym wykorzystaniem obiektu (tak zasadniczo tworzenie tylko iteracji i zastąpienie jakiegoś obiektu nowymi) nie jest tak powolne, jak myślałem? – bxshi

+4

@bxshi Obiekt RDD jest tani, ale dane w nim zawarte są drogie. Na przykład piszesz aplikację: data_source -> rdd1 -> rdd2 -> rdd3 -> get_result. Czym właściwie jest iskra: zapamiętaj transformację t1, t2, t3 i zastosuj transformację do źródła danych i uzyskaj wynik. Spark nie będzie przechowywać danych RDD, chyba że wywołasz 'RDD.cache()'. – cloud

+0

@cloud: Czy to znaczy, że tylko jedno RDD będzie istniało w danym czasie? – Shankar

6

Jako funkcjonalne struktury danych, RDD są niezmienne, a operacja na RDD generuje nowy RDD.

Niestabilność konstrukcji niekoniecznie oznacza pełną replikację. Perspektywiczne struktury danych są powszechnym wzorcem funkcjonalnym, w którym operacje na niezmiennych strukturach dają nową strukturę, ale poprzednie wersje są utrzymywane i często ponownie wykorzystywane.

GraphX ​​(A „moduł” w górę iskry) jest API wykres na górze iskrę, która wykorzystuje taką koncepcję: Z dokumentów:

zmiany wartości lub struktury wykres osiąga się przez tworzenie nowego wykresu z żądanymi zmianami. Należy zauważyć, że znaczące części oryginalnego wykresu (tj. Nienaruszona struktura, atrybuty, i oznaczenia) są ponownie wykorzystywane na nowym wykresie, co zmniejsza koszt tej funkcjonalnej struktury danych w postaci .

To może być rozwiązanie problemu pod ręką: http://spark.apache.org/docs/1.0.0/graphx-programming-guide.html

+0

Tak, są one ponownie używane, ale nadal trzeba wykonać iterację wszystkich elementów, aby utworzyć nowy obiekt. – bxshi

+0

Kiedy mówisz "Próbuję znaleźć skuteczny sposób na aktualizację RDD", chociaŜ miałeś na myśli mutacje na miejscu. Czy raczej mówisz o wyszukiwaniu? – maasg

+0

@massg Cóż, chciałbym porozmawiać o aktualizacji RDD, ale popełniłem błąd w definicji "iteracji". Kiedy wykonujesz mapę lub inne operacje manipulacyjne, aby utworzyć nowe RDD, masz równoległość w takich operacjach, ale nadal potrzebujesz dostępu do wszystkich elementów w starym RDD. – bxshi

1

Model programowania MapReduce (i FP) naprawdę nie obsługują aktualizacji pojedynczych wartości. Raczej należy zdefiniować sekwencję przekształceń.

Teraz, gdy masz współzależne wartości, tj. Nie możesz przeprowadzić transformacji prostym map, ale musisz agregować wiele wartości i aktualizować w oparciu o tę wartość, to musisz zastanowić się nad sposobem grupowania tych wartości razem następnie transformując każdą grupę - lub definiując operację monoidalną, aby operacja mogła zostać rozdzielona i podzielona na etapy.

Grupa podejściem

Teraz postaram się być nieco bardziej specyficzne dla danego przypadku. Mówisz, że masz poddrzew, czy można najpierw odwzorować każdy węzeł na klucz, który wskazuje odpowiedni poddrzewo? Jeśli tak można zrobić coś takiego:

nodes.map(n => (getSubTreeKey(n), n)).grouByKey().map ...

monoid

(ściśle mówiąc chcesz przemienności monoid) Najlepszy czytasz http://en.wikipedia.org/wiki/Monoid#Commutative_monoid

Na przykład + jest monoidal operacja ponieważ kiedy ktoś chce obliczyć sumę, powiedzmy, RDD z Ints, wówczas bazowy framework może odciąć dane na kawałki, wykonać sumę na każdej porcji, a następnie zsumować sumy wynikowe (być może w większej liczbie tylko 2 kroki). Jeśli znajdziesz monoid, który ostatecznie przyniesie takie same wyniki, jakich potrzebujesz od pojedynczych aktualizacji, masz sposób na dystrybucję swojego przetwarzania. Na przykład.

nodes.reduce(_ myMonoid _)

Powiązane problemy