Próbowałem użyć Sparka do pracy nad prostym problemem grafowym. Znalazłem przykładowy program w folderze źródłowym Spark: transitive_closure.py, który oblicza zamknięcie przechodnie na wykresie z nie więcej niż 200 krawędziami i wierzchołkami. Ale w moim własnym laptopie działa więcej niż 10 minut i się nie kończy. Wiersz poleceń, którego używam, to: spark-submit transitive_closure.py.Przykładowy program Sparka działa bardzo wolno
Zastanawiam się, dlaczego iskra jest tak powolna, nawet przy obliczaniu tak małego przechodniego wyniku zamknięcia? Czy to powszechny przypadek? Czy jest jakaś konfiguracja, za którą tęsknię?
Program pokazano poniżej i można go znaleźć w folderze instalacyjnym iskry na ich stronie internetowej.
from __future__ import print_function
import sys
from random import Random
from pyspark import SparkContext
numEdges = 200
numVertices = 100
rand = Random(42)
def generateGraph():
edges = set()
while len(edges) < numEdges:
src = rand.randrange(0, numEdges)
dst = rand.randrange(0, numEdges)
if src != dst:
edges.add((src, dst))
return edges
if __name__ == "__main__":
"""
Usage: transitive_closure [partitions]
"""
sc = SparkContext(appName="PythonTransitiveClosure")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
tc = sc.parallelize(generateGraph(), partitions).cache()
# Linear transitive closure: each round grows paths by one edge,
# by joining the graph's edges with the already-discovered paths.
# e.g. join the path (y, z) from the TC with the edge (x, y) from
# the graph to obtain the path (x, z).
# Because join() joins on keys, the edges are stored in reversed order.
edges = tc.map(lambda x_y: (x_y[1], x_y[0]))
oldCount = 0
nextCount = tc.count()
while True:
oldCount = nextCount
# Perform the join, obtaining an RDD of (y, (z, x)) pairs,
# then project the result to obtain the new (x, z) paths.
new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0]))
tc = tc.union(new_edges).distinct().cache()
nextCount = tc.count()
if nextCount == oldCount:
break
print("TC has %i edges" % tc.count())
sc.stop()
Dziękuję. Naprawdę pomocny. Mam jeszcze jedno pytanie, jeśli możesz pomóc, będę bardzo wdzięczny. Załóżmy, że mam program, który używa wielu operacji relacyjnych, takich jak łączenie, wybieranie, łączenie, aktualizowanie itp. W pętli, aż do faktów w relacjach do punktu stałego. Nawet przy całkowitych krotkach nie więcej niż 50, utknąłem na drugiej iteracji i wyjątku wielkości sterty Java. Użyłem cache() i koalesce (1) na każdej operacji na ramce danych. Jaki może być problem, który myślisz? – c21