2016-02-22 13 views
6

Próbowałem użyć Sparka do pracy nad prostym problemem grafowym. Znalazłem przykładowy program w folderze źródłowym Spark: transitive_closure.py, który oblicza zamknięcie przechodnie na wykresie z nie więcej niż 200 krawędziami i wierzchołkami. Ale w moim własnym laptopie działa więcej niż 10 minut i się nie kończy. Wiersz poleceń, którego używam, to: spark-submit transitive_closure.py.Przykładowy program Sparka działa bardzo wolno

Zastanawiam się, dlaczego iskra jest tak powolna, nawet przy obliczaniu tak małego przechodniego wyniku zamknięcia? Czy to powszechny przypadek? Czy jest jakaś konfiguracja, za którą tęsknię?

Program pokazano poniżej i można go znaleźć w folderze instalacyjnym iskry na ich stronie internetowej.

from __future__ import print_function 

import sys 
from random import Random 

from pyspark import SparkContext 

numEdges = 200 
numVertices = 100 
rand = Random(42) 


def generateGraph(): 
    edges = set() 
    while len(edges) < numEdges: 
     src = rand.randrange(0, numEdges) 
     dst = rand.randrange(0, numEdges) 
     if src != dst: 
      edges.add((src, dst)) 
    return edges 


if __name__ == "__main__": 
    """ 
    Usage: transitive_closure [partitions] 
    """ 
    sc = SparkContext(appName="PythonTransitiveClosure") 
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 
    tc = sc.parallelize(generateGraph(), partitions).cache() 

    # Linear transitive closure: each round grows paths by one edge, 
    # by joining the graph's edges with the already-discovered paths. 
    # e.g. join the path (y, z) from the TC with the edge (x, y) from 
    # the graph to obtain the path (x, z). 

    # Because join() joins on keys, the edges are stored in reversed order. 
    edges = tc.map(lambda x_y: (x_y[1], x_y[0])) 

    oldCount = 0 
    nextCount = tc.count() 
    while True: 
     oldCount = nextCount 
     # Perform the join, obtaining an RDD of (y, (z, x)) pairs, 
     # then project the result to obtain the new (x, z) paths. 
     new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0])) 
     tc = tc.union(new_edges).distinct().cache() 
     nextCount = tc.count() 
     if nextCount == oldCount: 
      break 

    print("TC has %i edges" % tc.count()) 

    sc.stop() 

Odpowiedz

4

nie może wiele powodów, dlaczego ten kod nie działa szczególnie dobrze na komputerze, ale najprawdopodobniej jest to tylko kolejny wariant problemu opisanego w Spark iteration time increasing exponentially when using join. Najprostszym sposobem, aby sprawdzić, czy jest to rzeczywiście sprawa jest zapewnienie spark.default.parallelism parametr na przedłożyć:

bin/spark-submit --conf spark.default.parallelism=2 \ 
    examples/src/main/python/transitive_closure.py 

Jeżeli nie ogranicza się inaczej, SparkContext.union, RDD.join i RDD.union ustawić szereg przegród dziecka do całkowitej liczby partycji w rodzicach. Zwykle jest to pożądane zachowanie, ale może być niezwykle nieefektywne, jeśli jest stosowane iteracyjnie.

+1

Dziękuję. Naprawdę pomocny. Mam jeszcze jedno pytanie, jeśli możesz pomóc, będę bardzo wdzięczny. Załóżmy, że mam program, który używa wielu operacji relacyjnych, takich jak łączenie, wybieranie, łączenie, aktualizowanie itp. W pętli, aż do faktów w relacjach do punktu stałego. Nawet przy całkowitych krotkach nie więcej niż 50, utknąłem na drugiej iteracji i wyjątku wielkości sterty Java. Użyłem cache() i koalesce (1) na każdej operacji na ramce danych. Jaki może być problem, który myślisz? – c21

0

useage mówi wiersz poleceń jest

transitive_closure [partitions] 

domyślne ustawienie równoległość pomoże tylko z przyłącza w każdej partycji, a nie przeczuciom podziału pracy.

Zamierzam argumentować, że należy użyć WIĘCEJ partycji. Ustawienie domyślnego paralelizmu może nadal pomóc, ale opublikowany kod ustawia liczbę wyraźnie (argument przekazany lub 2, w zależności od tego, która wartość jest większa). Absolutnym minimum powinny być rdzenie dostępne dla Sparka, w przeciwnym razie zawsze pracujesz na poziomie poniżej 100%.

+0

Tutaj nie ma żadnej wartości w zwiększaniu równoległości. Właściwie podana ilość danych możesz zyskać więcej, redukując ją do 1 :) Nie wspominając już o upuszczeniu Sparka. – zero323

Powiązane problemy